diff --git a/Cargo.toml b/Cargo.toml index 19b79daf8..aa8aa6b38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,12 +70,12 @@ codegen-units = 1 # We cannot publish to crates.io with any patches in the below section. Developers # must remove any entries in this section before creating a release candidate. [patch.crates-io] -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-catalog = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-functions-window = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } -datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f", submodules = false } +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-substrait = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-proto = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-ffi = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-catalog = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-functions-aggregate = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-functions-window = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } +datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "35749607f585b3bf25b66b7d2289c56c18d03e4f" } diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 200b6470b..f67989f67 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -49,6 +49,7 @@ use datafusion::prelude::{ }; use datafusion_ffi::catalog_provider::FFI_CatalogProvider; use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList; +use datafusion_ffi::config::extension_options::FFI_ExtensionOptions; use datafusion_ffi::execution::FFI_TaskContextProvider; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory; @@ -185,6 +186,28 @@ impl PySessionConfig { fn set(&self, key: &str, value: &str) -> Self { Self::from(self.config.clone().set_str(key, value)) } + + pub fn with_extension(&self, extension: Bound) -> PyResult { + let capsule = extension.call_method0("__datafusion_extension_options__")?; + let capsule = capsule.cast::().map_err(py_datafusion_err)?; + + let extension: NonNull = capsule + .pointer_checked(Some(c_str!("datafusion_extension_options")))? + .cast(); + let mut extension = unsafe { extension.as_ref() }.clone(); + + let mut config = self.config.clone(); + let options = config.options_mut(); + if let Some(prior_extension) = options.extensions.get::() { + extension + .merge(prior_extension) + .map_err(py_datafusion_err)?; + } + + options.extensions.insert(extension); + + Ok(Self::from(config)) + } } /// Runtime options for a SessionContext diff --git a/crates/core/src/dataset_exec.rs b/crates/core/src/dataset_exec.rs index e3c058c07..a7dd1500d 100644 --- a/crates/core/src/dataset_exec.rs +++ b/crates/core/src/dataset_exec.rs @@ -111,7 +111,7 @@ impl DatasetExec { let scanner = dataset.call_method("scanner", (), Some(&kwargs))?; - let schema = Arc::new( + let schema: SchemaRef = Arc::new( scanner .getattr("projected_schema")? .extract::>()? diff --git a/examples/datafusion-ffi-example/python/tests/_test_config.py b/examples/datafusion-ffi-example/python/tests/_test_config.py new file mode 100644 index 000000000..1f8f7eea8 --- /dev/null +++ b/examples/datafusion-ffi-example/python/tests/_test_config.py @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from datafusion import SessionConfig, SessionContext +from datafusion_ffi_example import MyConfig + + +def test_catalog_provider(): + config = MyConfig() + config = SessionConfig( + {"datafusion.catalog.information_schema": "true"} + ).with_extension(config) + config.set("my_config.baz_count", "42") + ctx = SessionContext(config) + + result = ctx.sql("SHOW my_config.baz_count;").collect() + assert result[0][1][0].as_py() == "42" + + ctx.sql("SET my_config.baz_count=1;") + result = ctx.sql("SHOW my_config.baz_count;").collect() + assert result[0][1][0].as_py() == "1" diff --git a/examples/datafusion-ffi-example/src/config.rs b/examples/datafusion-ffi-example/src/config.rs new file mode 100644 index 000000000..6cdb8aa83 --- /dev/null +++ b/examples/datafusion-ffi-example/src/config.rs @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; + +use datafusion_common::config::{ + ConfigEntry, ConfigExtension, ConfigField, ExtensionOptions, Visit, +}; +use datafusion_common::{DataFusionError, config_err}; +use datafusion_ffi::config::extension_options::FFI_ExtensionOptions; +use pyo3::exceptions::PyRuntimeError; +use pyo3::types::PyCapsule; +use pyo3::{Bound, PyResult, Python, pyclass, pymethods}; + +/// My own config options. +#[pyclass( + from_py_object, + name = "MyConfig", + module = "datafusion_ffi_example", + subclass +)] +#[derive(Clone, Debug)] +pub struct MyConfig { + /// Should "foo" be replaced by "bar"? + pub foo_to_bar: bool, + + /// How many "baz" should be created? + pub baz_count: usize, +} + +#[pymethods] +impl MyConfig { + #[new] + fn new() -> Self { + Self::default() + } + + fn __datafusion_extension_options__<'py>( + &self, + py: Python<'py>, + ) -> PyResult> { + let name = cr"datafusion_extension_options".into(); + + let mut config = FFI_ExtensionOptions::default(); + config + .add_config(self) + .map_err(|e| PyRuntimeError::new_err(e.to_string()))?; + + PyCapsule::new(py, config, Some(name)) + } +} + +impl Default for MyConfig { + fn default() -> Self { + Self { + foo_to_bar: true, + baz_count: 1337, + } + } +} + +impl ConfigExtension for MyConfig { + const PREFIX: &'static str = "my_config"; +} + +impl ExtensionOptions for MyConfig { + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn cloned(&self) -> Box { + Box::new(self.clone()) + } + + fn set(&mut self, key: &str, value: &str) -> datafusion_common::Result<()> { + datafusion_common::config::ConfigField::set(self, key, value) + } + + fn entries(&self) -> Vec { + vec![ + ConfigEntry { + key: "foo_to_bar".to_owned(), + value: Some(format!("{}", self.foo_to_bar)), + description: "foo to bar", + }, + ConfigEntry { + key: "baz_count".to_owned(), + value: Some(format!("{}", self.baz_count)), + description: "baz count", + }, + ] + } +} + +impl ConfigField for MyConfig { + fn visit(&self, v: &mut V, _key: &str, _description: &'static str) { + let key = "foo_to_bar"; + let desc = "foo to bar"; + self.foo_to_bar.visit(v, key, desc); + + let key = "baz_count"; + let desc = "baz count"; + self.baz_count.visit(v, key, desc); + } + + fn set(&mut self, key: &str, value: &str) -> Result<(), DataFusionError> { + let (key, rem) = key.split_once('.').unwrap_or((key, "")); + match key { + "foo_to_bar" => self.foo_to_bar.set(rem, value.as_ref()), + "baz_count" => self.baz_count.set(rem, value.as_ref()), + + _ => config_err!("Config value \"{}\" not found on MyConfig", key), + } + } +} diff --git a/examples/datafusion-ffi-example/src/lib.rs b/examples/datafusion-ffi-example/src/lib.rs index 68120a4cd..e708c49cc 100644 --- a/examples/datafusion-ffi-example/src/lib.rs +++ b/examples/datafusion-ffi-example/src/lib.rs @@ -19,6 +19,7 @@ use pyo3::prelude::*; use crate::aggregate_udf::MySumUDF; use crate::catalog_provider::{FixedSchemaProvider, MyCatalogProvider, MyCatalogProviderList}; +use crate::config::MyConfig; use crate::scalar_udf::IsNullUDF; use crate::table_function::MyTableFunction; use crate::table_provider::MyTableProvider; @@ -27,6 +28,7 @@ use crate::window_udf::MyRankUDF; pub(crate) mod aggregate_udf; pub(crate) mod catalog_provider; +pub(crate) mod config; pub(crate) mod scalar_udf; pub(crate) mod table_function; pub(crate) mod table_provider; @@ -46,5 +48,6 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/python/datafusion/context.py b/python/datafusion/context.py index ba9290a58..56415eb6a 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -296,6 +296,19 @@ def set(self, key: str, value: str) -> SessionConfig: self.config_internal = self.config_internal.set(key, value) return self + def with_extension(self, extension: Any) -> SessionConfig: + """Create a new configuration using an extension. + + Args: + extension: A custom configuration extension object. These are + shared from another DataFusion extension library. + + Returns: + A new :py:class:`SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_extension(extension) + return self + class RuntimeEnvBuilder: """Runtime configuration options."""