Commit 78cf3c99 authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

old code

parent 5cee2b77
......@@ -7,3 +7,8 @@ authors = ["Ranadeep Biswas <ranadip.bswas@gmail.com>"]
mysql = "12.3.1"
clap = "2.31.2"
rand = "0.4"
pbr = "1.0.0"
time = "0.1"
[profile.release]
opt-level = 3
fn reachable(root: u64, read_map: &HashMap<u64, HashMap<usize, u64>>) -> HashSet<u64> {
let mut stack = Vec::new();
let mut seen = HashSet::new();
stack.push(root);
// seen.insert(root);
while let Some(u) = stack.pop() {
if let Some(vs) = read_map.get(&u) {
for &v in vs.values() {
if seen.insert(v) {
stack.push(v);
}
}
}
}
seen
}
fn is_irreflexive(read_map: &HashMap<u64, HashMap<usize, u64>>) -> bool {
for &e in read_map.keys() {
let r = reachable(e, &read_map);
if r.contains(&e) {
println!("found {} {:?}", e, r);
return false;
}
}
return true;
}
extern crate mysql;
extern crate pbr;
use self::pbr::ProgressBar;
use db::slowq;
use db::op;
use algo::txn;
use db::op;
use db::slowq;
use mysql;
use std::thread;
use std::sync::{Arc, Mutex};
use std::thread;
pub fn do_bench(conn_str: String) {
let mut conn = mysql::Pool::new(conn_str.clone())
.unwrap()
.get_conn()
.unwrap();
use std::collections::{HashMap, HashSet};
use std::iter::FromIterator;
let n_vars = 1000;
let n_txn = 10;
let n_evts_per_txn = 10;
use std::net::Ipv4Addr;
op::create_table(&mut conn);
op::create_vars(n_vars, &mut conn);
fn reachable(root: u64, read_map: &HashMap<u64, HashMap<usize, u64>>) -> HashSet<u64> {
let mut stack = Vec::new();
let mut seen = HashSet::new();
slowq::turn_on_slow_query(&mut conn);
slowq::clean_slow_query(&mut conn);
slowq::increase_max_connections(100000, &mut conn);
stack.push(root);
// seen.insert(root);
let mut threads = vec![];
while let Some(u) = stack.pop() {
if let Some(vs) = read_map.get(&u) {
for &v in vs.values() {
if seen.insert(v) {
stack.push(v);
}
}
}
}
let mut txns = vec![];
let mut conn_ids = vec![];
seen
}
for _ in 0..n_txn {
txns.push(Arc::new(Mutex::new(txn::create_txn(
n_vars,
n_evts_per_txn,
))));
conn_ids.push(Arc::new(Mutex::new(0)));
fn is_irreflexive(read_map: &HashMap<u64, HashMap<usize, u64>>) -> bool {
for &e in read_map.keys() {
let r = reachable(e, &read_map);
if r.contains(&e) {
println!("found {} {:?}", e, r);
return false;
}
}
return true;
}
for i in 0..n_txn {
let conn_str_ = conn_str.clone();
let mut curr_txn = txns[i].clone();
let mut curr_conn_id = conn_ids[i].clone();
threads.push(thread::spawn(move || {
let mut loc_conn = mysql::Pool::new(conn_str_).unwrap().get_conn().unwrap();
fn get_wr_map(execution: &Vec<txn::Transaction>) -> HashMap<usize, HashMap<usize, Vec<usize>>> {
let mut write_val = HashMap::new();
execution.iter().enumerate().for_each(|(txn_id, txn)| {
if txn.commit {
txn.events.iter().for_each(|ev| {
if ev.is_write() {
write_val.insert(ev.var.clone(), txn_id + 1);
}
});
}
});
let mut write_read_x_vec = HashMap::new();
execution.iter().enumerate().for_each(|(txn_id, txn)| {
if txn.commit {
txn.events.iter().for_each(|ev| {
if ev.is_read() {
let write_txn = if ev.var.val == 0 {
0
} else {
write_val[&ev.var]
};
let var_entry = write_read_x_vec.entry(ev.var.id).or_insert(HashMap::new());
let write_entry = var_entry.entry(write_txn).or_insert(Vec::new());
write_entry.push(txn_id);
}
});
}
});
write_read_x_vec
}
let mut curr_conn_id_ = curr_conn_id.lock().unwrap();
*curr_conn_id_ = op::get_connection_id(&mut loc_conn);
fn get_ww_order(execution: &Vec<txn::Transaction>) -> HashMap<usize, Vec<usize>> {
let mut write_x_map: HashMap<usize, HashSet<usize>> = HashMap::new();
execution.iter().enumerate().for_each(|(txn_id, txn)| {
if txn.commit {
txn.events.iter().for_each(|ev| {
if ev.is_write() {
let var_id = ev.var.id;
let entry = write_x_map.entry(var_id).or_insert(HashSet::new());
entry.insert(txn_id + 1);
}
});
}
});
HashMap::from_iter(write_x_map.into_iter().map(|(var, txn_ids)| {
let mut vec = Vec::from_iter(txn_ids.into_iter());
vec.sort_by(|&a, &b| {
execution[a - 1]
.end
.query_time
.cmp(&execution[b - 1].end.query_time)
});
(var, vec)
}))
}
let mut curr_txn_ = curr_txn.lock().unwrap();
op::do_transaction(&mut curr_txn_, &mut loc_conn);
}));
pub fn do_single_node(node: usize, vars: &Vec<usize>) {
let mysql_addr = format!(
"mysql://{}@{}",
"root",
Ipv4Addr::new(172, 18, 0, 11 + node)
);
let mut conn = mysql::Pool::new(mysql_addr).unwrap().get_conn().unwrap();
let mut rng = rand::thread_rng();
let mut v = Vec::new();
for wr_txn in 0..n_txn {
for wr_pos in 0..n_evts_per_txn {
if rng.gen() {
// do read
v.push(Event::read(Variable::new(id, 0)));
} else {
// do write
counters[id] += 1;
v.push(Event::write(Variable::new(id, counters[id])));
}
}
}
}
pub fn single_bench(nodes: &Vec<String>, vars: &Vec<usize>) {
let n_vars = 5;
let n_txn = 6;
let n_evts_per_txn = 4;
let n_iter = 100;
{
let mut conn = mysql::Pool::new(nodes[0].clone())
.unwrap()
.get_conn()
.unwrap();
op::create_vars(vars, &mut conn);
}
for nodes in 0..6 {
do_single_node()
}
}
pub fn do_bench() {
let n_vars = 5;
let n_txn = 6;
let n_evts_per_txn = 4;
let n_iter = 100;
for t in threads {
t.join().expect("thread failed");
{
// let mut tc = mysql::Pool::new(conn_str.clone())
// .unwrap()
// .get_conn()
// .unwrap();
// slowq::increase_max_connections(1000000, &mut tc);
// slowq::turn_on_slow_query(&mut tc);
}
for i in 0..n_txn {
println!(
"Connection id: {}\n{:?}\n",
*conn_ids[i].lock().unwrap(),
*txns[i].lock().unwrap()
)
let mut nodes = Vec::with_capacity(6);
for i in 0..6 {
nodes.push(format!(
"mysql://{}@{}",
"root",
Ipv4Addr::new(172, 18, 0, 11 + i)
));
}
let threads = Vec::new();
let session_histories = Vec::with_capacity(6);
for i in 0..6 {
session_histories.push(Arc::new(Mutex::new(Vec::new())));
}
for i in 0..6 {
println!("{}", n);
let history = session_histories[i].clone();
let node_addr = nodes[i].clone();
threads.push(thread::spawn(move || {
let mut loc_conn = curr_conn.lock().unwrap();
let mut loc_txn = curr_txn.lock().unwrap();
op::do_transaction(&mut loc_txn, &mut loc_conn);
}));
}
{
let mut conn = mysql::Pool::new(nodes[0].clone())
.unwrap()
.get_conn()
.unwrap();
op::create_table(&mut conn);
}
//
// let mut conns = vec![];
// let mut conn_ids = vec![];
// let mut executed_txns = vec![];
//
// let txns = {
// let mut txns_ = vec![];
// let mut counters = vec![0; n_vars + 1];
//
// for _ in 0..n_txn {
// txns_.push(Arc::new(Mutex::new(txn::create_txn(
// n_vars,
// n_evts_per_txn,
// &mut counters,
// ))));
// }
//
// txns_
// };
//
// for _ in 0..n_txn {
// let mut txn_conn = mysql::Pool::new(&conn_str).unwrap().get_conn().unwrap();
// conn_ids.push(op::get_connection_id(&mut txn_conn));
// conns.push(Arc::new(Mutex::new(txn_conn)));
// }
//
// let mut pb = ProgressBar::new(n_iter);
// pb.format("╢▌▌░╟");
//
// for _ in 0..n_iter {
// op::clean_table(&mut conn);
// slowq::clean_slow_query(&mut conn);
// let mut threads = vec![];
// for i in 0..n_txn {
// let curr_txn = txns[i].clone();
// let curr_conn = conns[i].clone();
// threads.push(thread::spawn(move || {
// let mut loc_conn = curr_conn.lock().unwrap();
// let mut loc_txn = curr_txn.lock().unwrap();
//
// op::do_transaction(&mut loc_txn, &mut loc_conn);
// }));
// }
//
// for t in threads {
// t.join().expect("thread failed");
// }
//
// pb.inc();
//
// let mut curr_txns = txns.iter()
// .map(|x| x.lock().unwrap().clone())
// .collect::<Vec<_>>();
//
// for i in 0..n_txn {
// let conn_id = conn_ids[i];
// curr_txns[i].start = slowq::get_start_txn_durations(conn_id, &mut conn);
// curr_txns[i].end = slowq::get_end_txn_durations(conn_id, &mut conn);
// let mut access_durs = slowq::get_access_durations(conn_id, &mut conn);
// for j in 0..curr_txns[i].events.len() {
// curr_txns[i].events[j].dur = access_durs[j].clone();
// }
// }
// executed_txns.push(curr_txns);
// }
//
// println!("\n\n");
//
// // TODO: use cpupool
// executed_txns.iter().for_each(|each_execution| {
// let wr_map = get_wr_map(&each_execution);
// let ww_order = get_ww_order(&each_execution);
// println!("{:?} ||||| {:?}", wr_map, ww_order);
// });
// println!("{:?}", conn_ids);
// println!("{:#?}", executed_txns.first().unwrap());
// op::drop_database(&mut conn);
}
extern crate rand;
use rand::{self, seq, Rng};
use self::rand::Rng;
use algo::var::{Event, MySQLDur, Variable};
use algo::var::{Event, Transaction, Variable};
use std::fmt;
pub fn create_txn(lim: u64, n_op: usize) -> Transaction {
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct Transaction {
pub commit: bool,
pub events: Vec<Event>,
pub start: MySQLDur,
pub end: MySQLDur,
}
impl fmt::Debug for Transaction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
// "<{:?}, {}, {:?}, {:?}>",
"<{:?}, {}, {:?}>",
self.events,
if self.commit { "COMMIT" } else { "ROLLBACK" },
// self.start,
self.end,
)
}
}
// impl Transaction {
// pub fn is_acyclic_visibility(&self) -> bool {
//
// }
// }
pub fn create_txn(n_var: usize, n_op: usize, counters: &mut Vec<usize>) -> Transaction {
let mut rng = rand::thread_rng();
let mut v = vec![];
for _ in 0..n_op {
for id in seq::sample_iter(&mut rng, 1..n_var + 1, n_op).unwrap() {
if rng.gen() {
let id = rng.gen::<u64>() % lim + 1;
v.push(Event::read(Variable::new(id, 0)));
} else {
let id = rng.gen::<u64>() % lim + 1;
let val = rng.gen::<u64>() % lim + 1;
v.push(Event::write(Variable::new(id, val)));
counters[id] += 1;
v.push(Event::write(Variable::new(id, counters[id])));
}
}
Transaction {
events: v,
commit: rng.gen(),
// commit: rng.gen(),
commit: true,
start: MySQLDur::new(),
end: MySQLDur::new(),
}
}
pub fn create_txns(n: usize, lim: u64, n_op: usize) -> Vec<Transaction> {
(0..n).map(|_| create_txn(lim, n_op)).collect()
pub fn create_txns(
n_txn: usize,
n_var: usize,
n_op: usize,
counters: &mut Vec<usize>,
) -> Vec<Transaction> {
(0..n_txn)
.map(|_| create_txn(n_var, n_op, counters))
.collect()
}
#[derive(Debug, PartialEq, Eq)]
use std::fmt;
use mysql::time::Timespec;
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum EventType {
WRITE,
READ,
}
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct MySQLDur {
pub start_time: Timespec,
pub lock_time: Timespec,
pub query_time: Timespec,
}
impl MySQLDur {
pub fn new() -> Self {
MySQLDur {
start_time: Timespec::new(0, 0),
lock_time: Timespec::new(0, 0),
query_time: Timespec::new(0, 0),
}
}
}
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct Variable {
pub id: u64,
pub val: u64,
pub id: usize,
pub val: (usize, usize, usize, usize)
}
impl Variable {
pub fn new(id: u64, val: u64) -> Self {
pub fn new(id: usize, val: (usize, usize, usize, usize)) -> Self {
Variable { id: id, val: val }
}
pub fn is_zero(&self) -> bool {
self.val.0 == 0
}
}
#[derive(Debug, PartialEq, Eq)]
impl fmt::Debug for Variable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}:{:?}", self.id, self.val)
}
}
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct Event {
pub ev_type: EventType,
pub var: Variable,
pub dur: MySQLDur,
}
impl Event {
......@@ -27,32 +58,31 @@ impl Event {
Event {
ev_type: EventType::READ,
var: var,
dur: MySQLDur::new(),
}
}
pub fn write(var: Variable) -> Self {
Event {
ev_type: EventType::WRITE,
var: var,
dur: MySQLDur::new(),
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct Transaction {
pub commit: bool,
pub events: Vec<Event>,
}
pub fn is_write(&self) -> bool {
self.ev_type == EventType::WRITE
}
pub fn create_txn() -> Transaction {
let mut v = vec![];
for i in 1..6 {
v.push(Event {
ev_type: EventType::READ,
var: Variable { id: i, val: 10 },
})
pub fn is_read(&self) -> bool {
self.ev_type == EventType::READ
}
Transaction {
commit: true,
events: v,
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.ev_type {
EventType::READ => write!(f, "{}({:?})", 'R', self.var),
EventType::WRITE => write!(f, "{}({:?})", 'W', self.var),
}
}
}
extern crate mysql;
use algo::var::{EventType, Transaction, Variable};
use algo::txn::Transaction;
use algo::var::{EventType, Variable};
use mysql;
pub fn create_table(conn: &mut mysql::PooledConn) {
// drop_database(conn);
conn.query("CREATE DATABASE IF NOT EXISTS dbcop").unwrap();
conn.query("DROP TABLE dbcop.variables").unwrap();
conn.query(
"CREATE TABLE IF NOT EXISTS dbcop.variables (id BIGINT(64) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, val BIGINT(64) UNSIGNED NOT NULL)",
"CREATE TABLE IF NOT EXISTS dbcop.variables (id BIGINT(64) UNSIGNED NOT NULL PRIMARY KEY, val BIGINT(64) UNSIGNED NOT NULL, wr_node BIGINT(64) UNSIGNED NOT NULL, wr_txn BIGINT(64) UNSIGNED NOT NULL, wr_pos BIGINT(64) UNSIGNED NOT NULL)",
).unwrap();
conn.query("TRUNCATE TABLE dbcop.variables").unwrap();
// conn.query("TRUNCATE TABLE dbcop.variables").unwrap();
conn.query("USE dbcop").unwrap();
}
pub fn create_vars(limit: u64, conn: &mut mysql::PooledConn) {
for mut stmt in conn.prepare("INSERT INTO dbcop.variables (val) values (?)")
.into_iter()
pub fn create_vars(offset: usize, n_var: usize, conn: &mut mysql::PooledConn) {
for mut stmt in conn.prepare(
"INSERT INTO dbcop.variables (id, val, wr_node, wr_txn, wr_pos) values (?, ?, 0, 0, 0)",
).into_iter()
{
for _ in 0..limit {
stmt.execute((0,)).unwrap();
for v in offset..(offset + n_var) {
stmt.execute((v, 0)).unwrap();
}
}
}
pub fn clean_table(conn: &mut mysql::PooledConn) {
conn.query("UPDATE dbcop.variables SET val=0, wr_node=0, wr_txn=0, wr_pos=0")
.unwrap();
}
pub fn drop_database(conn: &mut mysql::PooledConn) {
conn.query("DROP DATABASE dbcop").unwrap();
}
pub fn write_var(var: u64, val: u64, conn: &mut mysql::PooledConn) {
for mut stmt in conn.prepare("UPDATE dbcop.variables SET val=? WHERE id=?")
.into_iter()
pub fn write_var(var: u64, val: u64, action_id: (u64, u64, u64), conn: &mut mysql::PooledConn) {
for mut stmt in conn.prepare(
"UPDATE dbcop.variables SET val=?, wr_node=?, wr_txn=?, wr_pos=? WHERE id=?",
).into_iter()
{
stmt.execute((val, var)).unwrap();
stmt.execute((val, action_id.0, action_id.1, action_id.2, var))
.unwrap();
}
}