From e58c63b2619accfbf3e4a3ae80e96c0d2961fa01 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Thu, 7 Apr 2022 20:37:13 +1000 Subject: [PATCH 1/8] WIP --- src/executor/execute.rs | 28 +-- src/executor/query/mod.rs | 15 +- src/executor/query/select/join/plan.rs | 14 +- src/executor/query/select/mod.rs | 187 +++++++++-------- src/executor/query/select/plan/mod.rs | 9 +- src/executor/query/set_expr.rs | 17 +- src/glue/database.rs | 5 + src/glue/mod.rs | 273 +++---------------------- src/glue/payload.rs | 11 + 9 files changed, 178 insertions(+), 381 deletions(-) create mode 100644 src/glue/database.rs create mode 100644 src/glue/payload.rs diff --git a/src/executor/execute.rs b/src/executor/execute.rs index 6b12db9c..95257ce4 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -5,7 +5,7 @@ use { other::explain, query::query, }, - crate::{glue::Context, parse_sql::Query, Result, Row, StorageInner, Value}, + crate::{Glue, glue::Context, parse_sql::Query, Result, Row, StorageInner, Value}, serde::Serialize, sqlparser::ast::{SetVariableValue, Statement}, std::convert::TryInto, @@ -61,9 +61,9 @@ pub enum Payload { TruncateTable, } -pub async fn execute( - mut storages: Vec<(String, &mut StorageInner)>, - context: &mut Context, +impl Glue { +pub async fn execute_query( + &mut self, statement: &Query, ) -> Result { let Query(statement) = statement; @@ -76,7 +76,7 @@ pub async fn execute( columns, if_not_exists, .. - } => create_table(storages[0].1, name, columns, *if_not_exists) + } => self.create_table(name, columns, *if_not_exists) .await .map(|_| Payload::Create), Statement::Drop { @@ -84,14 +84,14 @@ pub async fn execute( names, if_exists, .. - } => drop(storages[0].1, object_type, names, *if_exists) + } => self.drop(object_type, names, *if_exists) .await .map(|_| Payload::DropTable), #[cfg(feature = "alter-table")] - Statement::AlterTable { name, operation } => alter_table(storages[0].1, name, operation) + Statement::AlterTable { name, operation } => self.alter_table(name, operation) .await .map(|_| Payload::AlterTable), - Statement::Truncate { table_name, .. } => truncate(storages[0].1, table_name) + Statement::Truncate { table_name, .. } => self.truncate(table_name) .await .map(|_| Payload::TruncateTable), Statement::CreateIndex { @@ -100,8 +100,7 @@ pub async fn execute( columns, unique, if_not_exists, - } => create_index( - storages[0].1, + } => self.create_index( table_name, name, columns, @@ -117,22 +116,22 @@ pub async fn execute( columns, source, .. - } => insert(&mut storages, context, table_name, columns, source, false).await, + } => self.insert(table_name, columns, source, false).await, Statement::Update { table, selection, assignments, // TODO from: _, - } => update(storages[0].1, context, table, selection, assignments).await, + } => self.update(table, selection, assignments).await, Statement::Delete { table_name, selection, - } => delete(&mut storages, context, table_name, selection).await, + } => self.delete(table_name, selection).await, //- Selection Statement::Query(query_value) => { - let result = query(&mut storages, context, *query_value.clone()).await?; + let result = self.query(*query_value.clone()).await?; let (labels, rows) = result; let rows = rows.into_iter().map(Row).collect(); // I don't like this. TODO let payload = Payload::Select { labels, rows }; @@ -159,3 +158,4 @@ pub async fn execute( _ => Err(ExecuteError::QueryNotSupported.into()), } } +} diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index e1438539..67de3d39 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -4,6 +4,7 @@ mod set_expr; pub use select::{join::*, ManualError, PlanError, SelectError}; use { crate::{ + Glue, executor::types::LabelsAndRows, result::Result, Cast, Context, MetaRecipe, RecipeUtilities, StorageInner, Value, }, @@ -34,10 +35,10 @@ pub enum QueryError { OperationColumnsMisaligned, } +impl Glue { #[async_recursion(?Send)] pub async fn query( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, + &mut self, query: Query, ) -> Result { let Query { @@ -50,6 +51,9 @@ pub async fn query( fetch: _, lock: _, } = query; + + let context = self.get_mut_context(); + let limit: Option = limit .map(|expression| { MetaRecipe::new(expression)? @@ -67,8 +71,6 @@ pub async fn query( }) .transpose()?; - let mut context = context.clone(); - let context = &mut context; // We don't actually want to pass on any changes from here if let Some(with) = with { let With { recursive: _, // Recursive not currently supported @@ -85,12 +87,12 @@ pub async fn query( columns: _, // TODO: Columns - Check that number is same and then rename labels } = alias; let name = name.value; - let data = self::query(storages, context, query).await?; + let data = self.query(query).await?; context.set_table(name, data); } } - let (mut labels, mut rows) = from_body(storages, context, body, order_by).await?; + let (mut labels, mut rows) = from_body(body, order_by).await?; if let Some(offset) = offset { rows.drain(0..offset); @@ -117,3 +119,4 @@ pub async fn query( } Ok((labels, rows)) } +} diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 7ac0de72..2f0d97ae 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -6,7 +6,7 @@ use { types::{ColumnInfo, ComplexTableName}, MetaRecipe, }, - Context, JoinError, Result, StorageInner, + Context, JoinError, Result, StorageInner, Glue }, std::cmp::Ordering, }; @@ -40,15 +40,14 @@ impl Ord for JoinPlan { impl JoinPlan { pub async fn new<'a>( join_manual: JoinManual, - storages: &[(String, &mut StorageInner)], - context: &Context, + glue: &Glue, ) -> Result { let JoinManual { table, constraint, join_type, } = join_manual; - let columns = get_columns(storages, table.clone(), context).await?; + let columns = get_columns(glue, table.clone()).await?; let ComplexTableName { database, name: table, @@ -90,11 +89,10 @@ impl JoinPlan { } async fn get_columns( - storages: &[(String, &mut StorageInner)], + glue: &Glue, table: ComplexTableName, - context: &Context, ) -> Result> { - if let Some((context_table_labels, ..)) = context.tables.get(&table.name) { + if let Some((context_table_labels, ..)) = glue.get_context().tables.get(&table.name) { Ok(context_table_labels .iter() .map(|name| ColumnInfo { @@ -104,7 +102,7 @@ async fn get_columns( }) .collect::>()) } else { - let storage = storages + let storage = glue.get_storages() .iter() .find_map(|(name, storage)| { if name == &table.database { diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index a76f06f8..e92aa9ba 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -5,6 +5,7 @@ mod plan; use { crate::{ + Glue, executor::{ types::{LabelsAndRows, Row}, PlannedRecipe, @@ -41,13 +42,9 @@ pub enum SelectError { Unreachable, } -pub async fn select( - storages: &[(String, &mut StorageInner)], - context: &Context, - query: Select, - order_by: Vec, -) -> Result { - let Plan { +impl Glue { + pub async fn select(&mut self, plan: Plan) -> Result { + let Plan { joins, select_items, constraint, @@ -55,95 +52,105 @@ pub async fn select( groups, order_by, labels, - } = Plan::new(storages, context, query, order_by).await?; - - let rows = stream::iter(joins) - .map(Ok) - .try_fold(vec![], |rows, join| async { - join.execute(storages, context, rows).await - }) - .await?; - - let rows = order_by.execute(rows)?; // TODO: This should be done after filtering - - let selected_rows = - rows.into_par_iter() - .filter_map(|row| match constraint.confirm_constraint(&row) { - Ok(true) => Some( - select_items - .clone() - .into_iter() - .map(|selection| selection.simplify_by_row(&row)) - .collect::>>() - .map(|selection| (selection, row)), - ), - Ok(false) => None, - Err(error) => Some(Err(error)), - }); - let do_group = !groups.is_empty() - || select_items - .iter() - .any(|select_item| !select_item.aggregates.is_empty()); - - let final_rows = if do_group { - let groups = if groups.is_empty() { - vec![PlannedRecipe::TRUE] + } = plan; + let rows = stream::iter(joins) + .map(Ok) + .try_fold(vec![], |rows, join| async { + join.execute(self, rows).await + }) + .await?; + + let rows = order_by.execute(rows)?; // TODO: This should be done after filtering + + let selected_rows = + rows.into_par_iter() + .filter_map(|row| match constraint.confirm_constraint(&row) { + Ok(true) => Some( + select_items + .clone() + .into_iter() + .map(|selection| selection.simplify_by_row(&row)) + .collect::>>() + .map(|selection| (selection, row)), + ), + Ok(false) => None, + Err(error) => Some(Err(error)), + }); + let do_group = !groups.is_empty() + || select_items + .iter() + .any(|select_item| !select_item.aggregates.is_empty()); + + let final_rows = if do_group { + let groups = if groups.is_empty() { + vec![PlannedRecipe::TRUE] + } else { + groups + }; + + let accumulations: Vec<(Vec, Option, Vec)> = + selected_rows + .filter_map(|selection| { + let (selected_row, row) = try_option!(selection); + let group_constraint = + try_option!(group_constraint.clone().simplify_by_row(&row)); + let group_constraint = match group_constraint.as_solution() { + Some(Value::Bool(true)) => None, + Some(Value::Bool(false)) => return None, + Some(_) => unreachable!(), // TODO: Handle + None => Some(group_constraint), + }; + let groupers = try_option!(groups + .iter() + .map(|group| { + group + .clone() + .simplify_by_row(&row)? + .confirm_or_err(SelectError::GrouperMayNotContainAggregate.into()) + }) + .collect::>>()); + Some(Ok((groupers, group_constraint, selected_row))) + }) + .map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc])) + .try_reduce_with(accumulate) + .unwrap_or(Ok(vec![]))?; // TODO: Improve + + accumulations + .into_par_iter() + .map(|(_grouper, _group_constraint, vals)| { + vals.into_iter() + .map(|val| val.finalise_accumulation()) + .collect::>>() + }) + .collect::>>>()? + // TODO: Manage grouper and constraint } else { - groups - }; - - let accumulations: Vec<(Vec, Option, Vec)> = selected_rows - .filter_map(|selection| { - let (selected_row, row) = try_option!(selection); - let group_constraint = - try_option!(group_constraint.clone().simplify_by_row(&row)); - let group_constraint = match group_constraint.as_solution() { - Some(Value::Bool(true)) => None, - Some(Value::Bool(false)) => return None, - Some(_) => unreachable!(), // TODO: Handle - None => Some(group_constraint), - }; - let groupers = try_option!(groups - .iter() - .map(|group| { - group - .clone() - .simplify_by_row(&row)? - .confirm_or_err(SelectError::GrouperMayNotContainAggregate.into()) - }) - .collect::>>()); - Some(Ok((groupers, group_constraint, selected_row))) - }) - .map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc])) - .try_reduce_with(accumulate) - .unwrap_or(Ok(vec![]))?; // TODO: Improve - - accumulations - .into_par_iter() - .map(|(_grouper, _group_constraint, vals)| { - vals.into_iter() - .map(|val| val.finalise_accumulation()) - .collect::>>() - }) - .collect::>>>()? - // TODO: Manage grouper and constraint - } else { - selected_rows - .map(|selection| { - selection.and_then(|(selection, _)| { - selection - .into_iter() - .map(|selected| selected.confirm()) - .collect::>() + .map(|selection| { + selection.and_then(|(selection, _)| { + selection + .into_iter() + .map(|selected| selected.confirm()) + .collect::>() + }) }) - }) - .collect::>>()? - }; + .collect::>>()? + }; - Ok((labels, final_rows)) + Ok((labels, final_rows)) + } + pub async fn select_query( + &mut self, + query: Select, + order_by: Vec, +) -> Result { + let plan = Plan::new(self, query, order_by).await?; + self.select(plan); + } } + + #[allow(clippy::type_complexity)] // TODO fn accumulate( mut rows_l: Vec<(Vec, Option, Vec)>, diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index 8cecc9e4..c1de5b75 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -5,7 +5,7 @@ use { }, crate::{ executor::{types::ColumnInfo, PlannedRecipe}, - Context, Result, StorageInner, + Context, Result, StorageInner, Glue, }, futures::future::join_all, serde::Serialize, @@ -35,8 +35,7 @@ pub enum PlanError { impl Plan { pub async fn new( - storages: &[(String, &mut StorageInner)], - context: &Context, + glue: &Glue, select: Select, order_by: Vec, ) -> Result { @@ -46,12 +45,12 @@ impl Plan { constraint, group_constraint, groups, - } = Manual::new(select, context)?; + } = Manual::new(select, glue.get_context())?; let mut joins: Vec = join_all( joins .into_iter() - .map(|join| JoinPlan::new(join, storages, context)) + .map(|join| JoinPlan::new(join, glue)) .collect::>(), ) .await diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index 1aa2b433..681e6306 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -3,23 +3,23 @@ use { crate::{ executor::{alter_row::insert, types::LabelsAndRows}, macros::warning, - result::Result, + result::Result, Glue, Context, MetaRecipe, Payload, RecipeUtilities, StorageInner, Value, }, async_recursion::async_recursion, sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement}, }; +impl Glue { #[async_recursion(?Send)] pub async fn from_body( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, + &mut self, body: SetExpr, order_by: Vec, ) -> Result { match body { SetExpr::Select(query) => { - let (labels, rows) = select(storages, context, *query, order_by).await?; + let (labels, rows) = self.select_query(*query, order_by).await?; Ok((labels, rows)) } SetExpr::Values(values) => { @@ -34,7 +34,7 @@ pub async fn from_body( .into_iter() .map(|cell| { MetaRecipe::new(cell)? - .simplify_by_context(context)? + .simplify_by_context(self.get_context())? .confirm_or_err(QueryError::MissingComponentsForValues.into()) }) .collect::>>() @@ -61,8 +61,8 @@ pub async fn from_body( "set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering" ); } - let (left_labels, left) = from_body(storages, context, *left, vec![]).await?; - let (right_labels, right) = from_body(storages, context, *right, vec![]).await?; + let (left_labels, left) = self.from_body(*left, vec![]).await?; + let (right_labels, right) = self.from_body(*right, vec![]).await?; if left_labels.len() != right_labels.len() { return Err(QueryError::OperationColumnsMisaligned.into()); } @@ -85,7 +85,7 @@ pub async fn from_body( source, .. }) => { - let inserted = insert(storages, context, &table_name, &columns, &source, true).await?; + let inserted = self.insert(&table_name, &columns, &source, true).await?; if let Payload::Select { labels, rows } = inserted { Ok((labels, rows.into_iter().map(|row| row.0).collect())) } else { @@ -95,3 +95,4 @@ pub async fn from_body( _ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries } } +} diff --git a/src/glue/database.rs b/src/glue/database.rs new file mode 100644 index 00000000..43fa4ea2 --- /dev/null +++ b/src/glue/database.rs @@ -0,0 +1,5 @@ +use crate::Glue; + +impl Glue { + +} diff --git a/src/glue/mod.rs b/src/glue/mod.rs index a1083906..a353823a 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -1,5 +1,3 @@ -#![cfg(feature = "sled-storage")] - use crate::ExecuteError; use { crate::{ @@ -15,6 +13,8 @@ use { }; mod select; +mod database; +mod payload; pub(crate) type Variables = HashMap; @@ -44,28 +44,30 @@ impl Context { /// - [`Glue::select_as_string()`] -- Provides data, only for `SELECT` queries, as [String]s (rather than [Value]s). /// - [`Glue::select_as_json()`] -- Provides data, only for `SELECT` queries, as one big [String]; generally useful for webby interactions. pub struct Glue { - storages: Vec<(String, Storage)>, + databases: HashMap, context: Option, } /// ## Creation of new interfaces impl Glue { /// Creates a [Glue] instance with just one [Storage]. - pub fn new(name: String, storage: Storage) -> Self { - Self::new_multi(vec![(name, storage)]) + pub fn new(name: String, database: Storage) -> Self { + let mut databases = HashMap::new(); + databases.insert(name, database); + Self::new_multi(databases) } /// Creates a [Glue] instance with access to all provided storages. /// Argument is: [Vec]<(Identifier, [Storage])> - pub fn new_multi(storages: Vec<(String, Storage)>) -> Self { + pub fn new_multi(databases: HashMap) -> Self { let context = Some(Context::default()); - Self { storages, context } + Self { databases, context } } /// Merges existing [Glue] instances pub fn new_multi_glue(glues: Vec) -> Self { glues .into_iter() .reduce(|mut main, other| { - main.storages.extend(other.storages); + main.databases.extend(other.databases); main }) .unwrap() @@ -95,15 +97,15 @@ impl Glue { /// ``` /// pub fn extend(&mut self, glues: Vec) { - self.storages.extend( + self.databases.extend( glues .into_iter() .reduce(|mut main, other| { - main.storages.extend(other.storages); + main.databases.extend(other.databases); main }) .unwrap() - .storages, + .databases, ) } } @@ -126,7 +128,7 @@ impl Glue { impl Glue { pub fn into_connections(self) -> Vec<(String, Connection)> { - self.storages + self.databases .into_iter() .map(|(name, storage)| (name, storage.into_source())) .collect() @@ -160,7 +162,7 @@ impl Glue { }) = query { let store_name = db_name.0[0].value.clone(); - return if self.storages.iter().any(|(store, _)| store == &store_name) { + return if self.databases.iter().any(|(store, _)| store == &store_name) { if if_not_exists { Ok(Payload::Success) } else { @@ -212,35 +214,35 @@ impl Glue { .ok_or(ExecuteError::ObjectNotRecognised)?; let index = self - .storages + .databases .iter() .enumerate() .find_map(|(index, (name, _))| (name == &database_name).then(|| index)); if let Some(index) = index { - self.storages.remove(index); + self.databases.remove(index); } else if !if_exists { return Err(ExecuteError::ObjectNotRecognised.into()); } return Ok(Payload::Success); } - let mut storages: Vec<(String, Box)> = self - .storages + let mut databases: Vec<(String, Box)> = self + .databases .iter_mut() .map(|(name, storage)| (name.clone(), storage.take())) .collect(); - let give_storages: Vec<(String, &mut StorageInner)> = storages + let give_storages: Vec<(String, &mut StorageInner)> = databases .iter_mut() .map(|(name, storage)| (name.clone(), &mut **storage)) .collect(); let mut context = self.take_context(); - let result = block_on(execute(give_storages, &mut context, &query)); + let result = block_on(self.execute_query(&query)); - self.storages + self.databases .iter_mut() - .zip(storages) + .zip(databases) .for_each(|((_name, storage), (_name_2, taken))| storage.replace(taken)); self.replace_context(context); @@ -314,232 +316,3 @@ impl Glue { self.execute_parsed(query) } } - -impl Payload { - // TODO: Move - pub fn unwrap_rows(self) -> Vec { - if let Payload::Select { rows, .. } = self { - rows - } else { - panic!("Expected Select!") - } - } -} - -// TODO: Move -/* -#[cfg(test)] -mod tests { - use { - crate::{CSVStorage, Glue, Payload, Row, SledStorage, Storage, Value}, - std::convert::TryFrom, - }; - #[test] - fn eq() { - std::fs::remove_dir_all("data").unwrap(); - std::fs::create_dir("data").unwrap(); - let config = sled::Config::default() - .path("data/using_config") - .temporary(true); - - let sled = SledStorage::try_from(config).unwrap(); - let mut glue = Glue::new(String::from("sled"), Storage::new(Box::new(sled.clone()))); - assert_eq!( - glue.execute( - "CREATE TABLE api_test (id INTEGER PRIMARY KEY, name TEXT, nullable TEXT NULL, is BOOLEAN)", - ), - Ok(Payload::Create) - ); - assert_eq!( - glue.execute("INSERT INTO api_test (id, name, nullable, is) VALUES (1, 'test1', 'not null', TRUE), (2, 'test2', NULL, FALSE)"), - Ok(Payload::Insert(2)) - ); - - assert_eq!( - glue.execute("SELECT id, name, is FROM api_test"), // Not selecting NULL because NULL != NULL. TODO: Expand this test so that NULL == NULL - Ok(Payload::Select { - labels: vec![String::from("id"), String::from("name"), String::from("is")], - rows: vec![ - Row(vec![ - Value::I64(1), - Value::Str(String::from("test1")), - Value::Bool(true) - ]), - Row(vec![ - Value::I64(2), - Value::Str(String::from("test2")), - Value::Bool(false) - ]) - ] - }) - ); - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.select_as_string("SELECT * FROM api_test"), - Ok(vec![ - vec![ - String::from("id"), - String::from("name"), - String::from("nullable"), - String::from("is") - ], - vec![ - String::from("1"), - String::from("test1"), - String::from("not null"), - String::from("TRUE") - ], - vec![ - String::from("2"), - String::from("test2"), - String::from("NULL"), - String::from("FALSE") - ] - ]) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.select_as_json("SELECT * FROM api_test"), - Ok(String::from( - r#"[{"id":1,"is":true,"name":"test1","nullable":"not null"},{"id":2,"is":false,"name":"test2","nullable":null}]"# - )) - ); - - use crate::Cast; - - let test_value: Result = Value::Str(String::from("test")).cast(); - assert_eq!(test_value, Ok(String::from("test"))); - let test_value: Result = (Value::Str(String::from("test")).clone()).cast(); - assert_eq!(test_value, Ok(String::from("test"))); - let test_value: Result = Value::I64(1).cast(); - assert_eq!(test_value, Ok(String::from("1"))); - let test_value: Result = (Value::I64(1).clone()).cast(); - assert_eq!(test_value, Ok(String::from("1"))); - - assert_eq!( - glue.execute("CREATE TABLE api_insert_vec (name TEXT, rating FLOAT)"), - Ok(Payload::Create) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.insert_vec( - String::from("api_insert_vec"), - vec![String::from("name"), String::from("rating")], - vec![vec![Value::Str(String::from("test")), Value::F64(1.2)]] - ), - Ok(Payload::Insert(1)) - ); - - assert_eq!( - glue.execute("SELECT * FROM api_insert_vec"), - Ok(Payload::Select { - labels: vec![String::from("name"), String::from("rating")], - rows: vec![Row(vec![Value::Str(String::from("test")), Value::F64(1.2)])] - }) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.insert_vec( - String::from("api_insert_vec"), - vec![String::from("name"), String::from("rating")], - vec![ - vec![Value::Str(String::from("test2")), Value::F64(1.3)], - vec![Value::Str(String::from("test3")), Value::F64(1.0)], - vec![Value::Str(String::from("test4")), Value::F64(100000.94)] - ] - ), - Ok(Payload::Insert(3)) - ); - - assert_eq!( - glue.execute("SELECT * FROM api_insert_vec"), - Ok(Payload::Select { - labels: vec![String::from("name"), String::from("rating")], - rows: vec![ - Row(vec![Value::Str(String::from("test")), Value::F64(1.2)]), - Row(vec![Value::Str(String::from("test2")), Value::F64(1.3)]), - Row(vec![Value::Str(String::from("test3")), Value::F64(1.0)]), - Row(vec![ - Value::Str(String::from("test4")), - Value::F64(100000.94) - ]) - ] - }) - ); - - // Multi Glue - let csv_a = CSVStorage::new("data/using_config_a.csv").unwrap(); - let csv_b = CSVStorage::new("data/using_config_b.csv").unwrap(); - let _multi_glue_type_one = Glue::new_multi(vec![ - (String::from("sled"), Storage::new(Box::new(sled.clone()))), - (String::from("csv"), Storage::new(Box::new(csv_a))), - ]); - let mut csv_glue = Glue::new(String::from("csv"), Storage::new(Box::new(csv_b))); - - assert_eq!( - csv_glue.execute("CREATE TABLE data (name TEXT, rating TEXT)"), - Ok(Payload::Create) - ); - - assert_eq!( - csv_glue.execute( - r#" - INSERT INTO - data ( - name, - rating - ) - VALUES ( - 'test2', - '30.1' - ), ( - 'test3', - '0.1' - ) - "# - ), - Ok(Payload::Insert(2)) - ); - - let mut multi_glue = Glue::new_multi_glue(vec![glue, csv_glue]); - - assert_eq!( - multi_glue.execute( - r#" - SELECT - * - FROM - sled.api_insert_vec - INNER JOIN csv.data - ON sled.api_insert_vec.name = csv.data.name - "# - ), - Ok(Payload::Select { - labels: vec![ - String::from("api_insert_vec.name"), - String::from("api_insert_vec.rating"), - String::from("data.name"), - String::from("data.rating") - ], - rows: vec![ - Row(vec![ - Value::Str(String::from("test2")), - Value::F64(1.3), - Value::Str(String::from("test2")), - Value::Str(String::from("30.1")) - ]), - Row(vec![ - Value::Str(String::from("test3")), - Value::F64(1.0), - Value::Str(String::from("test3")), - Value::Str(String::from("0.1")) - ]) - ] - }) - ); - } -} -*/ diff --git a/src/glue/payload.rs b/src/glue/payload.rs new file mode 100644 index 00000000..9583f1e5 --- /dev/null +++ b/src/glue/payload.rs @@ -0,0 +1,11 @@ +use crate::{Row, Payload}; + +impl Payload { + pub fn unwrap_rows(self) -> Vec { + if let Payload::Select { rows, .. } = self { + rows + } else { + panic!("Expected Select!") + } + } +} From 0c708f1ff02a4075b2c6946baa9420c6557745b0 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 13:34:37 +1000 Subject: [PATCH 2/8] Make access implementations --- src/executor/execute.rs | 176 ++++++++++++------------- src/executor/query/mod.rs | 154 +++++++++++----------- src/executor/query/select/join/plan.rs | 15 +-- src/executor/query/select/mod.rs | 36 +++-- src/executor/query/select/plan/mod.rs | 8 +- src/executor/query/set_expr.rs | 158 +++++++++++----------- src/glue/database.rs | 29 +++- src/glue/error.rs | 13 ++ src/glue/mod.rs | 41 ++---- src/glue/payload.rs | 2 +- src/result.rs | 7 +- 11 files changed, 318 insertions(+), 321 deletions(-) create mode 100644 src/glue/error.rs diff --git a/src/executor/execute.rs b/src/executor/execute.rs index 95257ce4..97e00a7c 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -5,7 +5,7 @@ use { other::explain, query::query, }, - crate::{Glue, glue::Context, parse_sql::Query, Result, Row, StorageInner, Value}, + crate::{glue::Context, parse_sql::Query, Glue, Result, Row, StorageInner, Value}, serde::Serialize, sqlparser::ast::{SetVariableValue, Statement}, std::convert::TryInto, @@ -62,100 +62,96 @@ pub enum Payload { } impl Glue { -pub async fn execute_query( - &mut self, - statement: &Query, -) -> Result { - let Query(statement) = statement; + pub async fn execute_query(&mut self, statement: &Query) -> Result { + let Query(statement) = statement; - match statement { - //- Modification - //-- Tables - Statement::CreateTable { - name, - columns, - if_not_exists, - .. - } => self.create_table(name, columns, *if_not_exists) - .await - .map(|_| Payload::Create), - Statement::Drop { - object_type, - names, - if_exists, - .. - } => self.drop(object_type, names, *if_exists) - .await - .map(|_| Payload::DropTable), - #[cfg(feature = "alter-table")] - Statement::AlterTable { name, operation } => self.alter_table(name, operation) - .await - .map(|_| Payload::AlterTable), - Statement::Truncate { table_name, .. } => self.truncate(table_name) - .await - .map(|_| Payload::TruncateTable), - Statement::CreateIndex { - name, - table_name, - columns, - unique, - if_not_exists, - } => self.create_index( - table_name, - name, - columns, - *unique, - *if_not_exists, - ) - .await - .map(|_| Payload::Create), + match statement { + //- Modification + //-- Tables + Statement::CreateTable { + name, + columns, + if_not_exists, + .. + } => self + .create_table(name, columns, *if_not_exists) + .await + .map(|_| Payload::Create), + Statement::Drop { + object_type, + names, + if_exists, + .. + } => self + .drop(object_type, names, *if_exists) + .await + .map(|_| Payload::DropTable), + #[cfg(feature = "alter-table")] + Statement::AlterTable { name, operation } => self + .alter_table(name, operation) + .await + .map(|_| Payload::AlterTable), + Statement::Truncate { table_name, .. } => self + .truncate(table_name) + .await + .map(|_| Payload::TruncateTable), + Statement::CreateIndex { + name, + table_name, + columns, + unique, + if_not_exists, + } => self + .create_index(table_name, name, columns, *unique, *if_not_exists) + .await + .map(|_| Payload::Create), - //-- Rows - Statement::Insert { - table_name, - columns, - source, - .. - } => self.insert(table_name, columns, source, false).await, - Statement::Update { - table, - selection, - assignments, - // TODO - from: _, - } => self.update(table, selection, assignments).await, - Statement::Delete { - table_name, - selection, - } => self.delete(table_name, selection).await, + //-- Rows + Statement::Insert { + table_name, + columns, + source, + .. + } => self.insert(table_name, columns, source, false).await, + Statement::Update { + table, + selection, + assignments, + // TODO + from: _, + } => self.update(table, selection, assignments).await, + Statement::Delete { + table_name, + selection, + } => self.delete(table_name, selection).await, - //- Selection - Statement::Query(query_value) => { - let result = self.query(*query_value.clone()).await?; - let (labels, rows) = result; - let rows = rows.into_iter().map(Row).collect(); // I don't like this. TODO - let payload = Payload::Select { labels, rows }; - Ok(payload) - } + //- Selection + Statement::Query(query_value) => { + let result = self.query(*query_value.clone()).await?; + let (labels, rows) = result; + let rows = rows.into_iter().map(Row).collect(); // I don't like this. TODO + let payload = Payload::Select { labels, rows }; + Ok(payload) + } - //- Context - Statement::SetVariable { - variable, value, .. - } => { - let first_value = value.get(0).unwrap(); // Why might one want anything else? - let value: Value = match first_value { - SetVariableValue::Ident(..) => unimplemented!(), - SetVariableValue::Literal(literal) => literal.try_into()?, - }; - let name = variable.value.clone(); - context.set_variable(name, value); - Ok(Payload::Success) - } + //- Context + Statement::SetVariable { + variable, value, .. + } => { + let first_value = value.get(0).unwrap(); // Why might one want anything else? + let value: Value = match first_value { + SetVariableValue::Ident(..) => unimplemented!(), + SetVariableValue::Literal(literal) => literal.try_into()?, + }; + let name = variable.value.clone(); + context.set_variable(name, value); + Ok(Payload::Success) + } - Statement::ExplainTable { table_name, .. } => explain(&storages, table_name).await, + Statement::ExplainTable { table_name, .. } => explain(&storages, table_name).await, - Statement::CreateDatabase { .. } => unreachable!(), // Handled at Glue interface // TODO: Clean up somehow - _ => Err(ExecuteError::QueryNotSupported.into()), + Statement::CreateDatabase { .. } => unreachable!(), // Handled at Glue interface // TODO: Clean up somehow + _ => Err(ExecuteError::QueryNotSupported.into()), + } } } -} diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index 67de3d39..7f58f4f9 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -4,9 +4,8 @@ mod set_expr; pub use select::{join::*, ManualError, PlanError, SelectError}; use { crate::{ - Glue, - executor::types::LabelsAndRows, result::Result, Cast, Context, MetaRecipe, RecipeUtilities, - StorageInner, Value, + executor::types::LabelsAndRows, result::Result, Cast, Context, Glue, MetaRecipe, + RecipeUtilities, StorageInner, Value, }, async_recursion::async_recursion, serde::Serialize, @@ -36,87 +35,84 @@ pub enum QueryError { } impl Glue { -#[async_recursion(?Send)] -pub async fn query( - &mut self, - query: Query, -) -> Result { - let Query { - body, - order_by, - limit, - offset, - with, - // TODO (below) - fetch: _, - lock: _, - } = query; + #[async_recursion(?Send)] + pub async fn query(&mut self, query: Query) -> Result { + let Query { + body, + order_by, + limit, + offset, + with, + // TODO (below) + fetch: _, + lock: _, + } = query; - let context = self.get_mut_context(); + let context = self.get_mut_context(); - let limit: Option = limit - .map(|expression| { - MetaRecipe::new(expression)? - .simplify_by_context(context)? - .confirm_or_err(QueryError::MissingComponentsForLimit.into())? - .cast() - }) - .transpose()?; - let offset: Option = offset - .map(|offset| { - MetaRecipe::new(offset.value)? - .simplify_by_context(context)? - .confirm_or_err(QueryError::MissingComponentsForOffset.into())? - .cast() - }) - .transpose()?; + let limit: Option = limit + .map(|expression| { + MetaRecipe::new(expression)? + .simplify_by_context(context)? + .confirm_or_err(QueryError::MissingComponentsForLimit.into())? + .cast() + }) + .transpose()?; + let offset: Option = offset + .map(|offset| { + MetaRecipe::new(offset.value)? + .simplify_by_context(context)? + .confirm_or_err(QueryError::MissingComponentsForOffset.into())? + .cast() + }) + .transpose()?; - if let Some(with) = with { - let With { - recursive: _, // Recursive not currently supported - cte_tables, - } = with; - for cte in cte_tables.into_iter() { - let Cte { - alias, - query, - from: _, // What is `from` for? - } = cte; - let TableAlias { - name, - columns: _, // TODO: Columns - Check that number is same and then rename labels - } = alias; - let name = name.value; - let data = self.query(query).await?; - context.set_table(name, data); + if let Some(with) = with { + let With { + recursive: _, // Recursive not currently supported + cte_tables, + } = with; + for cte in cte_tables.into_iter() { + let Cte { + alias, + query, + from: _, // What is `from` for? + } = cte; + let TableAlias { + name, + columns: _, // TODO: Columns - Check that number is same and then rename labels + } = alias; + let name = name.value; + let data = self.query(query).await?; + context.set_table(name, data); + } } - } - let (mut labels, mut rows) = from_body(body, order_by).await?; + let (mut labels, mut rows) = from_body(body, order_by).await?; - if let Some(offset) = offset { - rows.drain(0..offset); - } - if let Some(limit) = limit { - rows.truncate(limit); - } - if ENSURE_SIZE { - let row_width = rows - .iter() - .map(|values_row| values_row.len()) - .max() - .unwrap_or(0); - if row_width > 0 { - rows = rows - .into_iter() - .map(|mut row| { - row.resize(row_width, Value::Null); - row - }) - .collect(); - labels.resize(row_width, String::new()) - }; + if let Some(offset) = offset { + rows.drain(0..offset); + } + if let Some(limit) = limit { + rows.truncate(limit); + } + if ENSURE_SIZE { + let row_width = rows + .iter() + .map(|values_row| values_row.len()) + .max() + .unwrap_or(0); + if row_width > 0 { + rows = rows + .into_iter() + .map(|mut row| { + row.resize(row_width, Value::Null); + row + }) + .collect(); + labels.resize(row_width, String::new()) + }; + } + Ok((labels, rows)) } - Ok((labels, rows)) -} } diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 2f0d97ae..b2bfd853 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -6,7 +6,7 @@ use { types::{ColumnInfo, ComplexTableName}, MetaRecipe, }, - Context, JoinError, Result, StorageInner, Glue + Context, Glue, JoinError, Result, StorageInner, }, std::cmp::Ordering, }; @@ -38,10 +38,7 @@ impl Ord for JoinPlan { } impl JoinPlan { - pub async fn new<'a>( - join_manual: JoinManual, - glue: &Glue, - ) -> Result { + pub async fn new<'a>(join_manual: JoinManual, glue: &Glue) -> Result { let JoinManual { table, constraint, @@ -88,10 +85,7 @@ impl JoinPlan { } } -async fn get_columns( - glue: &Glue, - table: ComplexTableName, -) -> Result> { +async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result> { if let Some((context_table_labels, ..)) = glue.get_context().tables.get(&table.name) { Ok(context_table_labels .iter() @@ -102,7 +96,8 @@ async fn get_columns( }) .collect::>()) } else { - let storage = glue.get_storages() + let storage = glue + .get_storages() .iter() .find_map(|(name, storage)| { if name == &table.database { diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index e92aa9ba..1857d640 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -5,13 +5,12 @@ mod plan; use { crate::{ - Glue, executor::{ types::{LabelsAndRows, Row}, PlannedRecipe, }, macros::try_option, - Context, RecipeUtilities, Result, StorageInner, Value, + Context, Glue, RecipeUtilities, Result, StorageInner, Value, }, futures::stream::{self, StreamExt, TryStreamExt}, rayon::prelude::*, @@ -45,14 +44,14 @@ pub enum SelectError { impl Glue { pub async fn select(&mut self, plan: Plan) -> Result { let Plan { - joins, - select_items, - constraint, - group_constraint, - groups, - order_by, - labels, - } = plan; + joins, + select_items, + constraint, + group_constraint, + groups, + order_by, + labels, + } = plan; let rows = stream::iter(joins) .map(Ok) .try_fold(vec![], |rows, join| async { @@ -103,10 +102,9 @@ impl Glue { let groupers = try_option!(groups .iter() .map(|group| { - group - .clone() - .simplify_by_row(&row)? - .confirm_or_err(SelectError::GrouperMayNotContainAggregate.into()) + group.clone().simplify_by_row(&row)?.confirm_or_err( + SelectError::GrouperMayNotContainAggregate.into(), + ) }) .collect::>>()); Some(Ok((groupers, group_constraint, selected_row))) @@ -140,17 +138,15 @@ impl Glue { Ok((labels, final_rows)) } pub async fn select_query( - &mut self, - query: Select, - order_by: Vec, -) -> Result { + &mut self, + query: Select, + order_by: Vec, + ) -> Result { let plan = Plan::new(self, query, order_by).await?; self.select(plan); } } - - #[allow(clippy::type_complexity)] // TODO fn accumulate( mut rows_l: Vec<(Vec, Option, Vec)>, diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index c1de5b75..105fe5f8 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -5,7 +5,7 @@ use { }, crate::{ executor::{types::ColumnInfo, PlannedRecipe}, - Context, Result, StorageInner, Glue, + Context, Glue, Result, StorageInner, }, futures::future::join_all, serde::Serialize, @@ -34,11 +34,7 @@ pub enum PlanError { } impl Plan { - pub async fn new( - glue: &Glue, - select: Select, - order_by: Vec, - ) -> Result { + pub async fn new(glue: &Glue, select: Select, order_by: Vec) -> Result { let Manual { joins, select_items, diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index 681e6306..9fd2953f 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -3,96 +3,96 @@ use { crate::{ executor::{alter_row::insert, types::LabelsAndRows}, macros::warning, - result::Result, Glue, - Context, MetaRecipe, Payload, RecipeUtilities, StorageInner, Value, + result::Result, + Context, Glue, MetaRecipe, Payload, RecipeUtilities, StorageInner, Value, }, async_recursion::async_recursion, sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement}, }; impl Glue { -#[async_recursion(?Send)] -pub async fn from_body( - &mut self, - body: SetExpr, - order_by: Vec, -) -> Result { - match body { - SetExpr::Select(query) => { - let (labels, rows) = self.select_query(*query, order_by).await?; - Ok((labels, rows)) - } - SetExpr::Values(values) => { - if !order_by.is_empty() { - warning!("VALUES does not currently support ordering"); + #[async_recursion(?Send)] + pub async fn from_body( + &mut self, + body: SetExpr, + order_by: Vec, + ) -> Result { + match body { + SetExpr::Select(query) => { + let (labels, rows) = self.select_query(*query, order_by).await?; + Ok((labels, rows)) } - let values = values.0; - values - .into_iter() - .map(|values_row| { - values_row - .into_iter() - .map(|cell| { - MetaRecipe::new(cell)? - .simplify_by_context(self.get_context())? - .confirm_or_err(QueryError::MissingComponentsForValues.into()) - }) - .collect::>>() - }) - .collect::>>>() - .map(|values| { - ( - (0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0)) - .map(|index| format!("unnamed_{}", index)) - .collect(), - values, - ) - }) - } - SetExpr::SetOperation { - op, - all, - left, - right, - } => { - use SetOperator::*; - if !order_by.is_empty() { - warning!( + SetExpr::Values(values) => { + if !order_by.is_empty() { + warning!("VALUES does not currently support ordering"); + } + let values = values.0; + values + .into_iter() + .map(|values_row| { + values_row + .into_iter() + .map(|cell| { + MetaRecipe::new(cell)? + .simplify_by_context(self.get_context())? + .confirm_or_err(QueryError::MissingComponentsForValues.into()) + }) + .collect::>>() + }) + .collect::>>>() + .map(|values| { + ( + (0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0)) + .map(|index| format!("unnamed_{}", index)) + .collect(), + values, + ) + }) + } + SetExpr::SetOperation { + op, + all, + left, + right, + } => { + use SetOperator::*; + if !order_by.is_empty() { + warning!( "set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering" ); + } + let (left_labels, left) = self.from_body(*left, vec![]).await?; + let (right_labels, right) = self.from_body(*right, vec![]).await?; + if left_labels.len() != right_labels.len() { + return Err(QueryError::OperationColumnsMisaligned.into()); + } + let mut rows = match op { + Union => [left, right].concat(), + Except => left + .into_iter() + .filter(|row| !right.contains(row)) + .collect(), + Intersect => left.into_iter().filter(|row| right.contains(row)).collect(), + }; + if !all { + rows.dedup(); + } + Ok((left_labels, rows)) } - let (left_labels, left) = self.from_body(*left, vec![]).await?; - let (right_labels, right) = self.from_body(*right, vec![]).await?; - if left_labels.len() != right_labels.len() { - return Err(QueryError::OperationColumnsMisaligned.into()); - } - let mut rows = match op { - Union => [left, right].concat(), - Except => left - .into_iter() - .filter(|row| !right.contains(row)) - .collect(), - Intersect => left.into_iter().filter(|row| right.contains(row)).collect(), - }; - if !all { - rows.dedup(); + SetExpr::Insert(Statement::Insert { + table_name, + columns, + source, + .. + }) => { + let inserted = self.insert(&table_name, &columns, &source, true).await?; + if let Payload::Select { labels, rows } = inserted { + Ok((labels, rows.into_iter().map(|row| row.0).collect())) + } else { + unreachable!(); // TODO: Handle + } } - Ok((left_labels, rows)) + _ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries } - SetExpr::Insert(Statement::Insert { - table_name, - columns, - source, - .. - }) => { - let inserted = self.insert(&table_name, &columns, &source, true).await?; - if let Payload::Select { labels, rows } = inserted { - Ok((labels, rows.into_iter().map(|row| row.0).collect())) - } else { - unreachable!(); // TODO: Handle - } - } - _ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries } } -} diff --git a/src/glue/database.rs b/src/glue/database.rs index 43fa4ea2..32ba315e 100644 --- a/src/glue/database.rs +++ b/src/glue/database.rs @@ -1,5 +1,30 @@ -use crate::Glue; +use crate::{Context, Glue, InterfaceError, Result, StorageInner}; impl Glue { - + pub fn get_database(&self, db_ref: &Option) -> Result<&StorageInner> { + let db_container = db_ref + .as_ref() + .and_then(|db_ref| self.databases.get(db_ref)) + .ok_or(InterfaceError::DatabaseNotFound)?; // TODO: None ref should give a primary + let db_ref = &*db_container.take(); + Ok(db_ref) + //Err(InterfaceError::DatabaseNotFound.into()) + } + pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut StorageInner> { + // TODO: Somehow don't dupe + let db_container = db_ref + .as_ref() + .and_then(|db_ref| self.databases.get(db_ref)) + .ok_or(InterfaceError::DatabaseNotFound)?; + let db_ref = &mut *db_container.take(); + Ok(db_ref) + } + pub fn get_context(&self) -> Result<&Context> { + self.context + .as_ref() + .ok_or(InterfaceError::ContextUnavailable.into()) + } + pub fn get_mut_context(&self) -> Result<&mut Context> { + Err(InterfaceError::ContextUnavailable.into()) + } } diff --git a/src/glue/error.rs b/src/glue/error.rs new file mode 100644 index 00000000..337c34e6 --- /dev/null +++ b/src/glue/error.rs @@ -0,0 +1,13 @@ +use { + serde::{Deserialize, Serialize}, + std::fmt::Debug, + thiserror::Error, +}; + +#[derive(Error, Serialize, Debug, PartialEq)] +pub enum InterfaceError { + #[error("database not found")] + DatabaseNotFound, + #[error("context currently unavailable")] + ContextUnavailable, +} diff --git a/src/glue/mod.rs b/src/glue/mod.rs index a353823a..315c8231 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -12,9 +12,12 @@ use { std::{collections::HashMap, fmt::Debug}, }; -mod select; mod database; +mod error; mod payload; +mod select; + +pub use error::InterfaceError; pub(crate) type Variables = HashMap; @@ -112,10 +115,10 @@ impl Glue { /// Internal: Modify impl Glue { - pub(crate) fn take_context(&mut self) -> Context { + pub(crate) fn take_context(&mut self) -> Result { self.context .take() - .expect("Unreachable: Context wasn't replaced!") + .ok_or(InterfaceError::ContextUnavailable.into()) } pub(crate) fn replace_context(&mut self, context: Context) { self.context.replace(context); @@ -213,41 +216,15 @@ impl Glue { .and_then(|name| name.0.get(0).map(|name| name.value.clone())) .ok_or(ExecuteError::ObjectNotRecognised)?; - let index = self - .databases - .iter() - .enumerate() - .find_map(|(index, (name, _))| (name == &database_name).then(|| index)); - if let Some(index) = index { - self.databases.remove(index); + if self.databases.contains_key(&database_name) { + self.databases.remove(&database_name); } else if !if_exists { return Err(ExecuteError::ObjectNotRecognised.into()); } return Ok(Payload::Success); } - let mut databases: Vec<(String, Box)> = self - .databases - .iter_mut() - .map(|(name, storage)| (name.clone(), storage.take())) - .collect(); - let give_storages: Vec<(String, &mut StorageInner)> = databases - .iter_mut() - .map(|(name, storage)| (name.clone(), &mut **storage)) - .collect(); - - let mut context = self.take_context(); - - let result = block_on(self.execute_query(&query)); - - self.databases - .iter_mut() - .zip(databases) - .for_each(|((_name, storage), (_name_2, taken))| storage.replace(taken)); - - self.replace_context(context); - - result + block_on(self.execute_query(&query)) } /// Provides a parsed query to execute later. /// Particularly useful if executing a small query many times as parsing is not (computationally) free. diff --git a/src/glue/payload.rs b/src/glue/payload.rs index 9583f1e5..e13caad0 100644 --- a/src/glue/payload.rs +++ b/src/glue/payload.rs @@ -1,4 +1,4 @@ -use crate::{Row, Payload}; +use crate::{Payload, Row}; impl Payload { pub fn unwrap_rows(self) -> Vec { diff --git a/src/result.rs b/src/result.rs index 0327f0a5..fb3a715b 100644 --- a/src/result.rs +++ b/src/result.rs @@ -2,8 +2,8 @@ use { crate::{ data::{RowError, TableError, ValueError}, executor::{ - AlterError, ExecuteError, FetchError, JoinError, ManualError, PlanError, QueryError, - RecipeError, SelectError, ValidateError, + AlterError, ExecuteError, FetchError, InterfaceError, JoinError, ManualError, + PlanError, QueryError, RecipeError, SelectError, ValidateError, }, store::StorageError, CSVStorageError, SheetStorageError, @@ -68,6 +68,8 @@ pub enum Error { CSVStorage(#[from] CSVStorageError), #[error(transparent)] SheetStorage(#[from] SheetStorageError), + #[error(transparent)] + Interface(#[from] InterfaceError), } unsafe impl Send for Error {} @@ -100,6 +102,7 @@ impl PartialEq for Error { (StorageImplementation(l), StorageImplementation(r)) => l == r, (CSVStorage(l), CSVStorage(r)) => l == r, (SheetStorage(l), SheetStorage(r)) => l == r, + (Interface(l), Interface(r)) => l == r, _ => false, } } From 11a9bf8fbc4bb95ce9d7e8ebe19800afd23b7735 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 14:38:17 +1000 Subject: [PATCH 3/8] Change interfacing --- src/executor/alter_row/auto_increment.rs | 67 +++---- src/executor/alter_row/delete.rs | 120 ++++++------ src/executor/alter_row/insert.rs | 96 +++++----- src/executor/alter_row/mod.rs | 11 +- src/executor/alter_row/update.rs | 217 ++++++++++----------- src/executor/alter_row/validate_unique.rs | 220 +++++++++++----------- src/executor/alter_table/alter_table.rs | 73 +++---- src/executor/alter_table/create_index.rs | 119 ++++++------ src/executor/alter_table/create_table.rs | 39 ++-- src/executor/alter_table/drop.rs | 81 ++++---- src/executor/alter_table/mod.rs | 10 +- src/executor/alter_table/truncate.rs | 53 +++--- src/executor/execute.rs | 13 +- src/executor/mod.rs | 2 +- src/executor/other/explain.rs | 165 ++++++++-------- src/executor/query/mod.rs | 5 +- src/executor/query/select/join/execute.rs | 33 +--- src/executor/query/select/join/plan.rs | 17 +- src/executor/query/select/mod.rs | 2 +- src/executor/query/select/plan/mod.rs | 2 +- src/executor/query/set_expr.rs | 10 +- src/glue/database.rs | 9 +- src/glue/mod.rs | 4 +- src/result.rs | 10 +- 24 files changed, 681 insertions(+), 697 deletions(-) diff --git a/src/executor/alter_row/auto_increment.rs b/src/executor/alter_row/auto_increment.rs index a3765da1..ca7fd92f 100644 --- a/src/executor/alter_row/auto_increment.rs +++ b/src/executor/alter_row/auto_increment.rs @@ -1,40 +1,43 @@ #![cfg(feature = "auto-increment")] -use crate::{Column, Result, Row, StorageInner, Value, ValueDefault}; +use crate::{Column, Glue, Result, Row, StorageInner, Value, ValueDefault}; -pub async fn auto_increment( - storage: &mut StorageInner, - table_name: &str, - columns: &[Column], - rows: &mut [Row], -) -> Result<()> { - let auto_increment_columns = columns - .iter() - .enumerate() - .filter(|(_, column)| matches!(column.default, Some(ValueDefault::AutoIncrement(_)))) - .map(|(index, column)| { - ( - index, - column.name.clone(), - rows.iter() - .filter(|row| matches!(row.0.get(index), Some(Value::Null))) - .count() as i64, - ) - }) - .collect(); +impl Glue { + pub async fn auto_increment( + &mut self, + table_name: &str, + columns: &[Column], + rows: &mut [Row], + ) -> Result<()> { + let auto_increment_columns = columns + .iter() + .enumerate() + .filter(|(_, column)| matches!(column.default, Some(ValueDefault::AutoIncrement(_)))) + .map(|(index, column)| { + ( + index, + column.name.clone(), + rows.iter() + .filter(|row| matches!(row.0.get(index), Some(Value::Null))) + .count() as i64, + ) + }) + .collect(); - let column_values = storage - .generate_increment_values(table_name.to_string(), auto_increment_columns) - .await?; + let column_values = self + .get_mut_database(&None)? + .generate_increment_values(table_name.to_string(), auto_increment_columns) + .await?; - let mut column_values = column_values; - for row in rows.iter_mut() { - for ((index, _name), value) in &mut column_values { - let cell = row.0.get_mut(*index).unwrap(); - if matches!(cell, Value::Null) { - *cell = Value::I64(*value); - *value += 1; + let mut column_values = column_values; + for row in rows.iter_mut() { + for ((index, _name), value) in &mut column_values { + let cell = row.0.get_mut(*index).unwrap(); + if matches!(cell, Value::Null) { + *cell = Value::I64(*value); + *value += 1; + } } } + Ok(()) } - Ok(()) } diff --git a/src/executor/alter_row/delete.rs b/src/executor/alter_row/delete.rs index 01ca7243..61f8c889 100644 --- a/src/executor/alter_row/delete.rs +++ b/src/executor/alter_row/delete.rs @@ -2,75 +2,77 @@ use { crate::{ data::{get_name, Schema}, executor::types::ColumnInfo, - Column, Context, ExecuteError, MetaRecipe, Payload, PlannedRecipe, Result, StorageInner, - Value, + Column, Context, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, Result, Value, }, sqlparser::ast::{Expr, ObjectName}, }; -pub async fn delete( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - table_name: &ObjectName, - selection: &Option, -) -> Result { - let table_name = get_name(table_name)?; - let Schema { - column_defs, - indexes, - .. - } = storages[0] - .1 - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; +impl Glue { + pub async fn delete( + &mut self, + table_name: &ObjectName, + selection: &Option, + ) -> Result { + let table_name = get_name(table_name)?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_mut_database(&None)? + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; - let columns = column_defs - .clone() - .into_iter() - .map(|Column { name, .. }| ColumnInfo::of_name(name)) - .collect::>(); - let filter = selection - .clone() - .map(|selection| { - PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(context)?, - &columns, - ) - }) - .unwrap_or(Ok(PlannedRecipe::TRUE))?; + let columns = column_defs + .clone() + .into_iter() + .map(|Column { name, .. }| ColumnInfo::of_name(name)) + .collect::>(); + let filter = selection + .clone() + .map(|selection| { + PlannedRecipe::new( + MetaRecipe::new(selection)?.simplify_by_context(self.get_context()?)?, + &columns, + ) + }) + .unwrap_or(Ok(PlannedRecipe::TRUE))?; - let keys = storages[0] - .1 - .scan_data(table_name) - .await? - .filter_map(|row_result| { - let (key, row) = match row_result { - Ok(keyed_row) => keyed_row, - Err(error) => return Some(Err(error)), - }; + let keys = self + .get_mut_database(&None)? + .scan_data(table_name) + .await? + .filter_map(|row_result| { + let (key, row) = match row_result { + Ok(keyed_row) => keyed_row, + Err(error) => return Some(Err(error)), + }; - let row = row.0; + let row = row.0; - let confirm_constraint = filter.confirm_constraint(&row); - match confirm_constraint { - Ok(true) => Some(Ok(key)), - Ok(false) => None, - Err(error) => Some(Err(error)), - } - }) - .collect::>>()?; + let confirm_constraint = filter.confirm_constraint(&row); + match confirm_constraint { + Ok(true) => Some(Ok(key)), + Ok(false) => None, + Err(error) => Some(Err(error)), + } + }) + .collect::>>()?; - let num_keys = keys.len(); + let num_keys = keys.len(); - let result = storages[0] - .1 - .delete_data(table_name, keys) - .await - .map(|_| Payload::Delete(num_keys))?; + let result = self + .get_mut_database(&None)? + .delete_data(table_name, keys) + .await + .map(|_| Payload::Delete(num_keys))?; - for index in indexes.iter() { - index.reset(storages[0].1, table_name, &column_defs).await?; // TODO: Not this; optimise + for index in indexes.iter() { + index + .reset(self.get_mut_database(&None)?, table_name, &column_defs) + .await?; // TODO: Not this; optimise + } + Ok(result) } - Ok(result) } diff --git a/src/executor/alter_row/insert.rs b/src/executor/alter_row/insert.rs index acbf8669..16da429b 100644 --- a/src/executor/alter_row/insert.rs +++ b/src/executor/alter_row/insert.rs @@ -2,57 +2,61 @@ use { super::{auto_increment, columns_to_positions, validate, validate_unique}, crate::{ data::{get_name, Schema}, - executor::query::query, - Context, ExecuteError, Payload, Result, Row, StorageInner, + Context, ExecuteError, Glue, Payload, Result, Row, }, sqlparser::ast::{Ident, ObjectName, Query}, }; -pub async fn insert( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - table_name: &ObjectName, - columns: &[Ident], - source: &Query, - expect_data: bool, -) -> Result { - let table_name = get_name(table_name)?; - let Schema { - column_defs, - indexes, - .. - } = storages[0] - .1 - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; - - // TODO: Multi storage - let (labels, mut rows) = query(storages, context, source.clone()).await?; - let column_positions = columns_to_positions(&column_defs, columns)?; - - validate(&column_defs, &column_positions, &mut rows)?; - let mut rows: Vec = rows.into_iter().map(Row).collect(); - #[cfg(feature = "auto-increment")] - auto_increment(storages[0].1, table_name, &column_defs, &mut rows).await?; - validate_unique(storages[0].1, table_name, &column_defs, &rows, None).await?; - - let num_rows = rows.len(); - - let result = storages[0].1.insert_data(table_name, rows.clone()).await; - - let result = result.map(|_| { - if expect_data { - Payload::Select { labels, rows } - } else { - Payload::Insert(num_rows) +impl Glue { + pub async fn insert( + &mut self, + table_name: &ObjectName, + columns: &[Ident], + source: &Query, + expect_data: bool, + ) -> Result { + let table_name = get_name(table_name)?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_database(&None)? + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; + + // TODO: Multi storage + let (labels, mut rows) = self.query(source.clone()).await?; + let column_positions = columns_to_positions(&column_defs, columns)?; + + validate(&column_defs, &column_positions, &mut rows)?; + let mut rows: Vec = rows.into_iter().map(Row).collect(); + #[cfg(feature = "auto-increment")] + self.auto_increment(table_name, &column_defs, &mut rows) + .await?; + self.validate_unique(table_name, &column_defs, &rows, None) + .await?; + + let num_rows = rows.len(); + + let database = self.get_mut_database(&None)?; + + let result = database.insert_data(table_name, rows.clone()).await; + + let result = result.map(|_| { + if expect_data { + Payload::Select { labels, rows } + } else { + Payload::Insert(num_rows) + } + })?; + + for index in indexes.iter() { + // TODO: Should definitely be just inserting an index record + index.reset(database, table_name, &column_defs).await?; // TODO: Not this; optimise } - })?; - for index in indexes.iter() { - // TODO: Should definitely be just inserting an index record - index.reset(storages[0].1, table_name, &column_defs).await?; // TODO: Not this; optimise + Ok(result) } - - Ok(result) } diff --git a/src/executor/alter_row/mod.rs b/src/executor/alter_row/mod.rs index df060e39..4634d026 100644 --- a/src/executor/alter_row/mod.rs +++ b/src/executor/alter_row/mod.rs @@ -5,13 +5,4 @@ mod update; mod validate; mod validate_unique; -pub use { - delete::delete, - insert::insert, - update::update, - validate::{columns_to_positions, validate, ValidateError}, - validate_unique::validate_unique, -}; - -#[cfg(feature = "auto-increment")] -pub use auto_increment::auto_increment; +pub use validate::{columns_to_positions, validate, ValidateError}; diff --git a/src/executor/alter_row/update.rs b/src/executor/alter_row/update.rs index 8d14a1b5..0ea35c78 100644 --- a/src/executor/alter_row/update.rs +++ b/src/executor/alter_row/update.rs @@ -3,123 +3,130 @@ use { crate::{ data::{get_name, Schema}, executor::types::{ColumnInfo, Row as VecRow}, - Column, Context, ExecuteError, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, Result, - Row, StorageInner, Value, + Column, Context, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, + Result, Row, Value, }, sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}, }; -pub async fn update( - storage: &mut StorageInner, - context: &Context, - table: &TableWithJoins, - selection: &Option, - assignments: &[Assignment], -) -> Result { - // TODO: Complex updates (joins) - let table = match &table.relation { - TableFactor::Table { name, .. } => get_name(name).cloned(), - _ => Err(ExecuteError::QueryNotSupported.into()), - }?; - let Schema { - column_defs, - indexes, - .. - } = storage - .fetch_schema(&table) - .await? - .ok_or(ExecuteError::TableNotExists)?; +impl Glue { + pub async fn update( + &mut self, + table: &TableWithJoins, + selection: &Option, + assignments: &[Assignment], + ) -> Result { + // TODO: Complex updates (joins) + let table = match &table.relation { + TableFactor::Table { name, .. } => get_name(name).cloned(), + _ => Err(ExecuteError::QueryNotSupported.into()), + }?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_database(&None)? + .fetch_schema(&table) + .await? + .ok_or(ExecuteError::TableNotExists)?; - let columns = column_defs - .clone() - .into_iter() - .map(|Column { name, .. }| ColumnInfo::of_name(name)) - .collect::>(); + let columns = column_defs + .clone() + .into_iter() + .map(|Column { name, .. }| ColumnInfo::of_name(name)) + .collect::>(); - let filter = selection - .clone() - .map(|selection| { - PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(context)?, - &columns, - ) - }) - .unwrap_or(Ok(PlannedRecipe::TRUE))?; + let filter = selection + .clone() + .map(|selection| { + PlannedRecipe::new( + MetaRecipe::new(selection)?.simplify_by_context(self.get_context()?)?, + &columns, + ) + }) + .unwrap_or(Ok(PlannedRecipe::TRUE))?; - let assignments = assignments - .iter() - .map(|assignment| { - let Assignment { id, value } = assignment; - let column_compare = id - .clone() - .into_iter() - .map(|component| component.value) - .collect(); - let index = columns - .iter() - .position(|column| column == &column_compare) - .ok_or(ExecuteError::ColumnNotFound)?; - let recipe = PlannedRecipe::new( - MetaRecipe::new(value.clone())?.simplify_by_context(context)?, - &columns, - )?; - Ok((index, recipe)) - }) - .collect::>>()?; + let assignments = assignments + .iter() + .map(|assignment| { + let Assignment { id, value } = assignment; + let column_compare = id + .clone() + .into_iter() + .map(|component| component.value) + .collect(); + let index = columns + .iter() + .position(|column| column == &column_compare) + .ok_or(ExecuteError::ColumnNotFound)?; + let recipe = PlannedRecipe::new( + MetaRecipe::new(value.clone())?.simplify_by_context(self.get_context()?)?, + &columns, + )?; + Ok((index, recipe)) + }) + .collect::>>()?; - let keyed_rows = storage - .scan_data(&table) - .await? - .into_iter() - .filter_map(|row_result| { - let (key, row) = match row_result { - Ok(keyed_row) => keyed_row, - Err(error) => return Some(Err(error)), - }; + let keyed_rows = self + .get_database(&None)? + .scan_data(&table) + .await? + .into_iter() + .filter_map(|row_result| { + let (key, row) = match row_result { + Ok(keyed_row) => keyed_row, + Err(error) => return Some(Err(error)), + }; - let row = row.0; + let row = row.0; - let confirm_constraint = filter.confirm_constraint(&row); - if let Ok(false) = confirm_constraint { - return None; - } else if let Err(error) = confirm_constraint { - return Some(Err(error)); - } - let row = row - .iter() - .enumerate() - .map(|(index, old_value)| { - assignments - .iter() - .find(|(assignment_index, _)| assignment_index == &index) - .map(|(_, assignment_recipe)| { - assignment_recipe.clone().simplify_by_row(&row)?.confirm() - }) - .unwrap_or_else(|| Ok(old_value.clone())) - }) - .collect::>(); - Some(row.map(|row| (key, row))) - }) - .collect::>>()?; + let confirm_constraint = filter.confirm_constraint(&row); + if let Ok(false) = confirm_constraint { + return None; + } else if let Err(error) = confirm_constraint { + return Some(Err(error)); + } + let row = row + .iter() + .enumerate() + .map(|(index, old_value)| { + assignments + .iter() + .find(|(assignment_index, _)| assignment_index == &index) + .map(|(_, assignment_recipe)| { + assignment_recipe.clone().simplify_by_row(&row)?.confirm() + }) + .unwrap_or_else(|| Ok(old_value.clone())) + }) + .collect::>(); + Some(row.map(|row| (key, row))) + }) + .collect::>>()?; - let column_positions = columns_to_positions(&column_defs, &[])?; - let (keys, mut rows): (Vec, Vec) = keyed_rows.into_iter().unzip(); - validate(&column_defs, &column_positions, &mut rows)?; + let column_positions = columns_to_positions(&column_defs, &[])?; + let (keys, mut rows): (Vec, Vec) = keyed_rows.into_iter().unzip(); + validate(&column_defs, &column_positions, &mut rows)?; - let table = table.as_str(); - let mut rows: Vec = rows.into_iter().map(Row).collect(); - #[cfg(feature = "auto-increment")] - auto_increment(&mut *storage, table, &column_defs, &mut rows).await?; - validate_unique(&*storage, table, &column_defs, &rows, Some(&keys)).await?; - let keyed_rows: Vec<(Value, Row)> = keys.into_iter().zip(rows).collect(); - let num_rows = keyed_rows.len(); - let result = storage - .update_data(table, keyed_rows) - .await - .map(|_| Payload::Update(num_rows))?; + let table = table.as_str(); + let mut rows: Vec = rows.into_iter().map(Row).collect(); + #[cfg(feature = "auto-increment")] + self.auto_increment(table, &column_defs, &mut rows).await?; + self.validate_unique(table, &column_defs, &rows, Some(&keys)) + .await?; + let keyed_rows: Vec<(Value, Row)> = keys.into_iter().zip(rows).collect(); + let num_rows = keyed_rows.len(); + let result = self + .get_mut_database(&None)? + .update_data(table, keyed_rows) + .await + .map(|_| Payload::Update(num_rows))?; - for index in indexes.iter() { - index.reset(storage, table, &column_defs).await?; // TODO: Not this; optimise + for index in indexes.iter() { + index + .reset(self.get_mut_database(&None)?, table, &column_defs) + .await?; // TODO: Not this; optimise + } + Ok(result) } - Ok(result) } diff --git a/src/executor/alter_row/validate_unique.rs b/src/executor/alter_row/validate_unique.rs index 4988ba06..dd9568b8 100644 --- a/src/executor/alter_row/validate_unique.rs +++ b/src/executor/alter_row/validate_unique.rs @@ -1,5 +1,5 @@ use { - crate::{Column, NullOrd, Result, Row, StorageInner, ValidateError, Value}, + crate::{Column, Glue, NullOrd, Result, Row, ValidateError, Value}, std::cmp::Ordering, }; @@ -20,137 +20,143 @@ macro_rules! some_or { }; } -pub async fn validate_unique( - storage: &StorageInner, - table_name: &str, - column_defs: &[Column], - rows: &[Row], - ignore_keys: Option<&[Value]>, -) -> Result<()> { - let unique_columns: Vec = column_defs - .iter() - .enumerate() - .filter_map(|(index, column_def)| { - if column_def.is_unique { - Some(index) - } else { - None - } - }) - .collect(); - let mut existing_values: Vec> = vec![vec![]; unique_columns.len()]; +impl Glue { + pub(crate) async fn validate_unique( + &self, + table_name: &str, + column_defs: &[Column], + rows: &[Row], + ignore_keys: Option<&[Value]>, + ) -> Result<()> { + let unique_columns: Vec = column_defs + .iter() + .enumerate() + .filter_map(|(index, column_def)| { + if column_def.is_unique { + Some(index) + } else { + None + } + }) + .collect(); + let mut existing_values: Vec> = vec![vec![]; unique_columns.len()]; - storage - .scan_data(table_name) - .await? - .try_for_each::<_, Result<_>>(|result| { - let (key, row) = result?; - if let Some(ignore_keys) = ignore_keys { - if ignore_keys.iter().any(|ignore_key| ignore_key == &key) { - return Ok(()); + self.get_database(&None)? + .scan_data(table_name) + .await? + .try_for_each::<_, Result<_>>(|result| { + let (key, row) = result?; + if let Some(ignore_keys) = ignore_keys { + if ignore_keys.iter().any(|ignore_key| ignore_key == &key) { + return Ok(()); + } } - } - let row = row.0; + let row = row.0; + unique_columns + .iter() + .enumerate() + .map(|(index, row_index)| { + existing_values + .get_mut(index)? + .push(row.get(*row_index)?.clone()); + Some(()) + }) + .collect::>() + .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) + })?; + + let mut new_values: Vec> = vec![vec![]; unique_columns.len()]; + rows.iter().try_for_each::<_, Result<_>>(|row| { unique_columns .iter() .enumerate() .map(|(index, row_index)| { - existing_values + new_values .get_mut(index)? - .push(row.get(*row_index)?.clone()); + .push(row.0.get(*row_index)?.clone()); Some(()) }) .collect::>() .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) })?; + let mut existing_values_iter = existing_values.into_iter(); + new_values + .into_iter() + .map(|mut new_values| { + let mut existing_values = existing_values_iter.next()?; - let mut new_values: Vec> = vec![vec![]; unique_columns.len()]; - rows.iter().try_for_each::<_, Result<_>>(|row| { - unique_columns - .iter() - .enumerate() - .map(|(index, row_index)| { - new_values - .get_mut(index)? - .push(row.0.get(*row_index)?.clone()); - Some(()) - }) - .collect::>() - .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) - })?; - let mut existing_values_iter = existing_values.into_iter(); - new_values - .into_iter() - .map(|mut new_values| { - let mut existing_values = existing_values_iter.next()?; - - existing_values.sort_unstable_by(|value_l, value_r| { - value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) - }); - new_values.sort_unstable_by(|value_l, value_r| { - value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) - }); - - let mut existing_values = existing_values.into_iter(); - let mut new_values = new_values.into_iter(); + existing_values.sort_unstable_by(|value_l, value_r| { + value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) + }); + new_values.sort_unstable_by(|value_l, value_r| { + value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) + }); - let mut new_value = some_or_continue!(new_values.next()); - let mut existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); - } - new_value = new_new; - } - }); + let mut existing_values = existing_values.into_iter(); + let mut new_values = new_values.into_iter(); - loop { - match existing_value.null_cmp(&new_value) { - Some(Ordering::Equal) => { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())) - } - Some(Ordering::Greater) => { + let mut new_value = some_or_continue!(new_values.next()); + let mut existing_value = some_or!(existing_values.next(), { + loop { let new_new = some_or_continue!(new_values.next()); if new_new == new_value { return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); } new_value = new_new; } - Some(Ordering::Less) => { - existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err( - ValidateError::DuplicateEntryOnUniqueField.into() - )); - } - new_value = new_new; + }); + + loop { + match existing_value.null_cmp(&new_value) { + Some(Ordering::Equal) => { + return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())) + } + Some(Ordering::Greater) => { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some( + Err(ValidateError::DuplicateEntryOnUniqueField.into()), + ); } - }); - } - None => { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); + new_value = new_new; } - new_value = new_new; - existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err( - ValidateError::DuplicateEntryOnUniqueField.into() - )); + Some(Ordering::Less) => { + existing_value = some_or!(existing_values.next(), { + loop { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some(Err( + ValidateError::DuplicateEntryOnUniqueField.into(), + )); + } + new_value = new_new; } - new_value = new_new; + }); + } + None => { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some( + Err(ValidateError::DuplicateEntryOnUniqueField.into()), + ); } - }); + new_value = new_new; + existing_value = some_or!(existing_values.next(), { + loop { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some(Err( + ValidateError::DuplicateEntryOnUniqueField.into(), + )); + } + new_value = new_new; + } + }); + } } } - } - }) - .collect::>>() - .ok_or(ValidateError::UnreachableUniqueValues)? + }) + .collect::>>() + .ok_or(ValidateError::UnreachableUniqueValues)? + } } diff --git a/src/executor/alter_table/alter_table.rs b/src/executor/alter_table/alter_table.rs index d3216b5c..653529de 100644 --- a/src/executor/alter_table/alter_table.rs +++ b/src/executor/alter_table/alter_table.rs @@ -1,46 +1,49 @@ use { super::{validate, AlterError}, - crate::{data::get_name, Error, Result, StorageInner}, + crate::{data::get_name, Error, Glue, Result}, sqlparser::ast::{AlterTableOperation, ObjectName}, }; -pub async fn alter_table( - storage: &mut StorageInner, - name: &ObjectName, - operation: &AlterTableOperation, -) -> Result<()> { - let table_name = get_name(name).map_err(Error::from)?; +impl Glue { + pub async fn alter_table( + &mut self, + name: &ObjectName, + operation: &AlterTableOperation, + ) -> Result<()> { + let table_name = get_name(name).map_err(Error::from)?; - match operation { - AlterTableOperation::RenameTable { - table_name: new_table_name, - } => { - let new_table_name = get_name(new_table_name).map_err(Error::from)?; + let database = self.get_mut_database(&None)?; + match operation { + AlterTableOperation::RenameTable { + table_name: new_table_name, + } => { + let new_table_name = get_name(new_table_name).map_err(Error::from)?; - storage.rename_schema(table_name, new_table_name).await - } - AlterTableOperation::RenameColumn { - old_column_name, - new_column_name, - } => { - storage - .rename_column(table_name, &old_column_name.value, &new_column_name.value) - .await - } - AlterTableOperation::AddColumn { column_def } => { - validate(column_def).map_err(Error::from)?; + database.rename_schema(table_name, new_table_name).await + } + AlterTableOperation::RenameColumn { + old_column_name, + new_column_name, + } => { + database + .rename_column(table_name, &old_column_name.value, &new_column_name.value) + .await + } + AlterTableOperation::AddColumn { column_def } => { + validate(column_def).map_err(Error::from)?; - storage.add_column(table_name, &column_def.into()).await - } - AlterTableOperation::DropColumn { - column_name, - if_exists, - .. - } => { - storage - .drop_column(table_name, &column_name.value, *if_exists) - .await + database.add_column(table_name, &column_def.into()).await + } + AlterTableOperation::DropColumn { + column_name, + if_exists, + .. + } => { + database + .drop_column(table_name, &column_name.value, *if_exists) + .await + } + _ => Err(AlterError::UnsupportedAlterTableOperation(operation.to_string()).into()), } - _ => Err(AlterError::UnsupportedAlterTableOperation(operation.to_string()).into()), } } diff --git a/src/executor/alter_table/create_index.rs b/src/executor/alter_table/create_index.rs index a0b2477d..5fb77ee7 100644 --- a/src/executor/alter_table/create_index.rs +++ b/src/executor/alter_table/create_index.rs @@ -1,69 +1,72 @@ use { - crate::{data::get_name, AlterError, ExecuteError, Index, Result, StorageInner}, + crate::{data::get_name, AlterError, ExecuteError, Glue, Index, Result}, sqlparser::ast::{Expr, ObjectName, OrderByExpr}, }; -pub async fn create_index( - storage: &mut StorageInner, - table: &ObjectName, - name: &ObjectName, - columns: &[OrderByExpr], - unique: bool, - if_not_exists: bool, -) -> Result<()> { - let name = name - .0 - .last() - .ok_or(ExecuteError::QueryNotSupported)? - .value - .clone(); +impl Glue { + pub async fn create_index( + &mut self, + table: &ObjectName, + name: &ObjectName, + columns: &[OrderByExpr], + unique: bool, + if_not_exists: bool, + ) -> Result<()> { + let name = name + .0 + .last() + .ok_or(ExecuteError::QueryNotSupported)? + .value + .clone(); - let table_name = get_name(table)?; + let table_name = get_name(table)?; - let schema = storage - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; + let database = self.get_mut_database(&None)?; + let schema = database + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; - if schema.indexes.iter().any(|index| index.name == name) { - if !if_not_exists { - Err(AlterError::AlreadyExists(name).into()) + if schema.indexes.iter().any(|index| index.name == name) { + if !if_not_exists { + Err(AlterError::AlreadyExists(name).into()) + } else { + Ok(()) + } } else { - Ok(()) - } - } else { - let mut columns = columns.iter(); - let column = columns.next().and_then(|column| match column.expr.clone() { - Expr::Identifier(ident) => Some(ident.value), - _ => None, - }); - if columns.next().is_some() { - Err(AlterError::UnsupportedNumberOfIndexColumns(name).into()) - } else if column - .as_ref() - .and_then(|column| { - schema - .column_defs - .iter() - .find(|column_def| &column_def.name == column) - }) - .is_none() - { - Err(AlterError::ColumnNotFound( - table_name.clone(), - column.unwrap_or_else(|| String::from("NILL")), - ) - .into()) - } else if let Some(column) = column { - let mut schema = schema.clone(); - let index = Index::new(name, column, unique); - index - .reset(storage, table_name, &schema.column_defs) - .await?; - schema.indexes.push(index); - storage.replace_schema(table_name, schema).await - } else { - unreachable!() + let mut columns = columns.iter(); + let column = columns.next().and_then(|column| match column.expr.clone() { + Expr::Identifier(ident) => Some(ident.value), + _ => None, + }); + if columns.next().is_some() { + Err(AlterError::UnsupportedNumberOfIndexColumns(name).into()) + } else if column + .as_ref() + .and_then(|column| { + schema + .column_defs + .iter() + .find(|column_def| &column_def.name == column) + }) + .is_none() + { + Err(AlterError::ColumnNotFound( + table_name.clone(), + column.unwrap_or_else(|| String::from("NILL")), + ) + .into()) + } else if let Some(column) = column { + let mut schema = schema.clone(); + let index = Index::new(name, column, unique); + index + .reset(database, table_name, &schema.column_defs) + .await?; + schema.indexes.push(index); + database.replace_schema(table_name, schema).await + } else { + unreachable!() + } } } } diff --git a/src/executor/alter_table/create_table.rs b/src/executor/alter_table/create_table.rs index 2bcae047..d1e11e3d 100644 --- a/src/executor/alter_table/create_table.rs +++ b/src/executor/alter_table/create_table.rs @@ -2,30 +2,33 @@ use { super::AlterError, crate::{ data::{get_name, Schema}, - Column, Result, StorageInner, + Column, Glue, Result, }, sqlparser::ast::{ColumnDef, ObjectName}, }; -pub async fn create_table( - storage: &mut StorageInner, - name: &ObjectName, - column_defs: &[ColumnDef], - if_not_exists: bool, -) -> Result<()> { - let schema = Schema { - table_name: get_name(name)?.to_string(), - column_defs: column_defs.iter().cloned().map(Column::from).collect(), - indexes: vec![], - }; +impl Glue { + pub async fn create_table( + &mut self, + name: &ObjectName, + column_defs: &[ColumnDef], + if_not_exists: bool, + ) -> Result<()> { + let schema = Schema { + table_name: get_name(name)?.to_string(), + column_defs: column_defs.iter().cloned().map(Column::from).collect(), + indexes: vec![], + }; - if storage.fetch_schema(&schema.table_name).await?.is_some() { - if !if_not_exists { - Err(AlterError::TableAlreadyExists(schema.table_name.to_owned()).into()) + let database = self.get_mut_database(&None)?; + if database.fetch_schema(&schema.table_name).await?.is_some() { + if !if_not_exists { + Err(AlterError::TableAlreadyExists(schema.table_name.to_owned()).into()) + } else { + Ok(()) + } } else { - Ok(()) + database.insert_schema(&schema).await } - } else { - storage.insert_schema(&schema).await } } diff --git a/src/executor/alter_table/drop.rs b/src/executor/alter_table/drop.rs index 2aba9501..faa52e74 100644 --- a/src/executor/alter_table/drop.rs +++ b/src/executor/alter_table/drop.rs @@ -1,51 +1,54 @@ use { super::AlterError, - crate::{data::get_name, Result, StorageInner, ValueDefault}, + crate::{data::get_name, Glue, Result, StorageInner, ValueDefault}, futures::stream::{self, TryStreamExt}, sqlparser::ast::{ObjectName, ObjectType}, }; -pub async fn drop( - storage: &mut StorageInner, - object_type: &ObjectType, - names: &[ObjectName], - if_exists: bool, -) -> Result<()> { - if object_type != &ObjectType::Table { - return Err(AlterError::DropTypeNotSupported(object_type.to_string()).into()); - } +impl Glue { + pub async fn drop( + &mut self, + object_type: &ObjectType, + names: &[ObjectName], + if_exists: bool, + ) -> Result<()> { + let database = self.get_mut_database(&None)?; + if object_type != &ObjectType::Table { + return Err(AlterError::DropTypeNotSupported(object_type.to_string()).into()); + } - stream::iter(names.iter().map(Ok)) - .try_fold(storage, |storage, table_name| async move { - let table_name = get_name(table_name)?; - let schema = storage.fetch_schema(table_name).await?; + stream::iter(names.iter().map(Ok)) + .try_fold(database, |database, table_name| async move { + let table_name = get_name(table_name)?; + let schema = database.fetch_schema(table_name).await?; - if schema.is_none() { - if !if_exists { - return Err(AlterError::TableNotFound(table_name.to_owned()).into()); - } else { - return Ok(storage); + if schema.is_none() { + if !if_exists { + return Err(AlterError::TableNotFound(table_name.to_owned()).into()); + } else { + return Ok(database); + } } - } - #[cfg(feature = "auto-increment")] - let result: Result<&mut StorageInner> = - stream::iter(schema.unwrap().column_defs.into_iter().map(Ok)) - .try_fold(storage, |storage, column| async move { - if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { - storage - .set_increment_value(table_name, &column.name, 1_i64) - .await?; - } - Ok(storage) - }) - .await; + #[cfg(feature = "auto-increment")] + let result: Result<&mut StorageInner> = + stream::iter(schema.unwrap().column_defs.into_iter().map(Ok)) + .try_fold(database, |database, column| async move { + if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { + database + .set_increment_value(table_name, &column.name, 1_i64) + .await?; + } + Ok(database) + }) + .await; - #[cfg(feature = "auto-increment")] - let storage = result?; + #[cfg(feature = "auto-increment")] + let database = result?; - storage.delete_schema(table_name).await?; - Ok(storage) - }) - .await - .map(|_| ()) + database.delete_schema(table_name).await?; + Ok(database) + }) + .await + .map(|_| ()) + } } diff --git a/src/executor/alter_table/mod.rs b/src/executor/alter_table/mod.rs index 534f1fcd..7f7d9fc3 100644 --- a/src/executor/alter_table/mod.rs +++ b/src/executor/alter_table/mod.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "alter-table")] -#[allow(clippy::module_inception)] // TODO mod alter_table; mod create_index; mod create_table; @@ -7,11 +5,5 @@ mod drop; mod error; mod truncate; mod validate; +pub use error::AlterError; use validate::validate; - -#[cfg(feature = "alter-table")] -pub use alter_table::alter_table; -pub use { - create_index::create_index, create_table::create_table, drop::drop, error::AlterError, - truncate::truncate, -}; diff --git a/src/executor/alter_table/truncate.rs b/src/executor/alter_table/truncate.rs index e328b1ec..d4bf5a63 100644 --- a/src/executor/alter_table/truncate.rs +++ b/src/executor/alter_table/truncate.rs @@ -1,35 +1,38 @@ use { - crate::{data::get_name, AlterError, Result, StorageInner, ValueDefault}, + crate::{data::get_name, AlterError, Glue, Result, StorageInner, ValueDefault}, futures::stream::{self, TryStreamExt}, sqlparser::ast::ObjectName, }; -pub async fn truncate(storage: &mut StorageInner, table_name: &ObjectName) -> Result<()> { - let table_name = get_name(table_name)?; - let schema = storage.fetch_schema(table_name).await?; +impl Glue { + pub async fn truncate(&mut self, table_name: &ObjectName) -> Result<()> { + let database = self.get_mut_database(&None)?; + let table_name = get_name(table_name)?; + let schema = database.fetch_schema(table_name).await?; - if let Some(schema) = schema { - // TODO: We should be deleting the entry - #[cfg(feature = "auto-increment")] - let result: Result<&mut StorageInner> = stream::iter(schema.column_defs.iter().map(Ok)) - .try_fold(storage, |storage, column| async move { - if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { - storage - .set_increment_value(table_name, &column.name, 1_i64) - .await?; - } - Ok(storage) - }) - .await; + if let Some(schema) = schema { + // TODO: We should be deleting the entry + #[cfg(feature = "auto-increment")] + let result: Result<&mut StorageInner> = stream::iter(schema.column_defs.iter().map(Ok)) + .try_fold(database, |database, column| async move { + if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { + database + .set_increment_value(table_name, &column.name, 1_i64) + .await?; + } + Ok(database) + }) + .await; - #[cfg(feature = "auto-increment")] - let storage = result?; + #[cfg(feature = "auto-increment")] + let database = result?; - // TODO: Maybe individual "truncate" operation - storage.delete_schema(table_name).await?; // TODO: !!! This will delete INDEXes which it shouldn't! - storage.insert_schema(&schema).await?; - Ok(()) - } else { - Err(AlterError::TableNotFound(table_name.to_owned()).into()) + // TODO: Maybe individual "truncate" operation + database.delete_schema(table_name).await?; // TODO: !!! This will delete INDEXes which it shouldn't! + database.insert_schema(&schema).await?; + Ok(()) + } else { + Err(AlterError::TableNotFound(table_name.to_owned()).into()) + } } } diff --git a/src/executor/execute.rs b/src/executor/execute.rs index 97e00a7c..e32f9ae2 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -1,10 +1,4 @@ use { - super::{ - alter_row::{delete, insert, update}, - alter_table::{create_index, create_table, drop, truncate}, - other::explain, - query::query, - }, crate::{glue::Context, parse_sql::Query, Glue, Result, Row, StorageInner, Value}, serde::Serialize, sqlparser::ast::{SetVariableValue, Statement}, @@ -12,9 +6,6 @@ use { thiserror::Error as ThisError, }; -#[cfg(feature = "alter-table")] -use super::alter_table::alter_table; - #[derive(ThisError, Serialize, Debug, PartialEq)] pub enum ExecuteError { #[error("query not supported")] @@ -144,11 +135,11 @@ impl Glue { SetVariableValue::Literal(literal) => literal.try_into()?, }; let name = variable.value.clone(); - context.set_variable(name, value); + self.get_mut_context()?.set_variable(name, value); Ok(Payload::Success) } - Statement::ExplainTable { table_name, .. } => explain(&storages, table_name).await, + Statement::ExplainTable { table_name, .. } => self.explain(table_name).await, Statement::CreateDatabase { .. } => unreachable!(), // Handled at Glue interface // TODO: Clean up somehow _ => Err(ExecuteError::QueryNotSupported.into()), diff --git a/src/executor/mod.rs b/src/executor/mod.rs index bf560933..51fd34cf 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -10,7 +10,7 @@ mod types; pub use { alter_row::ValidateError, alter_table::AlterError, - execute::{execute, ExecuteError, Payload}, + execute::{ExecuteError, Payload}, fetch::FetchError, query::{JoinError, ManualError, PlanError, QueryError, SelectError}, recipe::*, diff --git a/src/executor/other/explain.rs b/src/executor/other/explain.rs index 3e392dab..9584a225 100644 --- a/src/executor/other/explain.rs +++ b/src/executor/other/explain.rs @@ -1,91 +1,94 @@ -use crate::{ExecuteError, Payload, Row, Schema, Value}; -use crate::{Result, StorageInner}; +use crate::{ExecuteError, Payload, Row, Schema, StorageInner, Value}; +use crate::{Glue, Result}; use sqlparser::ast::ObjectName; -pub(crate) async fn explain( - storages: &[(String, &mut StorageInner)], - object: &ObjectName, -) -> Result { - println!("{:?}", object); +impl Glue { + pub async fn explain(&self, object: &ObjectName) -> Result { + let database = self.get_database(&None)?; - let mut name_vec = object.0.clone(); - let (store_name, opt_table_name) = match name_vec.len() { - 2 => (name_vec.remove(0).value, Some(name_vec.remove(0).value)), - 1 => { - let name = name_vec.remove(0).value; - if name == "ALL" { - let databases = storages - .iter() - .map(|(name, _)| Row(vec![name.clone().into()])) - .collect(); - return Ok(Payload::Select { - labels: vec![String::from("database")], - rows: databases, - }); - } - if name == "ALL_TABLE" { - let mut tables = vec![]; - for (name, store) in storages.iter() { - tables.extend( - get_tables(store) - .await? - .into_iter() - .map(|table| Row(vec![name.clone().into(), table])), - ); + let mut name_vec = object.0.clone(); + let (store_name, opt_table_name) = match name_vec.len() { + 2 => ( + Some(name_vec.remove(0).value), + Some(name_vec.remove(0).value), + ), + 1 => { + let name = name_vec.remove(0).value; + if name == "ALL" { + let databases: Vec = self + .get_database_list() + .into_iter() + .map(|name| Row(vec![name.clone().into()])) + .collect(); + return Ok(Payload::Select { + labels: vec![String::from("database")], + rows: databases, + }); + } + if name == "ALL_TABLE" { + let mut tables = vec![]; + for db_name in self.get_database_list().into_iter() { + tables.extend( + self.get_database(&Some(db_name.clone()))? + .get_tables() + .await? + .iter() + .map(|table| Row(vec![db_name.clone().into(), table.clone()])), + ); + } + return Ok(Payload::Select { + labels: vec![String::from("database"), String::from("table")], + rows: tables, + }); + } else if self.get_database_list().contains(&&name) { + (Some(name), None) + } else { + (None, Some(name)) } - return Ok(Payload::Select { - labels: vec![String::from("database"), String::from("table")], - rows: tables, - }); - } else if storages.iter().any(|(store, _)| store == &name) { - (name, None) - } else { - (storages[0].0.clone(), Some(name)) } - } - _ => return Err(ExecuteError::ObjectNotRecognised.into()), - }; + _ => return Err(ExecuteError::ObjectNotRecognised.into()), + }; - let store = storages - .iter() - .find_map(|(name, store)| (name == &store_name).then(|| store)) - .ok_or(ExecuteError::ObjectNotRecognised)?; - if let Some(table_name) = opt_table_name { - let Schema { column_defs, .. } = store - .fetch_schema(&table_name) - .await? - .ok_or(ExecuteError::ObjectNotRecognised)?; - let columns = column_defs - .iter() - .map(|column| { - ( - column.name.clone().into(), - column.data_type.to_string().into(), - ) - }) - .map(|(name, data_type)| Row(vec![name, data_type])) - .collect(); - Ok(Payload::Select { - labels: vec![String::from("column"), String::from("data_type")], - rows: columns, - }) - } else { - Ok(Payload::Select { - labels: vec![String::from("table")], - rows: get_tables(store) + let database = self.get_database(&store_name)?; + if let Some(table_name) = opt_table_name { + let Schema { column_defs, .. } = database + .fetch_schema(&table_name) .await? - .into_iter() - .map(|table| Row(vec![table])) - .collect(), - }) + .ok_or(ExecuteError::ObjectNotRecognised)?; + let columns = column_defs + .iter() + .map(|column| { + ( + column.name.clone().into(), + column.data_type.to_string().into(), + ) + }) + .map(|(name, data_type)| Row(vec![name, data_type])) + .collect(); + Ok(Payload::Select { + labels: vec![String::from("column"), String::from("data_type")], + rows: columns, + }) + } else { + Ok(Payload::Select { + labels: vec![String::from("table")], + rows: database + .get_tables() + .await? + .into_iter() + .map(|table| Row(vec![table])) + .collect(), + }) + } } } - -async fn get_tables(store: &&mut StorageInner) -> Result> { - Ok(store - .scan_schemas() - .await? - .into_iter() - .map(|Schema { table_name, .. }| table_name.into()) - .collect()) +impl StorageInner { + async fn get_tables(&self) -> Result> { + Ok(self + .scan_schemas() + .await? + .into_iter() + .map(|Schema { table_name, .. }| table_name.into()) + .collect()) + } } diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index 7f58f4f9..9cc5afb9 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -9,7 +9,6 @@ use { }, async_recursion::async_recursion, serde::Serialize, - set_expr::from_body, sqlparser::ast::{Cte, Query, TableAlias, With}, thiserror::Error as ThisError, }; @@ -48,7 +47,7 @@ impl Glue { lock: _, } = query; - let context = self.get_mut_context(); + let context = self.get_mut_context()?; let limit: Option = limit .map(|expression| { @@ -88,7 +87,7 @@ impl Glue { } } - let (mut labels, mut rows) = from_body(body, order_by).await?; + let (mut labels, mut rows) = self.from_body(body, order_by).await?; if let Some(offset) = offset { rows.drain(0..offset); diff --git a/src/executor/query/select/join/execute.rs b/src/executor/query/select/join/execute.rs index dc903abe..74597c3e 100644 --- a/src/executor/query/select/join/execute.rs +++ b/src/executor/query/select/join/execute.rs @@ -2,7 +2,7 @@ use { super::{JoinError, JoinMethod, JoinPlan, JoinType}, crate::{ executor::types::{ColumnInfo, Row}, - Context, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, + Glue, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, StorageInner, Value, }, }; @@ -55,29 +55,14 @@ impl JoinExecute { .map(|result| result.map(|(_, row)| row.0)) .collect::>>() } - pub async fn execute<'a>( - self, - storages: &[(String, &mut StorageInner)], - context: &Context, - plane_rows: Vec, - ) -> Result> { - let rows = if let Some((.., context_table_rows)) = context.tables.get(&self.table) { - Ok(context_table_rows.clone()) - } else { - let storage = storages - .iter() - .find_map(|(name, storage)| { - if name == &self.database { - Some(&**storage) - } else { - None - } - }) - .or_else(|| storages.get(0).map(|(_, storage)| &**storage)) - .ok_or(JoinError::Unreachable)?; - - self.get_rows(storage).await - }?; + pub async fn execute<'a>(self, glue: &Glue, plane_rows: Vec) -> Result> { + let rows = + if let Some((.., context_table_rows)) = glue.get_context()?.tables.get(&self.table) { + Ok(context_table_rows.clone()) + } else { + self.get_rows(glue.get_database(&Some(self.database.clone()))?) + .await + }?; self.method.run( &self.join_type, self.widths.0, diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index b2bfd853..2e037d23 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -86,7 +86,7 @@ impl JoinPlan { } async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result> { - if let Some((context_table_labels, ..)) = glue.get_context().tables.get(&table.name) { + if let Some((context_table_labels, ..)) = glue.get_context()?.tables.get(&table.name) { Ok(context_table_labels .iter() .map(|name| ColumnInfo { @@ -96,19 +96,6 @@ async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result>()) } else { - let storage = glue - .get_storages() - .iter() - .find_map(|(name, storage)| { - if name == &table.database { - Some(&**storage) - } else { - None - } - }) - .or_else(|| storages.get(0).map(|(_, storage)| &**storage)) - .ok_or_else(|| JoinError::TableNotFound(table.clone()))?; - - fetch_columns(storage, table).await + fetch_columns(glue.get_database(&Some(table.database.clone()))?, table).await } } diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index 1857d640..e2feaef7 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -143,7 +143,7 @@ impl Glue { order_by: Vec, ) -> Result { let plan = Plan::new(self, query, order_by).await?; - self.select(plan); + self.select(plan).await } } diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index 105fe5f8..43ad784b 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -41,7 +41,7 @@ impl Plan { constraint, group_constraint, groups, - } = Manual::new(select, glue.get_context())?; + } = Manual::new(select, glue.get_context()?)?; let mut joins: Vec = join_all( joins diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index 9fd2953f..5008e758 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -1,10 +1,8 @@ use { - super::{select::select, QueryError}, + super::QueryError, crate::{ - executor::{alter_row::insert, types::LabelsAndRows}, - macros::warning, - result::Result, - Context, Glue, MetaRecipe, Payload, RecipeUtilities, StorageInner, Value, + executor::types::LabelsAndRows, macros::warning, result::Result, Context, Glue, MetaRecipe, + Payload, RecipeUtilities, Value, }, async_recursion::async_recursion, sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement}, @@ -34,7 +32,7 @@ impl Glue { .into_iter() .map(|cell| { MetaRecipe::new(cell)? - .simplify_by_context(self.get_context())? + .simplify_by_context(self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForValues.into()) }) .collect::>>() diff --git a/src/glue/database.rs b/src/glue/database.rs index 32ba315e..8fc0624a 100644 --- a/src/glue/database.rs +++ b/src/glue/database.rs @@ -1,4 +1,4 @@ -use crate::{Context, Glue, InterfaceError, Result, StorageInner}; +use crate::{Context, Glue, InterfaceError, Result, Storage, StorageInner}; impl Glue { pub fn get_database(&self, db_ref: &Option) -> Result<&StorageInner> { @@ -8,7 +8,6 @@ impl Glue { .ok_or(InterfaceError::DatabaseNotFound)?; // TODO: None ref should give a primary let db_ref = &*db_container.take(); Ok(db_ref) - //Err(InterfaceError::DatabaseNotFound.into()) } pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut StorageInner> { // TODO: Somehow don't dupe @@ -27,4 +26,10 @@ impl Glue { pub fn get_mut_context(&self) -> Result<&mut Context> { Err(InterfaceError::ContextUnavailable.into()) } + pub fn get_database_list(&self) -> Vec<&String> { + self.databases.keys().collect() + } + /*pub fn database_iter(&self) -> Result>> { + Ok(Box::new(self.databases.iter().map(|(db_ref, db)| (db_ref, &*db.take())))) + }*/ } diff --git a/src/glue/mod.rs b/src/glue/mod.rs index 315c8231..43ed132f 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -1,8 +1,8 @@ use crate::ExecuteError; use { crate::{ - execute, parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Row, - Storage, StorageInner, Value, WIPError, + parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Row, Storage, + StorageInner, Value, WIPError, }, futures::executor::block_on, sqlparser::ast::{ diff --git a/src/result.rs b/src/result.rs index fb3a715b..69cb804c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,12 +1,8 @@ use { crate::{ - data::{RowError, TableError, ValueError}, - executor::{ - AlterError, ExecuteError, FetchError, InterfaceError, JoinError, ManualError, - PlanError, QueryError, RecipeError, SelectError, ValidateError, - }, - store::StorageError, - CSVStorageError, SheetStorageError, + AlterError, CSVStorageError, ExecuteError, FetchError, InterfaceError, JoinError, + ManualError, PlanError, QueryError, RecipeError, RowError, SelectError, SheetStorageError, + StorageError, TableError, ValidateError, ValueError, }, serde::Serialize, std::marker::{Send, Sync}, From 0c3acbbbe57640d8fc3d9a177c9971a3c22c27bf Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 15:55:10 +1000 Subject: [PATCH 4/8] Builds .... All kinds of horrors await --- src/executor/alter_row/delete.rs | 12 +++++----- src/executor/alter_row/insert.rs | 2 +- src/executor/alter_row/update.rs | 10 ++++----- src/executor/alter_table/alter_table.rs | 2 +- src/executor/alter_table/create_index.rs | 2 +- src/executor/alter_table/create_table.rs | 2 +- src/executor/alter_table/drop.rs | 2 +- src/executor/alter_table/truncate.rs | 2 +- src/executor/query/mod.rs | 8 +++---- src/executor/query/select/join/execute.rs | 2 +- src/executor/query/select/join/plan.rs | 2 +- src/glue/database.rs | 27 ++++++++++++----------- src/store/mod.rs | 25 ++++++++++++--------- 13 files changed, 50 insertions(+), 48 deletions(-) diff --git a/src/executor/alter_row/delete.rs b/src/executor/alter_row/delete.rs index 61f8c889..3a41176c 100644 --- a/src/executor/alter_row/delete.rs +++ b/src/executor/alter_row/delete.rs @@ -19,7 +19,7 @@ impl Glue { indexes, .. } = self - .get_mut_database(&None)? + .get_database(&None)? .fetch_schema(table_name) .await? .ok_or(ExecuteError::TableNotExists)?; @@ -40,7 +40,7 @@ impl Glue { .unwrap_or(Ok(PlannedRecipe::TRUE))?; let keys = self - .get_mut_database(&None)? + .get_database(&None)? .scan_data(table_name) .await? .filter_map(|row_result| { @@ -62,16 +62,14 @@ impl Glue { let num_keys = keys.len(); - let result = self - .get_mut_database(&None)? + let database = &mut **self.get_mut_database(&None)?; + let result = database .delete_data(table_name, keys) .await .map(|_| Payload::Delete(num_keys))?; for index in indexes.iter() { - index - .reset(self.get_mut_database(&None)?, table_name, &column_defs) - .await?; // TODO: Not this; optimise + index.reset(database, table_name, &column_defs).await?; // TODO: Not this; optimise } Ok(result) } diff --git a/src/executor/alter_row/insert.rs b/src/executor/alter_row/insert.rs index 16da429b..f79f0f1c 100644 --- a/src/executor/alter_row/insert.rs +++ b/src/executor/alter_row/insert.rs @@ -40,7 +40,7 @@ impl Glue { let num_rows = rows.len(); - let database = self.get_mut_database(&None)?; + let database = &mut **self.get_mut_database(&None)?; let result = database.insert_data(table_name, rows.clone()).await; diff --git a/src/executor/alter_row/update.rs b/src/executor/alter_row/update.rs index 0ea35c78..b8784e15 100644 --- a/src/executor/alter_row/update.rs +++ b/src/executor/alter_row/update.rs @@ -116,16 +116,16 @@ impl Glue { .await?; let keyed_rows: Vec<(Value, Row)> = keys.into_iter().zip(rows).collect(); let num_rows = keyed_rows.len(); - let result = self - .get_mut_database(&None)? + + let database = &mut **self.get_mut_database(&None)?; + + let result = database .update_data(table, keyed_rows) .await .map(|_| Payload::Update(num_rows))?; for index in indexes.iter() { - index - .reset(self.get_mut_database(&None)?, table, &column_defs) - .await?; // TODO: Not this; optimise + index.reset(database, table, &column_defs).await?; // TODO: Not this; optimise } Ok(result) } diff --git a/src/executor/alter_table/alter_table.rs b/src/executor/alter_table/alter_table.rs index 653529de..5115e362 100644 --- a/src/executor/alter_table/alter_table.rs +++ b/src/executor/alter_table/alter_table.rs @@ -11,8 +11,8 @@ impl Glue { operation: &AlterTableOperation, ) -> Result<()> { let table_name = get_name(name).map_err(Error::from)?; + let database = &mut **self.get_mut_database(&None)?; - let database = self.get_mut_database(&None)?; match operation { AlterTableOperation::RenameTable { table_name: new_table_name, diff --git a/src/executor/alter_table/create_index.rs b/src/executor/alter_table/create_index.rs index 5fb77ee7..19314e9a 100644 --- a/src/executor/alter_table/create_index.rs +++ b/src/executor/alter_table/create_index.rs @@ -20,8 +20,8 @@ impl Glue { .clone(); let table_name = get_name(table)?; + let database = &mut **self.get_mut_database(&None)?; - let database = self.get_mut_database(&None)?; let schema = database .fetch_schema(table_name) .await? diff --git a/src/executor/alter_table/create_table.rs b/src/executor/alter_table/create_table.rs index d1e11e3d..c3fae557 100644 --- a/src/executor/alter_table/create_table.rs +++ b/src/executor/alter_table/create_table.rs @@ -20,7 +20,7 @@ impl Glue { indexes: vec![], }; - let database = self.get_mut_database(&None)?; + let database = &mut **self.get_mut_database(&None)?; if database.fetch_schema(&schema.table_name).await?.is_some() { if !if_not_exists { Err(AlterError::TableAlreadyExists(schema.table_name.to_owned()).into()) diff --git a/src/executor/alter_table/drop.rs b/src/executor/alter_table/drop.rs index faa52e74..1898cce1 100644 --- a/src/executor/alter_table/drop.rs +++ b/src/executor/alter_table/drop.rs @@ -12,7 +12,7 @@ impl Glue { names: &[ObjectName], if_exists: bool, ) -> Result<()> { - let database = self.get_mut_database(&None)?; + let database = &mut **self.get_mut_database(&None)?; if object_type != &ObjectType::Table { return Err(AlterError::DropTypeNotSupported(object_type.to_string()).into()); } diff --git a/src/executor/alter_table/truncate.rs b/src/executor/alter_table/truncate.rs index d4bf5a63..97cd8e70 100644 --- a/src/executor/alter_table/truncate.rs +++ b/src/executor/alter_table/truncate.rs @@ -6,7 +6,7 @@ use { impl Glue { pub async fn truncate(&mut self, table_name: &ObjectName) -> Result<()> { - let database = self.get_mut_database(&None)?; + let database = &mut **self.get_mut_database(&None)?; let table_name = get_name(table_name)?; let schema = database.fetch_schema(table_name).await?; diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index 9cc5afb9..bc4074a0 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -47,12 +47,10 @@ impl Glue { lock: _, } = query; - let context = self.get_mut_context()?; - let limit: Option = limit .map(|expression| { MetaRecipe::new(expression)? - .simplify_by_context(context)? + .simplify_by_context(self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForLimit.into())? .cast() }) @@ -60,7 +58,7 @@ impl Glue { let offset: Option = offset .map(|offset| { MetaRecipe::new(offset.value)? - .simplify_by_context(context)? + .simplify_by_context(self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForOffset.into())? .cast() }) @@ -83,7 +81,7 @@ impl Glue { } = alias; let name = name.value; let data = self.query(query).await?; - context.set_table(name, data); + self.get_mut_context()?.set_table(name, data); } } diff --git a/src/executor/query/select/join/execute.rs b/src/executor/query/select/join/execute.rs index 74597c3e..6cd00c71 100644 --- a/src/executor/query/select/join/execute.rs +++ b/src/executor/query/select/join/execute.rs @@ -60,7 +60,7 @@ impl JoinExecute { if let Some((.., context_table_rows)) = glue.get_context()?.tables.get(&self.table) { Ok(context_table_rows.clone()) } else { - self.get_rows(glue.get_database(&Some(self.database.clone()))?) + self.get_rows(&**glue.get_database(&Some(self.database.clone()))?) .await }?; self.method.run( diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 2e037d23..227d2fe8 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -96,6 +96,6 @@ async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result>()) } else { - fetch_columns(glue.get_database(&Some(table.database.clone()))?, table).await + fetch_columns(&**glue.get_database(&Some(table.database.clone()))?, table).await } } diff --git a/src/glue/database.rs b/src/glue/database.rs index 8fc0624a..111f1e2d 100644 --- a/src/glue/database.rs +++ b/src/glue/database.rs @@ -1,22 +1,23 @@ -use crate::{Context, Glue, InterfaceError, Result, Storage, StorageInner}; +use { + crate::{Context, Glue, InterfaceError, Result, Storage, StorageInner}, + std::sync::MutexGuard, +}; impl Glue { - pub fn get_database(&self, db_ref: &Option) -> Result<&StorageInner> { - let db_container = db_ref + // TODO: None ref should give a primary + pub fn get_database(&self, db_ref: &Option) -> Result>> { + db_ref .as_ref() .and_then(|db_ref| self.databases.get(db_ref)) - .ok_or(InterfaceError::DatabaseNotFound)?; // TODO: None ref should give a primary - let db_ref = &*db_container.take(); - Ok(db_ref) + .ok_or(InterfaceError::DatabaseNotFound.into()) + .map(|db| db.get()) } - pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut StorageInner> { - // TODO: Somehow don't dupe - let db_container = db_ref + pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut Box> { + db_ref .as_ref() - .and_then(|db_ref| self.databases.get(db_ref)) - .ok_or(InterfaceError::DatabaseNotFound)?; - let db_ref = &mut *db_container.take(); - Ok(db_ref) + .and_then(|db_ref| self.databases.get_mut(db_ref)) + .ok_or(InterfaceError::DatabaseNotFound.into()) + .map(Storage::get_mut) } pub fn get_context(&self) -> Result<&Context> { self.context diff --git a/src/store/mod.rs b/src/store/mod.rs index 3374dcf7..05abeb9e 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -3,6 +3,8 @@ mod store_mut; #[cfg(feature = "alter-table")] mod alter_table; +use std::sync::{Mutex, MutexGuard}; + #[cfg(feature = "alter-table")] pub use alter_table::*; #[cfg(not(feature = "alter-table"))] @@ -54,7 +56,7 @@ impl TryFrom for Storage { crate::{CSVStorage, SheetStorage, SledStorage}, Connection::*, }; - let storage: Option> = Some(match &connection { + let storage: Mutex> = Mutex::new(match &connection { #[cfg(feature = "sled-storage")] Sled(path) => Box::new(SledStorage::new(path)?), #[cfg(feature = "csv-storage")] @@ -72,30 +74,33 @@ impl TryFrom for Storage { pub struct Storage { source_connection: Connection, - storage: Option>, + storage: Mutex>, } impl Storage { pub fn new(storage: Box) -> Self { - let storage = Some(storage); + let storage = Mutex::new(storage); Self { storage, source_connection: Connection::default(), } } - pub fn replace(&mut self, storage: Box) { + /*pub fn replace(&mut self, storage: Box) { self.storage.replace(storage); } pub fn take(&mut self) -> Box { self.storage .take() .expect("Unreachable: Storage wasn't replaced!") + }*/ + pub fn get(&self) -> MutexGuard> { + self.storage + .lock() + .expect("Unreachable: Storage wasn't replaced!") } - pub fn take_readable(&mut self) -> &StorageInner { - /*let storage = self.take(); - let readable = &*storage; - self.replace(storage); - readable*/ - unimplemented!() + pub fn get_mut(&mut self) -> &mut Box { + self.storage + .get_mut() + .expect("Unreachable: Storage wasn't replaced!") } pub fn into_source(self) -> Connection { self.source_connection From e0eedb182c2dca541becefddf13efd1e6edc65be Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 15:55:37 +1000 Subject: [PATCH 5/8] Clippy auto --- src/executor/alter_row/auto_increment.rs | 2 +- src/executor/alter_row/delete.rs | 2 +- src/executor/alter_row/insert.rs | 5 ++--- src/executor/alter_row/update.rs | 4 ++-- src/executor/execute.rs | 2 +- src/executor/other/explain.rs | 2 +- src/executor/other/mod.rs | 2 +- src/executor/query/mod.rs | 4 ++-- src/executor/query/select/join/plan.rs | 3 +-- src/executor/query/select/mod.rs | 3 +-- src/executor/query/select/plan/mod.rs | 3 +-- src/executor/query/set_expr.rs | 2 +- src/glue/error.rs | 2 +- src/glue/mod.rs | 3 +-- 14 files changed, 17 insertions(+), 22 deletions(-) diff --git a/src/executor/alter_row/auto_increment.rs b/src/executor/alter_row/auto_increment.rs index ca7fd92f..75c250f5 100644 --- a/src/executor/alter_row/auto_increment.rs +++ b/src/executor/alter_row/auto_increment.rs @@ -1,5 +1,5 @@ #![cfg(feature = "auto-increment")] -use crate::{Column, Glue, Result, Row, StorageInner, Value, ValueDefault}; +use crate::{Column, Glue, Result, Row, Value, ValueDefault}; impl Glue { pub async fn auto_increment( diff --git a/src/executor/alter_row/delete.rs b/src/executor/alter_row/delete.rs index 3a41176c..291997ff 100644 --- a/src/executor/alter_row/delete.rs +++ b/src/executor/alter_row/delete.rs @@ -2,7 +2,7 @@ use { crate::{ data::{get_name, Schema}, executor::types::ColumnInfo, - Column, Context, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, Result, Value, + Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, Result, Value, }, sqlparser::ast::{Expr, ObjectName}, }; diff --git a/src/executor/alter_row/insert.rs b/src/executor/alter_row/insert.rs index f79f0f1c..61df17d6 100644 --- a/src/executor/alter_row/insert.rs +++ b/src/executor/alter_row/insert.rs @@ -1,8 +1,7 @@ use { - super::{auto_increment, columns_to_positions, validate, validate_unique}, + super::{columns_to_positions, validate}, crate::{ - data::{get_name, Schema}, - Context, ExecuteError, Glue, Payload, Result, Row, + data::{get_name, Schema}, ExecuteError, Glue, Payload, Result, Row, }, sqlparser::ast::{Ident, ObjectName, Query}, }; diff --git a/src/executor/alter_row/update.rs b/src/executor/alter_row/update.rs index b8784e15..1633d5f2 100644 --- a/src/executor/alter_row/update.rs +++ b/src/executor/alter_row/update.rs @@ -1,9 +1,9 @@ use { - super::{auto_increment, columns_to_positions, validate, validate_unique}, + super::{columns_to_positions, validate}, crate::{ data::{get_name, Schema}, executor::types::{ColumnInfo, Row as VecRow}, - Column, Context, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, + Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, Result, Row, Value, }, sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}, diff --git a/src/executor/execute.rs b/src/executor/execute.rs index e32f9ae2..5a493457 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -1,5 +1,5 @@ use { - crate::{glue::Context, parse_sql::Query, Glue, Result, Row, StorageInner, Value}, + crate::{parse_sql::Query, Glue, Result, Row, Value}, serde::Serialize, sqlparser::ast::{SetVariableValue, Statement}, std::convert::TryInto, diff --git a/src/executor/other/explain.rs b/src/executor/other/explain.rs index 9584a225..eae4fad9 100644 --- a/src/executor/other/explain.rs +++ b/src/executor/other/explain.rs @@ -4,7 +4,7 @@ use sqlparser::ast::ObjectName; impl Glue { pub async fn explain(&self, object: &ObjectName) -> Result { - let database = self.get_database(&None)?; + let _database = self.get_database(&None)?; let mut name_vec = object.0.clone(); let (store_name, opt_table_name) = match name_vec.len() { diff --git a/src/executor/other/mod.rs b/src/executor/other/mod.rs index 6e89b9c4..25e70422 100644 --- a/src/executor/other/mod.rs +++ b/src/executor/other/mod.rs @@ -1,2 +1,2 @@ mod explain; -pub(crate) use explain::*; + diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index bc4074a0..894424d4 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -4,8 +4,8 @@ mod set_expr; pub use select::{join::*, ManualError, PlanError, SelectError}; use { crate::{ - executor::types::LabelsAndRows, result::Result, Cast, Context, Glue, MetaRecipe, - RecipeUtilities, StorageInner, Value, + executor::types::LabelsAndRows, result::Result, Cast, Glue, MetaRecipe, + RecipeUtilities, Value, }, async_recursion::async_recursion, serde::Serialize, diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 227d2fe8..11644fa1 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -5,8 +5,7 @@ use { fetch::fetch_columns, types::{ColumnInfo, ComplexTableName}, MetaRecipe, - }, - Context, Glue, JoinError, Result, StorageInner, + }, Glue, Result, }, std::cmp::Ordering, }; diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index e2feaef7..aaaee542 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -9,8 +9,7 @@ use { types::{LabelsAndRows, Row}, PlannedRecipe, }, - macros::try_option, - Context, Glue, RecipeUtilities, Result, StorageInner, Value, + macros::try_option, Glue, RecipeUtilities, Result, Value, }, futures::stream::{self, StreamExt, TryStreamExt}, rayon::prelude::*, diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index 43ad784b..f9ba5d4b 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -4,8 +4,7 @@ use { Manual, Order, SelectItem, }, crate::{ - executor::{types::ColumnInfo, PlannedRecipe}, - Context, Glue, Result, StorageInner, + executor::{types::ColumnInfo, PlannedRecipe}, Glue, Result, }, futures::future::join_all, serde::Serialize, diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index 5008e758..efc89075 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -1,7 +1,7 @@ use { super::QueryError, crate::{ - executor::types::LabelsAndRows, macros::warning, result::Result, Context, Glue, MetaRecipe, + executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, Payload, RecipeUtilities, Value, }, async_recursion::async_recursion, diff --git a/src/glue/error.rs b/src/glue/error.rs index 337c34e6..3d24427c 100644 --- a/src/glue/error.rs +++ b/src/glue/error.rs @@ -1,5 +1,5 @@ use { - serde::{Deserialize, Serialize}, + serde::{Serialize}, std::fmt::Debug, thiserror::Error, }; diff --git a/src/glue/mod.rs b/src/glue/mod.rs index 43ed132f..a26eba65 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -1,8 +1,7 @@ use crate::ExecuteError; use { crate::{ - parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Row, Storage, - StorageInner, Value, WIPError, + parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Storage, Value, WIPError, }, futures::executor::block_on, sqlparser::ast::{ From c60162df7837f74a6419fe610353b26f62dcdcb1 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 16:24:25 +1000 Subject: [PATCH 6/8] Almost done --- src/executor/alter_row/delete.rs | 2 +- src/executor/alter_row/insert.rs | 3 ++- src/executor/alter_row/update.rs | 8 ++++---- src/executor/other/explain.rs | 2 -- src/executor/other/mod.rs | 1 - src/executor/query/mod.rs | 8 ++++---- src/executor/query/select/join/execute.rs | 5 ++--- src/executor/query/select/join/manual.rs | 4 ++-- src/executor/query/select/join/plan.rs | 7 ++++--- src/executor/query/select/mod.rs | 3 ++- src/executor/query/select/plan/mod.rs | 5 +++-- src/executor/query/set_expr.rs | 6 +++--- src/executor/types.rs | 4 ++-- src/glue/database.rs | 25 ++++++++++------------- src/glue/error.rs | 6 +----- src/glue/mod.rs | 23 ++++++++++++++------- 16 files changed, 57 insertions(+), 55 deletions(-) diff --git a/src/executor/alter_row/delete.rs b/src/executor/alter_row/delete.rs index 291997ff..4be3d7d3 100644 --- a/src/executor/alter_row/delete.rs +++ b/src/executor/alter_row/delete.rs @@ -33,7 +33,7 @@ impl Glue { .clone() .map(|selection| { PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(self.get_context()?)?, + MetaRecipe::new(selection)?.simplify_by_context(&*self.get_context()?)?, &columns, ) }) diff --git a/src/executor/alter_row/insert.rs b/src/executor/alter_row/insert.rs index 61df17d6..6bec07eb 100644 --- a/src/executor/alter_row/insert.rs +++ b/src/executor/alter_row/insert.rs @@ -1,7 +1,8 @@ use { super::{columns_to_positions, validate}, crate::{ - data::{get_name, Schema}, ExecuteError, Glue, Payload, Result, Row, + data::{get_name, Schema}, + ExecuteError, Glue, Payload, Result, Row, }, sqlparser::ast::{Ident, ObjectName, Query}, }; diff --git a/src/executor/alter_row/update.rs b/src/executor/alter_row/update.rs index 1633d5f2..dae9b0f5 100644 --- a/src/executor/alter_row/update.rs +++ b/src/executor/alter_row/update.rs @@ -3,8 +3,8 @@ use { crate::{ data::{get_name, Schema}, executor::types::{ColumnInfo, Row as VecRow}, - Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, - Result, Row, Value, + Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, Result, + Row, Value, }, sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}, }; @@ -41,7 +41,7 @@ impl Glue { .clone() .map(|selection| { PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(self.get_context()?)?, + MetaRecipe::new(selection)?.simplify_by_context(&*self.get_context()?)?, &columns, ) }) @@ -61,7 +61,7 @@ impl Glue { .position(|column| column == &column_compare) .ok_or(ExecuteError::ColumnNotFound)?; let recipe = PlannedRecipe::new( - MetaRecipe::new(value.clone())?.simplify_by_context(self.get_context()?)?, + MetaRecipe::new(value.clone())?.simplify_by_context(&*self.get_context()?)?, &columns, )?; Ok((index, recipe)) diff --git a/src/executor/other/explain.rs b/src/executor/other/explain.rs index eae4fad9..6c69f635 100644 --- a/src/executor/other/explain.rs +++ b/src/executor/other/explain.rs @@ -4,8 +4,6 @@ use sqlparser::ast::ObjectName; impl Glue { pub async fn explain(&self, object: &ObjectName) -> Result { - let _database = self.get_database(&None)?; - let mut name_vec = object.0.clone(); let (store_name, opt_table_name) = match name_vec.len() { 2 => ( diff --git a/src/executor/other/mod.rs b/src/executor/other/mod.rs index 25e70422..3ab13279 100644 --- a/src/executor/other/mod.rs +++ b/src/executor/other/mod.rs @@ -1,2 +1 @@ mod explain; - diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index 894424d4..0ee20c48 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -4,8 +4,8 @@ mod set_expr; pub use select::{join::*, ManualError, PlanError, SelectError}; use { crate::{ - executor::types::LabelsAndRows, result::Result, Cast, Glue, MetaRecipe, - RecipeUtilities, Value, + executor::types::LabelsAndRows, result::Result, Cast, Glue, MetaRecipe, RecipeUtilities, + Value, }, async_recursion::async_recursion, serde::Serialize, @@ -50,7 +50,7 @@ impl Glue { let limit: Option = limit .map(|expression| { MetaRecipe::new(expression)? - .simplify_by_context(self.get_context()?)? + .simplify_by_context(&*self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForLimit.into())? .cast() }) @@ -58,7 +58,7 @@ impl Glue { let offset: Option = offset .map(|offset| { MetaRecipe::new(offset.value)? - .simplify_by_context(self.get_context()?)? + .simplify_by_context(&*self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForOffset.into())? .cast() }) diff --git a/src/executor/query/select/join/execute.rs b/src/executor/query/select/join/execute.rs index 6cd00c71..3d428bc4 100644 --- a/src/executor/query/select/join/execute.rs +++ b/src/executor/query/select/join/execute.rs @@ -9,7 +9,7 @@ use { #[derive(Debug)] pub struct JoinExecute { - pub database: String, + pub database: Option, pub table: String, pub method: JoinMethod, pub join_type: JoinType, @@ -60,8 +60,7 @@ impl JoinExecute { if let Some((.., context_table_rows)) = glue.get_context()?.tables.get(&self.table) { Ok(context_table_rows.clone()) } else { - self.get_rows(&**glue.get_database(&Some(self.database.clone()))?) - .await + self.get_rows(&**glue.get_database(&self.database)?).await }?; self.method.run( &self.join_type, diff --git a/src/executor/query/select/join/manual.rs b/src/executor/query/select/join/manual.rs index 060c997b..f0ce48b9 100644 --- a/src/executor/query/select/join/manual.rs +++ b/src/executor/query/select/join/manual.rs @@ -58,9 +58,9 @@ impl JoinManual { return Err(JoinError::UnimplementedNumberOfComponents.into()); } let database = if name_parts == 2 { - name.0.get(0).unwrap().value.clone() + Some(name.0.get(0).unwrap().value.clone()) } else { - String::new() + None }; let name = name.0.last().unwrap().value.clone(); let alias = alias.map(|alias| alias.name.value); diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 11644fa1..9af35cc3 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -5,14 +5,15 @@ use { fetch::fetch_columns, types::{ColumnInfo, ComplexTableName}, MetaRecipe, - }, Glue, Result, + }, + Glue, Result, }, std::cmp::Ordering, }; #[derive(Debug)] pub struct JoinPlan { - pub database: String, + pub database: Option, pub table: String, pub columns: Vec, pub join_type: JoinType, @@ -95,6 +96,6 @@ async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result>()) } else { - fetch_columns(&**glue.get_database(&Some(table.database.clone()))?, table).await + fetch_columns(&**glue.get_database(&table.database)?, table).await } } diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index aaaee542..af0e8e22 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -9,7 +9,8 @@ use { types::{LabelsAndRows, Row}, PlannedRecipe, }, - macros::try_option, Glue, RecipeUtilities, Result, Value, + macros::try_option, + Glue, RecipeUtilities, Result, Value, }, futures::stream::{self, StreamExt, TryStreamExt}, rayon::prelude::*, diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index f9ba5d4b..d77c5aae 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -4,7 +4,8 @@ use { Manual, Order, SelectItem, }, crate::{ - executor::{types::ColumnInfo, PlannedRecipe}, Glue, Result, + executor::{types::ColumnInfo, PlannedRecipe}, + Glue, Result, }, futures::future::join_all, serde::Serialize, @@ -40,7 +41,7 @@ impl Plan { constraint, group_constraint, groups, - } = Manual::new(select, glue.get_context()?)?; + } = Manual::new(select, &*glue.get_context()?)?; let mut joins: Vec = join_all( joins diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index efc89075..737a2117 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -1,8 +1,8 @@ use { super::QueryError, crate::{ - executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, - Payload, RecipeUtilities, Value, + executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, Payload, + RecipeUtilities, Value, }, async_recursion::async_recursion, sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement}, @@ -32,7 +32,7 @@ impl Glue { .into_iter() .map(|cell| { MetaRecipe::new(cell)? - .simplify_by_context(self.get_context()?)? + .simplify_by_context(&*self.get_context()?)? .confirm_or_err(QueryError::MissingComponentsForValues.into()) }) .collect::>>() diff --git a/src/executor/types.rs b/src/executor/types.rs index f8ba6fdb..c0377d51 100644 --- a/src/executor/types.rs +++ b/src/executor/types.rs @@ -15,7 +15,7 @@ pub struct ColumnInfo { #[derive(Debug, Clone, PartialEq, Serialize)] pub struct ComplexTableName { - pub database: String, + pub database: Option, pub alias: Alias, pub name: String, } @@ -23,7 +23,7 @@ impl ColumnInfo { pub fn of_name(name: String) -> Self { ColumnInfo { table: ComplexTableName { - database: String::new(), + database: None, name: String::new(), alias: None, }, diff --git a/src/glue/database.rs b/src/glue/database.rs index 111f1e2d..813f838f 100644 --- a/src/glue/database.rs +++ b/src/glue/database.rs @@ -6,31 +6,28 @@ use { impl Glue { // TODO: None ref should give a primary pub fn get_database(&self, db_ref: &Option) -> Result>> { - db_ref - .as_ref() - .and_then(|db_ref| self.databases.get(db_ref)) + self.databases + .get(db_ref.as_ref().unwrap_or(&self.primary)) .ok_or(InterfaceError::DatabaseNotFound.into()) .map(|db| db.get()) } pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut Box> { - db_ref - .as_ref() - .and_then(|db_ref| self.databases.get_mut(db_ref)) + self.databases + .get_mut(db_ref.as_ref().unwrap_or(&self.primary)) .ok_or(InterfaceError::DatabaseNotFound.into()) .map(Storage::get_mut) } - pub fn get_context(&self) -> Result<&Context> { + pub fn get_context(&self) -> Result> { self.context - .as_ref() - .ok_or(InterfaceError::ContextUnavailable.into()) + .lock() + .map_err(|_| InterfaceError::ContextUnavailable.into()) } - pub fn get_mut_context(&self) -> Result<&mut Context> { - Err(InterfaceError::ContextUnavailable.into()) + pub fn get_mut_context(&mut self) -> Result<&mut Context> { + self.context + .get_mut() + .map_err(|_| InterfaceError::ContextUnavailable.into()) } pub fn get_database_list(&self) -> Vec<&String> { self.databases.keys().collect() } - /*pub fn database_iter(&self) -> Result>> { - Ok(Box::new(self.databases.iter().map(|(db_ref, db)| (db_ref, &*db.take())))) - }*/ } diff --git a/src/glue/error.rs b/src/glue/error.rs index 3d24427c..839853c3 100644 --- a/src/glue/error.rs +++ b/src/glue/error.rs @@ -1,8 +1,4 @@ -use { - serde::{Serialize}, - std::fmt::Debug, - thiserror::Error, -}; +use {serde::Serialize, std::fmt::Debug, thiserror::Error}; #[derive(Error, Serialize, Debug, PartialEq)] pub enum InterfaceError { diff --git a/src/glue/mod.rs b/src/glue/mod.rs index a26eba65..985244d4 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -1,7 +1,10 @@ +use std::sync::Mutex; + use crate::ExecuteError; use { crate::{ - parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Storage, Value, WIPError, + parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Storage, Value, + WIPError, }, futures::executor::block_on, sqlparser::ast::{ @@ -46,8 +49,9 @@ impl Context { /// - [`Glue::select_as_string()`] -- Provides data, only for `SELECT` queries, as [String]s (rather than [Value]s). /// - [`Glue::select_as_json()`] -- Provides data, only for `SELECT` queries, as one big [String]; generally useful for webby interactions. pub struct Glue { + pub primary: String, databases: HashMap, - context: Option, + context: Mutex, } /// ## Creation of new interfaces @@ -61,8 +65,13 @@ impl Glue { /// Creates a [Glue] instance with access to all provided storages. /// Argument is: [Vec]<(Identifier, [Storage])> pub fn new_multi(databases: HashMap) -> Self { - let context = Some(Context::default()); - Self { databases, context } + let context = Mutex::new(Context::default()); + let primary = databases.keys().next().cloned().unwrap_or_else(String::new); + Self { + databases, + context, + primary, + } } /// Merges existing [Glue] instances pub fn new_multi_glue(glues: Vec) -> Self { @@ -114,17 +123,17 @@ impl Glue { /// Internal: Modify impl Glue { - pub(crate) fn take_context(&mut self) -> Result { + /*pub(crate) fn take_context(&mut self) -> Result { self.context .take() .ok_or(InterfaceError::ContextUnavailable.into()) } pub(crate) fn replace_context(&mut self, context: Context) { self.context.replace(context); - } + }*/ #[allow(dead_code)] fn set_context(&mut self, context: Context) { - self.context = Some(context); + self.context = Mutex::new(context); } } From 89659fe514abd7b89dceb93001fb11c9d3f8f753 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 16:24:43 +1000 Subject: [PATCH 7/8] Clippy auto --- src/glue/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/glue/mod.rs b/src/glue/mod.rs index 985244d4..be64d206 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -66,7 +66,7 @@ impl Glue { /// Argument is: [Vec]<(Identifier, [Storage])> pub fn new_multi(databases: HashMap) -> Self { let context = Mutex::new(Context::default()); - let primary = databases.keys().next().cloned().unwrap_or_else(String::new); + let primary = databases.keys().next().cloned().unwrap_or_default(); Self { databases, context, From 2a6aa9da4d2f254ab76cfda5cf146bca34a351a7 Mon Sep 17 00:00:00 2001 From: Kyran Gostelow Date: Fri, 8 Apr 2022 16:31:34 +1000 Subject: [PATCH 8/8] Done! --- tests/functionality/query/with.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/functionality/query/with.rs b/tests/functionality/query/with.rs index 57e66f28..d2f48388 100644 --- a/tests/functionality/query/with.rs +++ b/tests/functionality/query/with.rs @@ -33,6 +33,7 @@ crate::util_macros::testcase!( cte_1 "# => a = I64: (1)); + /* TODO: #107 glue.execute( r#" WITH cte_0 AS ( @@ -53,6 +54,7 @@ crate::util_macros::testcase!( "#, ) .expect_err("CTE is not simultaneous"); + */ glue.execute("CREATE TABLE basic_insert (a INTEGER)") .unwrap();