Commit 02e4af3b authored by Ranadeep Biswas's avatar Ranadeep Biswas
Browse files

trait based linearization

parent eb07eb03
use hashbrown::{HashMap, HashSet};
use consistency::util::{ConstrainedLinearization, DiGraph};
use slog::Logger;
type TransactionId = (usize, usize);
type TransactionInfo = (HashMap<usize, TransactionId>, HashSet<usize>);
type Variable = usize;
#[derive(Debug, Default)]
pub struct AtomicHistoryPO {
pub so: DiGraph<TransactionId>,
pub vis: DiGraph<TransactionId>,
pub root: TransactionId,
pub txns_info: HashMap<TransactionId, TransactionInfo>,
pub wr_rel: HashMap<Variable, DiGraph<TransactionId>>,
}
impl AtomicHistoryPO {
pub fn new(
n_sizes: &[usize],
txns_info: HashMap<TransactionId, TransactionInfo>,
) -> AtomicHistoryPO {
let root = (0, 0);
let mut so: DiGraph<TransactionId> = Default::default();
{
let v: Vec<_> = n_sizes
.iter()
.enumerate()
.filter_map(|(node_i, node_len)| {
if node_len > &0 {
Some((node_i + 1, 0))
} else {
None
}
})
.collect();
so.add_edges(root, &v);
}
{
for (node_i, &node_len) in n_sizes.iter().enumerate() {
for transaction_i in 1..node_len {
so.add_edge((node_i + 1, transaction_i - 1), (node_i + 1, transaction_i));
}
}
}
so.take_closure();
let mut wr_rel: HashMap<Variable, DiGraph<TransactionId>> = Default::default();
for (&txn_id, txn_info) in txns_info.iter() {
for &var in txn_info.1.iter() {
wr_rel.entry(var).or_insert_with(Default::default);
}
for (&var, &txn_id2) in txn_info.0.iter() {
let entry = wr_rel.entry(var).or_insert_with(Default::default);
entry.add_edge(txn_id2, txn_id);
}
}
AtomicHistoryPO {
vis: so.clone(),
so,
root,
txns_info,
wr_rel,
}
}
pub fn get_wr(&self) -> DiGraph<TransactionId> {
let mut wr: DiGraph<TransactionId> = Default::default();
for (_, wr_x) in self.wr_rel.iter() {
wr.union_with(wr_x);
}
wr
}
pub fn vis_includes(&mut self, g: &DiGraph<TransactionId>) {
self.vis.union_with(g);
}
pub fn vis_is_trans(&mut self) {
self.vis = self.vis.take_closure();
}
pub fn causal_ww(&mut self) -> HashMap<Variable, DiGraph<TransactionId>> {
let mut ww: HashMap<Variable, DiGraph<TransactionId>> = Default::default();
for (&x, wr_x) in self.wr_rel.iter() {
let mut ww_x: DiGraph<TransactionId> = Default::default();
for (t1, t3s) in wr_x.adj_map.iter() {
for (t2, _) in wr_x.adj_map.iter() {
if t1 != t2
&& (self.vis.has_edge(t2, t1)
|| t3s.iter().any(|t3| self.vis.has_edge(t2, t3)))
{
ww_x.add_edge(*t2, *t1);
}
}
}
ww.insert(x, ww_x);
}
ww
}
}
#[derive(Debug)]
pub struct PrefixConsistentHistory {
pub history: AtomicHistoryPO,
pub active_write: HashMap<Variable, HashSet<TransactionId>>,
log: Logger,
}
impl PrefixConsistentHistory {
pub fn new(
n_sizes: &[usize],
txns_info: HashMap<TransactionId, TransactionInfo>,
log: Logger,
) -> Self {
Self {
history: AtomicHistoryPO::new(n_sizes, txns_info),
active_write: Default::default(),
log,
}
}
}
impl ConstrainedLinearization for PrefixConsistentHistory {
type Vertex = (TransactionId, bool);
fn get_root(&self) -> Self::Vertex {
((0, 0), false)
}
fn children_of(&self, u: &Self::Vertex) -> Option<Vec<Self::Vertex>> {
if u.1 {
self.history
.vis
.adj_map
.get(&u.0)
.map(|vs| vs.iter().map(|&v| (v, false)).collect())
} else {
Some(vec![(u.0, true)])
}
}
fn forward_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(&curr_txn.0).unwrap();
if curr_txn.1 {
for &x in curr_txn_info.1.iter() {
let read_by = self
.history
.wr_rel
.get(&x)
.unwrap()
.adj_map
.get(&curr_txn.0)
.unwrap();
self.active_write.insert(x, read_by.clone());
}
} else {
for (&x, _) in curr_txn_info.0.iter() {
assert!(self
.active_write
.entry(x)
.or_insert_with(Default::default)
.remove(&curr_txn.0));
}
self.active_write.retain(|_, ts| ts.len() > 0);
}
}
fn backtrack_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(&curr_txn.0).unwrap();
if curr_txn.1 {
for &x in curr_txn_info.1.iter() {
assert!(self.active_write.remove(&x).is_some());
}
} else {
for (&x, _) in curr_txn_info.0.iter() {
self.active_write
.entry(x)
.or_insert_with(Default::default)
.insert(curr_txn.0);
}
}
}
fn allow_next(&self, _linearization: &[Self::Vertex], v: &Self::Vertex) -> bool {
if v.1 {
let curr_txn_info = self.history.txns_info.get(&v.0).unwrap();
curr_txn_info
.1
.iter()
.all(|x| match self.active_write.get(x) {
Some(ts) if ts.len() == 1 => ts.iter().next().unwrap() == &v.0,
None => true,
_ => false,
})
} else {
true
}
}
fn vertices(&self) -> Vec<Self::Vertex> {
self.history
.txns_info
.keys()
.map(|&u| vec![(u, false), (u, true)])
.flatten()
.collect()
}
}
#[derive(Debug)]
pub struct SnapshotIsolationHistory {
pub history: AtomicHistoryPO,
pub active_write: HashMap<Variable, HashSet<TransactionId>>,
pub active_variable: HashSet<Variable>,
log: Logger,
}
impl SnapshotIsolationHistory {
pub fn new(
n_sizes: &[usize],
txns_info: HashMap<TransactionId, TransactionInfo>,
log: Logger,
) -> Self {
Self {
history: AtomicHistoryPO::new(n_sizes, txns_info),
active_write: Default::default(),
active_variable: Default::default(),
log,
}
}
}
impl ConstrainedLinearization for SnapshotIsolationHistory {
type Vertex = (TransactionId, bool);
fn get_root(&self) -> Self::Vertex {
((0, 0), false)
}
fn children_of(&self, u: &Self::Vertex) -> Option<Vec<Self::Vertex>> {
if u.1 {
self.history
.vis
.adj_map
.get(&u.0)
.map(|vs| vs.iter().map(|&v| (v, false)).collect())
} else {
Some(vec![(u.0, true)])
}
}
fn forward_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(&curr_txn.0).unwrap();
if curr_txn.1 {
for &x in curr_txn_info.1.iter() {
let read_by = self
.history
.wr_rel
.get(&x)
.unwrap()
.adj_map
.get(&curr_txn.0)
.unwrap();
self.active_write.insert(x, read_by.clone());
}
self.active_variable = self
.active_variable
.difference(&curr_txn_info.1)
.cloned()
.collect();
} else {
for (&x, _) in curr_txn_info.0.iter() {
assert!(self
.active_write
.entry(x)
.or_insert_with(Default::default)
.remove(&curr_txn.0));
self.active_write.retain(|_, ts| ts.len() > 0);
}
self.active_variable = self
.active_variable
.union(&curr_txn_info.1)
.cloned()
.collect();
}
}
fn backtrack_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(&curr_txn.0).unwrap();
if curr_txn.1 {
for &x in curr_txn_info.1.iter() {
assert!(self.active_write.remove(&x).is_some());
}
self.active_variable = self
.active_variable
.union(&curr_txn_info.1)
.cloned()
.collect();
} else {
for (&x, _) in curr_txn_info.0.iter() {
self.active_write
.entry(x)
.or_insert_with(Default::default)
.insert(curr_txn.0);
}
self.active_variable = self
.active_variable
.difference(&curr_txn_info.1)
.cloned()
.collect();
}
}
fn allow_next(&self, _linearization: &[Self::Vertex], v: &Self::Vertex) -> bool {
if v.1 {
let curr_txn_info = self.history.txns_info.get(&v.0).unwrap();
curr_txn_info
.1
.iter()
.all(|x| match self.active_write.get(x) {
Some(ts) if ts.len() == 1 => ts.iter().next().unwrap() == &v.0,
None => true,
_ => false,
})
} else {
self.active_variable
.intersection(&self.history.txns_info.get(&v.0).unwrap().1)
.next()
.is_none()
}
}
fn vertices(&self) -> Vec<Self::Vertex> {
self.history
.txns_info
.keys()
.map(|&u| vec![(u, false), (u, true)])
.flatten()
.collect()
}
}
#[derive(Debug)]
pub struct SerializableHistory {
pub history: AtomicHistoryPO,
pub active_write: HashMap<Variable, HashSet<TransactionId>>,
log: Logger,
}
impl SerializableHistory {
pub fn new(
n_sizes: &[usize],
txns_info: HashMap<TransactionId, TransactionInfo>,
log: Logger,
) -> Self {
Self {
history: AtomicHistoryPO::new(n_sizes, txns_info),
active_write: Default::default(),
log,
}
}
}
impl ConstrainedLinearization for SerializableHistory {
type Vertex = TransactionId;
fn get_root(&self) -> Self::Vertex {
(0, 0)
}
fn forward_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(curr_txn).unwrap();
for (&x, _) in curr_txn_info.0.iter() {
assert!(self
.active_write
.entry(x)
.or_insert_with(Default::default)
.remove(curr_txn));
}
self.active_write.retain(|_, ts| ts.len() > 0);
for &x in curr_txn_info.1.iter() {
let read_by = self
.history
.wr_rel
.get(&x)
.unwrap()
.adj_map
.get(curr_txn)
.unwrap();
self.active_write.insert(x, read_by.clone());
}
}
fn backtrack_book_keeping(&mut self, linearization: &[Self::Vertex]) {
let curr_txn = linearization.last().unwrap();
let curr_txn_info = self.history.txns_info.get(curr_txn).unwrap();
for &x in curr_txn_info.1.iter() {
assert!(self.active_write.remove(&x).is_some());
}
for (&x, _) in curr_txn_info.0.iter() {
self.active_write
.entry(x)
.or_insert_with(Default::default)
.insert(*curr_txn);
}
}
fn children_of(&self, u: &Self::Vertex) -> Option<Vec<Self::Vertex>> {
self.history
.vis
.adj_map
.get(u)
.map(|vs| vs.iter().cloned().collect())
}
fn allow_next(&self, _linearization: &[Self::Vertex], v: &Self::Vertex) -> bool {
let curr_txn_info = self.history.txns_info.get(v).unwrap();
curr_txn_info
.1
.iter()
.all(|x| match self.active_write.get(x) {
Some(ts) if ts.len() == 1 => ts.iter().next().unwrap() == v,
None => true,
_ => false,
})
}
fn vertices(&self) -> Vec<Self::Vertex> {
self.history.txns_info.keys().cloned().collect()
}
}
use std::collections::{HashMap, HashSet};
use consistency::ser::Chains;
use consistency::util::EdgeClosure;
use petgraph::algo::astar;
// use petgraph::dot::{Config, Dot};
use petgraph::graph::node_index;
use petgraph::Graph;
use slog::Logger;
pub struct Causal {
chains: Chains,
co_closure: EdgeClosure,
pub vis_pg: Graph<usize, usize>,
pub co_pg: Graph<usize, usize>,
log: Logger,
}
impl Causal {
pub fn new(
n_sizes: &Vec<usize>,
txns_info: &HashMap<(usize, usize), (HashMap<usize, (usize, usize)>, HashSet<usize>)>,
log: Logger,
) -> Self {
let mut chains = Chains::new(&n_sizes, &txns_info, log.clone());
chains.preprocess_wr();
Causal {
chains: chains,
co_closure: EdgeClosure::new(),
vis_pg: Graph::new(),
co_pg: Graph::new(),
log,
}
}
pub fn preprocess_vis(&mut self) -> bool {
for po in self.chains.tuple_to_id.iter().skip(1) {
for &id in po.iter().rev().skip(1) {
if self.chains.vis_closure.contains(id + 1, id) {
info!(self.log, "found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(id, id + 1);
self.co_closure.add_edge(id, id + 1);
self.vis_pg
.extend_with_edges(&[(id as u32, (id + 1) as u32, 1)]);
self.co_pg
.extend_with_edges(&[(id as u32, (id + 1) as u32, 1)]);
}
if let Some(&u) = po.first() {
if self.chains.vis_closure.contains(u, self.chains.root_txn_id) {
info!(self.log, "found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(self.chains.root_txn_id, u);
self.co_closure.add_edge(self.chains.root_txn_id, u);
self.vis_pg
.extend_with_edges(&[(self.chains.root_txn_id as u32, u as u32, 1)]);
self.co_pg
.extend_with_edges(&[(self.chains.root_txn_id as u32, u as u32, 1)]);
}
}
for (_, info) in self.chains.wr_order.iter() {
for (&u, vs) in info {
for &v in vs.iter() {
if self.chains.vis_closure.contains(v, u) {
info!(self.log, "found cycle in WR");
return false;
}
self.chains.vis_closure.add_edge(u, v);
self.co_closure.add_edge(u, v);
self.vis_pg.extend_with_edges(&[(u as u32, v as u32, 1)]);
self.co_pg.extend_with_edges(&[(u as u32, v as u32, 1)]);
}
}
}
return true;
}
pub fn preprocess_co(&mut self) -> bool {
let mut ww_reason = HashMap::new();
loop {
// let mut new_rw_edge = Vec::new();
let mut new_ww_edge = Vec::new();
for (&_x, wr_x) in self.chains.wr_order.iter() {
for (&u, vs) in wr_x.iter() {
for &v in vs.iter() {
for (&u_, _) in wr_x.iter() {
if u != u_ && v != u_ {
// if self.chains.vis_closure.contains(u, u_) {
// info!(self.log,
// "adding RW ({1}, {2}), WR_{3}({0}, {1}), {3} in W({2}), VIS({0}, {2})",
// u, v, u_, _x
// );
// if self.chains.vis_closure.contains(u_, v) {
// // info!(self.log,"cycle: {0} -> {1} -> {0}", v, u_);
// info!(self.log,
// "VIS*-RW cycle: {0:?} -> {1:?} -> {0:?}",
// self.id_to_tuple[v], self.id_to_tuple[u_]
// );
// return false;
// }
// new_rw_edge.push((v, u_));
// }
if self.chains.vis_closure.contains(u_, v) {
// info!(self.log,
// "adding WW ({2}, {0}), WR_{3}({0}, {1}), {3} in W({2}), VIS({2}, {1})",
// u, v, u_, _x
// );
if self.chains.vis_closure.contains(u, u_) {
// info!(self.log,"cycle: {0} -> {1} -> {0}", u_, u);
info!(
self.log,
"cycle: {0:?} co {1:?} vis {0:?}",
self.chains.id_to_tuple[u_],
self.chains.id_to_tuple[u]
);
return false;
}
new_ww_edge.push((u_, u));
ww_reason.insert((u_, u), (u, v, u_, _x));
}
}
}
}
}
}
let mut is_converged = true;
for (u, v) in new_ww_edge {
if self.co_closure.contains(v, u) {
// info!(self.log,"cycle: {0} -> {1} -> {0}", u, v);
info!(
self.log,
"cycle: {0:?} co {1:?} co {0:?}",
self.chains.id_to_tuple[u],
self.chains.id_to_tuple[v]
);
let co_path = astar(
&self.co_pg,
node_index(v),