Commit 6c3df1bf authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

Merge branch 'wip'

parents ef7628d8 88b0a79f
......@@ -4,6 +4,23 @@ version = "0.1.0"
authors = ["Ranadeep Biswas <ranadip.bswas@gmail.com>"]
[dependencies]
mysql = "12.3.1"
clap = "2.31.2"
rand = "0.4"
mysql = "*"
postgres = "*"
rand = "*"
clap = "*"
ansi_term = "*"
serde = "*"
serde_derive = "*"
serde_json = "*"
serde_yaml = "*"
byteorder = "*"
petgraph = "*"
rayon = "*"
slog = "*"
slog-term = "*"
slog-json = "*"
chrono = { version = "*", features = ["serde"] }
antidotedb = { path = "../antidotedb" }
[profile.release]
opt-level = 3
......@@ -6,25 +6,19 @@
1. Clone it.
```
git clone git@gitlab.math.univ-paris-diderot.fr:ranadeep/dbcop.git
git clone -b wip git@gitlab.math.univ-paris-diderot.fr:ranadeep/dbcop.git
```
2. Compile and install using `cargo` and run.
Make sure `~/.cargo/bin` is in your system path and MySQL server is running on `localhost` at `3306` port.
2. Fire up Galera cluster using `docker-compose`.
```
cd dbcop
cargo install
./dbcop -u root -p <root_password>
```
or you can simply do after changing directory,
```
cargo run -- -u root -p <root_password>
cd docker
sudo docker-compose up
```
3. Slow query log will be available at `mysql.slow_log` table.
4. You can fetch the queries of a particular thread using,
3. Compile and install using `cargo` and run.
Make sure `~/.cargo/bin` is in your system path.
```
SELECT * FROM mysql.slow_log WHERE thread_id = ?
cd dbcop
cargo install
dbcop
```
Replace `?` with the number corresponding to a thread from `dbcop` output.
......@@ -21,3 +21,13 @@ parse
final
delete slow_query_log
delete table
--------
VoltDB
MongoDB
communication graph, in TPC
version: "3.7"
services:
antidote1:
image: antidotedb/antidote
container_name: antidote1
hostname: antidote1
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote1
antidote2:
image: antidotedb/antidote
container_name: antidote2
hostname: antidote2
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote2
antidote3:
image: antidotedb/antidote
container_name: antidote3
hostname: antidote3
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote3
antidote4:
image: antidotedb/antidote
container_name: antidote4
hostname: antidote4
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote4
antidote5:
image: antidotedb/antidote
container_name: antidote5
hostname: antidote5
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote5
antidote6:
image: antidotedb/antidote
container_name: antidote6
hostname: antidote6
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote6
antidote7:
image: antidotedb/antidote
container_name: antidote7
hostname: antidote7
environment:
SHORT_NAME: "true"
NODE_NAME: antidote@antidote7
rpc:call(antidote@antidote1, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote2, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote3, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote4, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote5, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote6, inter_dc_manager, start_bg_processes, [stable]),
rpc:call(antidote@antidote7, inter_dc_manager, start_bg_processes, [stable]),
{ok, Desc1} = rpc:call(antidote@antidote1, inter_dc_manager, get_descriptor, []),
{ok, Desc2} = rpc:call(antidote@antidote2, inter_dc_manager, get_descriptor, []),
{ok, Desc3} = rpc:call(antidote@antidote3, inter_dc_manager, get_descriptor, []),
{ok, Desc4} = rpc:call(antidote@antidote4, inter_dc_manager, get_descriptor, []),
{ok, Desc5} = rpc:call(antidote@antidote5, inter_dc_manager, get_descriptor, []),
{ok, Desc6} = rpc:call(antidote@antidote6, inter_dc_manager, get_descriptor, []),
{ok, Desc7} = rpc:call(antidote@antidote7, inter_dc_manager, get_descriptor, []),
Descriptors = [
Desc1,
Desc2,
Desc3,
Desc4,
Desc5,
Desc6,
Desc7
],
rpc:call(antidote@antidote1, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote2, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote3, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote4, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote5, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote6, inter_dc_manager, observe_dcs_sync, [Descriptors]),
rpc:call(antidote@antidote7, inter_dc_manager, observe_dcs_sync, [Descriptors]).
version: "3.7"
services:
cockroach1:
image: cockroachdb/cockroach:v2.1.0
container_name: roach1
hostname: roach1
command:
- start
- --insecure
cockroach2:
image: cockroachdb/cockroach:v2.1.0
container_name: roach2
hostname: roach2
command:
- start
- --insecure
- --join=roach1
cockroach3:
image: cockroachdb/cockroach:v2.1.0
container_name: roach3
hostname: roach3
command:
- start
- --insecure
- --join=roach1
FROM base/archlinux
RUN pacman -Syu --noconfirm
RUN pacman -S base-devel --noconfirm
RUN pacman -S galera rsync lsof --noconfirm
RUN pacman -S iproute2 --noconfirm
RUN mysql_install_db --user=mysql --basedir=/usr --datadir=/var/lib/mysql
RUN sed -e 's|wsrep_provider=.*|wsrep_provider=/usr/lib64/libgalera_smm.so|g' /etc/mysql/wsrep.cnf >> /etc/mysql/my.cnf
COPY mysql_first_time.sql '/tmp/mysql-first-time.sql'
EXPOSE 3306
ENTRYPOINT ["mysqld_safe", "--init-file=/tmp/mysql-first-time.sql"]
-- What's done in this file shouldn't be replicated
-- or products like mysql-fabric won't work
SET @@SESSION.SQL_LOG_BIN=0;
DELETE FROM mysql.user ;
CREATE USER 'root'@'%' IDENTIFIED BY '' ;
GRANT ALL ON *.* TO 'root'@'%' WITH GRANT OPTION ;
DROP DATABASE IF EXISTS test ;
FLUSH PRIVILEGES ;
version: "3.7"
services:
galera1:
image: galera:arch
hostname: galera1
container_name: galera1
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://
galera2:
image: galera:arch
hostname: galera2
container_name: galera2
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera3:
image: galera:arch
hostname: galera3
container_name: galera3
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera4:
image: galera:arch
hostname: galera4
container_name: galera4
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera5:
image: galera:arch
hostname: galera5
container_name: galera5
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera6:
image: galera:arch
hostname: galera6
container_name: galera6
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
galera7:
image: galera:arch
hostname: galera7
container_name: galera7
command:
- --wsrep-node-name=dbcop-test
- --wsrep-cluster-address=gcomm://galera1
depends_on:
- galera1
version: "3.7"
services:
parent:
image: erkules/galera
command: ["--wsrep-node-name=dbcop-test", "--wsrep-cluster-address=gcomm://"]
hostname: parent
child:
image: erkules/galera
depends_on:
- parent
command: ["--wsrep-node-name=dbcop-test", "--wsrep-cluster-address=gcomm://parent"]
deploy:
replicas: 4
extern crate antidotedb;
extern crate byteorder;
extern crate clap;
extern crate dbcop;
use std::fs;
use std::path::Path;
use dbcop::db::cluster::{Cluster, ClusterNode, Node};
use dbcop::db::history::{HistParams, Transaction};
use clap::{App, Arg};
use byteorder::{BigEndian, ReadBytesExt};
use std::io::Cursor;
use antidotedb::crdt::{Operation, LWWREG};
use antidotedb::AntidoteDB;
#[derive(Debug, Clone)]
pub struct AntidoteNode {
node: Node,
addr: String,
id: usize,
timestamp: Option<Vec<u8>>,
}
impl From<Node> for AntidoteNode {
fn from(node: Node) -> Self {
AntidoteNode {
node: node.clone(),
addr: format!("{}:8087", node.ip),
id: node.id,
timestamp: None,
}
}
}
impl ClusterNode for AntidoteNode {
fn exec_session(&self, hist: &mut Vec<Transaction>) {
let mut conn = AntidoteDB::connect_with_string(&self.addr);
let mut timestamp = self.timestamp.clone();
// println!("{:?}", timestamp);
hist.iter_mut().for_each(|transaction| {
let db_transaction = conn.start_transaction(timestamp.as_ref());
transaction.events.iter_mut().for_each(|event| {
let obj = LWWREG::new(&format!("{}", event.variable), "dbcop");
if event.write {
let op = obj.set(event.value as u64);
match conn.mult_update_in_transaction(&[op], &db_transaction) {
Ok(_) => event.success = true,
Err(_e) => {
assert_eq!(event.success, false);
// println!("WRITE ERR -- {:?}", _e);
}
}
} else {
match conn.mult_read_in_transaction(&[obj.clone()], &db_transaction) {
Ok(values) => {
let bytes = values[0].get_reg().get_value();
event.value =
Cursor::new(bytes).read_u64::<BigEndian>().unwrap() as usize;
event.success = true;
}
Err(_) => assert!(!event.success),
}
}
});
match conn.commit_transaction(&db_transaction) {
Ok(commit_time) => {
transaction.success = true;
timestamp = Some(commit_time);
}
Err(_e) => {
assert_eq!(transaction.success, false);
println!("{:?} -- COMMIT ERROR", transaction);
}
}
})
}
}
#[derive(Debug)]
pub struct AntidoteCluster(Vec<AntidoteNode>);
impl AntidoteCluster {
fn new(ips: &Vec<&str>) -> Self {
let mut v = AntidoteCluster::node_vec(ips);
let k: Vec<_> = v.drain(..).map(|x| From::from(x)).collect();
AntidoteCluster(k)
}
fn create_table(&self) -> bool {
true
}
fn create_variables(&mut self, n_variable: usize) {
let mut conn = AntidoteDB::connect_with_string(&self.get_antidote_addr(0).unwrap());
let db_transaction = conn.start_transaction(None);
let ops: Vec<_> = (0..n_variable)
.map(|variable| LWWREG::new(&format!("{}", variable), "dbcop").set(0))
.collect();
conn.mult_update_in_transaction(&ops, &db_transaction)
.expect("error to init zero values");
match conn.commit_transaction(&db_transaction) {
Ok(commit_time) => {
self.0.iter_mut().for_each(|x| {
x.timestamp = Some(commit_time.clone());
});
}
Err(_e) => {
println!("COMMIT ERROR while init");
}
}
self.0.iter_mut().for_each(|x| {
let mut conn = AntidoteDB::connect_with_string(&x.addr);
let timestamp = x.timestamp.clone();
// println!("{:?}", timestamp);
let db_transaction = conn.start_transaction(timestamp.as_ref());
let objs: Vec<_> = (0..n_variable)
.map(|variable| LWWREG::new(&format!("{}", variable), "dbcop"))
.collect();
match conn.mult_read_in_transaction(&objs, &db_transaction) {
Ok(values) => assert!((0..n_variable).all(|var| {
let bytes = values[var].get_reg().get_value();
Cursor::new(bytes).read_u64::<BigEndian>().unwrap() == 0
})),
Err(_) => unreachable!(),
}
match conn.commit_transaction(&db_transaction) {
Ok(commit_time) => {}
Err(_e) => unreachable!(),
}
});
// println!("zero init is done");
}
fn drop_database(&self) {}
fn get_antidote_addr(&self, i: usize) -> Option<String> {
self.0.get(i).map(|ref node| node.addr.clone())
}
}
impl Cluster<AntidoteNode> for AntidoteCluster {
fn n_node(&self) -> usize {
self.0.len()
}
fn setup(&self) -> bool {
self.create_table()
}
fn get_node(&self, id: usize) -> Node {
self.0[id].node.clone()
}
fn get_cluster_node(&self, id: usize) -> AntidoteNode {
self.0[id].clone()
}
fn setup_test(&mut self, p: &HistParams) {
self.create_variables(p.get_n_variable());
}
fn cleanup(&self) {
self.drop_database();
}
fn info(&self) -> String {
"AntidoteDB".to_string()
}
}
fn main() {
let matches = App::new("Antidote")
.version("1.0")
.author("Ranadeep")
.about("executes histories on AntidoteDB")
.arg(
Arg::with_name("hist_dir")
.long("dir")
.short("d")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("hist_out")
.long("out")
.short("o")
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("ips")
.help("Cluster ips")
.multiple(true)
.required(true),
)
.get_matches();
let hist_dir = Path::new(matches.value_of("hist_dir").unwrap());
let hist_out = Path::new(matches.value_of("hist_out").unwrap());
fs::create_dir_all(hist_out).expect("couldn't create directory");
let ips: Vec<_> = matches.values_of("ips").unwrap().collect();
let mut cluster = AntidoteCluster::new(&ips);
cluster.execute_all(hist_dir, hist_out, 50);
}
extern crate clap;
extern crate dbcop;
extern crate postgres;
use std::path::Path;
use dbcop::db::cluster::{Cluster, ClusterNode, Node};
use dbcop::db::history::{HistParams, Transaction};
use clap::{App, Arg};
use postgres::{transaction, Connection, TlsMode};
#[derive(Debug)]
pub struct CockroachNode {
addr: String,
id: usize,
}
impl From<Node> for CockroachNode {
fn from(node: Node) -> Self {
CockroachNode {
addr: format!("postgresql://{}@{}:26257", "root", node.ip),
id: node.id,
}
}
}
impl ClusterNode for CockroachNode {
fn exec_session(&self, hist: &mut Vec<Transaction>) {
match Connection::connect(self.addr.clone(), TlsMode::None) {
Ok(conn) => hist.iter_mut().for_each(|transaction| {
let mut config = transaction::Config::new();
config.isolation_level(transaction::IsolationLevel::Serializable);
match conn.transaction_with(&config) {
Ok(sqltxn) => {
transaction.events.iter_mut().for_each(|event| {
if event.write {
match sqltxn.execute(
"UPDATE dbcop.variables SET val=$1 WHERE var=$2",
&[&(event.value as i64), &(event.variable as i64)],
) {
Ok(_) => event.success = true,
Err(_e) => {
assert_eq!(event.success, false);
// println!("WRITE ERR -- {:?}", _e);
}
}
} else {
match sqltxn.query(
"SELECT * FROM dbcop.variables WHERE var=$1",
&[&(event.variable as i64)],
) {
Ok(result) => {
if !result.is_empty() {
let mut row = result.get(0);
let value : i64 = row.get("val");
event.value = value as usize;
event.success = true;
} else {
// may be diverged
assert_eq!(event.success, false);
}
}
Err(_e) => {
// println!("READ ERR -- {:?}", _e);
assert_eq!(event.success, false);
}
}
}
});
match sqltxn.commit() {
Ok(_) => {
transaction.success = true;
}
Err(_e) => {
assert_eq!(transaction.success, false);
println!("{:?} -- COMMIT ERROR {}", transaction, _e);
}
}
}
Err(e) => println!("{:?} - TRANSACTION ERROR", e),
}
}),
Err(_e) => {
hist.iter().for_each(|transaction| {
assert_eq!(transaction.success, false);
});
// println!("CONNECTION ERROR {}", _e);}
}
}
}
}
#[derive(Debug)]
pub struct CockroachCluster(Vec<Node>);
impl CockroachCluster {
fn new(ips: &Vec<&str>) -> Self {
CockroachCluster(CockroachCluster::node_vec(ips))