Commit db434c50 authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

Support for Causal Consistency

parent af522bc2
use std::collections::{HashMap, HashSet};
use consistency::ser::Chains;
use consistency::util::EdgeClosure;
pub struct Causal {
chains: Chains,
co_closure: EdgeClosure,
}
impl Causal {
pub fn new(
n_sizes: &Vec<usize>,
txns_info: &HashMap<(usize, usize), (HashMap<usize, (usize, usize)>, HashSet<usize>)>,
) -> Self {
let mut chains = Chains::new(&n_sizes, &txns_info);
chains.preprocess_wr();
Causal {
chains: chains,
co_closure: EdgeClosure::new(),
}
}
pub fn preprocess_vis(&mut self) -> bool {
for po in self.chains.tuple_to_id.iter().skip(1) {
for &id in po.iter().rev().skip(1) {
if self.chains.vis_closure.contains(id + 1, id) {
println!("found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(id, id + 1);
self.co_closure.add_edge(id, id + 1);
}
if let Some(&u) = po.first() {
if self.chains.vis_closure.contains(u, self.chains.root_txn_id) {
println!("found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(self.chains.root_txn_id, u);
self.co_closure.add_edge(self.chains.root_txn_id, u);
}
}
for (_, info) in self.chains.wr_order.iter() {
for (&u, vs) in info {
for &v in vs.iter() {
if self.chains.vis_closure.contains(v, u) {
println!("found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(u, v);
self.co_closure.add_edge(u, v);
}
}
}
return true;
}
pub fn preprocess_co(&mut self) -> bool {
loop {
// let mut new_rw_edge = Vec::new();
let mut new_ww_edge = Vec::new();
for (&_x, wr_x) in self.chains.wr_order.iter() {
for (&u, vs) in wr_x.iter() {
for &v in vs.iter() {
for (&u_, _) in wr_x.iter() {
if u != u_ && v != u_ {
// if self.chains.vis_closure.contains(u, u_) {
// println!(
// "adding RW ({1}, {2}), WR_{3}({0}, {1}), {3} in W({2}), VIS({0}, {2})",
// u, v, u_, _x
// );
// if self.chains.vis_closure.contains(u_, v) {
// // println!("cycle: {0} -> {1} -> {0}", v, u_);
// println!(
// "VIS*-RW cycle: {0:?} -> {1:?} -> {0:?}",
// self.id_to_tuple[v], self.id_to_tuple[u_]
// );
// return false;
// }
// new_rw_edge.push((v, u_));
// }
if self.chains.vis_closure.contains(u_, v) {
println!(
"adding WW ({2}, {0}), WR_{3}({0}, {1}), {3} in W({2}), VIS({2}, {1})",
u, v, u_, _x
);
if self.chains.vis_closure.contains(u, u_) {
// println!("cycle: {0} -> {1} -> {0}", u_, u);
println!(
"cycle: {0:?} -> {1:?} -> {0:?}",
self.chains.id_to_tuple[u_], self.chains.id_to_tuple[u]
);
return false;
}
new_ww_edge.push((u_, u));
}
}
}
}
}
}
let mut is_converged = true;
for (u, v) in new_ww_edge {
if self.co_closure.contains(v, u) {
// println!("cycle: {0} -> {1} -> {0}", u, v);
println!(
"cycle: {0:?} -> {1:?} -> {0:?}",
self.chains.id_to_tuple[u], self.chains.id_to_tuple[v]
);
return false;
}
is_converged &= !self.co_closure.add_edge(u, v);
}
if is_converged {
break;
}
}
return true;
}
}
pub mod causal;
pub mod ser;
pub mod si;
pub mod util;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone)]
struct EdgeClosure {
forward_edge: HashMap<usize, HashSet<usize>>,
backward_edge: HashMap<usize, HashSet<usize>>,
}
impl EdgeClosure {
pub fn new() -> Self {
EdgeClosure {
forward_edge: HashMap::new(),
backward_edge: HashMap::new(),
}
}
pub fn contains(&self, u: usize, v: usize) -> bool {
self.forward_edge
.get(&u)
.and_then(|vs| Some(vs.contains(&v)))
== Some(true)
}
pub fn add_edge(&mut self, u: usize, v: usize) -> bool {
// returns true if new edge added
if !self.contains(u, v) {
let mut new_edge = Vec::new();
{
let opt_prevs_u = self.backward_edge.get(&u);
let opt_nexts_v = self.forward_edge.get(&v);
if let Some(prevs_u) = opt_prevs_u {
if let Some(nexts_v) = opt_nexts_v {
for &prev_u in prevs_u.iter() {
for &next_v in nexts_v.iter() {
if !self.contains(prev_u, next_v) {
new_edge.push((prev_u, next_v));
}
}
}
}
}
if let Some(prevs_u) = opt_prevs_u {
for &prev_u in prevs_u.iter() {
if !self.contains(prev_u, v) {
new_edge.push((prev_u, v));
}
}
}
if let Some(nexts_v) = opt_nexts_v {
for &next_v in nexts_v.iter() {
if !self.contains(u, next_v) {
new_edge.push((u, next_v));
}
}
}
new_edge.push((u, v));
}
for (u_, v_) in new_edge {
let ent_u = self.forward_edge.entry(u_).or_insert_with(HashSet::new);
ent_u.insert(v_);
let ent_v = self.backward_edge.entry(v_).or_insert_with(HashSet::new);
ent_v.insert(u_);
}
true
} else {
false
}
}
}
use consistency::util::EdgeClosure;
#[derive(Debug)]
pub struct Chains {
n_sizes: Vec<usize>,
root_txn_id: usize,
txns: Vec<(HashMap<usize, usize>, HashSet<usize>)>,
pub n_sizes: Vec<usize>,
pub root_txn_id: usize,
pub txns: Vec<(HashMap<usize, usize>, HashSet<usize>)>,
tuple_to_id: Vec<Vec<usize>>,
id_to_tuple: Vec<(usize, usize)>,
wr_order: HashMap<usize, HashMap<usize, HashSet<usize>>>,
wr_order_by_txn: HashMap<usize, HashMap<usize, HashSet<usize>>>,
vis_closure: EdgeClosure,
pub tuple_to_id: Vec<Vec<usize>>,
pub id_to_tuple: Vec<(usize, usize)>,
pub wr_order: HashMap<usize, HashMap<usize, HashSet<usize>>>,
pub wr_order_by_txn: HashMap<usize, HashMap<usize, HashSet<usize>>>,
pub vis_closure: EdgeClosure,
}
impl Chains {
......@@ -185,14 +119,22 @@ impl Chains {
// }
}
pub fn preprocess_vis(&mut self) {
pub fn preprocess_vis(&mut self) -> bool {
for po in self.tuple_to_id.iter().skip(1) {
for (j, &id) in po.iter().enumerate() {
if j < po.len() - 1 {
if self.vis_closure.contains(id + 1, id) {
println!("found cycles in VIS");
return false;
}
self.vis_closure.add_edge(id, id + 1);
}
}
if let Some(&u) = po.first() {
if self.vis_closure.contains(u, self.root_txn_id) {
println!("found cycles in VIS");
return false;
}
self.vis_closure.add_edge(self.root_txn_id, u);
}
}
......@@ -200,10 +142,15 @@ impl Chains {
for (_, info) in self.wr_order.iter() {
for (&u, vs) in info {
for &v in vs.iter() {
if self.vis_closure.contains(v, u) {
println!("found cycles in VIS");
return false;
}
self.vis_closure.add_edge(u, v);
}
}
}
return true;
}
pub fn preprocess_ww_rw(&mut self) -> bool {
......@@ -274,8 +221,7 @@ impl Chains {
pub fn preprocess(&mut self) -> bool {
self.preprocess_wr();
self.preprocess_vis();
self.preprocess_ww_rw()
self.preprocess_vis() && self.preprocess_ww_rw()
}
pub fn _serializable_order_dfs(
......
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone)]
pub struct EdgeClosure {
pub forward_edge: HashMap<usize, HashSet<usize>>,
pub backward_edge: HashMap<usize, HashSet<usize>>,
}
impl EdgeClosure {
pub fn new() -> Self {
EdgeClosure {
forward_edge: HashMap::new(),
backward_edge: HashMap::new(),
}
}
pub fn contains(&self, u: usize, v: usize) -> bool {
self.forward_edge
.get(&u)
.and_then(|vs| Some(vs.contains(&v)))
== Some(true)
}
pub fn add_edge(&mut self, u: usize, v: usize) -> bool {
// returns true if new edge added
if !self.contains(u, v) {
let mut new_edge = Vec::new();
{
let opt_prevs_u = self.backward_edge.get(&u);
let opt_nexts_v = self.forward_edge.get(&v);
if let Some(prevs_u) = opt_prevs_u {
if let Some(nexts_v) = opt_nexts_v {
for &prev_u in prevs_u.iter() {
for &next_v in nexts_v.iter() {
if !self.contains(prev_u, next_v) {
new_edge.push((prev_u, next_v));
}
}
}
}
}
if let Some(prevs_u) = opt_prevs_u {
for &prev_u in prevs_u.iter() {
if !self.contains(prev_u, v) {
new_edge.push((prev_u, v));
}
}
}
if let Some(nexts_v) = opt_nexts_v {
for &next_v in nexts_v.iter() {
if !self.contains(u, next_v) {
new_edge.push((u, next_v));
}
}
}
new_edge.push((u, v));
}
for (u_, v_) in new_edge {
let ent_u = self.forward_edge.entry(u_).or_insert_with(HashSet::new);
ent_u.insert(v_);
let ent_v = self.backward_edge.entry(v_).or_insert_with(HashSet::new);
ent_v.insert(u_);
}
true
} else {
false
}
}
}
......@@ -10,6 +10,8 @@ use std::thread;
use std::convert::From;
use serde_yaml;
#[derive(Debug, Clone)]
pub struct Node {
pub ip: IpAddr,
......@@ -87,6 +89,11 @@ where
}
println!();
}
println!("# yaml");
println!("{}", serde_yaml::to_string(&hist).unwrap());
println!();
transactional_history_verify(&hist, &id_vec);
self.cleanup();
None
......
use ansi_term::Style;
use std::fmt;
#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub struct Event {
pub id: usize,
pub write: bool,
......@@ -9,6 +10,12 @@ pub struct Event {
pub success: bool,
}
#[derive(Serialize, Deserialize, Eq, PartialEq)]
pub struct Transaction {
pub events: Vec<Event>,
pub success: bool,
}
impl fmt::Debug for Event {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let repr = format!(
......@@ -55,11 +62,6 @@ impl Event {
}
}
pub struct Transaction {
pub events: Vec<Event>,
pub success: bool,
}
impl fmt::Debug for Transaction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let repr = format!("{:?}", self.events);
......
use std::collections::{HashMap, HashSet};
use consistency::causal::Causal;
use consistency::ser::Chains;
use consistency::si::SIChains;
use db::history::Transaction;
......@@ -23,7 +24,7 @@ pub fn transactional_history_verify(
let event2 = &transaction2.events[i_event];
// println!("{:?}\n{:?}", event, event2);
if !transaction2.success {
println!("UNCOMMITTED READ");
println!("DIRTY READ");
return;
}
if !event2.write {
......@@ -189,54 +190,67 @@ pub fn transactional_history_verify(
}
{
println!("Doing serializable consistency check");
let mut chains = Chains::new(&n_sizes, &transaction_infos);
println!("{:?}", chains);
if !chains.preprocess() {
println!("found cycle while processing wr and po order");
}
// println!("{:?}", chains);
// println!("{:?}", chains.serializable_order_dfs());
match chains.serializable_order_dfs() {
Some(order) => {
println!("Serializable progress of transactions");
for node_id in order {
print!("{} ", node_id);
}
println!();
println!("SER")
}
None => {
println!("No valid SER history");
{
println!("Doing causal consistency check");
let mut causal = Causal::new(&n_sizes, &transaction_infos);
if causal.preprocess_vis() && causal.preprocess_co() {
println!("History is causal consistent!");
println!();
{
println!("Doing snapshot isolation check");
let mut chains = SIChains::new(&n_sizes, &transaction_infos);
println!("{:?}", chains);
if !chains.preprocess() {
println!("found cycle while processing wr and po order");
println!("Doing serializable consistency check");
let mut chains = Chains::new(&n_sizes, &transaction_infos);
println!("{:?}", chains);
if !chains.preprocess() {
println!("found cycle while processing wr and po order");
}
// println!("{:?}", chains);
// println!("{:?}", chains.serializable_order_dfs());
match chains.serializable_order_dfs() {
Some(order) => {
println!("Serializable progress of transactions");
for node_id in order {
print!("{} ", node_id);
}
println!();
println!("SER")
}
// println!("{:?}", chains);
match chains.serializable_order_dfs() {
Some(order) => {
let mut rw_map = HashMap::new();
println!("SI progress of transactions (broken in read and write)");
for node_id in order {
let ent = rw_map.entry(node_id).or_insert(true);
if *ent {
print!("{}R ", node_id);
*ent = false;
} else {
print!("{}W ", node_id);
*ent = true;
None => {
println!("No valid SER history");
println!();
{
println!("Doing snapshot isolation check");
let mut chains = SIChains::new(&n_sizes, &transaction_infos);
println!("{:?}", chains);
if !chains.preprocess() {
println!("found cycle while processing wr and po order");
}
// println!("{:?}", chains);
match chains.serializable_order_dfs() {
Some(order) => {
let mut rw_map = HashMap::new();
println!(
"SI progress of transactions (broken in read and write)"
);
for node_id in order {
let ent = rw_map.entry(node_id).or_insert(true);
if *ent {
print!("{}R ", node_id);
*ent = false;
} else {
print!("{}W ", node_id);
*ent = true;
}
}
println!();
println!("SI")
}
None => println!("No valid SI history\nNON-SI"),
}
println!();
println!("SI")
}
None => println!("No valid SI history\nNON-SI"),
}
}
} else {
println!("no valid causal consistent history");
println!("NON-CC");
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment