Commit d0c801be authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

Misc

parent ed07191e
......@@ -2,12 +2,14 @@ pub mod algo;
pub mod sat;
pub mod util;
#[derive(Debug)]
#[derive(Debug, Clone, Copy)]
pub enum Consistency {
RepeatableRead,
ReadCommitted,
RepeatableRead,
ReadAtomic,
Causal,
Prefix,
SnapshotIsolation,
Serializable,
Inc,
}
use hashbrown::{HashMap, HashSet};
use std::fs;
use std::fs::{File, OpenOptions};
use std::default::Default;
......@@ -43,7 +44,7 @@ impl CNF {
self.clauses.push(Vec::new());
}
fn write_to_file(&self, path: &PathBuf) {
fn write_to_file(&mut self, path: &PathBuf) {
let mut file = BufWriter::new(
OpenOptions::new()
.write(true)
......@@ -55,9 +56,9 @@ impl CNF {
writeln!(file, "p cnf {} {}", self.n_variable, self.clauses.len() - 1)
.expect("failed to write parameters");
for clause in self.clauses.iter().rev().skip(1) {
for clause in self.clauses.drain(..).rev().skip(1) {
for (sign, literal) in clause {
if *sign {
if sign {
write!(file, "{} ", literal).expect("failed to write cnf to file");
} else {
write!(file, "-{} ", literal).expect("failed to write cnf to file");
......@@ -95,7 +96,7 @@ impl Sat {
}
}
for (_, mut wr_map) in write_variable.iter_mut() {
for (_, wr_map) in write_variable.iter_mut() {
wr_map.entry((0, 0)).or_insert_with(Default::default);
}
......@@ -261,7 +262,7 @@ impl Sat {
self.add_clauses(&clauses);
}
pub fn solve(&self, path: &PathBuf) -> Option<Vec<(usize, usize)>> {
pub fn solve(&mut self, path: &PathBuf) -> Option<Vec<(usize, usize)>> {
let inp_cnf = path.join("history.cnf");
let out_cnf = path.join("result.cnf");
self.cnf.write_to_file(&inp_cnf);
......@@ -277,6 +278,8 @@ impl Sat {
panic!("failed to execute process")
}
fs::remove_file(inp_cnf).expect("couldn't delete input cnf");
// println!("status: {}", output.status);
// println!("stdout: {}", String::from_utf8_lossy(&output.stdout));
// println!("stderr: {}", String::from_utf8_lossy(&output.stderr));
......
......@@ -7,7 +7,7 @@ use rand::Rng;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use chrono::{DateTime, Local};
use chrono::{DateTime, Duration, Local};
#[derive(Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct Event {
......@@ -144,6 +144,10 @@ impl History {
pub fn get_cloned_params(&self) -> HistParams {
self.params.clone()
}
pub fn get_duration(&self) -> Duration {
self.end - self.start
}
}
pub fn generate_single_history(
......@@ -154,17 +158,23 @@ pub fn generate_single_history(
) -> Vec<Session> {
let mut counters = HashMap::new();
let mut random_generator = rand::thread_rng();
let variable_range = Uniform::from(0..n_variable);
let read_variable_range = Uniform::from(0..n_variable);
let jump = (n_variable as f64 / n_node as f64).ceil() as usize;
(0..n_node)
.map(|_| {
.map(|i_node| {
let i = i_node * jump;
let j = std::cmp::min((i_node + 1) * jump, n_variable);
// let write_variable_range = Uniform::from(i..j);
(0..n_transaction)
.map(|_| Transaction {
events: (0..n_event)
.map(|_| {
let variable = variable_range.sample(&mut random_generator);
if random_generator.gen() {
let variable = read_variable_range.sample(&mut random_generator);
Event::read(variable)
} else {
let variable = read_variable_range.sample(&mut random_generator);
// let variable = write_variable_range.sample(&mut random_generator);
let value = {
let entry = counters.entry(variable).or_insert(0);
*entry += 1;
......
......@@ -98,19 +98,11 @@ fn main() {
.help("Use BiComponent"),
)
.arg(
Arg::with_name("serializable")
.long("ser")
.help("Check for Serializablity"),
)
.arg(
Arg::with_name("snapshot_isolation")
.long("si")
.help("Check for Snapshot Isolation"),
)
.arg(
Arg::with_name("causal")
.long("cc")
.help("Check for Causality"),
Arg::with_name("consistency")
.long("cons")
.short("c")
.takes_value(true)
.help("Check for mentioned consistency"),
)
.about("Verifies histories"),
])
......@@ -135,27 +127,62 @@ fn main() {
);
for hist in histories.drain(..) {
let mut file = File::create(dir.join(format!("hist-{:05}.bincode", hist.get_id())))
let file = File::create(dir.join(format!("hist-{:05}.bincode", hist.get_id())))
.expect("couldn't create bincode file");
let mut buf_writer = BufWriter::new(file);
let buf_writer = BufWriter::new(file);
bincode::serialize_into(buf_writer, &hist)
.expect("dumping history to bincode file went wrong");
}
}
("verify", Some(matches)) => {
let v_dir = Path::new(matches.value_of("v_directory").unwrap());
let histories: Vec<History> = fs::read_dir(v_dir)
.expect("couldn't read history directory")
.filter_map(|entry_res| match entry_res {
Ok(ref entry) if entry.path().is_dir() => {
let file = File::open(entry.path().join("history.bincode")).unwrap();
let buf_reader = BufReader::new(file);
Some(bincode::deserialize_from(buf_reader).unwrap())
}
_ => None,
})
.collect();
// let v_dir = Path::new(matches.value_of("v_directory").unwrap());
//
// let histories: Vec<History> = fs::read_dir(v_dir)
// .expect("couldn't read history directory")
// .filter_map(|entry_res| match entry_res {
// Ok(ref entry) if entry.path().is_dir() => {
// let file = File::open(entry.path().join("history.bincode")).unwrap();
// let buf_reader = BufReader::new(file);
// Some(bincode::deserialize_from(buf_reader).unwrap())
// }
// _ => None,
// })
// .collect();
//
// let o_dir = Path::new(matches.value_of("o_directory").unwrap());
//
// if !o_dir.is_dir() {
// fs::create_dir_all(o_dir).expect("failed to create directory");
// }
//
// histories.iter().for_each(|ref hist| {
// let curr_dir = o_dir.join(format!("hist-{:05}", hist.get_id()));
//
// let mut verifier = Verifier::new(curr_dir.to_path_buf());
//
// if matches.is_present("causal") {
// verifier.model("cc");
// } else if matches.is_present("snapshot_isolation") {
// verifier.model("si");
// } else if matches.is_present("serializable") {
// verifier.model("ser");
// }
//
// verifier.sat(matches.is_present("sat"));
// verifier.bicomponent(matches.is_present("bicomponent"));
//
// if !verifier.transactional_history_verify(hist.get_data()) {
// println!("hist-{:05} failed", hist.get_id());
// } else {
// println!("hist-{:05} done", hist.get_id());
// }
// });
let v_path =
Path::new(matches.value_of("v_directory").unwrap()).join("history.bincode");
let file = File::open(v_path).unwrap();
let buf_reader = BufReader::new(file);
let hist: History = bincode::deserialize_from(buf_reader).unwrap();
let o_dir = Path::new(matches.value_of("o_directory").unwrap());
......@@ -163,26 +190,29 @@ fn main() {
fs::create_dir_all(o_dir).expect("failed to create directory");
}
histories.iter().for_each(|ref hist| {
let curr_dir = o_dir.join(format!("hist-{:05}", hist.get_id()));
// let curr_dir = o_dir.join(format!("hist-{:05}", hist.get_id()));
let mut verifier = Verifier::new(curr_dir.to_path_buf());
let mut verifier = Verifier::new(o_dir.to_path_buf());
if matches.is_present("causal") {
verifier.model("cc");
} else if matches.is_present("snapshot_isolation") {
verifier.model("si");
} else if matches.is_present("serializable") {
verifier.model("ser");
}
match matches.value_of("consistency") {
Some("cc") => verifier.model("cc"),
Some("si") => verifier.model("si"),
Some("ser") => verifier.model("ser"),
None => verifier.model(""),
_ => unreachable!(),
};
verifier.sat(matches.is_present("sat"));
verifier.bicomponent(matches.is_present("bicomponent"));
verifier.sat(matches.is_present("sat"));
verifier.bicomponent(matches.is_present("bicomponent"));
if !verifier.transactional_history_verify(hist.get_data()) {
println!("hist-{:05} failed", hist.get_id());
}
});
match verifier.verify(hist.get_data()) {
Some(level) => println!(
"hist-{:05} failed - minimum level failed {:?}",
hist.get_id(),
level
),
None => println!("hist-{:05} done", hist.get_id()),
}
}
_ => unreachable!(),
}
......
use hashbrown::{HashMap, HashSet};
use std::fs;
// use std::fs;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
......@@ -8,7 +8,9 @@ use consistency::sat::Sat;
use consistency::Consistency;
use db::history::Session;
use consistency::algo::{AtomicHistoryPO, SerializableHistory, SnapshotIsolationHistory};
use consistency::algo::{
AtomicHistoryPO, PrefixConsistentHistory, SerializableHistory, SnapshotIsolationHistory,
};
use consistency::util::ConstrainedLinearization;
mod util;
......@@ -27,11 +29,11 @@ pub struct Verifier {
impl Verifier {
pub fn new(dir: PathBuf) -> Self {
fs::create_dir(&dir).unwrap();
// fs::create_dir(&dir).unwrap();
let log_file = File::create(dir.join("result_log.json")).unwrap();
Verifier {
log: Self::get_logger(BufWriter::new(log_file)),
log: Self::get_logger(log_file),
consistency_model: Consistency::Serializable,
use_sat: false,
use_bicomponent: false,
......@@ -43,11 +45,13 @@ impl Verifier {
self.consistency_model = match model {
"rc" => Consistency::ReadCommitted,
"rr" => Consistency::RepeatableRead,
"ra" => Consistency::ReadAtomic,
"cc" => Consistency::Causal,
"pre" => Consistency::Prefix,
"si" => Consistency::SnapshotIsolation,
"ser" => Consistency::Serializable,
_ => unreachable!(),
"" => Consistency::Inc,
&_ => unreachable!(),
}
}
......@@ -102,7 +106,29 @@ impl Verifier {
write_map
}
pub fn transactional_history_verify(&self, histories: &[Session]) -> bool {
pub fn verify(&mut self, histories: &[Session]) -> Option<Consistency> {
let moment = std::time::Instant::now();
let decision = self.transactional_history_verify(histories);
let duration = moment.elapsed();
info!(
self.log,
#"information",
"the algorithm finished";
"model" => format!("{:?}", self.consistency_model),
"sat" => self.use_sat,
"bicomponent" => self.use_bicomponent,
"duration" => duration.as_secs() as f64 + f64::from(duration.subsec_nanos()) * 1e-9,
"minViolation" => match decision {
Some(e) => format!("{:?}",e),
None => format!("ok")
},
);
decision
}
pub fn transactional_history_verify(&mut self, histories: &[Session]) -> Option<Consistency> {
let write_map = Self::gen_write_map(histories);
for (i_node_r, session) in histories.iter().enumerate() {
......@@ -129,12 +155,13 @@ impl Verifier {
(i_node, i_transaction, i_event),
);
info!(self.log, "finished early"; "reason" => "DIRTY READ", "description" => "read from uncommitted/aborted transaction");
return false;
return Some(Consistency::ReadCommitted);
}
}
} else {
info!(self.log, "finished early"; "reason" => "NO WRITE WITH SAME (VARIABLE, VALUE)");
return false;
panic!("In consistent write");
// return false;
}
}
}
......@@ -188,7 +215,7 @@ impl Verifier {
(i_node + 1, i_transaction, i_event)
);
info!(self.log, "finished early"; "reason" => "LOST UPDATE", "description" => "did not read the latest write within transaction");
return false;
return Some(Consistency::ReadCommitted);
}
} else {
if event.value != 0 {
......@@ -201,7 +228,7 @@ impl Verifier {
!= wr_i_event
{
info!(self.log, "finished early"; "reason" => "UNCOMMITTED READ", "description" => "read some non-last write from other transaction");
return false;
return Some(Consistency::ReadCommitted);
}
}
......@@ -214,7 +241,7 @@ impl Verifier {
&& (*wr_i_event2 == wr_i_event))
{
info!(self.log, "finished early"; "reason" => "NON REPEATABLE READ", "description" => "did not read same as latest read which is after lastest write");
return false;
return Some(Consistency::RepeatableRead);
}
}
}
......@@ -278,6 +305,10 @@ impl Verifier {
.is_none());
}
info!(self.log, "atleast not read commmitted";
"number of transactions" => format!("{}", transaction_infos.len())
);
if self.use_sat {
info!(self.log, "using SAT");
}
......@@ -286,9 +317,7 @@ impl Verifier {
info!(self.log, "using bicomponent");
}
let moment = std::time::Instant::now();
let decision = if self.use_bicomponent {
if self.use_bicomponent {
// communication graph
info!(self.log, "doing bicomponent decomposition");
let mut access_map = HashMap::new();
......@@ -329,30 +358,19 @@ impl Verifier {
let biconnected_components = biconn.get_biconnected_vertex_components();
biconnected_components.iter().all(|component| {
if biconnected_components.iter().all(|component| {
info!(self.log, "doing for component {:?}", component);
let restrict_infos = self.restrict(&transaction_infos, component);
self.do_hard_verification(&restrict_infos)
})
self.do_hard_verification(&restrict_infos).is_none()
}) {
None
} else {
Some(self.consistency_model)
}
} else {
self.do_hard_verification(&transaction_infos)
};
let duration = moment.elapsed();
info!(
self.log,
#"information",
"the algorithm finished";
"model" => format!("{:?}", self.consistency_model),
"sat" => self.use_sat,
"bicomponent" => self.use_bicomponent,
"duration" => duration.as_secs() as f64 + f64::from(duration.subsec_nanos()) * 1e-9,
"result" => decision
);
decision
}
}
fn restrict(
......@@ -375,12 +393,12 @@ impl Verifier {
}
fn do_hard_verification(
&self,
&mut self,
transaction_infos: &HashMap<
(usize, usize),
(HashMap<usize, (usize, usize)>, HashSet<usize>),
>,
) -> bool {
) -> Option<Consistency> {
if self.use_sat {
let mut sat_solver = Sat::new(&transaction_infos);
......@@ -406,11 +424,33 @@ impl Verifier {
_ => unreachable!(),
}
sat_solver.solve(&self.dir).is_some()
if sat_solver.solve(&self.dir).is_some() {
None
} else {
Some(self.consistency_model)
}
} else {
info!(self.log, "using our algorithms");
match self.consistency_model {
Consistency::ReadAtomic => {
let mut ra_hist = AtomicHistoryPO::new(transaction_infos.clone());
let wr = ra_hist.get_wr();
ra_hist.vis_includes(&wr);
// ra_hist.vis_is_trans();
let ww = ra_hist.causal_ww();
for (_, ww_x) in ww.iter() {
ra_hist.vis_includes(ww_x);
}
// ra_hist.vis_is_trans();
if ra_hist.vis.has_cycle() {
Some(self.consistency_model)
} else {
None
}
}
Consistency::Causal => {
let mut causal_hist = AtomicHistoryPO::new(transaction_infos.clone());
......@@ -423,7 +463,34 @@ impl Verifier {
}
causal_hist.vis_is_trans();
!causal_hist.vis.has_cycle()
if causal_hist.vis.has_cycle() {
Some(self.consistency_model)
} else {
None
}
}
Consistency::Prefix => {
let mut pre_hist =
PrefixConsistentHistory::new(transaction_infos.clone(), self.log.clone());
let wr = pre_hist.history.get_wr();
pre_hist.history.vis_includes(&wr);
pre_hist.history.vis_is_trans();
let ww = pre_hist.history.causal_ww();
for (_, ww_x) in ww.iter() {
pre_hist.history.vis_includes(ww_x);
}
pre_hist.history.vis_is_trans();
if pre_hist.history.vis.has_cycle() {
Some(self.consistency_model)
} else {
if pre_hist.get_linearization().is_some() {
None
} else {
Some(self.consistency_model)
}
}
}
Consistency::SnapshotIsolation => {
let mut si_hist =
......@@ -439,9 +506,13 @@ impl Verifier {
si_hist.history.vis_is_trans();
if si_hist.history.vis.has_cycle() {
false
Some(self.consistency_model)
} else {
si_hist.get_linearization().is_some()
if si_hist.get_linearization().is_some() {
None
} else {
Some(self.consistency_model)
}
}
}
Consistency::Serializable => {
......@@ -458,37 +529,74 @@ impl Verifier {
ser_hist.history.vis_is_trans();
if ser_hist.history.vis.has_cycle() {
false
Some(self.consistency_model)
} else {
let lin_o = ser_hist.get_linearization();
{
// checking correctness
if let Some(ref lin) = lin_o {
let mut curr_value_map: HashMap<usize, (usize, usize)> =
Default::default();
for txn_id in lin.iter() {
let (read_info, write_info) =
transaction_infos.get(txn_id).unwrap();
for (x, txn1) in read_info.iter() {
match curr_value_map.get(&x) {
Some(txn1_) => assert_eq!(txn1_, txn1),
_ => unreachable!(),
}
}
for &x in write_info.iter() {
curr_value_map.insert(x, *txn_id);
}
// if !write_info.is_empty() {
// println!("{:?}", txn_id);
// println!("{:?}", curr_value_map);
// }
}
}
// let lin_o = ser_hist.get_linearization();
// {
// // checking correctness
// if let Some(ref lin) = lin_o {
// let mut curr_value_map: HashMap<usize, (usize, usize)> =
// Default::default();
// for txn_id in lin.iter() {
// let (read_info, write_info) =
// transaction_infos.get(txn_id).unwrap();
// for (x, txn1) in read_info.iter() {
// match curr_value_map.get(&x) {
// Some(txn1_) => assert_eq!(txn1_, txn1),
// _ => unreachable!(),
// }
// }
// for &x in write_info.iter() {
// curr_value_map.insert(x, *txn_id);
// }
// // if !write_info.is_empty() {
// // println!("{:?}", txn_id);
// // println!("{:?}", curr_value_map);
// // }
// }
// }
// }
// lin_o.is_some();
if ser_hist.get_linearization().is_some() {
None
} else {
Some(self.consistency_model)
}
lin_o.is_some()
}
}
_ => unreachable!(),
Consistency::Inc => {
self.consistency_model = Consistency::ReadAtomic;
let decision = self.do_hard_verification(transaction_infos);
if decision.is_some() {
return decision;
}
self.consistency_model = Consistency::Causal;
let decision = self.do_hard_verification(transaction_infos);