Commit d0f1f468 authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

Added source

parent d294335d
[package]
name = "dbcop"
version = "0.1.0"
authors = ["Ranadeep Biswas <ranadip.bswas@gmail.com>"]
[dependencies]
mysql = "12.3.1"
clap = "2.31.2"
rand = "0.4"
main
load arguments
do parse
do
parse
pre-prepare
connect database
turn on slow_query_log
prepare
do pre-prepare
create new table
insert variables
benchmark
create k threads
for each thread
for a sufficiently long iteration
do multiple transactions
choose some variables - randomly - and read or write it, also record
parse slow_query_log
final
delete slow_query_log
delete table
extern crate mysql;
use db::slowq;
use db::op;
use algo::txn;
use std::thread;
pub fn do_bench(conn_str: String) {
let pool = mysql::Pool::new(conn_str.clone()).unwrap();
let n_vars = 1000;
op::create_table(&pool);
op::create_vars(n_vars, &pool);
slowq::turn_on_slow_query(&pool);
slowq::clean_slow_query(&pool);
let mut threads = vec![];
for i in 0..5 {
let conn_str_ = conn_str.clone();
threads.push(
thread::Builder::new()
.name(format!("thread-{}", i))
.spawn(move || {
let n_txns = 10;
let n_evts = 10;
let mut txns = txn::create_txns(n_txns, n_vars, n_evts);
let loc_pool = mysql::Pool::new(conn_str_).unwrap();
println!(
"thread-{} using connection_id {}",
i,
op::get_connection_id(&loc_pool)
);
for ref mut txn in txns.iter_mut() {
op::do_transaction(txn, &loc_pool);
}
})
.unwrap(),
);
}
for t in threads {
t.join().expect("thread failed");
}
op::drop_database(&pool);
}
pub mod var;
pub mod txn;
pub mod bench;
extern crate rand;
use self::rand::Rng;
use algo::var::{Event, Transaction, Variable};
pub fn create_txn(lim: u64, n_op: usize) -> Transaction {
let mut rng = rand::thread_rng();
let mut v = vec![];
for _ in 0..n_op {
if rng.gen() {
let id = rng.gen::<u64>() % lim + 1;
v.push(Event::read(Variable::new(id, 0)));
} else {
let id = rng.gen::<u64>() % lim + 1;
let val = rng.gen::<u64>() % lim + 1;
v.push(Event::write(Variable::new(id, val)));
}
}
Transaction {
events: v,
commit: rng.gen(),
}
}
pub fn create_txns(n: usize, lim: u64, n_op: usize) -> Vec<Transaction> {
(0..n).map(|_| create_txn(lim, n_op)).collect()
}
#[derive(Debug, PartialEq, Eq)]
pub enum EventType {
WRITE,
READ,
}
#[derive(Debug, PartialEq, Eq)]
pub struct Variable {
pub id: u64,
pub val: u64,
}
impl Variable {
pub fn new(id: u64, val: u64) -> Self {
Variable { id: id, val: val }
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct Event {
pub ev_type: EventType,
pub var: Variable,
}
impl Event {
pub fn read(var: Variable) -> Self {
Event {
ev_type: EventType::READ,
var: var,
}
}
pub fn write(var: Variable) -> Self {
Event {
ev_type: EventType::WRITE,
var: var,
}
}
}
#[derive(Debug, PartialEq, Eq)]
pub struct Transaction {
pub commit: bool,
pub events: Vec<Event>,
}
pub fn create_txn() -> Transaction {
let mut v = vec![];
for i in 1..6 {
v.push(Event {
ev_type: EventType::READ,
var: Variable { id: i, val: 10 },
})
}
Transaction {
commit: true,
events: v,
}
}
pub mod op;
pub mod slowq;
extern crate mysql;
use algo::var::{EventType, Transaction, Variable};
pub fn create_table(pool: &mysql::Pool) {
let mut conn = pool.get_conn().unwrap();
conn.query("CREATE DATABASE IF NOT EXISTS dbcop").unwrap();
conn.query(
"CREATE TABLE IF NOT EXISTS dbcop.variables (id BIGINT(64) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, val BIGINT(64) UNSIGNED NOT NULL)",
).unwrap();
conn.query("TRUNCATE TABLE dbcop.variables").unwrap();
conn.query("USE dbcop").unwrap();
}
pub fn create_vars(limit: u64, pool: &mysql::Pool) {
for mut stmt in pool.prepare("INSERT INTO dbcop.variables (val) values (?)")
.into_iter()
{
for _ in 0..limit {
stmt.execute((0,)).unwrap();
}
}
}
pub fn drop_database(pool: &mysql::Pool) {
let mut conn = pool.get_conn().unwrap();
conn.query("DROP DATABASE dbcop").unwrap();
}
pub fn write_var(var: u64, val: u64, pool: &mysql::Pool) {
for mut stmt in pool.prepare("UPDATE dbcop.variables SET val=? WHERE id=?")
.into_iter()
{
stmt.execute((val, var)).unwrap();
}
}
pub fn read_var(var: u64, pool: &mysql::Pool) -> Variable {
pool.first_exec("SELECT * FROM dbcop.variables WHERE id=?", (var,))
.map(|result| {
let mut row = result.unwrap();
Variable {
id: row.take("id").unwrap(),
val: row.take("val").unwrap(),
}
})
.unwrap()
}
pub fn get_connection_id(pool: &mysql::Pool) -> u64 {
pool.first_exec("SELECT connection_id()", ())
.map(|result| {
let mut row = result.unwrap();
row.take("connection_id()").unwrap()
})
.unwrap()
}
pub fn do_transaction(txn: &mut Transaction, pool: &mysql::Pool) {
for mut sqltxn in pool.start_transaction(false, None, None) {
for ref mut e in txn.events.iter_mut() {
if e.ev_type == EventType::WRITE {
sqltxn
.prep_exec(
"UPDATE dbcop.variables SET val=? WHERE id=?",
(e.var.val, e.var.id),
)
.unwrap();
} else if e.ev_type == EventType::READ {
sqltxn
.prep_exec("SELECT * FROM dbcop.variables WHERE id=?", (e.var.id,))
.and_then(|mut rows| {
let mut row = rows.next().unwrap().unwrap();
// assert_eq!(e.var.id, row.take::<u64, &str>("id").unwrap());
e.var.val = row.take("val").unwrap();
Ok(())
})
.unwrap();
}
}
if txn.commit {
sqltxn.commit().unwrap();
} else {
sqltxn.rollback().unwrap();
}
}
}
extern crate mysql;
use mysql::time::Timespec;
use mysql::time::Duration;
#[derive(Debug, PartialEq, Eq)]
struct LogRow {
start_time: Option<Timespec>,
user_host: Option<String>,
query_time: Option<Duration>,
lock_time: Option<Duration>,
rows_sent: Option<i64>,
rows_examined: Option<i64>,
db: Option<String>,
last_insert_id: Option<i64>,
insert_id: Option<i64>,
server_id: Option<i64>,
sql_text: Option<String>,
thread_id: Option<i64>,
rows_affected: Option<i64>,
}
pub fn turn_on_slow_query(pool: &mysql::Pool) {
let mut conn = pool.get_conn().unwrap();
conn.query("SET GLOBAL slow_query_log = 'ON'").unwrap();
conn.query("SET GLOBAL long_query_time = 0").unwrap();
conn.query("SET GLOBAL log_output = 'TABLE'").unwrap();
}
pub fn clean_slow_query(pool: &mysql::Pool) {
let mut conn = pool.get_conn().unwrap();
conn.query("TRUNCATE TABLE mysql.slow_log").unwrap();
}
fn get_slow_query(mut conn: mysql::PooledConn) {
let slow_log: Vec<_> = conn.prep_exec("SELECT * FROM mysql.slow_log WHERE db=?", ("test",))
.map(|result| {
result
.map(|x| x.unwrap())
.map(|mut row| LogRow {
start_time: row.take("start_time"),
user_host: row.take("user_host"),
query_time: row.take("query_time"),
lock_time: row.take("lock_time"),
rows_sent: row.take("rows_sent"),
rows_examined: row.take("rows_examined"),
db: row.take("db"),
last_insert_id: row.take("last_insert_id"),
insert_id: row.take("insert_id"),
server_id: row.take("server_id"),
sql_text: row.take("sql_text"),
thread_id: row.take("thread_id"),
rows_affected: row.take("rows_affected"),
})
.collect()
})
.unwrap();
for ref e in slow_log.iter() {
println!(
"{:?} {:?} {:?} {:?} {:?}",
e.start_time.unwrap(),
e.query_time,
e.lock_time,
e.thread_id,
e.sql_text
);
}
}
#![allow(dead_code)]
extern crate clap;
extern crate mysql;
mod algo;
mod db;
use clap::{App, Arg};
fn main() {
let matches = App::new("DBcop")
.arg(
Arg::with_name("mysql_ip")
.short("i")
.long("ip")
.help("MySQL ip")
.takes_value(true),
)
.arg(
Arg::with_name("mysql_port")
.short("p")
.long("port")
.help("MySQL port")
.takes_value(true),
)
.arg(
Arg::with_name("mysql_username")
.short("u")
.long("username")
.help("MySQL username")
.required(true)
.takes_value(true),
)
.arg(
Arg::with_name("mysql_secret")
.short("s")
.long("secret")
.help("MySQL secret")
.required(true)
.takes_value(true),
)
.get_matches();
let ip = matches.value_of("mysql_ip").unwrap_or("localhost");
let port = matches.value_of("mysql_port").unwrap_or("3306");
let user = matches.value_of("mysql_username").unwrap();
let sec = matches.value_of("mysql_secret").unwrap();
println!("{} {} {:?} {:?}", ip, port, user, sec);
let conn_str = format!("mysql://{}:{}@{}:{}", user, sec, ip, port);
println!("{}", conn_str);
algo::bench::do_bench(conn_str);
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment