Commit 88b0a79f authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

added comunication graph

parent e11e46b4
......@@ -18,7 +18,7 @@ petgraph = "*"
rayon = "*"
slog = "*"
slog-term = "*"
slog-stream = "*"
slog-json = "*"
chrono = { version = "*", features = ["serde"] }
antidotedb = { path = "../antidotedb" }
......
......@@ -21,3 +21,31 @@ services:
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote3
antidote4:
image: antidotedb/antidote
container_name: antidote4
hostname: antidote4
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote4
antidote5:
image: antidotedb/antidote
container_name: antidote5
hostname: antidote5
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote5
antidote6:
image: antidotedb/antidote
container_name: antidote6
hostname: antidote6
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote6
antidote7:
image: antidotedb/antidote
container_name: antidote7
hostname: antidote7
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote7
rpc:call(antidote@antidote1, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote2, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote3, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote4, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote5, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote6, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote7, inter_dc_manager, start_bg_processes, [stable]),
{ok, Desc1} = rpc:call(antidote@antidote1, inter_dc_manager, get_descriptor, []),
{ok, Desc2} = rpc:call(antidote@antidote2, inter_dc_manager, get_descriptor, []),
{ok, Desc3} = rpc:call(antidote@antidote3, inter_dc_manager, get_descriptor, []),
Descriptors = [Desc1, Desc2, Desc3],
{ok, Desc4} = rpc:call(antidote@antidote4, inter_dc_manager, get_descriptor, []),
{ok, Desc5} = rpc:call(antidote@antidote5, inter_dc_manager, get_descriptor, []),
{ok, Desc6} = rpc:call(antidote@antidote6, inter_dc_manager, get_descriptor, []),
{ok, Desc7} = rpc:call(antidote@antidote7, inter_dc_manager, get_descriptor, []),
Descriptors = [
Desc1,
Desc2,
Desc3,
Desc4,
Desc5,
Desc6,
Desc7
],
rpc:call(antidote@antidote1, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote2, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote3, inter_dc_manager, observe_dcs_sync, [Descriptors]).
rpc:call(antidote@antidote3, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote4, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote5, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote6, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote7, inter_dc_manager, observe_dcs_sync, [Descriptors]).
version: "3.7"
services:
antidote1:
cockroach1:
image: cockroachdb/cockroach:v2.1.0
container_name: roach1
hostname: roach1
command:
- start
- --insecure
antidote2:
cockroach2:
image: cockroachdb/cockroach:v2.1.0
container_name: roach2
hostname: roach2
......@@ -15,7 +15,7 @@ services:
- start
- --insecure
- --join=roach1
antidote3:
cockroach3:
image: cockroachdb/cockroach:v2.1.0
container_name: roach3
hostname: roach3
......
version: "3.7"
version: "3.7"
services:
parent:
image: galera:arch
command: ["--wsrep-node-name=dbcop-test", "--wsrep-cluster-address=gcomm://"]
hostname: parent
child:
image: galera:arch
depends_on:
- parent
command: ["--wsrep-node-name=dbcop-test", "--wsrep-cluster-address=gcomm://parent"]
deploy:
replicas: 4
galera1:
image: galera:arch
hostname: galera1
container_name: galera1
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://
galera2:
image: galera:arch
hostname: galera2
container_name: galera2
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera3:
image: galera:arch
hostname: galera3
container_name: galera3
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera4:
image: galera:arch
hostname: galera4
container_name: galera4
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera5:
image: galera:arch
hostname: galera5
container_name: galera5
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera6:
image: galera:arch
hostname: galera6
container_name: galera6
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera7:
image: galera:arch
hostname: galera7
container_name: galera7
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
......@@ -3,6 +3,7 @@ extern crate byteorder;
extern crate clap;
extern crate dbcop;
use std::fs;
use std::path::Path;
use dbcop::db::cluster::{Cluster, ClusterNode, Node};
......@@ -149,7 +150,7 @@ impl AntidoteCluster {
}
});
println!("zero init is done");
// println!("zero init is done");
}
fn drop_database(&self) {}
......@@ -213,11 +214,11 @@ fn main() {
let hist_dir = Path::new(matches.value_of("hist_dir").unwrap());
let hist_out = Path::new(matches.value_of("hist_out").unwrap());
fs::create_dir_all(hist_out).expect("couldn't create directory");
let ips: Vec<_> = matches.values_of("ips").unwrap().collect();
let mut cluster = AntidoteCluster::new(&ips);
cluster.setup();
cluster.execute_all(hist_dir, hist_out);
cluster.execute_all(hist_dir, hist_out, 50);
}
......@@ -201,11 +201,11 @@ fn main() {
let hist_dir = Path::new(matches.value_of("hist_dir").unwrap());
let hist_out = Path::new(matches.value_of("hist_out").unwrap());
fs::create_dir_all(hist_out).expect("couldn't create directory");
let ips: Vec<_> = matches.values_of("ips").unwrap().collect();
let mut cluster = CockroachCluster::new(&ips);
cluster.setup();
cluster.execute_all(hist_dir, hist_out);
cluster.execute_all(hist_dir, hist_out, 500);
}
......@@ -7,6 +7,8 @@ use std::path::Path;
use dbcop::db::cluster::{Cluster, ClusterNode, Node};
use dbcop::db::history::{HistParams, Transaction};
use std::fs;
use clap::{App, Arg};
#[derive(Debug)]
......@@ -109,7 +111,7 @@ impl GaleraCluster {
).unwrap();
// conn.query("USE dbcop").unwrap();
Ok(true)
}).is_ok(),
}).expect("problem creating database"),
_ => false,
}
}
......@@ -199,11 +201,11 @@ fn main() {
let hist_dir = Path::new(matches.value_of("hist_dir").unwrap());
let hist_out = Path::new(matches.value_of("hist_out").unwrap());
fs::create_dir_all(hist_out).expect("couldn't create directory");
let ips: Vec<_> = matches.values_of("ips").unwrap().collect();
let mut cluster = GaleraCluster::new(&ips);
cluster.setup();
cluster.execute_all(hist_dir, hist_out);
cluster.execute_all(hist_dir, hist_out, 500);
}
......@@ -227,7 +227,7 @@ impl Sat {
self.add_clauses(&clauses);
}
pub fn causal(&mut self) {
pub fn read_atomic(&mut self) {
let mut clauses = Vec::new();
for (&x, ref wr_map) in self.write_variable.iter() {
......@@ -283,11 +283,13 @@ impl Sat {
}
pub fn solve(&self, path: &PathBuf) -> bool {
self.cnf.write_to_file(&path.join("hist.cnf"));
let inp_cnf = path.join("history.cnf");
let out_cnf = path.join("result.cnf");
self.cnf.write_to_file(&inp_cnf);
if let Ok(mut child) = Command::new("minisat")
.arg("hist.cnf")
.arg("result.cnf")
.arg(&inp_cnf)
.arg(&out_cnf)
.stdout(Stdio::null())
.spawn()
{
......@@ -300,7 +302,7 @@ impl Sat {
// println!("stdout: {}", String::from_utf8_lossy(&output.stdout));
// println!("stderr: {}", String::from_utf8_lossy(&output.stderr));
let result = File::open("result.cnf").expect("file couldn't open");
let result = File::open(&out_cnf).expect("file couldn't open");
let reader = BufReader::new(&result);
......
......@@ -13,6 +13,8 @@ use std::net::IpAddr;
// use rand::distributions::{Distribution, Uniform};
// use rand::Rng;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
// use std::convert::From;
......@@ -50,16 +52,26 @@ where
.collect()
}
fn execute_all(&mut self, r_dir: &Path, o_dir: &Path) -> Option<usize> {
let histories: Vec<History> = fs::read_dir(r_dir)
.unwrap()
.filter_map(|entry_res| match entry_res {
Ok(ref entry) if !&entry.path().is_dir() => {
let file = File::open(entry.path()).unwrap();
let buf_reader = BufReader::new(file);
Some(serde_json::from_reader(buf_reader).unwrap())
}
_ => None,
fn execute_all(&mut self, r_dir: &Path, o_dir: &Path, millisec: u64) -> Option<usize> {
// let histories: Vec<History> = fs::read_dir(r_dir)
// .unwrap()
// .take(100)
// .filter_map(|entry_res| match entry_res {
// Ok(ref entry) if !&entry.path().is_dir() => {
// let file = File::open(entry.path()).unwrap();
// let buf_reader = BufReader::new(file);
// Some(serde_json::from_reader(buf_reader).unwrap())
// }
// _ => None,
// })
// .collect();
let histories: Vec<History> = (0..100)
.flat_map(|id| {
let filename = format!("hist-{:05}.json", id);
let file = File::open(r_dir.join(filename)).unwrap();
let buf_reader = BufReader::new(file);
serde_json::from_reader(buf_reader)
})
.collect();
......@@ -67,12 +79,15 @@ where
let curr_dir = o_dir.join(format!("hist-{:05}", history.get_id()));
fs::create_dir(&curr_dir).expect("couldn't create dir");
self.execute(history, &curr_dir);
sleep(Duration::from_millis(millisec));
}
None
}
fn execute(&mut self, hist: &History, dir: &Path) -> Option<usize> {
self.setup();
self.setup_test(hist.get_params());
let mut exec = hist.get_cloned_data();
......@@ -83,6 +98,8 @@ where
let end_time = chrono::Local::now();
self.cleanup();
let exec_hist = History::new(
hist.get_cloned_params(),
self.info(),
......@@ -95,7 +112,6 @@ where
let buf_writer = BufWriter::new(file);
serde_json::to_writer_pretty(buf_writer, &exec_hist).expect("dumping to json went wrong");
self.cleanup();
None
}
......
......@@ -20,3 +20,4 @@ extern crate serde_derive;
#[macro_use]
extern crate slog;
extern crate slog_term;
extern crate slog_json;
......@@ -49,7 +49,7 @@ fn main() {
)
.arg(
Arg::with_name("n_variable")
.long("nval")
.long("nvar")
.short("v")
.default_value("5")
.help("Number of variables per history"),
......@@ -84,7 +84,32 @@ fn main() {
.short("o")
.takes_value(true)
.required(true)
.help("Directory to output the results."),
.help("Directory to output the results"),
)
.arg(
Arg::with_name("sat")
.long("sat")
.help("Use MiniSAT as backend"),
)
.arg(
Arg::with_name("bicomponent")
.long("bic")
.help("Use BiComponent"),
)
.arg(
Arg::with_name("serializable")
.long("ser")
.help("Check for Serializablity"),
)
.arg(
Arg::with_name("snapshot_isolation")
.long("si")
.help("Check for Snapshot Isolation"),
)
.arg(
Arg::with_name("causal")
.long("cc")
.help("Check for Causality"),
)
.about("Verifies histories"),
])
......@@ -92,53 +117,76 @@ fn main() {
let app_matches = app.get_matches();
if let Some(matches) = app_matches.subcommand_matches("generate") {
let dir = Path::new(matches.value_of("g_directory").unwrap());
if !dir.is_dir() {}
let mut histories = generate_mult_histories(
matches.value_of("n_history").unwrap().parse().unwrap(),
matches.value_of("n_node").unwrap().parse().unwrap(),
matches.value_of("n_variable").unwrap().parse().unwrap(),
matches.value_of("n_transaction").unwrap().parse().unwrap(),
matches.value_of("n_event").unwrap().parse().unwrap(),
);
for hist in histories.drain(..) {
let mut file =
File::create(dir.join(format!("hist-{:05}.json", hist.get_id()))).unwrap();
let mut buf_writer = BufWriter::new(file);
serde_json::to_writer_pretty(buf_writer, &hist)
.expect("dumping history to json file went wrong");
match app_matches.subcommand() {
("generate", Some(matches)) => {
let dir = Path::new(matches.value_of("g_directory").unwrap());
if !dir.is_dir() {
fs::create_dir_all(dir).expect("failed to create directory");
}
let mut histories = generate_mult_histories(
matches.value_of("n_history").unwrap().parse().unwrap(),
matches.value_of("n_node").unwrap().parse().unwrap(),
matches.value_of("n_variable").unwrap().parse().unwrap(),
matches.value_of("n_transaction").unwrap().parse().unwrap(),
matches.value_of("n_event").unwrap().parse().unwrap(),
);
for hist in histories.drain(..) {
let mut file =
File::create(dir.join(format!("hist-{:05}.json", hist.get_id()))).unwrap();
let mut buf_writer = BufWriter::new(file);
serde_json::to_writer_pretty(buf_writer, &hist)
.expect("dumping history to json file went wrong");
}
}
} else if let Some(matches) = app_matches.subcommand_matches("verify") {
let v_dir = Path::new(matches.value_of("v_directory").unwrap());
if !v_dir.is_dir() {}
let histories: Vec<History> = fs::read_dir(v_dir)
.unwrap()
.filter_map(|entry_res| match entry_res {
Ok(ref entry) if entry.path().is_dir() => {
let file = File::open(entry.path().join("history.json")).unwrap();
let buf_reader = BufReader::new(file);
Some(serde_json::from_reader(buf_reader).unwrap())
("verify", Some(matches)) => {
let v_dir = Path::new(matches.value_of("v_directory").unwrap());
if !v_dir.is_dir() {}
let histories: Vec<History> = fs::read_dir(v_dir)
.unwrap()
.filter_map(|entry_res| match entry_res {
Ok(ref entry) if entry.path().is_dir() => {
let file = File::open(entry.path().join("history.json")).unwrap();
let buf_reader = BufReader::new(file);
Some(serde_json::from_reader(buf_reader).unwrap())
}
_ => None,
})
.collect();
// println!("{:?}", histories);
let o_dir = Path::new(matches.value_of("o_directory").unwrap());
if !o_dir.is_dir() {
fs::create_dir_all(o_dir).expect("failed to create directory");
}
histories.iter().for_each(|ref hist| {
let curr_dir = o_dir.join(format!("hist-{:05}", hist.get_id()));
let mut verifier = Verifier::new(curr_dir.to_path_buf());
if matches.is_present("causal") {
verifier.model("cc");
} else if matches.is_present("snapshot_isolation") {
verifier.model("si");
} else if matches.is_present("serializable") {
verifier.model("ser");
}
_ => None,
})
.collect();
// println!("{:?}", histories);
let o_dir = Path::new(matches.value_of("o_directory").unwrap());
histories.iter().for_each(|ref hist| {
let curr_dir = o_dir.join(format!("hist-{:05}", hist.get_id()));
verifier.sat(matches.is_present("sat"));
verifier.bicomponent(matches.is_present("bicomponent"));
let verifier = Verifier::new(curr_dir.to_path_buf());
verifier.transactional_history_verify(hist.get_data());
});
if !verifier.transactional_history_verify(hist.get_data()) {
println!("hist-{:05} failed", hist.get_id());
}
});
}
_ => unreachable!(),
}
}
......@@ -10,30 +10,56 @@ use consistency::ser::Chains;
use consistency::si::SIChains;
use db::history::Transaction;
mod util;
use self::util::{BiConn, UGraph};
use slog::{Drain, Logger};
pub struct Verifier {
log: slog::Logger,
consistency_model: String,
use_sat: bool,
use_bicomponent: bool,
dir: PathBuf,
}
impl Verifier {
pub fn new(dir: PathBuf) -> Self {
fs::create_dir(&dir).unwrap();
let log_file = File::create(dir.join("result.log")).unwrap();
let log_file = File::create(dir.join("result_log.json")).unwrap();
Verifier {
log: Self::get_logger(BufWriter::new(log_file)),
consistency_model: "ser".to_string(),
use_sat: false,
use_bicomponent: false,
dir,
}
}
pub fn model(&mut self, model: &str) {
self.consistency_model = model.to_string();
}
pub fn sat(&mut self, flag: bool) {
self.use_sat = flag;
}
pub fn bicomponent(&mut self, flag: bool) {
self.use_bicomponent = flag;
}
pub fn get_logger<W>(io: W) -> Logger
where
W: Write + Send + 'static,
{
let plain = slog_term::PlainSyncDecorator::new(io);
let root_logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!());
// let plain = slog_term::PlainSyncDecorator::new(io);
// let root_logger = Logger::root(slog_term::FullFormat::new(plain).build().fuse(), o!());
let root_logger = Logger::root(
std::sync::Mutex::new(slog_json::Json::default(io)).map(slog::Fuse),
o!(),
);
info!(root_logger, "Application started";
"started_at" => format!("{}", chrono::Local::now()));
......@@ -66,7 +92,7 @@ impl Verifier {
write_map
}
pub fn transactional_history_verify(&self, histories: &Vec<Vec<Transaction>>) {
pub fn transactional_history_verify(&self, histories: &Vec<Vec<Transaction>>) -> bool {
let write_map = self.gen_write_map(histories);