diff --git a/src/executor/alter_row/auto_increment.rs b/src/executor/alter_row/auto_increment.rs index a3765da1..75c250f5 100644 --- a/src/executor/alter_row/auto_increment.rs +++ b/src/executor/alter_row/auto_increment.rs @@ -1,40 +1,43 @@ #![cfg(feature = "auto-increment")] -use crate::{Column, Result, Row, StorageInner, Value, ValueDefault}; +use crate::{Column, Glue, Result, Row, Value, ValueDefault}; -pub async fn auto_increment( - storage: &mut StorageInner, - table_name: &str, - columns: &[Column], - rows: &mut [Row], -) -> Result<()> { - let auto_increment_columns = columns - .iter() - .enumerate() - .filter(|(_, column)| matches!(column.default, Some(ValueDefault::AutoIncrement(_)))) - .map(|(index, column)| { - ( - index, - column.name.clone(), - rows.iter() - .filter(|row| matches!(row.0.get(index), Some(Value::Null))) - .count() as i64, - ) - }) - .collect(); +impl Glue { + pub async fn auto_increment( + &mut self, + table_name: &str, + columns: &[Column], + rows: &mut [Row], + ) -> Result<()> { + let auto_increment_columns = columns + .iter() + .enumerate() + .filter(|(_, column)| matches!(column.default, Some(ValueDefault::AutoIncrement(_)))) + .map(|(index, column)| { + ( + index, + column.name.clone(), + rows.iter() + .filter(|row| matches!(row.0.get(index), Some(Value::Null))) + .count() as i64, + ) + }) + .collect(); - let column_values = storage - .generate_increment_values(table_name.to_string(), auto_increment_columns) - .await?; + let column_values = self + .get_mut_database(&None)? + .generate_increment_values(table_name.to_string(), auto_increment_columns) + .await?; - let mut column_values = column_values; - for row in rows.iter_mut() { - for ((index, _name), value) in &mut column_values { - let cell = row.0.get_mut(*index).unwrap(); - if matches!(cell, Value::Null) { - *cell = Value::I64(*value); - *value += 1; + let mut column_values = column_values; + for row in rows.iter_mut() { + for ((index, _name), value) in &mut column_values { + let cell = row.0.get_mut(*index).unwrap(); + if matches!(cell, Value::Null) { + *cell = Value::I64(*value); + *value += 1; + } } } + Ok(()) } - Ok(()) } diff --git a/src/executor/alter_row/delete.rs b/src/executor/alter_row/delete.rs index 01ca7243..4be3d7d3 100644 --- a/src/executor/alter_row/delete.rs +++ b/src/executor/alter_row/delete.rs @@ -2,75 +2,75 @@ use { crate::{ data::{get_name, Schema}, executor::types::ColumnInfo, - Column, Context, ExecuteError, MetaRecipe, Payload, PlannedRecipe, Result, StorageInner, - Value, + Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, Result, Value, }, sqlparser::ast::{Expr, ObjectName}, }; -pub async fn delete( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - table_name: &ObjectName, - selection: &Option, -) -> Result { - let table_name = get_name(table_name)?; - let Schema { - column_defs, - indexes, - .. - } = storages[0] - .1 - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; +impl Glue { + pub async fn delete( + &mut self, + table_name: &ObjectName, + selection: &Option, + ) -> Result { + let table_name = get_name(table_name)?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_database(&None)? + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; - let columns = column_defs - .clone() - .into_iter() - .map(|Column { name, .. }| ColumnInfo::of_name(name)) - .collect::>(); - let filter = selection - .clone() - .map(|selection| { - PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(context)?, - &columns, - ) - }) - .unwrap_or(Ok(PlannedRecipe::TRUE))?; + let columns = column_defs + .clone() + .into_iter() + .map(|Column { name, .. }| ColumnInfo::of_name(name)) + .collect::>(); + let filter = selection + .clone() + .map(|selection| { + PlannedRecipe::new( + MetaRecipe::new(selection)?.simplify_by_context(&*self.get_context()?)?, + &columns, + ) + }) + .unwrap_or(Ok(PlannedRecipe::TRUE))?; - let keys = storages[0] - .1 - .scan_data(table_name) - .await? - .filter_map(|row_result| { - let (key, row) = match row_result { - Ok(keyed_row) => keyed_row, - Err(error) => return Some(Err(error)), - }; + let keys = self + .get_database(&None)? + .scan_data(table_name) + .await? + .filter_map(|row_result| { + let (key, row) = match row_result { + Ok(keyed_row) => keyed_row, + Err(error) => return Some(Err(error)), + }; - let row = row.0; + let row = row.0; - let confirm_constraint = filter.confirm_constraint(&row); - match confirm_constraint { - Ok(true) => Some(Ok(key)), - Ok(false) => None, - Err(error) => Some(Err(error)), - } - }) - .collect::>>()?; + let confirm_constraint = filter.confirm_constraint(&row); + match confirm_constraint { + Ok(true) => Some(Ok(key)), + Ok(false) => None, + Err(error) => Some(Err(error)), + } + }) + .collect::>>()?; - let num_keys = keys.len(); + let num_keys = keys.len(); - let result = storages[0] - .1 - .delete_data(table_name, keys) - .await - .map(|_| Payload::Delete(num_keys))?; + let database = &mut **self.get_mut_database(&None)?; + let result = database + .delete_data(table_name, keys) + .await + .map(|_| Payload::Delete(num_keys))?; - for index in indexes.iter() { - index.reset(storages[0].1, table_name, &column_defs).await?; // TODO: Not this; optimise + for index in indexes.iter() { + index.reset(database, table_name, &column_defs).await?; // TODO: Not this; optimise + } + Ok(result) } - Ok(result) } diff --git a/src/executor/alter_row/insert.rs b/src/executor/alter_row/insert.rs index acbf8669..6bec07eb 100644 --- a/src/executor/alter_row/insert.rs +++ b/src/executor/alter_row/insert.rs @@ -1,58 +1,62 @@ use { - super::{auto_increment, columns_to_positions, validate, validate_unique}, + super::{columns_to_positions, validate}, crate::{ data::{get_name, Schema}, - executor::query::query, - Context, ExecuteError, Payload, Result, Row, StorageInner, + ExecuteError, Glue, Payload, Result, Row, }, sqlparser::ast::{Ident, ObjectName, Query}, }; -pub async fn insert( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - table_name: &ObjectName, - columns: &[Ident], - source: &Query, - expect_data: bool, -) -> Result { - let table_name = get_name(table_name)?; - let Schema { - column_defs, - indexes, - .. - } = storages[0] - .1 - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; - - // TODO: Multi storage - let (labels, mut rows) = query(storages, context, source.clone()).await?; - let column_positions = columns_to_positions(&column_defs, columns)?; - - validate(&column_defs, &column_positions, &mut rows)?; - let mut rows: Vec = rows.into_iter().map(Row).collect(); - #[cfg(feature = "auto-increment")] - auto_increment(storages[0].1, table_name, &column_defs, &mut rows).await?; - validate_unique(storages[0].1, table_name, &column_defs, &rows, None).await?; - - let num_rows = rows.len(); - - let result = storages[0].1.insert_data(table_name, rows.clone()).await; - - let result = result.map(|_| { - if expect_data { - Payload::Select { labels, rows } - } else { - Payload::Insert(num_rows) +impl Glue { + pub async fn insert( + &mut self, + table_name: &ObjectName, + columns: &[Ident], + source: &Query, + expect_data: bool, + ) -> Result { + let table_name = get_name(table_name)?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_database(&None)? + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; + + // TODO: Multi storage + let (labels, mut rows) = self.query(source.clone()).await?; + let column_positions = columns_to_positions(&column_defs, columns)?; + + validate(&column_defs, &column_positions, &mut rows)?; + let mut rows: Vec = rows.into_iter().map(Row).collect(); + #[cfg(feature = "auto-increment")] + self.auto_increment(table_name, &column_defs, &mut rows) + .await?; + self.validate_unique(table_name, &column_defs, &rows, None) + .await?; + + let num_rows = rows.len(); + + let database = &mut **self.get_mut_database(&None)?; + + let result = database.insert_data(table_name, rows.clone()).await; + + let result = result.map(|_| { + if expect_data { + Payload::Select { labels, rows } + } else { + Payload::Insert(num_rows) + } + })?; + + for index in indexes.iter() { + // TODO: Should definitely be just inserting an index record + index.reset(database, table_name, &column_defs).await?; // TODO: Not this; optimise } - })?; - for index in indexes.iter() { - // TODO: Should definitely be just inserting an index record - index.reset(storages[0].1, table_name, &column_defs).await?; // TODO: Not this; optimise + Ok(result) } - - Ok(result) } diff --git a/src/executor/alter_row/mod.rs b/src/executor/alter_row/mod.rs index df060e39..4634d026 100644 --- a/src/executor/alter_row/mod.rs +++ b/src/executor/alter_row/mod.rs @@ -5,13 +5,4 @@ mod update; mod validate; mod validate_unique; -pub use { - delete::delete, - insert::insert, - update::update, - validate::{columns_to_positions, validate, ValidateError}, - validate_unique::validate_unique, -}; - -#[cfg(feature = "auto-increment")] -pub use auto_increment::auto_increment; +pub use validate::{columns_to_positions, validate, ValidateError}; diff --git a/src/executor/alter_row/update.rs b/src/executor/alter_row/update.rs index 8d14a1b5..dae9b0f5 100644 --- a/src/executor/alter_row/update.rs +++ b/src/executor/alter_row/update.rs @@ -1,125 +1,132 @@ use { - super::{auto_increment, columns_to_positions, validate, validate_unique}, + super::{columns_to_positions, validate}, crate::{ data::{get_name, Schema}, executor::types::{ColumnInfo, Row as VecRow}, - Column, Context, ExecuteError, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, Result, - Row, StorageInner, Value, + Column, ExecuteError, Glue, MetaRecipe, Payload, PlannedRecipe, RecipeUtilities, Result, + Row, Value, }, sqlparser::ast::{Assignment, Expr, TableFactor, TableWithJoins}, }; -pub async fn update( - storage: &mut StorageInner, - context: &Context, - table: &TableWithJoins, - selection: &Option, - assignments: &[Assignment], -) -> Result { - // TODO: Complex updates (joins) - let table = match &table.relation { - TableFactor::Table { name, .. } => get_name(name).cloned(), - _ => Err(ExecuteError::QueryNotSupported.into()), - }?; - let Schema { - column_defs, - indexes, - .. - } = storage - .fetch_schema(&table) - .await? - .ok_or(ExecuteError::TableNotExists)?; +impl Glue { + pub async fn update( + &mut self, + table: &TableWithJoins, + selection: &Option, + assignments: &[Assignment], + ) -> Result { + // TODO: Complex updates (joins) + let table = match &table.relation { + TableFactor::Table { name, .. } => get_name(name).cloned(), + _ => Err(ExecuteError::QueryNotSupported.into()), + }?; + let Schema { + column_defs, + indexes, + .. + } = self + .get_database(&None)? + .fetch_schema(&table) + .await? + .ok_or(ExecuteError::TableNotExists)?; - let columns = column_defs - .clone() - .into_iter() - .map(|Column { name, .. }| ColumnInfo::of_name(name)) - .collect::>(); + let columns = column_defs + .clone() + .into_iter() + .map(|Column { name, .. }| ColumnInfo::of_name(name)) + .collect::>(); - let filter = selection - .clone() - .map(|selection| { - PlannedRecipe::new( - MetaRecipe::new(selection)?.simplify_by_context(context)?, - &columns, - ) - }) - .unwrap_or(Ok(PlannedRecipe::TRUE))?; + let filter = selection + .clone() + .map(|selection| { + PlannedRecipe::new( + MetaRecipe::new(selection)?.simplify_by_context(&*self.get_context()?)?, + &columns, + ) + }) + .unwrap_or(Ok(PlannedRecipe::TRUE))?; - let assignments = assignments - .iter() - .map(|assignment| { - let Assignment { id, value } = assignment; - let column_compare = id - .clone() - .into_iter() - .map(|component| component.value) - .collect(); - let index = columns - .iter() - .position(|column| column == &column_compare) - .ok_or(ExecuteError::ColumnNotFound)?; - let recipe = PlannedRecipe::new( - MetaRecipe::new(value.clone())?.simplify_by_context(context)?, - &columns, - )?; - Ok((index, recipe)) - }) - .collect::>>()?; + let assignments = assignments + .iter() + .map(|assignment| { + let Assignment { id, value } = assignment; + let column_compare = id + .clone() + .into_iter() + .map(|component| component.value) + .collect(); + let index = columns + .iter() + .position(|column| column == &column_compare) + .ok_or(ExecuteError::ColumnNotFound)?; + let recipe = PlannedRecipe::new( + MetaRecipe::new(value.clone())?.simplify_by_context(&*self.get_context()?)?, + &columns, + )?; + Ok((index, recipe)) + }) + .collect::>>()?; - let keyed_rows = storage - .scan_data(&table) - .await? - .into_iter() - .filter_map(|row_result| { - let (key, row) = match row_result { - Ok(keyed_row) => keyed_row, - Err(error) => return Some(Err(error)), - }; + let keyed_rows = self + .get_database(&None)? + .scan_data(&table) + .await? + .into_iter() + .filter_map(|row_result| { + let (key, row) = match row_result { + Ok(keyed_row) => keyed_row, + Err(error) => return Some(Err(error)), + }; - let row = row.0; + let row = row.0; - let confirm_constraint = filter.confirm_constraint(&row); - if let Ok(false) = confirm_constraint { - return None; - } else if let Err(error) = confirm_constraint { - return Some(Err(error)); - } - let row = row - .iter() - .enumerate() - .map(|(index, old_value)| { - assignments - .iter() - .find(|(assignment_index, _)| assignment_index == &index) - .map(|(_, assignment_recipe)| { - assignment_recipe.clone().simplify_by_row(&row)?.confirm() - }) - .unwrap_or_else(|| Ok(old_value.clone())) - }) - .collect::>(); - Some(row.map(|row| (key, row))) - }) - .collect::>>()?; + let confirm_constraint = filter.confirm_constraint(&row); + if let Ok(false) = confirm_constraint { + return None; + } else if let Err(error) = confirm_constraint { + return Some(Err(error)); + } + let row = row + .iter() + .enumerate() + .map(|(index, old_value)| { + assignments + .iter() + .find(|(assignment_index, _)| assignment_index == &index) + .map(|(_, assignment_recipe)| { + assignment_recipe.clone().simplify_by_row(&row)?.confirm() + }) + .unwrap_or_else(|| Ok(old_value.clone())) + }) + .collect::>(); + Some(row.map(|row| (key, row))) + }) + .collect::>>()?; - let column_positions = columns_to_positions(&column_defs, &[])?; - let (keys, mut rows): (Vec, Vec) = keyed_rows.into_iter().unzip(); - validate(&column_defs, &column_positions, &mut rows)?; + let column_positions = columns_to_positions(&column_defs, &[])?; + let (keys, mut rows): (Vec, Vec) = keyed_rows.into_iter().unzip(); + validate(&column_defs, &column_positions, &mut rows)?; - let table = table.as_str(); - let mut rows: Vec = rows.into_iter().map(Row).collect(); - #[cfg(feature = "auto-increment")] - auto_increment(&mut *storage, table, &column_defs, &mut rows).await?; - validate_unique(&*storage, table, &column_defs, &rows, Some(&keys)).await?; - let keyed_rows: Vec<(Value, Row)> = keys.into_iter().zip(rows).collect(); - let num_rows = keyed_rows.len(); - let result = storage - .update_data(table, keyed_rows) - .await - .map(|_| Payload::Update(num_rows))?; + let table = table.as_str(); + let mut rows: Vec = rows.into_iter().map(Row).collect(); + #[cfg(feature = "auto-increment")] + self.auto_increment(table, &column_defs, &mut rows).await?; + self.validate_unique(table, &column_defs, &rows, Some(&keys)) + .await?; + let keyed_rows: Vec<(Value, Row)> = keys.into_iter().zip(rows).collect(); + let num_rows = keyed_rows.len(); - for index in indexes.iter() { - index.reset(storage, table, &column_defs).await?; // TODO: Not this; optimise + let database = &mut **self.get_mut_database(&None)?; + + let result = database + .update_data(table, keyed_rows) + .await + .map(|_| Payload::Update(num_rows))?; + + for index in indexes.iter() { + index.reset(database, table, &column_defs).await?; // TODO: Not this; optimise + } + Ok(result) } - Ok(result) } diff --git a/src/executor/alter_row/validate_unique.rs b/src/executor/alter_row/validate_unique.rs index 4988ba06..dd9568b8 100644 --- a/src/executor/alter_row/validate_unique.rs +++ b/src/executor/alter_row/validate_unique.rs @@ -1,5 +1,5 @@ use { - crate::{Column, NullOrd, Result, Row, StorageInner, ValidateError, Value}, + crate::{Column, Glue, NullOrd, Result, Row, ValidateError, Value}, std::cmp::Ordering, }; @@ -20,137 +20,143 @@ macro_rules! some_or { }; } -pub async fn validate_unique( - storage: &StorageInner, - table_name: &str, - column_defs: &[Column], - rows: &[Row], - ignore_keys: Option<&[Value]>, -) -> Result<()> { - let unique_columns: Vec = column_defs - .iter() - .enumerate() - .filter_map(|(index, column_def)| { - if column_def.is_unique { - Some(index) - } else { - None - } - }) - .collect(); - let mut existing_values: Vec> = vec![vec![]; unique_columns.len()]; +impl Glue { + pub(crate) async fn validate_unique( + &self, + table_name: &str, + column_defs: &[Column], + rows: &[Row], + ignore_keys: Option<&[Value]>, + ) -> Result<()> { + let unique_columns: Vec = column_defs + .iter() + .enumerate() + .filter_map(|(index, column_def)| { + if column_def.is_unique { + Some(index) + } else { + None + } + }) + .collect(); + let mut existing_values: Vec> = vec![vec![]; unique_columns.len()]; - storage - .scan_data(table_name) - .await? - .try_for_each::<_, Result<_>>(|result| { - let (key, row) = result?; - if let Some(ignore_keys) = ignore_keys { - if ignore_keys.iter().any(|ignore_key| ignore_key == &key) { - return Ok(()); + self.get_database(&None)? + .scan_data(table_name) + .await? + .try_for_each::<_, Result<_>>(|result| { + let (key, row) = result?; + if let Some(ignore_keys) = ignore_keys { + if ignore_keys.iter().any(|ignore_key| ignore_key == &key) { + return Ok(()); + } } - } - let row = row.0; + let row = row.0; + unique_columns + .iter() + .enumerate() + .map(|(index, row_index)| { + existing_values + .get_mut(index)? + .push(row.get(*row_index)?.clone()); + Some(()) + }) + .collect::>() + .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) + })?; + + let mut new_values: Vec> = vec![vec![]; unique_columns.len()]; + rows.iter().try_for_each::<_, Result<_>>(|row| { unique_columns .iter() .enumerate() .map(|(index, row_index)| { - existing_values + new_values .get_mut(index)? - .push(row.get(*row_index)?.clone()); + .push(row.0.get(*row_index)?.clone()); Some(()) }) .collect::>() .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) })?; + let mut existing_values_iter = existing_values.into_iter(); + new_values + .into_iter() + .map(|mut new_values| { + let mut existing_values = existing_values_iter.next()?; - let mut new_values: Vec> = vec![vec![]; unique_columns.len()]; - rows.iter().try_for_each::<_, Result<_>>(|row| { - unique_columns - .iter() - .enumerate() - .map(|(index, row_index)| { - new_values - .get_mut(index)? - .push(row.0.get(*row_index)?.clone()); - Some(()) - }) - .collect::>() - .ok_or_else(|| ValidateError::UnreachableUniqueValues.into()) - })?; - let mut existing_values_iter = existing_values.into_iter(); - new_values - .into_iter() - .map(|mut new_values| { - let mut existing_values = existing_values_iter.next()?; - - existing_values.sort_unstable_by(|value_l, value_r| { - value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) - }); - new_values.sort_unstable_by(|value_l, value_r| { - value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) - }); - - let mut existing_values = existing_values.into_iter(); - let mut new_values = new_values.into_iter(); + existing_values.sort_unstable_by(|value_l, value_r| { + value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) + }); + new_values.sort_unstable_by(|value_l, value_r| { + value_l.partial_cmp(value_r).unwrap_or(Ordering::Equal) + }); - let mut new_value = some_or_continue!(new_values.next()); - let mut existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); - } - new_value = new_new; - } - }); + let mut existing_values = existing_values.into_iter(); + let mut new_values = new_values.into_iter(); - loop { - match existing_value.null_cmp(&new_value) { - Some(Ordering::Equal) => { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())) - } - Some(Ordering::Greater) => { + let mut new_value = some_or_continue!(new_values.next()); + let mut existing_value = some_or!(existing_values.next(), { + loop { let new_new = some_or_continue!(new_values.next()); if new_new == new_value { return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); } new_value = new_new; } - Some(Ordering::Less) => { - existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err( - ValidateError::DuplicateEntryOnUniqueField.into() - )); - } - new_value = new_new; + }); + + loop { + match existing_value.null_cmp(&new_value) { + Some(Ordering::Equal) => { + return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())) + } + Some(Ordering::Greater) => { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some( + Err(ValidateError::DuplicateEntryOnUniqueField.into()), + ); } - }); - } - None => { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err(ValidateError::DuplicateEntryOnUniqueField.into())); + new_value = new_new; } - new_value = new_new; - existing_value = some_or!(existing_values.next(), { - loop { - let new_new = some_or_continue!(new_values.next()); - if new_new == new_value { - return Some(Err( - ValidateError::DuplicateEntryOnUniqueField.into() - )); + Some(Ordering::Less) => { + existing_value = some_or!(existing_values.next(), { + loop { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some(Err( + ValidateError::DuplicateEntryOnUniqueField.into(), + )); + } + new_value = new_new; } - new_value = new_new; + }); + } + None => { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some( + Err(ValidateError::DuplicateEntryOnUniqueField.into()), + ); } - }); + new_value = new_new; + existing_value = some_or!(existing_values.next(), { + loop { + let new_new = some_or_continue!(new_values.next()); + if new_new == new_value { + return Some(Err( + ValidateError::DuplicateEntryOnUniqueField.into(), + )); + } + new_value = new_new; + } + }); + } } } - } - }) - .collect::>>() - .ok_or(ValidateError::UnreachableUniqueValues)? + }) + .collect::>>() + .ok_or(ValidateError::UnreachableUniqueValues)? + } } diff --git a/src/executor/alter_table/alter_table.rs b/src/executor/alter_table/alter_table.rs index d3216b5c..5115e362 100644 --- a/src/executor/alter_table/alter_table.rs +++ b/src/executor/alter_table/alter_table.rs @@ -1,46 +1,49 @@ use { super::{validate, AlterError}, - crate::{data::get_name, Error, Result, StorageInner}, + crate::{data::get_name, Error, Glue, Result}, sqlparser::ast::{AlterTableOperation, ObjectName}, }; -pub async fn alter_table( - storage: &mut StorageInner, - name: &ObjectName, - operation: &AlterTableOperation, -) -> Result<()> { - let table_name = get_name(name).map_err(Error::from)?; +impl Glue { + pub async fn alter_table( + &mut self, + name: &ObjectName, + operation: &AlterTableOperation, + ) -> Result<()> { + let table_name = get_name(name).map_err(Error::from)?; + let database = &mut **self.get_mut_database(&None)?; - match operation { - AlterTableOperation::RenameTable { - table_name: new_table_name, - } => { - let new_table_name = get_name(new_table_name).map_err(Error::from)?; + match operation { + AlterTableOperation::RenameTable { + table_name: new_table_name, + } => { + let new_table_name = get_name(new_table_name).map_err(Error::from)?; - storage.rename_schema(table_name, new_table_name).await - } - AlterTableOperation::RenameColumn { - old_column_name, - new_column_name, - } => { - storage - .rename_column(table_name, &old_column_name.value, &new_column_name.value) - .await - } - AlterTableOperation::AddColumn { column_def } => { - validate(column_def).map_err(Error::from)?; + database.rename_schema(table_name, new_table_name).await + } + AlterTableOperation::RenameColumn { + old_column_name, + new_column_name, + } => { + database + .rename_column(table_name, &old_column_name.value, &new_column_name.value) + .await + } + AlterTableOperation::AddColumn { column_def } => { + validate(column_def).map_err(Error::from)?; - storage.add_column(table_name, &column_def.into()).await - } - AlterTableOperation::DropColumn { - column_name, - if_exists, - .. - } => { - storage - .drop_column(table_name, &column_name.value, *if_exists) - .await + database.add_column(table_name, &column_def.into()).await + } + AlterTableOperation::DropColumn { + column_name, + if_exists, + .. + } => { + database + .drop_column(table_name, &column_name.value, *if_exists) + .await + } + _ => Err(AlterError::UnsupportedAlterTableOperation(operation.to_string()).into()), } - _ => Err(AlterError::UnsupportedAlterTableOperation(operation.to_string()).into()), } } diff --git a/src/executor/alter_table/create_index.rs b/src/executor/alter_table/create_index.rs index a0b2477d..19314e9a 100644 --- a/src/executor/alter_table/create_index.rs +++ b/src/executor/alter_table/create_index.rs @@ -1,69 +1,72 @@ use { - crate::{data::get_name, AlterError, ExecuteError, Index, Result, StorageInner}, + crate::{data::get_name, AlterError, ExecuteError, Glue, Index, Result}, sqlparser::ast::{Expr, ObjectName, OrderByExpr}, }; -pub async fn create_index( - storage: &mut StorageInner, - table: &ObjectName, - name: &ObjectName, - columns: &[OrderByExpr], - unique: bool, - if_not_exists: bool, -) -> Result<()> { - let name = name - .0 - .last() - .ok_or(ExecuteError::QueryNotSupported)? - .value - .clone(); +impl Glue { + pub async fn create_index( + &mut self, + table: &ObjectName, + name: &ObjectName, + columns: &[OrderByExpr], + unique: bool, + if_not_exists: bool, + ) -> Result<()> { + let name = name + .0 + .last() + .ok_or(ExecuteError::QueryNotSupported)? + .value + .clone(); - let table_name = get_name(table)?; + let table_name = get_name(table)?; + let database = &mut **self.get_mut_database(&None)?; - let schema = storage - .fetch_schema(table_name) - .await? - .ok_or(ExecuteError::TableNotExists)?; + let schema = database + .fetch_schema(table_name) + .await? + .ok_or(ExecuteError::TableNotExists)?; - if schema.indexes.iter().any(|index| index.name == name) { - if !if_not_exists { - Err(AlterError::AlreadyExists(name).into()) + if schema.indexes.iter().any(|index| index.name == name) { + if !if_not_exists { + Err(AlterError::AlreadyExists(name).into()) + } else { + Ok(()) + } } else { - Ok(()) - } - } else { - let mut columns = columns.iter(); - let column = columns.next().and_then(|column| match column.expr.clone() { - Expr::Identifier(ident) => Some(ident.value), - _ => None, - }); - if columns.next().is_some() { - Err(AlterError::UnsupportedNumberOfIndexColumns(name).into()) - } else if column - .as_ref() - .and_then(|column| { - schema - .column_defs - .iter() - .find(|column_def| &column_def.name == column) - }) - .is_none() - { - Err(AlterError::ColumnNotFound( - table_name.clone(), - column.unwrap_or_else(|| String::from("NILL")), - ) - .into()) - } else if let Some(column) = column { - let mut schema = schema.clone(); - let index = Index::new(name, column, unique); - index - .reset(storage, table_name, &schema.column_defs) - .await?; - schema.indexes.push(index); - storage.replace_schema(table_name, schema).await - } else { - unreachable!() + let mut columns = columns.iter(); + let column = columns.next().and_then(|column| match column.expr.clone() { + Expr::Identifier(ident) => Some(ident.value), + _ => None, + }); + if columns.next().is_some() { + Err(AlterError::UnsupportedNumberOfIndexColumns(name).into()) + } else if column + .as_ref() + .and_then(|column| { + schema + .column_defs + .iter() + .find(|column_def| &column_def.name == column) + }) + .is_none() + { + Err(AlterError::ColumnNotFound( + table_name.clone(), + column.unwrap_or_else(|| String::from("NILL")), + ) + .into()) + } else if let Some(column) = column { + let mut schema = schema.clone(); + let index = Index::new(name, column, unique); + index + .reset(database, table_name, &schema.column_defs) + .await?; + schema.indexes.push(index); + database.replace_schema(table_name, schema).await + } else { + unreachable!() + } } } } diff --git a/src/executor/alter_table/create_table.rs b/src/executor/alter_table/create_table.rs index 2bcae047..c3fae557 100644 --- a/src/executor/alter_table/create_table.rs +++ b/src/executor/alter_table/create_table.rs @@ -2,30 +2,33 @@ use { super::AlterError, crate::{ data::{get_name, Schema}, - Column, Result, StorageInner, + Column, Glue, Result, }, sqlparser::ast::{ColumnDef, ObjectName}, }; -pub async fn create_table( - storage: &mut StorageInner, - name: &ObjectName, - column_defs: &[ColumnDef], - if_not_exists: bool, -) -> Result<()> { - let schema = Schema { - table_name: get_name(name)?.to_string(), - column_defs: column_defs.iter().cloned().map(Column::from).collect(), - indexes: vec![], - }; +impl Glue { + pub async fn create_table( + &mut self, + name: &ObjectName, + column_defs: &[ColumnDef], + if_not_exists: bool, + ) -> Result<()> { + let schema = Schema { + table_name: get_name(name)?.to_string(), + column_defs: column_defs.iter().cloned().map(Column::from).collect(), + indexes: vec![], + }; - if storage.fetch_schema(&schema.table_name).await?.is_some() { - if !if_not_exists { - Err(AlterError::TableAlreadyExists(schema.table_name.to_owned()).into()) + let database = &mut **self.get_mut_database(&None)?; + if database.fetch_schema(&schema.table_name).await?.is_some() { + if !if_not_exists { + Err(AlterError::TableAlreadyExists(schema.table_name.to_owned()).into()) + } else { + Ok(()) + } } else { - Ok(()) + database.insert_schema(&schema).await } - } else { - storage.insert_schema(&schema).await } } diff --git a/src/executor/alter_table/drop.rs b/src/executor/alter_table/drop.rs index 2aba9501..1898cce1 100644 --- a/src/executor/alter_table/drop.rs +++ b/src/executor/alter_table/drop.rs @@ -1,51 +1,54 @@ use { super::AlterError, - crate::{data::get_name, Result, StorageInner, ValueDefault}, + crate::{data::get_name, Glue, Result, StorageInner, ValueDefault}, futures::stream::{self, TryStreamExt}, sqlparser::ast::{ObjectName, ObjectType}, }; -pub async fn drop( - storage: &mut StorageInner, - object_type: &ObjectType, - names: &[ObjectName], - if_exists: bool, -) -> Result<()> { - if object_type != &ObjectType::Table { - return Err(AlterError::DropTypeNotSupported(object_type.to_string()).into()); - } +impl Glue { + pub async fn drop( + &mut self, + object_type: &ObjectType, + names: &[ObjectName], + if_exists: bool, + ) -> Result<()> { + let database = &mut **self.get_mut_database(&None)?; + if object_type != &ObjectType::Table { + return Err(AlterError::DropTypeNotSupported(object_type.to_string()).into()); + } - stream::iter(names.iter().map(Ok)) - .try_fold(storage, |storage, table_name| async move { - let table_name = get_name(table_name)?; - let schema = storage.fetch_schema(table_name).await?; + stream::iter(names.iter().map(Ok)) + .try_fold(database, |database, table_name| async move { + let table_name = get_name(table_name)?; + let schema = database.fetch_schema(table_name).await?; - if schema.is_none() { - if !if_exists { - return Err(AlterError::TableNotFound(table_name.to_owned()).into()); - } else { - return Ok(storage); + if schema.is_none() { + if !if_exists { + return Err(AlterError::TableNotFound(table_name.to_owned()).into()); + } else { + return Ok(database); + } } - } - #[cfg(feature = "auto-increment")] - let result: Result<&mut StorageInner> = - stream::iter(schema.unwrap().column_defs.into_iter().map(Ok)) - .try_fold(storage, |storage, column| async move { - if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { - storage - .set_increment_value(table_name, &column.name, 1_i64) - .await?; - } - Ok(storage) - }) - .await; + #[cfg(feature = "auto-increment")] + let result: Result<&mut StorageInner> = + stream::iter(schema.unwrap().column_defs.into_iter().map(Ok)) + .try_fold(database, |database, column| async move { + if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { + database + .set_increment_value(table_name, &column.name, 1_i64) + .await?; + } + Ok(database) + }) + .await; - #[cfg(feature = "auto-increment")] - let storage = result?; + #[cfg(feature = "auto-increment")] + let database = result?; - storage.delete_schema(table_name).await?; - Ok(storage) - }) - .await - .map(|_| ()) + database.delete_schema(table_name).await?; + Ok(database) + }) + .await + .map(|_| ()) + } } diff --git a/src/executor/alter_table/mod.rs b/src/executor/alter_table/mod.rs index 534f1fcd..7f7d9fc3 100644 --- a/src/executor/alter_table/mod.rs +++ b/src/executor/alter_table/mod.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "alter-table")] -#[allow(clippy::module_inception)] // TODO mod alter_table; mod create_index; mod create_table; @@ -7,11 +5,5 @@ mod drop; mod error; mod truncate; mod validate; +pub use error::AlterError; use validate::validate; - -#[cfg(feature = "alter-table")] -pub use alter_table::alter_table; -pub use { - create_index::create_index, create_table::create_table, drop::drop, error::AlterError, - truncate::truncate, -}; diff --git a/src/executor/alter_table/truncate.rs b/src/executor/alter_table/truncate.rs index e328b1ec..97cd8e70 100644 --- a/src/executor/alter_table/truncate.rs +++ b/src/executor/alter_table/truncate.rs @@ -1,35 +1,38 @@ use { - crate::{data::get_name, AlterError, Result, StorageInner, ValueDefault}, + crate::{data::get_name, AlterError, Glue, Result, StorageInner, ValueDefault}, futures::stream::{self, TryStreamExt}, sqlparser::ast::ObjectName, }; -pub async fn truncate(storage: &mut StorageInner, table_name: &ObjectName) -> Result<()> { - let table_name = get_name(table_name)?; - let schema = storage.fetch_schema(table_name).await?; +impl Glue { + pub async fn truncate(&mut self, table_name: &ObjectName) -> Result<()> { + let database = &mut **self.get_mut_database(&None)?; + let table_name = get_name(table_name)?; + let schema = database.fetch_schema(table_name).await?; - if let Some(schema) = schema { - // TODO: We should be deleting the entry - #[cfg(feature = "auto-increment")] - let result: Result<&mut StorageInner> = stream::iter(schema.column_defs.iter().map(Ok)) - .try_fold(storage, |storage, column| async move { - if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { - storage - .set_increment_value(table_name, &column.name, 1_i64) - .await?; - } - Ok(storage) - }) - .await; + if let Some(schema) = schema { + // TODO: We should be deleting the entry + #[cfg(feature = "auto-increment")] + let result: Result<&mut StorageInner> = stream::iter(schema.column_defs.iter().map(Ok)) + .try_fold(database, |database, column| async move { + if matches!(column.default, Some(ValueDefault::AutoIncrement(_))) { + database + .set_increment_value(table_name, &column.name, 1_i64) + .await?; + } + Ok(database) + }) + .await; - #[cfg(feature = "auto-increment")] - let storage = result?; + #[cfg(feature = "auto-increment")] + let database = result?; - // TODO: Maybe individual "truncate" operation - storage.delete_schema(table_name).await?; // TODO: !!! This will delete INDEXes which it shouldn't! - storage.insert_schema(&schema).await?; - Ok(()) - } else { - Err(AlterError::TableNotFound(table_name.to_owned()).into()) + // TODO: Maybe individual "truncate" operation + database.delete_schema(table_name).await?; // TODO: !!! This will delete INDEXes which it shouldn't! + database.insert_schema(&schema).await?; + Ok(()) + } else { + Err(AlterError::TableNotFound(table_name.to_owned()).into()) + } } } diff --git a/src/executor/execute.rs b/src/executor/execute.rs index 6b12db9c..5a493457 100644 --- a/src/executor/execute.rs +++ b/src/executor/execute.rs @@ -1,20 +1,11 @@ use { - super::{ - alter_row::{delete, insert, update}, - alter_table::{create_index, create_table, drop, truncate}, - other::explain, - query::query, - }, - crate::{glue::Context, parse_sql::Query, Result, Row, StorageInner, Value}, + crate::{parse_sql::Query, Glue, Result, Row, Value}, serde::Serialize, sqlparser::ast::{SetVariableValue, Statement}, std::convert::TryInto, thiserror::Error as ThisError, }; -#[cfg(feature = "alter-table")] -use super::alter_table::alter_table; - #[derive(ThisError, Serialize, Debug, PartialEq)] pub enum ExecuteError { #[error("query not supported")] @@ -61,101 +52,97 @@ pub enum Payload { TruncateTable, } -pub async fn execute( - mut storages: Vec<(String, &mut StorageInner)>, - context: &mut Context, - statement: &Query, -) -> Result { - let Query(statement) = statement; +impl Glue { + pub async fn execute_query(&mut self, statement: &Query) -> Result { + let Query(statement) = statement; - match statement { - //- Modification - //-- Tables - Statement::CreateTable { - name, - columns, - if_not_exists, - .. - } => create_table(storages[0].1, name, columns, *if_not_exists) - .await - .map(|_| Payload::Create), - Statement::Drop { - object_type, - names, - if_exists, - .. - } => drop(storages[0].1, object_type, names, *if_exists) - .await - .map(|_| Payload::DropTable), - #[cfg(feature = "alter-table")] - Statement::AlterTable { name, operation } => alter_table(storages[0].1, name, operation) - .await - .map(|_| Payload::AlterTable), - Statement::Truncate { table_name, .. } => truncate(storages[0].1, table_name) - .await - .map(|_| Payload::TruncateTable), - Statement::CreateIndex { - name, - table_name, - columns, - unique, - if_not_exists, - } => create_index( - storages[0].1, - table_name, - name, - columns, - *unique, - *if_not_exists, - ) - .await - .map(|_| Payload::Create), + match statement { + //- Modification + //-- Tables + Statement::CreateTable { + name, + columns, + if_not_exists, + .. + } => self + .create_table(name, columns, *if_not_exists) + .await + .map(|_| Payload::Create), + Statement::Drop { + object_type, + names, + if_exists, + .. + } => self + .drop(object_type, names, *if_exists) + .await + .map(|_| Payload::DropTable), + #[cfg(feature = "alter-table")] + Statement::AlterTable { name, operation } => self + .alter_table(name, operation) + .await + .map(|_| Payload::AlterTable), + Statement::Truncate { table_name, .. } => self + .truncate(table_name) + .await + .map(|_| Payload::TruncateTable), + Statement::CreateIndex { + name, + table_name, + columns, + unique, + if_not_exists, + } => self + .create_index(table_name, name, columns, *unique, *if_not_exists) + .await + .map(|_| Payload::Create), - //-- Rows - Statement::Insert { - table_name, - columns, - source, - .. - } => insert(&mut storages, context, table_name, columns, source, false).await, - Statement::Update { - table, - selection, - assignments, - // TODO - from: _, - } => update(storages[0].1, context, table, selection, assignments).await, - Statement::Delete { - table_name, - selection, - } => delete(&mut storages, context, table_name, selection).await, + //-- Rows + Statement::Insert { + table_name, + columns, + source, + .. + } => self.insert(table_name, columns, source, false).await, + Statement::Update { + table, + selection, + assignments, + // TODO + from: _, + } => self.update(table, selection, assignments).await, + Statement::Delete { + table_name, + selection, + } => self.delete(table_name, selection).await, - //- Selection - Statement::Query(query_value) => { - let result = query(&mut storages, context, *query_value.clone()).await?; - let (labels, rows) = result; - let rows = rows.into_iter().map(Row).collect(); // I don't like this. TODO - let payload = Payload::Select { labels, rows }; - Ok(payload) - } + //- Selection + Statement::Query(query_value) => { + let result = self.query(*query_value.clone()).await?; + let (labels, rows) = result; + let rows = rows.into_iter().map(Row).collect(); // I don't like this. TODO + let payload = Payload::Select { labels, rows }; + Ok(payload) + } - //- Context - Statement::SetVariable { - variable, value, .. - } => { - let first_value = value.get(0).unwrap(); // Why might one want anything else? - let value: Value = match first_value { - SetVariableValue::Ident(..) => unimplemented!(), - SetVariableValue::Literal(literal) => literal.try_into()?, - }; - let name = variable.value.clone(); - context.set_variable(name, value); - Ok(Payload::Success) - } + //- Context + Statement::SetVariable { + variable, value, .. + } => { + let first_value = value.get(0).unwrap(); // Why might one want anything else? + let value: Value = match first_value { + SetVariableValue::Ident(..) => unimplemented!(), + SetVariableValue::Literal(literal) => literal.try_into()?, + }; + let name = variable.value.clone(); + self.get_mut_context()?.set_variable(name, value); + Ok(Payload::Success) + } - Statement::ExplainTable { table_name, .. } => explain(&storages, table_name).await, + Statement::ExplainTable { table_name, .. } => self.explain(table_name).await, - Statement::CreateDatabase { .. } => unreachable!(), // Handled at Glue interface // TODO: Clean up somehow - _ => Err(ExecuteError::QueryNotSupported.into()), + Statement::CreateDatabase { .. } => unreachable!(), // Handled at Glue interface // TODO: Clean up somehow + _ => Err(ExecuteError::QueryNotSupported.into()), + } } } diff --git a/src/executor/mod.rs b/src/executor/mod.rs index bf560933..51fd34cf 100644 --- a/src/executor/mod.rs +++ b/src/executor/mod.rs @@ -10,7 +10,7 @@ mod types; pub use { alter_row::ValidateError, alter_table::AlterError, - execute::{execute, ExecuteError, Payload}, + execute::{ExecuteError, Payload}, fetch::FetchError, query::{JoinError, ManualError, PlanError, QueryError, SelectError}, recipe::*, diff --git a/src/executor/other/explain.rs b/src/executor/other/explain.rs index 3e392dab..6c69f635 100644 --- a/src/executor/other/explain.rs +++ b/src/executor/other/explain.rs @@ -1,91 +1,92 @@ -use crate::{ExecuteError, Payload, Row, Schema, Value}; -use crate::{Result, StorageInner}; +use crate::{ExecuteError, Payload, Row, Schema, StorageInner, Value}; +use crate::{Glue, Result}; use sqlparser::ast::ObjectName; -pub(crate) async fn explain( - storages: &[(String, &mut StorageInner)], - object: &ObjectName, -) -> Result { - println!("{:?}", object); - - let mut name_vec = object.0.clone(); - let (store_name, opt_table_name) = match name_vec.len() { - 2 => (name_vec.remove(0).value, Some(name_vec.remove(0).value)), - 1 => { - let name = name_vec.remove(0).value; - if name == "ALL" { - let databases = storages - .iter() - .map(|(name, _)| Row(vec![name.clone().into()])) - .collect(); - return Ok(Payload::Select { - labels: vec![String::from("database")], - rows: databases, - }); - } - if name == "ALL_TABLE" { - let mut tables = vec![]; - for (name, store) in storages.iter() { - tables.extend( - get_tables(store) - .await? - .into_iter() - .map(|table| Row(vec![name.clone().into(), table])), - ); +impl Glue { + pub async fn explain(&self, object: &ObjectName) -> Result { + let mut name_vec = object.0.clone(); + let (store_name, opt_table_name) = match name_vec.len() { + 2 => ( + Some(name_vec.remove(0).value), + Some(name_vec.remove(0).value), + ), + 1 => { + let name = name_vec.remove(0).value; + if name == "ALL" { + let databases: Vec = self + .get_database_list() + .into_iter() + .map(|name| Row(vec![name.clone().into()])) + .collect(); + return Ok(Payload::Select { + labels: vec![String::from("database")], + rows: databases, + }); + } + if name == "ALL_TABLE" { + let mut tables = vec![]; + for db_name in self.get_database_list().into_iter() { + tables.extend( + self.get_database(&Some(db_name.clone()))? + .get_tables() + .await? + .iter() + .map(|table| Row(vec![db_name.clone().into(), table.clone()])), + ); + } + return Ok(Payload::Select { + labels: vec![String::from("database"), String::from("table")], + rows: tables, + }); + } else if self.get_database_list().contains(&&name) { + (Some(name), None) + } else { + (None, Some(name)) } - return Ok(Payload::Select { - labels: vec![String::from("database"), String::from("table")], - rows: tables, - }); - } else if storages.iter().any(|(store, _)| store == &name) { - (name, None) - } else { - (storages[0].0.clone(), Some(name)) } - } - _ => return Err(ExecuteError::ObjectNotRecognised.into()), - }; + _ => return Err(ExecuteError::ObjectNotRecognised.into()), + }; - let store = storages - .iter() - .find_map(|(name, store)| (name == &store_name).then(|| store)) - .ok_or(ExecuteError::ObjectNotRecognised)?; - if let Some(table_name) = opt_table_name { - let Schema { column_defs, .. } = store - .fetch_schema(&table_name) - .await? - .ok_or(ExecuteError::ObjectNotRecognised)?; - let columns = column_defs - .iter() - .map(|column| { - ( - column.name.clone().into(), - column.data_type.to_string().into(), - ) - }) - .map(|(name, data_type)| Row(vec![name, data_type])) - .collect(); - Ok(Payload::Select { - labels: vec![String::from("column"), String::from("data_type")], - rows: columns, - }) - } else { - Ok(Payload::Select { - labels: vec![String::from("table")], - rows: get_tables(store) + let database = self.get_database(&store_name)?; + if let Some(table_name) = opt_table_name { + let Schema { column_defs, .. } = database + .fetch_schema(&table_name) .await? - .into_iter() - .map(|table| Row(vec![table])) - .collect(), - }) + .ok_or(ExecuteError::ObjectNotRecognised)?; + let columns = column_defs + .iter() + .map(|column| { + ( + column.name.clone().into(), + column.data_type.to_string().into(), + ) + }) + .map(|(name, data_type)| Row(vec![name, data_type])) + .collect(); + Ok(Payload::Select { + labels: vec![String::from("column"), String::from("data_type")], + rows: columns, + }) + } else { + Ok(Payload::Select { + labels: vec![String::from("table")], + rows: database + .get_tables() + .await? + .into_iter() + .map(|table| Row(vec![table])) + .collect(), + }) + } } } - -async fn get_tables(store: &&mut StorageInner) -> Result> { - Ok(store - .scan_schemas() - .await? - .into_iter() - .map(|Schema { table_name, .. }| table_name.into()) - .collect()) +impl StorageInner { + async fn get_tables(&self) -> Result> { + Ok(self + .scan_schemas() + .await? + .into_iter() + .map(|Schema { table_name, .. }| table_name.into()) + .collect()) + } } diff --git a/src/executor/other/mod.rs b/src/executor/other/mod.rs index 6e89b9c4..3ab13279 100644 --- a/src/executor/other/mod.rs +++ b/src/executor/other/mod.rs @@ -1,2 +1 @@ mod explain; -pub(crate) use explain::*; diff --git a/src/executor/query/mod.rs b/src/executor/query/mod.rs index e1438539..0ee20c48 100644 --- a/src/executor/query/mod.rs +++ b/src/executor/query/mod.rs @@ -4,12 +4,11 @@ mod set_expr; pub use select::{join::*, ManualError, PlanError, SelectError}; use { crate::{ - executor::types::LabelsAndRows, result::Result, Cast, Context, MetaRecipe, RecipeUtilities, - StorageInner, Value, + executor::types::LabelsAndRows, result::Result, Cast, Glue, MetaRecipe, RecipeUtilities, + Value, }, async_recursion::async_recursion, serde::Serialize, - set_expr::from_body, sqlparser::ast::{Cte, Query, TableAlias, With}, thiserror::Error as ThisError, }; @@ -34,86 +33,83 @@ pub enum QueryError { OperationColumnsMisaligned, } -#[async_recursion(?Send)] -pub async fn query( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - query: Query, -) -> Result { - let Query { - body, - order_by, - limit, - offset, - with, - // TODO (below) - fetch: _, - lock: _, - } = query; - let limit: Option = limit - .map(|expression| { - MetaRecipe::new(expression)? - .simplify_by_context(context)? - .confirm_or_err(QueryError::MissingComponentsForLimit.into())? - .cast() - }) - .transpose()?; - let offset: Option = offset - .map(|offset| { - MetaRecipe::new(offset.value)? - .simplify_by_context(context)? - .confirm_or_err(QueryError::MissingComponentsForOffset.into())? - .cast() - }) - .transpose()?; +impl Glue { + #[async_recursion(?Send)] + pub async fn query(&mut self, query: Query) -> Result { + let Query { + body, + order_by, + limit, + offset, + with, + // TODO (below) + fetch: _, + lock: _, + } = query; - let mut context = context.clone(); - let context = &mut context; // We don't actually want to pass on any changes from here - if let Some(with) = with { - let With { - recursive: _, // Recursive not currently supported - cte_tables, - } = with; - for cte in cte_tables.into_iter() { - let Cte { - alias, - query, - from: _, // What is `from` for? - } = cte; - let TableAlias { - name, - columns: _, // TODO: Columns - Check that number is same and then rename labels - } = alias; - let name = name.value; - let data = self::query(storages, context, query).await?; - context.set_table(name, data); + let limit: Option = limit + .map(|expression| { + MetaRecipe::new(expression)? + .simplify_by_context(&*self.get_context()?)? + .confirm_or_err(QueryError::MissingComponentsForLimit.into())? + .cast() + }) + .transpose()?; + let offset: Option = offset + .map(|offset| { + MetaRecipe::new(offset.value)? + .simplify_by_context(&*self.get_context()?)? + .confirm_or_err(QueryError::MissingComponentsForOffset.into())? + .cast() + }) + .transpose()?; + + if let Some(with) = with { + let With { + recursive: _, // Recursive not currently supported + cte_tables, + } = with; + for cte in cte_tables.into_iter() { + let Cte { + alias, + query, + from: _, // What is `from` for? + } = cte; + let TableAlias { + name, + columns: _, // TODO: Columns - Check that number is same and then rename labels + } = alias; + let name = name.value; + let data = self.query(query).await?; + self.get_mut_context()?.set_table(name, data); + } } - } - let (mut labels, mut rows) = from_body(storages, context, body, order_by).await?; + let (mut labels, mut rows) = self.from_body(body, order_by).await?; - if let Some(offset) = offset { - rows.drain(0..offset); - } - if let Some(limit) = limit { - rows.truncate(limit); - } - if ENSURE_SIZE { - let row_width = rows - .iter() - .map(|values_row| values_row.len()) - .max() - .unwrap_or(0); - if row_width > 0 { - rows = rows - .into_iter() - .map(|mut row| { - row.resize(row_width, Value::Null); - row - }) - .collect(); - labels.resize(row_width, String::new()) - }; + if let Some(offset) = offset { + rows.drain(0..offset); + } + if let Some(limit) = limit { + rows.truncate(limit); + } + if ENSURE_SIZE { + let row_width = rows + .iter() + .map(|values_row| values_row.len()) + .max() + .unwrap_or(0); + if row_width > 0 { + rows = rows + .into_iter() + .map(|mut row| { + row.resize(row_width, Value::Null); + row + }) + .collect(); + labels.resize(row_width, String::new()) + }; + } + Ok((labels, rows)) } - Ok((labels, rows)) } diff --git a/src/executor/query/select/join/execute.rs b/src/executor/query/select/join/execute.rs index dc903abe..3d428bc4 100644 --- a/src/executor/query/select/join/execute.rs +++ b/src/executor/query/select/join/execute.rs @@ -2,14 +2,14 @@ use { super::{JoinError, JoinMethod, JoinPlan, JoinType}, crate::{ executor::types::{ColumnInfo, Row}, - Context, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, + Glue, IndexFilter, Ingredient, MetaRecipe, Method, PlannedRecipe, Recipe, Result, StorageInner, Value, }, }; #[derive(Debug)] pub struct JoinExecute { - pub database: String, + pub database: Option, pub table: String, pub method: JoinMethod, pub join_type: JoinType, @@ -55,29 +55,13 @@ impl JoinExecute { .map(|result| result.map(|(_, row)| row.0)) .collect::>>() } - pub async fn execute<'a>( - self, - storages: &[(String, &mut StorageInner)], - context: &Context, - plane_rows: Vec, - ) -> Result> { - let rows = if let Some((.., context_table_rows)) = context.tables.get(&self.table) { - Ok(context_table_rows.clone()) - } else { - let storage = storages - .iter() - .find_map(|(name, storage)| { - if name == &self.database { - Some(&**storage) - } else { - None - } - }) - .or_else(|| storages.get(0).map(|(_, storage)| &**storage)) - .ok_or(JoinError::Unreachable)?; - - self.get_rows(storage).await - }?; + pub async fn execute<'a>(self, glue: &Glue, plane_rows: Vec) -> Result> { + let rows = + if let Some((.., context_table_rows)) = glue.get_context()?.tables.get(&self.table) { + Ok(context_table_rows.clone()) + } else { + self.get_rows(&**glue.get_database(&self.database)?).await + }?; self.method.run( &self.join_type, self.widths.0, diff --git a/src/executor/query/select/join/manual.rs b/src/executor/query/select/join/manual.rs index 060c997b..f0ce48b9 100644 --- a/src/executor/query/select/join/manual.rs +++ b/src/executor/query/select/join/manual.rs @@ -58,9 +58,9 @@ impl JoinManual { return Err(JoinError::UnimplementedNumberOfComponents.into()); } let database = if name_parts == 2 { - name.0.get(0).unwrap().value.clone() + Some(name.0.get(0).unwrap().value.clone()) } else { - String::new() + None }; let name = name.0.last().unwrap().value.clone(); let alias = alias.map(|alias| alias.name.value); diff --git a/src/executor/query/select/join/plan.rs b/src/executor/query/select/join/plan.rs index 7ac0de72..9af35cc3 100644 --- a/src/executor/query/select/join/plan.rs +++ b/src/executor/query/select/join/plan.rs @@ -6,14 +6,14 @@ use { types::{ColumnInfo, ComplexTableName}, MetaRecipe, }, - Context, JoinError, Result, StorageInner, + Glue, Result, }, std::cmp::Ordering, }; #[derive(Debug)] pub struct JoinPlan { - pub database: String, + pub database: Option, pub table: String, pub columns: Vec, pub join_type: JoinType, @@ -38,17 +38,13 @@ impl Ord for JoinPlan { } impl JoinPlan { - pub async fn new<'a>( - join_manual: JoinManual, - storages: &[(String, &mut StorageInner)], - context: &Context, - ) -> Result { + pub async fn new<'a>(join_manual: JoinManual, glue: &Glue) -> Result { let JoinManual { table, constraint, join_type, } = join_manual; - let columns = get_columns(storages, table.clone(), context).await?; + let columns = get_columns(glue, table.clone()).await?; let ComplexTableName { database, name: table, @@ -89,12 +85,8 @@ impl JoinPlan { } } -async fn get_columns( - storages: &[(String, &mut StorageInner)], - table: ComplexTableName, - context: &Context, -) -> Result> { - if let Some((context_table_labels, ..)) = context.tables.get(&table.name) { +async fn get_columns(glue: &Glue, table: ComplexTableName) -> Result> { + if let Some((context_table_labels, ..)) = glue.get_context()?.tables.get(&table.name) { Ok(context_table_labels .iter() .map(|name| ColumnInfo { @@ -104,18 +96,6 @@ async fn get_columns( }) .collect::>()) } else { - let storage = storages - .iter() - .find_map(|(name, storage)| { - if name == &table.database { - Some(&**storage) - } else { - None - } - }) - .or_else(|| storages.get(0).map(|(_, storage)| &**storage)) - .ok_or_else(|| JoinError::TableNotFound(table.clone()))?; - - fetch_columns(storage, table).await + fetch_columns(&**glue.get_database(&table.database)?, table).await } } diff --git a/src/executor/query/select/mod.rs b/src/executor/query/select/mod.rs index a76f06f8..af0e8e22 100644 --- a/src/executor/query/select/mod.rs +++ b/src/executor/query/select/mod.rs @@ -10,7 +10,7 @@ use { PlannedRecipe, }, macros::try_option, - Context, RecipeUtilities, Result, StorageInner, Value, + Glue, RecipeUtilities, Result, Value, }, futures::stream::{self, StreamExt, TryStreamExt}, rayon::prelude::*, @@ -41,107 +41,110 @@ pub enum SelectError { Unreachable, } -pub async fn select( - storages: &[(String, &mut StorageInner)], - context: &Context, - query: Select, - order_by: Vec, -) -> Result { - let Plan { - joins, - select_items, - constraint, - group_constraint, - groups, - order_by, - labels, - } = Plan::new(storages, context, query, order_by).await?; - - let rows = stream::iter(joins) - .map(Ok) - .try_fold(vec![], |rows, join| async { - join.execute(storages, context, rows).await - }) - .await?; - - let rows = order_by.execute(rows)?; // TODO: This should be done after filtering - - let selected_rows = - rows.into_par_iter() - .filter_map(|row| match constraint.confirm_constraint(&row) { - Ok(true) => Some( - select_items - .clone() - .into_iter() - .map(|selection| selection.simplify_by_row(&row)) - .collect::>>() - .map(|selection| (selection, row)), - ), - Ok(false) => None, - Err(error) => Some(Err(error)), - }); - let do_group = !groups.is_empty() - || select_items - .iter() - .any(|select_item| !select_item.aggregates.is_empty()); - - let final_rows = if do_group { - let groups = if groups.is_empty() { - vec![PlannedRecipe::TRUE] +impl Glue { + pub async fn select(&mut self, plan: Plan) -> Result { + let Plan { + joins, + select_items, + constraint, + group_constraint, + groups, + order_by, + labels, + } = plan; + let rows = stream::iter(joins) + .map(Ok) + .try_fold(vec![], |rows, join| async { + join.execute(self, rows).await + }) + .await?; + + let rows = order_by.execute(rows)?; // TODO: This should be done after filtering + + let selected_rows = + rows.into_par_iter() + .filter_map(|row| match constraint.confirm_constraint(&row) { + Ok(true) => Some( + select_items + .clone() + .into_iter() + .map(|selection| selection.simplify_by_row(&row)) + .collect::>>() + .map(|selection| (selection, row)), + ), + Ok(false) => None, + Err(error) => Some(Err(error)), + }); + let do_group = !groups.is_empty() + || select_items + .iter() + .any(|select_item| !select_item.aggregates.is_empty()); + + let final_rows = if do_group { + let groups = if groups.is_empty() { + vec![PlannedRecipe::TRUE] + } else { + groups + }; + + let accumulations: Vec<(Vec, Option, Vec)> = + selected_rows + .filter_map(|selection| { + let (selected_row, row) = try_option!(selection); + let group_constraint = + try_option!(group_constraint.clone().simplify_by_row(&row)); + let group_constraint = match group_constraint.as_solution() { + Some(Value::Bool(true)) => None, + Some(Value::Bool(false)) => return None, + Some(_) => unreachable!(), // TODO: Handle + None => Some(group_constraint), + }; + let groupers = try_option!(groups + .iter() + .map(|group| { + group.clone().simplify_by_row(&row)?.confirm_or_err( + SelectError::GrouperMayNotContainAggregate.into(), + ) + }) + .collect::>>()); + Some(Ok((groupers, group_constraint, selected_row))) + }) + .map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc])) + .try_reduce_with(accumulate) + .unwrap_or(Ok(vec![]))?; // TODO: Improve + + accumulations + .into_par_iter() + .map(|(_grouper, _group_constraint, vals)| { + vals.into_iter() + .map(|val| val.finalise_accumulation()) + .collect::>>() + }) + .collect::>>>()? + // TODO: Manage grouper and constraint } else { - groups - }; - - let accumulations: Vec<(Vec, Option, Vec)> = selected_rows - .filter_map(|selection| { - let (selected_row, row) = try_option!(selection); - let group_constraint = - try_option!(group_constraint.clone().simplify_by_row(&row)); - let group_constraint = match group_constraint.as_solution() { - Some(Value::Bool(true)) => None, - Some(Value::Bool(false)) => return None, - Some(_) => unreachable!(), // TODO: Handle - None => Some(group_constraint), - }; - let groupers = try_option!(groups - .iter() - .map(|group| { - group - .clone() - .simplify_by_row(&row)? - .confirm_or_err(SelectError::GrouperMayNotContainAggregate.into()) - }) - .collect::>>()); - Some(Ok((groupers, group_constraint, selected_row))) - }) - .map::<_, Result<_>>(|acc| acc.map(|acc| vec![acc])) - .try_reduce_with(accumulate) - .unwrap_or(Ok(vec![]))?; // TODO: Improve - - accumulations - .into_par_iter() - .map(|(_grouper, _group_constraint, vals)| { - vals.into_iter() - .map(|val| val.finalise_accumulation()) - .collect::>>() - }) - .collect::>>>()? - // TODO: Manage grouper and constraint - } else { - selected_rows - .map(|selection| { - selection.and_then(|(selection, _)| { - selection - .into_iter() - .map(|selected| selected.confirm()) - .collect::>() + .map(|selection| { + selection.and_then(|(selection, _)| { + selection + .into_iter() + .map(|selected| selected.confirm()) + .collect::>() + }) }) - }) - .collect::>>()? - }; + .collect::>>()? + }; - Ok((labels, final_rows)) + Ok((labels, final_rows)) + } + pub async fn select_query( + &mut self, + query: Select, + order_by: Vec, + ) -> Result { + let plan = Plan::new(self, query, order_by).await?; + self.select(plan).await + } } #[allow(clippy::type_complexity)] // TODO diff --git a/src/executor/query/select/plan/mod.rs b/src/executor/query/select/plan/mod.rs index 8cecc9e4..d77c5aae 100644 --- a/src/executor/query/select/plan/mod.rs +++ b/src/executor/query/select/plan/mod.rs @@ -5,7 +5,7 @@ use { }, crate::{ executor::{types::ColumnInfo, PlannedRecipe}, - Context, Result, StorageInner, + Glue, Result, }, futures::future::join_all, serde::Serialize, @@ -34,24 +34,19 @@ pub enum PlanError { } impl Plan { - pub async fn new( - storages: &[(String, &mut StorageInner)], - context: &Context, - select: Select, - order_by: Vec, - ) -> Result { + pub async fn new(glue: &Glue, select: Select, order_by: Vec) -> Result { let Manual { joins, select_items, constraint, group_constraint, groups, - } = Manual::new(select, context)?; + } = Manual::new(select, &*glue.get_context()?)?; let mut joins: Vec = join_all( joins .into_iter() - .map(|join| JoinPlan::new(join, storages, context)) + .map(|join| JoinPlan::new(join, glue)) .collect::>(), ) .await diff --git a/src/executor/query/set_expr.rs b/src/executor/query/set_expr.rs index 1aa2b433..737a2117 100644 --- a/src/executor/query/set_expr.rs +++ b/src/executor/query/set_expr.rs @@ -1,97 +1,96 @@ use { - super::{select::select, QueryError}, + super::QueryError, crate::{ - executor::{alter_row::insert, types::LabelsAndRows}, - macros::warning, - result::Result, - Context, MetaRecipe, Payload, RecipeUtilities, StorageInner, Value, + executor::types::LabelsAndRows, macros::warning, result::Result, Glue, MetaRecipe, Payload, + RecipeUtilities, Value, }, async_recursion::async_recursion, sqlparser::ast::{OrderByExpr, SetExpr, SetOperator, Statement}, }; -#[async_recursion(?Send)] -pub async fn from_body( - storages: &mut Vec<(String, &mut StorageInner)>, - context: &mut Context, - body: SetExpr, - order_by: Vec, -) -> Result { - match body { - SetExpr::Select(query) => { - let (labels, rows) = select(storages, context, *query, order_by).await?; - Ok((labels, rows)) - } - SetExpr::Values(values) => { - if !order_by.is_empty() { - warning!("VALUES does not currently support ordering"); +impl Glue { + #[async_recursion(?Send)] + pub async fn from_body( + &mut self, + body: SetExpr, + order_by: Vec, + ) -> Result { + match body { + SetExpr::Select(query) => { + let (labels, rows) = self.select_query(*query, order_by).await?; + Ok((labels, rows)) } - let values = values.0; - values - .into_iter() - .map(|values_row| { - values_row - .into_iter() - .map(|cell| { - MetaRecipe::new(cell)? - .simplify_by_context(context)? - .confirm_or_err(QueryError::MissingComponentsForValues.into()) - }) - .collect::>>() - }) - .collect::>>>() - .map(|values| { - ( - (0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0)) - .map(|index| format!("unnamed_{}", index)) - .collect(), - values, - ) - }) - } - SetExpr::SetOperation { - op, - all, - left, - right, - } => { - use SetOperator::*; - if !order_by.is_empty() { - warning!( + SetExpr::Values(values) => { + if !order_by.is_empty() { + warning!("VALUES does not currently support ordering"); + } + let values = values.0; + values + .into_iter() + .map(|values_row| { + values_row + .into_iter() + .map(|cell| { + MetaRecipe::new(cell)? + .simplify_by_context(&*self.get_context()?)? + .confirm_or_err(QueryError::MissingComponentsForValues.into()) + }) + .collect::>>() + }) + .collect::>>>() + .map(|values| { + ( + (0..values.get(0).map(|first_row| first_row.len()).unwrap_or(0)) + .map(|index| format!("unnamed_{}", index)) + .collect(), + values, + ) + }) + } + SetExpr::SetOperation { + op, + all, + left, + right, + } => { + use SetOperator::*; + if !order_by.is_empty() { + warning!( "set operations (UNION, EXCEPT & INTERSECT) do not currently support ordering" ); + } + let (left_labels, left) = self.from_body(*left, vec![]).await?; + let (right_labels, right) = self.from_body(*right, vec![]).await?; + if left_labels.len() != right_labels.len() { + return Err(QueryError::OperationColumnsMisaligned.into()); + } + let mut rows = match op { + Union => [left, right].concat(), + Except => left + .into_iter() + .filter(|row| !right.contains(row)) + .collect(), + Intersect => left.into_iter().filter(|row| right.contains(row)).collect(), + }; + if !all { + rows.dedup(); + } + Ok((left_labels, rows)) } - let (left_labels, left) = from_body(storages, context, *left, vec![]).await?; - let (right_labels, right) = from_body(storages, context, *right, vec![]).await?; - if left_labels.len() != right_labels.len() { - return Err(QueryError::OperationColumnsMisaligned.into()); - } - let mut rows = match op { - Union => [left, right].concat(), - Except => left - .into_iter() - .filter(|row| !right.contains(row)) - .collect(), - Intersect => left.into_iter().filter(|row| right.contains(row)).collect(), - }; - if !all { - rows.dedup(); - } - Ok((left_labels, rows)) - } - SetExpr::Insert(Statement::Insert { - table_name, - columns, - source, - .. - }) => { - let inserted = insert(storages, context, &table_name, &columns, &source, true).await?; - if let Payload::Select { labels, rows } = inserted { - Ok((labels, rows.into_iter().map(|row| row.0).collect())) - } else { - unreachable!(); // TODO: Handle + SetExpr::Insert(Statement::Insert { + table_name, + columns, + source, + .. + }) => { + let inserted = self.insert(&table_name, &columns, &source, true).await?; + if let Payload::Select { labels, rows } = inserted { + Ok((labels, rows.into_iter().map(|row| row.0).collect())) + } else { + unreachable!(); // TODO: Handle + } } + _ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries } - _ => Err(QueryError::QueryNotSupported.into()), // TODO: Other queries } } diff --git a/src/executor/types.rs b/src/executor/types.rs index f8ba6fdb..c0377d51 100644 --- a/src/executor/types.rs +++ b/src/executor/types.rs @@ -15,7 +15,7 @@ pub struct ColumnInfo { #[derive(Debug, Clone, PartialEq, Serialize)] pub struct ComplexTableName { - pub database: String, + pub database: Option, pub alias: Alias, pub name: String, } @@ -23,7 +23,7 @@ impl ColumnInfo { pub fn of_name(name: String) -> Self { ColumnInfo { table: ComplexTableName { - database: String::new(), + database: None, name: String::new(), alias: None, }, diff --git a/src/glue/database.rs b/src/glue/database.rs new file mode 100644 index 00000000..813f838f --- /dev/null +++ b/src/glue/database.rs @@ -0,0 +1,33 @@ +use { + crate::{Context, Glue, InterfaceError, Result, Storage, StorageInner}, + std::sync::MutexGuard, +}; + +impl Glue { + // TODO: None ref should give a primary + pub fn get_database(&self, db_ref: &Option) -> Result>> { + self.databases + .get(db_ref.as_ref().unwrap_or(&self.primary)) + .ok_or(InterfaceError::DatabaseNotFound.into()) + .map(|db| db.get()) + } + pub fn get_mut_database(&mut self, db_ref: &Option) -> Result<&mut Box> { + self.databases + .get_mut(db_ref.as_ref().unwrap_or(&self.primary)) + .ok_or(InterfaceError::DatabaseNotFound.into()) + .map(Storage::get_mut) + } + pub fn get_context(&self) -> Result> { + self.context + .lock() + .map_err(|_| InterfaceError::ContextUnavailable.into()) + } + pub fn get_mut_context(&mut self) -> Result<&mut Context> { + self.context + .get_mut() + .map_err(|_| InterfaceError::ContextUnavailable.into()) + } + pub fn get_database_list(&self) -> Vec<&String> { + self.databases.keys().collect() + } +} diff --git a/src/glue/error.rs b/src/glue/error.rs new file mode 100644 index 00000000..839853c3 --- /dev/null +++ b/src/glue/error.rs @@ -0,0 +1,9 @@ +use {serde::Serialize, std::fmt::Debug, thiserror::Error}; + +#[derive(Error, Serialize, Debug, PartialEq)] +pub enum InterfaceError { + #[error("database not found")] + DatabaseNotFound, + #[error("context currently unavailable")] + ContextUnavailable, +} diff --git a/src/glue/mod.rs b/src/glue/mod.rs index a1083906..be64d206 100644 --- a/src/glue/mod.rs +++ b/src/glue/mod.rs @@ -1,10 +1,10 @@ -#![cfg(feature = "sled-storage")] +use std::sync::Mutex; use crate::ExecuteError; use { crate::{ - execute, parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Row, - Storage, StorageInner, Value, WIPError, + parse, parse_single, CSVSettings, Connection, Payload, Query, Result, Storage, Value, + WIPError, }, futures::executor::block_on, sqlparser::ast::{ @@ -14,8 +14,13 @@ use { std::{collections::HashMap, fmt::Debug}, }; +mod database; +mod error; +mod payload; mod select; +pub use error::InterfaceError; + pub(crate) type Variables = HashMap; #[derive(Default, Debug, Clone)] @@ -44,28 +49,36 @@ impl Context { /// - [`Glue::select_as_string()`] -- Provides data, only for `SELECT` queries, as [String]s (rather than [Value]s). /// - [`Glue::select_as_json()`] -- Provides data, only for `SELECT` queries, as one big [String]; generally useful for webby interactions. pub struct Glue { - storages: Vec<(String, Storage)>, - context: Option, + pub primary: String, + databases: HashMap, + context: Mutex, } /// ## Creation of new interfaces impl Glue { /// Creates a [Glue] instance with just one [Storage]. - pub fn new(name: String, storage: Storage) -> Self { - Self::new_multi(vec![(name, storage)]) + pub fn new(name: String, database: Storage) -> Self { + let mut databases = HashMap::new(); + databases.insert(name, database); + Self::new_multi(databases) } /// Creates a [Glue] instance with access to all provided storages. /// Argument is: [Vec]<(Identifier, [Storage])> - pub fn new_multi(storages: Vec<(String, Storage)>) -> Self { - let context = Some(Context::default()); - Self { storages, context } + pub fn new_multi(databases: HashMap) -> Self { + let context = Mutex::new(Context::default()); + let primary = databases.keys().next().cloned().unwrap_or_default(); + Self { + databases, + context, + primary, + } } /// Merges existing [Glue] instances pub fn new_multi_glue(glues: Vec) -> Self { glues .into_iter() .reduce(|mut main, other| { - main.storages.extend(other.storages); + main.databases.extend(other.databases); main }) .unwrap() @@ -95,38 +108,38 @@ impl Glue { /// ``` /// pub fn extend(&mut self, glues: Vec) { - self.storages.extend( + self.databases.extend( glues .into_iter() .reduce(|mut main, other| { - main.storages.extend(other.storages); + main.databases.extend(other.databases); main }) .unwrap() - .storages, + .databases, ) } } /// Internal: Modify impl Glue { - pub(crate) fn take_context(&mut self) -> Context { + /*pub(crate) fn take_context(&mut self) -> Result { self.context .take() - .expect("Unreachable: Context wasn't replaced!") + .ok_or(InterfaceError::ContextUnavailable.into()) } pub(crate) fn replace_context(&mut self, context: Context) { self.context.replace(context); - } + }*/ #[allow(dead_code)] fn set_context(&mut self, context: Context) { - self.context = Some(context); + self.context = Mutex::new(context); } } impl Glue { pub fn into_connections(self) -> Vec<(String, Connection)> { - self.storages + self.databases .into_iter() .map(|(name, storage)| (name, storage.into_source())) .collect() @@ -160,7 +173,7 @@ impl Glue { }) = query { let store_name = db_name.0[0].value.clone(); - return if self.storages.iter().any(|(store, _)| store == &store_name) { + return if self.databases.iter().any(|(store, _)| store == &store_name) { if if_not_exists { Ok(Payload::Success) } else { @@ -211,41 +224,15 @@ impl Glue { .and_then(|name| name.0.get(0).map(|name| name.value.clone())) .ok_or(ExecuteError::ObjectNotRecognised)?; - let index = self - .storages - .iter() - .enumerate() - .find_map(|(index, (name, _))| (name == &database_name).then(|| index)); - if let Some(index) = index { - self.storages.remove(index); + if self.databases.contains_key(&database_name) { + self.databases.remove(&database_name); } else if !if_exists { return Err(ExecuteError::ObjectNotRecognised.into()); } return Ok(Payload::Success); } - let mut storages: Vec<(String, Box)> = self - .storages - .iter_mut() - .map(|(name, storage)| (name.clone(), storage.take())) - .collect(); - let give_storages: Vec<(String, &mut StorageInner)> = storages - .iter_mut() - .map(|(name, storage)| (name.clone(), &mut **storage)) - .collect(); - - let mut context = self.take_context(); - - let result = block_on(execute(give_storages, &mut context, &query)); - - self.storages - .iter_mut() - .zip(storages) - .for_each(|((_name, storage), (_name_2, taken))| storage.replace(taken)); - - self.replace_context(context); - - result + block_on(self.execute_query(&query)) } /// Provides a parsed query to execute later. /// Particularly useful if executing a small query many times as parsing is not (computationally) free. @@ -314,232 +301,3 @@ impl Glue { self.execute_parsed(query) } } - -impl Payload { - // TODO: Move - pub fn unwrap_rows(self) -> Vec { - if let Payload::Select { rows, .. } = self { - rows - } else { - panic!("Expected Select!") - } - } -} - -// TODO: Move -/* -#[cfg(test)] -mod tests { - use { - crate::{CSVStorage, Glue, Payload, Row, SledStorage, Storage, Value}, - std::convert::TryFrom, - }; - #[test] - fn eq() { - std::fs::remove_dir_all("data").unwrap(); - std::fs::create_dir("data").unwrap(); - let config = sled::Config::default() - .path("data/using_config") - .temporary(true); - - let sled = SledStorage::try_from(config).unwrap(); - let mut glue = Glue::new(String::from("sled"), Storage::new(Box::new(sled.clone()))); - assert_eq!( - glue.execute( - "CREATE TABLE api_test (id INTEGER PRIMARY KEY, name TEXT, nullable TEXT NULL, is BOOLEAN)", - ), - Ok(Payload::Create) - ); - assert_eq!( - glue.execute("INSERT INTO api_test (id, name, nullable, is) VALUES (1, 'test1', 'not null', TRUE), (2, 'test2', NULL, FALSE)"), - Ok(Payload::Insert(2)) - ); - - assert_eq!( - glue.execute("SELECT id, name, is FROM api_test"), // Not selecting NULL because NULL != NULL. TODO: Expand this test so that NULL == NULL - Ok(Payload::Select { - labels: vec![String::from("id"), String::from("name"), String::from("is")], - rows: vec![ - Row(vec![ - Value::I64(1), - Value::Str(String::from("test1")), - Value::Bool(true) - ]), - Row(vec![ - Value::I64(2), - Value::Str(String::from("test2")), - Value::Bool(false) - ]) - ] - }) - ); - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.select_as_string("SELECT * FROM api_test"), - Ok(vec![ - vec![ - String::from("id"), - String::from("name"), - String::from("nullable"), - String::from("is") - ], - vec![ - String::from("1"), - String::from("test1"), - String::from("not null"), - String::from("TRUE") - ], - vec![ - String::from("2"), - String::from("test2"), - String::from("NULL"), - String::from("FALSE") - ] - ]) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.select_as_json("SELECT * FROM api_test"), - Ok(String::from( - r#"[{"id":1,"is":true,"name":"test1","nullable":"not null"},{"id":2,"is":false,"name":"test2","nullable":null}]"# - )) - ); - - use crate::Cast; - - let test_value: Result = Value::Str(String::from("test")).cast(); - assert_eq!(test_value, Ok(String::from("test"))); - let test_value: Result = (Value::Str(String::from("test")).clone()).cast(); - assert_eq!(test_value, Ok(String::from("test"))); - let test_value: Result = Value::I64(1).cast(); - assert_eq!(test_value, Ok(String::from("1"))); - let test_value: Result = (Value::I64(1).clone()).cast(); - assert_eq!(test_value, Ok(String::from("1"))); - - assert_eq!( - glue.execute("CREATE TABLE api_insert_vec (name TEXT, rating FLOAT)"), - Ok(Payload::Create) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.insert_vec( - String::from("api_insert_vec"), - vec![String::from("name"), String::from("rating")], - vec![vec![Value::Str(String::from("test")), Value::F64(1.2)]] - ), - Ok(Payload::Insert(1)) - ); - - assert_eq!( - glue.execute("SELECT * FROM api_insert_vec"), - Ok(Payload::Select { - labels: vec![String::from("name"), String::from("rating")], - rows: vec![Row(vec![Value::Str(String::from("test")), Value::F64(1.2)])] - }) - ); - - #[cfg(feature = "expanded-api")] - assert_eq!( - glue.insert_vec( - String::from("api_insert_vec"), - vec![String::from("name"), String::from("rating")], - vec![ - vec![Value::Str(String::from("test2")), Value::F64(1.3)], - vec![Value::Str(String::from("test3")), Value::F64(1.0)], - vec![Value::Str(String::from("test4")), Value::F64(100000.94)] - ] - ), - Ok(Payload::Insert(3)) - ); - - assert_eq!( - glue.execute("SELECT * FROM api_insert_vec"), - Ok(Payload::Select { - labels: vec![String::from("name"), String::from("rating")], - rows: vec![ - Row(vec![Value::Str(String::from("test")), Value::F64(1.2)]), - Row(vec![Value::Str(String::from("test2")), Value::F64(1.3)]), - Row(vec![Value::Str(String::from("test3")), Value::F64(1.0)]), - Row(vec![ - Value::Str(String::from("test4")), - Value::F64(100000.94) - ]) - ] - }) - ); - - // Multi Glue - let csv_a = CSVStorage::new("data/using_config_a.csv").unwrap(); - let csv_b = CSVStorage::new("data/using_config_b.csv").unwrap(); - let _multi_glue_type_one = Glue::new_multi(vec![ - (String::from("sled"), Storage::new(Box::new(sled.clone()))), - (String::from("csv"), Storage::new(Box::new(csv_a))), - ]); - let mut csv_glue = Glue::new(String::from("csv"), Storage::new(Box::new(csv_b))); - - assert_eq!( - csv_glue.execute("CREATE TABLE data (name TEXT, rating TEXT)"), - Ok(Payload::Create) - ); - - assert_eq!( - csv_glue.execute( - r#" - INSERT INTO - data ( - name, - rating - ) - VALUES ( - 'test2', - '30.1' - ), ( - 'test3', - '0.1' - ) - "# - ), - Ok(Payload::Insert(2)) - ); - - let mut multi_glue = Glue::new_multi_glue(vec![glue, csv_glue]); - - assert_eq!( - multi_glue.execute( - r#" - SELECT - * - FROM - sled.api_insert_vec - INNER JOIN csv.data - ON sled.api_insert_vec.name = csv.data.name - "# - ), - Ok(Payload::Select { - labels: vec![ - String::from("api_insert_vec.name"), - String::from("api_insert_vec.rating"), - String::from("data.name"), - String::from("data.rating") - ], - rows: vec![ - Row(vec![ - Value::Str(String::from("test2")), - Value::F64(1.3), - Value::Str(String::from("test2")), - Value::Str(String::from("30.1")) - ]), - Row(vec![ - Value::Str(String::from("test3")), - Value::F64(1.0), - Value::Str(String::from("test3")), - Value::Str(String::from("0.1")) - ]) - ] - }) - ); - } -} -*/ diff --git a/src/glue/payload.rs b/src/glue/payload.rs new file mode 100644 index 00000000..e13caad0 --- /dev/null +++ b/src/glue/payload.rs @@ -0,0 +1,11 @@ +use crate::{Payload, Row}; + +impl Payload { + pub fn unwrap_rows(self) -> Vec { + if let Payload::Select { rows, .. } = self { + rows + } else { + panic!("Expected Select!") + } + } +} diff --git a/src/result.rs b/src/result.rs index 0327f0a5..69cb804c 100644 --- a/src/result.rs +++ b/src/result.rs @@ -1,12 +1,8 @@ use { crate::{ - data::{RowError, TableError, ValueError}, - executor::{ - AlterError, ExecuteError, FetchError, JoinError, ManualError, PlanError, QueryError, - RecipeError, SelectError, ValidateError, - }, - store::StorageError, - CSVStorageError, SheetStorageError, + AlterError, CSVStorageError, ExecuteError, FetchError, InterfaceError, JoinError, + ManualError, PlanError, QueryError, RecipeError, RowError, SelectError, SheetStorageError, + StorageError, TableError, ValidateError, ValueError, }, serde::Serialize, std::marker::{Send, Sync}, @@ -68,6 +64,8 @@ pub enum Error { CSVStorage(#[from] CSVStorageError), #[error(transparent)] SheetStorage(#[from] SheetStorageError), + #[error(transparent)] + Interface(#[from] InterfaceError), } unsafe impl Send for Error {} @@ -100,6 +98,7 @@ impl PartialEq for Error { (StorageImplementation(l), StorageImplementation(r)) => l == r, (CSVStorage(l), CSVStorage(r)) => l == r, (SheetStorage(l), SheetStorage(r)) => l == r, + (Interface(l), Interface(r)) => l == r, _ => false, } } diff --git a/src/store/mod.rs b/src/store/mod.rs index 3374dcf7..05abeb9e 100644 --- a/src/store/mod.rs +++ b/src/store/mod.rs @@ -3,6 +3,8 @@ mod store_mut; #[cfg(feature = "alter-table")] mod alter_table; +use std::sync::{Mutex, MutexGuard}; + #[cfg(feature = "alter-table")] pub use alter_table::*; #[cfg(not(feature = "alter-table"))] @@ -54,7 +56,7 @@ impl TryFrom for Storage { crate::{CSVStorage, SheetStorage, SledStorage}, Connection::*, }; - let storage: Option> = Some(match &connection { + let storage: Mutex> = Mutex::new(match &connection { #[cfg(feature = "sled-storage")] Sled(path) => Box::new(SledStorage::new(path)?), #[cfg(feature = "csv-storage")] @@ -72,30 +74,33 @@ impl TryFrom for Storage { pub struct Storage { source_connection: Connection, - storage: Option>, + storage: Mutex>, } impl Storage { pub fn new(storage: Box) -> Self { - let storage = Some(storage); + let storage = Mutex::new(storage); Self { storage, source_connection: Connection::default(), } } - pub fn replace(&mut self, storage: Box) { + /*pub fn replace(&mut self, storage: Box) { self.storage.replace(storage); } pub fn take(&mut self) -> Box { self.storage .take() .expect("Unreachable: Storage wasn't replaced!") + }*/ + pub fn get(&self) -> MutexGuard> { + self.storage + .lock() + .expect("Unreachable: Storage wasn't replaced!") } - pub fn take_readable(&mut self) -> &StorageInner { - /*let storage = self.take(); - let readable = &*storage; - self.replace(storage); - readable*/ - unimplemented!() + pub fn get_mut(&mut self) -> &mut Box { + self.storage + .get_mut() + .expect("Unreachable: Storage wasn't replaced!") } pub fn into_source(self) -> Connection { self.source_connection diff --git a/tests/functionality/query/with.rs b/tests/functionality/query/with.rs index 57e66f28..d2f48388 100644 --- a/tests/functionality/query/with.rs +++ b/tests/functionality/query/with.rs @@ -33,6 +33,7 @@ crate::util_macros::testcase!( cte_1 "# => a = I64: (1)); + /* TODO: #107 glue.execute( r#" WITH cte_0 AS ( @@ -53,6 +54,7 @@ crate::util_macros::testcase!( "#, ) .expect_err("CTE is not simultaneous"); + */ glue.execute("CREATE TABLE basic_insert (a INTEGER)") .unwrap();