From 1d8b5e36642704b1c74b9bf07df964175fef8981 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 20:35:45 -0400 Subject: [PATCH 01/17] first pass --- Cargo.lock | 465 +++++++++++++++++++++++++++++++++++++++++++++++-- Cargo.toml | 6 +- src/context.rs | 74 ++++++++ 3 files changed, 527 insertions(+), 18 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 41cdd3a2f..1d2d4a532 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -188,6 +188,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bumpalo" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1ad822118d20d2c234f427000d5acc36eabe1e29a348c89b63dd60b13f28e5d" + [[package]] name = "byteorder" version = "1.4.3" @@ -224,6 +230,7 @@ dependencies = [ "libc", "num-integer", "num-traits", + "serde", "winapi", ] @@ -441,9 +448,11 @@ dependencies = [ "datafusion-expr", "futures", "mimalloc", + "object_store", "pyo3", "rand 0.7.3", "tokio", + "url", "uuid 0.8.2", ] @@ -497,6 +506,15 @@ version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f107b87b6afc2a64fd13cac55fe06d6c8859f12d4b14cbcdd2c67d0976781be" +[[package]] +name = "encoding_rs" +version = "0.8.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9852635589dc9f9ea1b6fe9f05b50ef208c85c834a562f0c6abb1c475736ec2b" +dependencies = [ + "cfg-if", +] + [[package]] name = "fastrand" version = "1.7.0" @@ -527,13 +545,18 @@ dependencies = [ "miniz_oxide", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "form_urlencoded" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" +checksum = "a9c384f161156f5260c24a097c56119f9be8c798586aecc13afbcbe7b7e26bf8" dependencies = [ - "matches", "percent-encoding", ] @@ -664,6 +687,25 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574" +[[package]] +name = "h2" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca32592cf21ac7ccab1825cd87f6c9b3d9022c44d086172ed0966bec8af30be" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "half" version = "2.0.0" @@ -694,13 +736,83 @@ dependencies = [ "libc", ] +[[package]] +name = "http" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.2", +] + +[[package]] +name = "http-body" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "httparse" +version = "1.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" + +[[package]] +name = "httpdate" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" + +[[package]] +name = "hyper" +version = "0.14.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "httpdate", + "itoa 1.0.2", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +dependencies = [ + "http", + "hyper", + "rustls", + "tokio", + "tokio-rustls", +] + [[package]] name = "idna" -version = "0.2.3" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "418a0a6fab821475f634efe3ccc45c013f742efe03d853e8d3355d5cb850ecf8" +checksum = "e14ddfc70884202db2244c223200c204c2bda1bc6e0998d11b5e024d657209e6" dependencies = [ - "matches", "unicode-bidi", "unicode-normalization", ] @@ -736,6 +848,12 @@ version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48dc51180a9b377fd75814d0cc02199c20f8e99433d6762f650d39cdbbd3b56f" +[[package]] +name = "ipnet" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "879d54834c8c76457ef4293a689b2a8c59b076067ad77b15efafbb05f92a592b" + [[package]] name = "itertools" version = "0.10.3" @@ -766,6 +884,15 @@ dependencies = [ "libc", ] +[[package]] +name = "js-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49409df3e3bf0856b916e2ceaca09ee28e6871cf7d9ce97a692cacfdb2a25a47" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -890,12 +1017,6 @@ dependencies = [ "libc", ] -[[package]] -name = "matches" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" - [[package]] name = "md-5" version = "0.10.1" @@ -929,6 +1050,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + [[package]] name = "miniz_oxide" version = "0.5.3" @@ -938,6 +1065,18 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +dependencies = [ + "libc", + "log", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys", +] + [[package]] name = "multiversion" version = "0.6.1" @@ -1051,12 +1190,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2168fee79ee3e7695905bc3a48777d807f82d956f821186fa7a2601c1295a73e" dependencies = [ "async-trait", + "base64", "bytes", "chrono", "futures", "itertools", "parking_lot", "percent-encoding", + "quick-xml", + "rand 0.8.5", + "reqwest", + "ring", + "rustls-pemfile", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -1155,9 +1302,9 @@ checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc" [[package]] name = "percent-encoding" -version = "2.1.0" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" [[package]] name = "pin-project-lite" @@ -1252,6 +1399,16 @@ dependencies = [ "syn", ] +[[package]] +name = "quick-xml" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37dddbbe9df96afafcb8027fcf263971b726530e12f0787f620a7ba5b4846081" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.18" @@ -1373,6 +1530,82 @@ dependencies = [ "winapi", ] +[[package]] +name = "reqwest" +version = "0.11.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "hyper", + "hyper-rustls", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls", + "rustls-pemfile", + "serde", + "serde_json", + "serde_urlencoded", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "webpki-roots", + "winreg", +] + +[[package]] +name = "ring" +version = "0.16.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3053cf52e236a3ed746dfc745aa9cacf1b791d846bdaf412f60a8d7d6e17c8fc" +dependencies = [ + "cc", + "libc", + "once_cell", + "spin", + "untrusted", + "web-sys", + "winapi", +] + +[[package]] +name = "rustls" +version = "0.20.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aab8ee6c7097ed6057f43c187a62418d0c05a4bd5f18b3571db50ee0f9ce033" +dependencies = [ + "log", + "ring", + "sct", + "webpki", +] + +[[package]] +name = "rustls-pemfile" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0864aeff53f8c05aa08d86e5ef839d3dfcf07aeba2db32f12db0ef716e87bd55" +dependencies = [ + "base64", +] + [[package]] name = "rustversion" version = "1.0.6" @@ -1400,6 +1633,16 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "sct" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d53dcdb7c9f8158937a7981b48accfd39a43af418591a5d008c7b22b5e1b7ca4" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "seq-macro" version = "0.3.1" @@ -1437,6 +1680,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa 1.0.2", + "ryu", + "serde", +] + [[package]] name = "sha2" version = "0.10.2" @@ -1488,6 +1743,22 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45456094d1983e2ee2a18fdfebce3189fa451699d0502cb8e3b49dba5ba41451" +[[package]] +name = "socket2" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +dependencies = [ + "libc", + "winapi", +] + +[[package]] +name = "spin" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" + [[package]] name = "sqlparser" version = "0.23.0" @@ -1632,12 +1903,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" dependencies = [ "bytes", + "libc", "memchr", + "mio", "num_cpus", "once_cell", "parking_lot", "pin-project-lite", + "socket2", "tokio-macros", + "winapi", ] [[package]] @@ -1651,6 +1926,17 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-rustls" +version = "0.23.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" +dependencies = [ + "rustls", + "tokio", + "webpki", +] + [[package]] name = "tokio-stream" version = "0.1.9" @@ -1662,6 +1948,26 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-util" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", + "tracing", +] + +[[package]] +name = "tower-service" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52" + [[package]] name = "tracing" version = "0.1.35" @@ -1694,6 +2000,12 @@ dependencies = [ "once_cell", ] +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + [[package]] name = "typenum" version = "1.15.0" @@ -1739,15 +2051,20 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52fee519a3e570f7df377a06a1a7775cdbfb7aa460be7e08de2b1f0e69973a44" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "url" -version = "2.2.2" +version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a507c383b2d33b5fc35d1861e77e6b383d158b2da5e14fe51b83dfedf6fd578c" +checksum = "0d68c799ae75762b8c3fe375feb6600ef5602c883c5d21eb51c09f22b83c4643" dependencies = [ "form_urlencoded", "idna", - "matches", "percent-encoding", ] @@ -1786,6 +2103,16 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1798,6 +2125,101 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eaf9f5aceeec8be17c128b2e93e031fb8a4d469bb9c4ae2d7dc1888b26887268" +dependencies = [ + "cfg-if", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8ffb332579b0557b52d268b91feab8df3615f265d5270fec2a8c95b17c1142" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23639446165ca5a5de86ae1d8896b737ae80319560fbaa4c2887b7da6e7ebd7d" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "052be0f94026e6cbc75cdefc9bae13fd6052cdcaf532fa6c45e7ae33a1e6c810" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bc0c051dc5f23e307b13285f9d75df86bfdf816c5721e573dec1f9b8aa193c" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" + +[[package]] +name = "web-sys" +version = "0.3.60" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bcda906d8be16e728fd5adc5b729afad4e444e106ab28cd1c7256e54fa61510f" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "webpki" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f095d78192e208183081cc07bc5515ef55216397af48b873e5edcd72637fa1bd" +dependencies = [ + "ring", + "untrusted", +] + +[[package]] +name = "webpki-roots" +version = "0.22.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" +dependencies = [ + "webpki", +] + [[package]] name = "winapi" version = "0.3.9" @@ -1872,6 +2294,15 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +[[package]] +name = "winreg" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" +dependencies = [ + "winapi", +] + [[package]] name = "zstd" version = "0.11.2+zstd.1.5.2" diff --git a/Cargo.toml b/Cargo.toml index 217ac1c7b..3d2e71489 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,17 +31,21 @@ rust-version = "1.57" default = ["mimalloc"] [dependencies] -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } + rand = "0.7" pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] } datafusion = { version = "^12.0.0", features = ["pyarrow"] } datafusion-expr = { version = "^12.0.0" } datafusion-common = { version = "^12.0.0", features = ["pyarrow"] } uuid = { version = "0.8", features = ["v4"] } +url = "2.3" mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" futures = "0.3" +object_store = { version = "0.5.0", features = ["aws", "gcp"] } + [lib] name = "datafusion_python" crate-type = ["cdylib", "rlib"] diff --git a/src/context.rs b/src/context.rs index 25d08ef8e..7d7dbdb07 100644 --- a/src/context.rs +++ b/src/context.rs @@ -18,6 +18,7 @@ use std::path::PathBuf; use std::{collections::HashSet, sync::Arc}; +use url::Url; use uuid::Uuid; use pyo3::exceptions::{PyKeyError, PyValueError}; @@ -25,11 +26,16 @@ use pyo3::prelude::*; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::Result; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; +use object_store::aws::{AmazonS3, AmazonS3Builder}; +use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; +use object_store::{ObjectMeta, ObjectStore}; + use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; @@ -92,6 +98,45 @@ impl PySessionContext { } } + /// Register an object store the datafusion runtime environment + /// + /// Returns the scheme of the registered object store (e.g. "s3" or "gs") + fn register_object_store(&mut self, object_store_url: String) -> PyResult { + let uri = Url::parse(&object_store_url) + .map_err(|_| DataFusionError::Common("failed to parse uri".to_string()))?; + let bucket_name = uri + .host_str() + .ok_or_else(|| DataFusionError::Common("failed to get bucket name".to_string()))?; + let scheme = uri.scheme(); + + let store: Arc = match scheme { + "s3" => Arc::new( + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + AmazonS3Builder::from_env() + .build() + .expect("failed to build s3 client from env") + }), + ), + "gcs" => Arc::new( + GoogleCloudStorageBuilder::new() + .with_bucket_name(bucket_name) + .build() + .map_err(|_| { + DataFusionError::Common("failed to build gcs client".to_string()) + })?, + ), + _ => unimplemented!(), + }; + + self.ctx + .runtime_env() + .register_object_store(scheme, &bucket_name, store); + + Ok(scheme.to_string()) + } + /// Returns a PyDataFrame whose plan corresponds to the SQL statement. fn sql(&mut self, query: &str, py: Python) -> PyResult { let result = self.ctx.sql(query); @@ -265,3 +310,32 @@ impl PySessionContext { Ok(self.ctx.session_id()) } } + +async fn build_s3_from_sdk_config(bucket_name: &str, sdk_config: &SdkConfig) -> Result { + let credentials_providder = sdk_config + .credentials_provider() + .expect("could not find credentials provider"); + let credentials = credentials_providder + .provide_credentials() + .await + .expect("could not load credentials"); + + let s3_builder = AmazonS3Builder::from_env() + .with_bucket_name(bucket_name) + .with_region( + sdk_config + .region() + .expect("could not find region") + .to_string(), + ) + .with_access_key_id(credentials.access_key_id()) + .with_secret_access_key(credentials.secret_access_key()); + + let s3 = match credentials.session_token() { + Some(session_token) => s3_builder.with_token(session_token), + None => s3_builder, + } + .build()?; + + Ok(s3) +} From b515e29fa88619a0e4c4c9251dc04921cb6c6127 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 20:48:15 -0400 Subject: [PATCH 02/17] remove function --- src/context.rs | 31 +------------------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/src/context.rs b/src/context.rs index 7d7dbdb07..824f78f12 100644 --- a/src/context.rs +++ b/src/context.rs @@ -98,7 +98,7 @@ impl PySessionContext { } } - /// Register an object store the datafusion runtime environment + /// Register an object store w/ the datafusion runtime environment /// /// Returns the scheme of the registered object store (e.g. "s3" or "gs") fn register_object_store(&mut self, object_store_url: String) -> PyResult { @@ -310,32 +310,3 @@ impl PySessionContext { Ok(self.ctx.session_id()) } } - -async fn build_s3_from_sdk_config(bucket_name: &str, sdk_config: &SdkConfig) -> Result { - let credentials_providder = sdk_config - .credentials_provider() - .expect("could not find credentials provider"); - let credentials = credentials_providder - .provide_credentials() - .await - .expect("could not load credentials"); - - let s3_builder = AmazonS3Builder::from_env() - .with_bucket_name(bucket_name) - .with_region( - sdk_config - .region() - .expect("could not find region") - .to_string(), - ) - .with_access_key_id(credentials.access_key_id()) - .with_secret_access_key(credentials.secret_access_key()); - - let s3 = match credentials.session_token() { - Some(session_token) => s3_builder.with_token(session_token), - None => s3_builder, - } - .build()?; - - Ok(s3) -} From 1c7702f8cfb0ff50f1c2a1e468bed3c12d9a8b50 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 21:00:39 -0400 Subject: [PATCH 03/17] cleanup --- Cargo.toml | 4 +--- src/context.rs | 9 ++++----- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3d2e71489..f09b272ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,8 +31,7 @@ rust-version = "1.57" default = ["mimalloc"] [dependencies] -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] } - +tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } rand = "0.7" pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] } datafusion = { version = "^12.0.0", features = ["pyarrow"] } @@ -43,7 +42,6 @@ url = "2.3" mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" futures = "0.3" - object_store = { version = "0.5.0", features = ["aws", "gcp"] } [lib] diff --git a/src/context.rs b/src/context.rs index 824f78f12..665c512a7 100644 --- a/src/context.rs +++ b/src/context.rs @@ -26,15 +26,14 @@ use pyo3::prelude::*; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::record_batch::RecordBatch; -use datafusion::common::Result; use datafusion::datasource::datasource::TableProvider; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; -use object_store::aws::{AmazonS3, AmazonS3Builder}; -use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::aws::AmazonS3Builder; +use object_store::gcp::GoogleCloudStorageBuilder; +use object_store::ObjectStore; use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; @@ -100,7 +99,7 @@ impl PySessionContext { /// Register an object store w/ the datafusion runtime environment /// - /// Returns the scheme of the registered object store (e.g. "s3" or "gs") + /// Returns the scheme of the registered object store (e.g. "s3" or "gcs") fn register_object_store(&mut self, object_store_url: String) -> PyResult { let uri = Url::parse(&object_store_url) .map_err(|_| DataFusionError::Common("failed to parse uri".to_string()))?; From b2f39ea3776744402360f66be3e150fd5f53d64e Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 21:01:27 -0400 Subject: [PATCH 04/17] add doc stub --- docs/source/python/generated/datafusion.SessionContext.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/source/python/generated/datafusion.SessionContext.rst b/docs/source/python/generated/datafusion.SessionContext.rst index 86b942f20..d09712774 100644 --- a/docs/source/python/generated/datafusion.SessionContext.rst +++ b/docs/source/python/generated/datafusion.SessionContext.rst @@ -19,6 +19,7 @@ datafusion.SessionContext ~SessionContext.deregister_table ~SessionContext.empty_table ~SessionContext.register_csv + ~SessionContext.register_object_store ~SessionContext.register_parquet ~SessionContext.register_record_batches ~SessionContext.register_table From 1576a4f0cb424fd3837617db6f0bdb1480578587 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 21:03:57 -0400 Subject: [PATCH 05/17] remove superfluous move --- src/context.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/context.rs b/src/context.rs index 665c512a7..6e1cc8a9d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -112,7 +112,7 @@ impl PySessionContext { "s3" => Arc::new( tokio::runtime::Runtime::new() .unwrap() - .block_on(async move { + .block_on(async { AmazonS3Builder::from_env() .build() .expect("failed to build s3 client from env") From bb9dbcf585b72e2e89373d8bdce7806f6d3da9ae Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 22 Sep 2022 22:22:09 -0400 Subject: [PATCH 06/17] ensure a default region is set (us-east-1) --- src/context.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/context.rs b/src/context.rs index 6e1cc8a9d..aa2d69a09 100644 --- a/src/context.rs +++ b/src/context.rs @@ -114,6 +114,8 @@ impl PySessionContext { .unwrap() .block_on(async { AmazonS3Builder::from_env() + .with_bucket_name(bucket_name) + .with_region(std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-east-1".to_string())) .build() .expect("failed to build s3 client from env") }), From 6e9980c92dedb7e97cf350f1d51004ebaef5aa80 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Fri, 23 Sep 2022 14:57:16 -0400 Subject: [PATCH 07/17] refactor API --- datafusion/store.py | 23 ++++++++++++ src/context.rs | 50 +++--------------------- src/lib.rs | 6 +++ src/store.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 44 deletions(-) create mode 100644 datafusion/store.py create mode 100644 src/store.rs diff --git a/datafusion/store.py b/datafusion/store.py new file mode 100644 index 000000000..d21c3f4f8 --- /dev/null +++ b/datafusion/store.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +from ._internal import store + + +def __getattr__(name): + return getattr(store, name) diff --git a/src/context.rs b/src/context.rs index aa2d69a09..be8f4d079 100644 --- a/src/context.rs +++ b/src/context.rs @@ -18,7 +18,6 @@ use std::path::PathBuf; use std::{collections::HashSet, sync::Arc}; -use url::Url; use uuid::Uuid; use pyo3::exceptions::{PyKeyError, PyValueError}; @@ -31,10 +30,6 @@ use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; -use object_store::aws::AmazonS3Builder; -use object_store::gcp::GoogleCloudStorageBuilder; -use object_store::ObjectStore; - use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; @@ -42,6 +37,8 @@ use crate::errors::DataFusionError; use crate::udaf::PyAggregateUDF; use crate::udf::PyScalarUDF; use crate::utils::wait_for_future; +use crate::store::PyAmazonS3Context; + /// `PySessionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a @@ -97,45 +94,10 @@ impl PySessionContext { } } - /// Register an object store w/ the datafusion runtime environment - /// - /// Returns the scheme of the registered object store (e.g. "s3" or "gcs") - fn register_object_store(&mut self, object_store_url: String) -> PyResult { - let uri = Url::parse(&object_store_url) - .map_err(|_| DataFusionError::Common("failed to parse uri".to_string()))?; - let bucket_name = uri - .host_str() - .ok_or_else(|| DataFusionError::Common("failed to get bucket name".to_string()))?; - let scheme = uri.scheme(); - - let store: Arc = match scheme { - "s3" => Arc::new( - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async { - AmazonS3Builder::from_env() - .with_bucket_name(bucket_name) - .with_region(std::env::var("AWS_DEFAULT_REGION").unwrap_or_else(|_| "us-east-1".to_string())) - .build() - .expect("failed to build s3 client from env") - }), - ), - "gcs" => Arc::new( - GoogleCloudStorageBuilder::new() - .with_bucket_name(bucket_name) - .build() - .map_err(|_| { - DataFusionError::Common("failed to build gcs client".to_string()) - })?, - ), - _ => unimplemented!(), - }; - - self.ctx - .runtime_env() - .register_object_store(scheme, &bucket_name, store); - - Ok(scheme.to_string()) + /// Register a an object store with the given name + fn register_object_store(&mut self, scheme: &str, bucket_name: &str, store: &PyAmazonS3Context) -> PyResult<()> { + self.ctx.runtime_env().register_object_store(scheme, bucket_name, store.store.clone()); + Ok(()) } /// Returns a PyDataFrame whose plan corresponds to the SQL statement. diff --git a/src/lib.rs b/src/lib.rs index 268dc0434..f9ad13196 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,8 @@ mod udaf; #[allow(clippy::borrow_deref_ref)] mod udf; pub mod utils; +pub mod store; + #[cfg(feature = "mimalloc")] #[global_allocator] @@ -64,5 +66,9 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { functions::init_module(funcs)?; m.add_submodule(funcs)?; + let store = PyModule::new(py, "store")?; + store::init_module(store)?; + m.add_submodule(store)?; + Ok(()) } diff --git a/src/store.rs b/src/store.rs new file mode 100644 index 000000000..dfa5bb9e8 --- /dev/null +++ b/src/store.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use pyo3::prelude::*; + +use object_store::aws::{AmazonS3, AmazonS3Builder}; + +#[pyclass(name = "AmazonS3", module = "datafusion.store", subclass, unsendable)] +#[derive(Debug)] +pub(crate) struct PyAmazonS3Context { + pub store: Arc, +} + +#[pymethods] +impl PyAmazonS3Context { + #[allow(clippy::too_many_arguments)] + #[args( + region = "None", + access_key_id = "None", + secret_access_key = "None", + endpoint = "None", + imdsv1_fallback = "false", + allow_http = "false" + )] + #[new] + fn new( + bucket_name: String, + region: Option, + access_key_id: Option, + secret_access_key: Option, + endpoint: Option, + //retry_config: RetryConfig, + allow_http: bool, + imdsv1_fallback: bool, + ) -> Self { + // start w/ the options that come directly from the environment + let mut builder = AmazonS3Builder::from_env(); + + if let Some(region) = region { + builder = builder.with_region(region); + } + + if let Some(access_key_id) = access_key_id { + builder = builder.with_access_key_id(access_key_id); + }; + + if let Some(secret_access_key) = secret_access_key { + builder = builder.with_secret_access_key(secret_access_key); + }; + + if let Some(endpoint) = endpoint { + builder = builder.with_endpoint(endpoint); + }; + + if imdsv1_fallback { + builder = builder.with_imdsv1_fallback(); + }; + + let store = builder + .with_bucket_name(bucket_name) + //.with_retry_config(retry_config) #TODO: add later + .with_allow_http(allow_http) + .build() + .expect("failed to build AmazonS3"); + + Self { + store: Arc::new(store), + } + } +} + + +pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { + m.add_class::()?; + Ok(()) +} \ No newline at end of file From b52dcb5498be952527e1bc639585e7ed14169053 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Fri, 23 Sep 2022 15:58:03 -0400 Subject: [PATCH 08/17] make "host" optional; more closely resemble rust API --- src/context.rs | 10 ++++++++-- src/store.rs | 4 +++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/context.rs b/src/context.rs index be8f4d079..2841671b8 100644 --- a/src/context.rs +++ b/src/context.rs @@ -95,8 +95,14 @@ impl PySessionContext { } /// Register a an object store with the given name - fn register_object_store(&mut self, scheme: &str, bucket_name: &str, store: &PyAmazonS3Context) -> PyResult<()> { - self.ctx.runtime_env().register_object_store(scheme, bucket_name, store.store.clone()); + fn register_object_store(&mut self, scheme: &str, store: &PyAmazonS3Context, host: Option<&str>) -> PyResult<()> { + let derived_host = match host { + Some(b) => b, + None => { + &store.bucket_name + } + }; + self.ctx.runtime_env().register_object_store(scheme, derived_host, store.store.clone()); Ok(()) } diff --git a/src/store.rs b/src/store.rs index dfa5bb9e8..5b43bba78 100644 --- a/src/store.rs +++ b/src/store.rs @@ -25,6 +25,7 @@ use object_store::aws::{AmazonS3, AmazonS3Builder}; #[derive(Debug)] pub(crate) struct PyAmazonS3Context { pub store: Arc, + pub bucket_name: String, } #[pymethods] @@ -73,7 +74,7 @@ impl PyAmazonS3Context { }; let store = builder - .with_bucket_name(bucket_name) + .with_bucket_name(bucket_name.clone()) //.with_retry_config(retry_config) #TODO: add later .with_allow_http(allow_http) .build() @@ -81,6 +82,7 @@ impl PyAmazonS3Context { Self { store: Arc::new(store), + bucket_name, } } } From a9ea2eadbdefe8e8d3b4f7ca7115afdba7025984 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Fri, 23 Sep 2022 17:02:15 -0400 Subject: [PATCH 09/17] nit: change module name --- datafusion/{store.py => object_store.py} | 4 ++-- src/lib.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) rename datafusion/{store.py => object_store.py} (91%) diff --git a/datafusion/store.py b/datafusion/object_store.py similarity index 91% rename from datafusion/store.py rename to datafusion/object_store.py index d21c3f4f8..70ecbd2bb 100644 --- a/datafusion/store.py +++ b/datafusion/object_store.py @@ -16,8 +16,8 @@ # under the License. -from ._internal import store +from ._internal import object_store def __getattr__(name): - return getattr(store, name) + return getattr(object_store, name) diff --git a/src/lib.rs b/src/lib.rs index f9ad13196..c770113d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,7 +66,7 @@ fn _internal(py: Python, m: &PyModule) -> PyResult<()> { functions::init_module(funcs)?; m.add_submodule(funcs)?; - let store = PyModule::new(py, "store")?; + let store = PyModule::new(py, "object_store")?; store::init_module(store)?; m.add_submodule(store)?; From 0b7700f1c25ab0bb0c984bfb6b4898e74c510655 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Mon, 26 Sep 2022 13:20:37 -0400 Subject: [PATCH 10/17] add unified API via &PyAny --- src/context.rs | 40 +++++++++++++++++++++++++++---------- src/store.rs | 53 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 77 insertions(+), 16 deletions(-) diff --git a/src/context.rs b/src/context.rs index 2841671b8..d6282c61d 100644 --- a/src/context.rs +++ b/src/context.rs @@ -15,9 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::path::PathBuf; -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; +use object_store::ObjectStore; use uuid::Uuid; use pyo3::exceptions::{PyKeyError, PyValueError}; @@ -34,11 +36,10 @@ use crate::catalog::{PyCatalog, PyTable}; use crate::dataframe::PyDataFrame; use crate::dataset::Dataset; use crate::errors::DataFusionError; +use crate::store::StorageContexts; use crate::udaf::PyAggregateUDF; use crate::udf::PyScalarUDF; use crate::utils::wait_for_future; -use crate::store::PyAmazonS3Context; - /// `PySessionContext` is able to plan and execute DataFusion plans. /// It has a powerful optimizer, a physical planner for local execution, and a @@ -95,14 +96,33 @@ impl PySessionContext { } /// Register a an object store with the given name - fn register_object_store(&mut self, scheme: &str, store: &PyAmazonS3Context, host: Option<&str>) -> PyResult<()> { - let derived_host = match host { - Some(b) => b, - None => { - &store.bucket_name - } + fn register_object_store( + &mut self, + scheme: &str, + store: &PyAny, + host: Option<&str>, + ) -> PyResult<()> { + let res: Result<(Arc, String), PyErr> = + match StorageContexts::extract(store) { + Ok(store) => match store { + StorageContexts::AmazonS3(s3) => Ok((s3.inner, s3.bucket_name)), + StorageContexts::GoogleCloudStorage(gcs) => Ok((gcs.inner, gcs.bucket_name)), + }, + Err(_e) => Err(PyValueError::new_err("Invalid object store")), + }; + + // for most stores the "host" is the bucket name + let (store, upstream_host) = res?; + + let derived_host = if let Some(host) = host { + host + } else { + &upstream_host }; - self.ctx.runtime_env().register_object_store(scheme, derived_host, store.store.clone()); + + self.ctx + .runtime_env() + .register_object_store(scheme, derived_host, store); Ok(()) } diff --git a/src/store.rs b/src/store.rs index 5b43bba78..28da2764d 100644 --- a/src/store.rs +++ b/src/store.rs @@ -20,11 +20,53 @@ use std::sync::Arc; use pyo3::prelude::*; use object_store::aws::{AmazonS3, AmazonS3Builder}; +use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; + +#[derive(FromPyObject)] +pub enum StorageContexts { + AmazonS3(PyAmazonS3Context), + GoogleCloudStorage(PyGoogleCloudContext), +} + +#[pyclass( + name = "GoogleCloud", + module = "datafusion.store", + subclass, + unsendable +)] +#[derive(Debug, Clone)] +pub struct PyGoogleCloudContext { + pub inner: Arc, + pub bucket_name: String, +} + +#[pymethods] +impl PyGoogleCloudContext { + #[allow(clippy::too_many_arguments)] + #[args(service_account_path = "None")] + #[new] + fn new(bucket_name: String, service_account_path: Option) -> Self { + let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(&bucket_name); + + if let Some(credential_path) = service_account_path { + builder = builder.with_service_account_path(credential_path); + } + + Self { + inner: Arc::new( + builder + .build() + .expect("Could not create Google Cloud Storage"), + ), + bucket_name, + } + } +} #[pyclass(name = "AmazonS3", module = "datafusion.store", subclass, unsendable)] -#[derive(Debug)] -pub(crate) struct PyAmazonS3Context { - pub store: Arc, +#[derive(Debug, Clone)] +pub struct PyAmazonS3Context { + pub inner: Arc, pub bucket_name: String, } @@ -81,14 +123,13 @@ impl PyAmazonS3Context { .expect("failed to build AmazonS3"); Self { - store: Arc::new(store), + inner: Arc::new(store), bucket_name, } } } - pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; Ok(()) -} \ No newline at end of file +} From 31b759992d097089a5fd7d9cb35a286ef92a1259 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Mon, 26 Sep 2022 13:52:26 -0400 Subject: [PATCH 11/17] add azure support --- Cargo.toml | 2 +- src/context.rs | 7 ++-- src/store.rs | 89 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 95 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f09b272ba..c859f9111 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ url = "2.3" mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" futures = "0.3" -object_store = { version = "0.5.0", features = ["aws", "gcp"] } +object_store = { version = "0.5.0", features = ["aws", "gcp", "azure"] } [lib] name = "datafusion_python" diff --git a/src/context.rs b/src/context.rs index d6282c61d..f5ae741ef 100644 --- a/src/context.rs +++ b/src/context.rs @@ -107,13 +107,16 @@ impl PySessionContext { Ok(store) => match store { StorageContexts::AmazonS3(s3) => Ok((s3.inner, s3.bucket_name)), StorageContexts::GoogleCloudStorage(gcs) => Ok((gcs.inner, gcs.bucket_name)), + StorageContexts::MicrosoftAzure(azure) => { + Ok((azure.inner, azure.container_name)) + } }, Err(_e) => Err(PyValueError::new_err("Invalid object store")), }; - // for most stores the "host" is the bucket name + // for most stores the "host" is the bucket name and can be inferred from the store let (store, upstream_host) = res?; - + // let users override the host to match the api signature from upstream let derived_host = if let Some(host) = host { host } else { diff --git a/src/store.rs b/src/store.rs index 28da2764d..0bbc5503c 100644 --- a/src/store.rs +++ b/src/store.rs @@ -20,12 +20,101 @@ use std::sync::Arc; use pyo3::prelude::*; use object_store::aws::{AmazonS3, AmazonS3Builder}; +use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; #[derive(FromPyObject)] pub enum StorageContexts { AmazonS3(PyAmazonS3Context), GoogleCloudStorage(PyGoogleCloudContext), + MicrosoftAzure(PyMicrosoftAzureContext), +} + +#[pyclass( + name = "MicrosoftAzure", + module = "datafusion.store", + subclass, + unsendable +)] +#[derive(Debug, Clone)] +pub struct PyMicrosoftAzureContext { + pub inner: Arc, + pub container_name: String, +} + +#[pymethods] +impl PyMicrosoftAzureContext { + #[allow(clippy::too_many_arguments)] + #[args( + account = "None", + access_key = "None", + bearer_token = "None", + client_id = "None", + client_secret = "None", + tenant_id = "None", + sas_query_pairs = "None", + use_emulator = "None", + allow_http = "None" + )] + #[new] + fn new( + container_name: String, + account: Option, + access_key: Option, + bearer_token: Option, + client_id: Option, + client_secret: Option, + tenant_id: Option, + sas_query_pairs: Option>, + use_emulator: Option, + allow_http: Option, + ) -> Self { + let mut builder = MicrosoftAzureBuilder::from_env().with_container_name(&container_name); + + if let Some(account) = account { + builder = builder.with_account(account); + } + + if let Some(access_key) = access_key { + builder = builder.with_access_key(access_key); + } + + if let Some(bearer_token) = bearer_token { + builder = builder.with_bearer_token_authorization(bearer_token); + } + + match (client_id, client_secret, tenant_id) { + (Some(client_id), Some(client_secret), Some(tenant_id)) => { + builder = + builder.with_client_secret_authorization(client_id, client_secret, tenant_id); + } + (None, None, None) => {} + _ => { + panic!("client_id, client_secret, tenat_id must be all set or all None"); + } + } + + if let Some(sas_query_pairs) = sas_query_pairs { + builder = builder.with_sas_authorization(sas_query_pairs); + } + + if let Some(use_emulator) = use_emulator { + builder = builder.with_use_emulator(use_emulator); + } + + if let Some(allow_http) = allow_http { + builder = builder.with_allow_http(allow_http); + } + + Self { + inner: Arc::new( + builder + .build() + .expect("Could not create Google Cloud Storage"), + ), + container_name, + } + } } #[pyclass( From 14a01cd1e28efe1188a1a45974877afa0096b128 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Mon, 26 Sep 2022 13:53:58 -0400 Subject: [PATCH 12/17] fix error message --- src/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store.rs b/src/store.rs index 0bbc5503c..935c1ebdf 100644 --- a/src/store.rs +++ b/src/store.rs @@ -110,7 +110,7 @@ impl PyMicrosoftAzureContext { inner: Arc::new( builder .build() - .expect("Could not create Google Cloud Storage"), + .expect("Could not create Azure Storage context"), //TODO: change these to PyErr ), container_name, } From 640bd1d681808b1e578c3cf0360836b9f51c2acf Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Tue, 27 Sep 2022 14:05:57 -0400 Subject: [PATCH 13/17] drop url as dep --- Cargo.lock | 5 ++--- Cargo.toml | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d2d4a532..106b346bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -452,7 +452,6 @@ dependencies = [ "pyo3", "rand 0.7.3", "tokio", - "url", "uuid 0.8.2", ] @@ -2213,9 +2212,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.22.4" +version = "0.22.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1c760f0d366a6c24a02ed7816e23e691f5d92291f94d15e836006fd11b04daf" +checksum = "368bfe657969fb01238bb756d351dcade285e0f6fcbd36dcb23359a5169975be" dependencies = [ "webpki", ] diff --git a/Cargo.toml b/Cargo.toml index c859f9111..a9a017cbf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ datafusion = { version = "^12.0.0", features = ["pyarrow"] } datafusion-expr = { version = "^12.0.0" } datafusion-common = { version = "^12.0.0", features = ["pyarrow"] } uuid = { version = "0.8", features = ["v4"] } -url = "2.3" mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" futures = "0.3" From 9f0b40805d0979278c36b68230b439d43e204db2 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 13 Oct 2022 09:38:02 -0400 Subject: [PATCH 14/17] run cargo fmt --- src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c770113d5..b866da2fe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,13 +33,12 @@ mod expression; #[allow(clippy::borrow_deref_ref)] mod functions; mod pyarrow_filter_expression; +pub mod store; #[allow(clippy::borrow_deref_ref)] mod udaf; #[allow(clippy::borrow_deref_ref)] mod udf; pub mod utils; -pub mod store; - #[cfg(feature = "mimalloc")] #[global_allocator] From 26c43899a7850679d64d2ba7a157e35c75d23aab Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 13 Oct 2022 15:31:57 -0400 Subject: [PATCH 15/17] bump to 0.5.1 of object_store --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 79d3d5a44..afdb9a987 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ uuid = { version = "0.8", features = ["v4"] } mimalloc = { version = "*", optional = true, default-features = false } async-trait = "0.1" futures = "0.3" -object_store = { version = "0.5.0", features = ["aws", "gcp", "azure"] } +object_store = { version = "0.5.1", features = ["aws", "gcp", "azure"] } [lib] name = "datafusion_python" From 0d6980441df060fd2d7785913b0745b0d70f8e94 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Tue, 25 Oct 2022 20:57:59 -0400 Subject: [PATCH 16/17] add local store; test register store --- .gitignore | 1 + datafusion/tests/test_store.py | 45 ++++++++++++++++++++++++++++++++++ src/context.rs | 1 + src/store.rs | 36 +++++++++++++++++++++++++++ 4 files changed, 83 insertions(+) create mode 100644 datafusion/tests/test_store.py diff --git a/.gitignore b/.gitignore index cbd980ebc..5b6cf361b 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ venv apache-rat-*.jar *rat.txt +.env diff --git a/datafusion/tests/test_store.py b/datafusion/tests/test_store.py new file mode 100644 index 000000000..7c0823df8 --- /dev/null +++ b/datafusion/tests/test_store.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest, os + +from datafusion import SessionContext +from datafusion.object_store import LocalFileSystem + + +@pytest.fixture +def local(): + return LocalFileSystem() + + +@pytest.fixture +def ctx(local): + ctx = SessionContext() + ctx.register_object_store("local", local, None) + return ctx + + +def test_read_parquet(ctx): + ctx.register_parquet( + "test", + f"file://{os.getcwd()}/testing/data/parquet", + [], + True, + ".parquet", + ) + df = ctx.sql("SELECT * FROM test") + assert isinstance(df.collect(), list) diff --git a/src/context.rs b/src/context.rs index ef1598521..ec7d8e674 100644 --- a/src/context.rs +++ b/src/context.rs @@ -111,6 +111,7 @@ impl PySessionContext { StorageContexts::MicrosoftAzure(azure) => { Ok((azure.inner, azure.container_name)) } + StorageContexts::LocalFileSystem(local) => Ok((local.inner, "".to_string())), }, Err(_e) => Err(PyValueError::new_err("Invalid object store")), }; diff --git a/src/store.rs b/src/store.rs index 935c1ebdf..2e8c9eb25 100644 --- a/src/store.rs +++ b/src/store.rs @@ -22,12 +22,45 @@ use pyo3::prelude::*; use object_store::aws::{AmazonS3, AmazonS3Builder}; use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; use object_store::gcp::{GoogleCloudStorage, GoogleCloudStorageBuilder}; +use object_store::local::LocalFileSystem; #[derive(FromPyObject)] pub enum StorageContexts { AmazonS3(PyAmazonS3Context), GoogleCloudStorage(PyGoogleCloudContext), MicrosoftAzure(PyMicrosoftAzureContext), + LocalFileSystem(PyLocalFileSystemContext), +} + +#[pyclass( + name = "LocalFileSystem", + module = "datafusion.store", + subclass, + unsendable +)] +#[derive(Debug, Clone)] +pub struct PyLocalFileSystemContext { + pub inner: Arc, +} + +#[pymethods] +impl PyLocalFileSystemContext { + #[args(prefix = "None")] + #[new] + fn new(prefix: Option) -> Self { + if let Some(prefix) = prefix { + Self { + inner: Arc::new( + LocalFileSystem::new_with_prefix(prefix) + .expect("Could not create local LocalFileSystem"), + ), + } + } else { + Self { + inner: Arc::new(LocalFileSystem::new()), + } + } + } } #[pyclass( @@ -220,5 +253,8 @@ impl PyAmazonS3Context { pub(crate) fn init_module(m: &PyModule) -> PyResult<()> { m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } From a32c65d745e6b488c9c1e10784adf0a7fb7436f0 Mon Sep 17 00:00:00 2001 From: Will Eaton Date: Thu, 3 Nov 2022 14:45:24 -0400 Subject: [PATCH 17/17] move import to new line to make the linter happy --- datafusion/tests/test_store.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/tests/test_store.py b/datafusion/tests/test_store.py index 7c0823df8..d6f0db583 100644 --- a/datafusion/tests/test_store.py +++ b/datafusion/tests/test_store.py @@ -15,7 +15,8 @@ # specific language governing permissions and limitations # under the License. -import pytest, os +import os +import pytest from datafusion import SessionContext from datafusion.object_store import LocalFileSystem