From 0590cd0a446af3fe47ff254fd8b97d7f5117273f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 16 Nov 2021 13:18:50 +0300 Subject: [PATCH 1/4] feat: support stale reads --- .../cloud/sqlalchemy_spanner/sqlalchemy_spanner.py | 13 +++++++++++++ test/test_suite.py | 12 +++++++++++- 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py index d65fc0a9..118672e4 100644 --- a/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py +++ b/google/cloud/sqlalchemy_spanner/sqlalchemy_spanner.py @@ -25,7 +25,9 @@ from sqlalchemy import ForeignKeyConstraint, types, util from sqlalchemy.engine.base import Engine from sqlalchemy.engine.default import DefaultDialect, DefaultExecutionContext +from sqlalchemy.event import listens_for from sqlalchemy.ext.compiler import compiles +from sqlalchemy.pool import Pool from sqlalchemy.sql.compiler import ( selectable, DDLCompiler, @@ -38,6 +40,13 @@ from google.cloud import spanner_dbapi from google.cloud.sqlalchemy_spanner._opentelemetry_tracing import trace_call + +@listens_for(Pool, "reset") +def reset_connection(dbapi_conn, connection_record): + """An event of returning a connection back to a pool.""" + dbapi_conn.connection.staleness = None + + # Spanner-to-SQLAlchemy types map _type_map = { "BOOL": types.Boolean, @@ -128,6 +137,10 @@ def pre_exec(self): if read_only is not None: self._dbapi_connection.connection.read_only = read_only + staleness = self.execution_options.get("staleness", None) + if staleness is not None: + self._dbapi_connection.connection.staleness = staleness + class SpannerIdentifierPreparer(IdentifierPreparer): """Identifiers compiler. diff --git a/test/test_suite.py b/test/test_suite.py index ea40bf09..e9c556d0 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -1578,7 +1578,7 @@ class ExecutionOptionsTest(fixtures.TestBase): """ def setUp(self): - self._engine = create_engine(get_db_url()) + self._engine = create_engine(get_db_url(), pool_size=1) self._metadata = MetaData(bind=self._engine) self._table = Table( @@ -1594,3 +1594,13 @@ def test_read_only(self): with self._engine.connect().execution_options(read_only=True) as connection: connection.execute(select(["*"], from_obj=self._table)).fetchall() assert connection.connection.read_only is True + + def test_staleness(self): + with self._engine.connect().execution_options( + staleness={"max_staleness": 5} + ) as connection: + connection.execute(select(["*"], from_obj=self._table)).fetchall() + assert connection.connection.staleness == {"max_staleness": 5} + + with self._engine.connect() as connection: + assert connection.connection.staleness is None From 1de41a7b8a424001a8f2a1efa6306aa49fe5734e Mon Sep 17 00:00:00 2001 From: Ilya Gurov Date: Tue, 16 Nov 2021 13:28:19 +0300 Subject: [PATCH 2/4] Update README.md --- README.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/README.md b/README.md index b8c557b6..78ac39f1 100644 --- a/README.md +++ b/README.md @@ -167,6 +167,14 @@ Note that execution options are applied lazily - on the `execute()` method call, ReadOnly/ReadWrite mode of a connection can't be changed while a transaction is in progress - first you must commit or rollback it. +### Stale reads +To use the Spanner [Stale Reads](https://cloud.google.com/spanner/docs/reads#perform-stale-read) with SQLAlchemy you can tweak the connection execution options with a wanted staleness value. For example: +```python +with engine.connect().execution_options(staleness={"max_staleness": 5}) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` +Note that the set option will be dropped when the connection is returned back to the pool. + ### DDL and transactions DDL statements are executed outside the regular transactions mechanism, which means DDL statements will not be rolled back on normal transaction rollback. From 4796b9cb184ff0ffdb0f829f7f963b5894983000 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 17 Nov 2021 12:46:08 +0300 Subject: [PATCH 3/4] fix --- test/test_suite.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/test_suite.py b/test/test_suite.py index e9c556d0..74c5d447 100644 --- a/test/test_suite.py +++ b/test/test_suite.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from datetime import timezone +import datetime import decimal import operator import os @@ -975,7 +975,9 @@ def test_row_w_scalar_select(self): eq_( row["somelabel"], - DatetimeWithNanoseconds(2006, 5, 12, 12, 0, 0, tzinfo=timezone.utc), + DatetimeWithNanoseconds( + 2006, 5, 12, 12, 0, 0, tzinfo=datetime.timezone.utc + ), ) @@ -1597,10 +1599,12 @@ def test_read_only(self): def test_staleness(self): with self._engine.connect().execution_options( - staleness={"max_staleness": 5} + read_only=True, staleness={"max_staleness": datetime.timedelta(seconds=5)} ) as connection: connection.execute(select(["*"], from_obj=self._table)).fetchall() - assert connection.connection.staleness == {"max_staleness": 5} + assert connection.connection.staleness == { + "max_staleness": datetime.timedelta(seconds=5) + } with self._engine.connect() as connection: assert connection.connection.staleness is None From 848dc585d019064f07a88eaaa3143c213fd18a0e Mon Sep 17 00:00:00 2001 From: Ilya Gurov Date: Wed, 17 Nov 2021 13:01:37 +0300 Subject: [PATCH 4/4] Update README.md --- README.md | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 78ac39f1..e88babe9 100644 --- a/README.md +++ b/README.md @@ -170,7 +170,38 @@ ReadOnly/ReadWrite mode of a connection can't be changed while a transaction is ### Stale reads To use the Spanner [Stale Reads](https://cloud.google.com/spanner/docs/reads#perform-stale-read) with SQLAlchemy you can tweak the connection execution options with a wanted staleness value. For example: ```python -with engine.connect().execution_options(staleness={"max_staleness": 5}) as connection: +# maximum staleness +with engine.connect().execution_options( + read_only=True, + staleness={"max_staleness": datetime.timedelta(seconds=5)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# exact staleness +with engine.connect().execution_options( + read_only=True, + staleness={"exact_staleness": datetime.timedelta(seconds=5)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# min read timestamp +with engine.connect().execution_options( + read_only=True, + staleness={"min_read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)} +) as connection: + connection.execute(select(["*"], from_obj=table)).fetchall() +``` + +```python +# read timestamp +with engine.connect().execution_options( + read_only=True, + staleness={"read_timestamp": datetime.datetime(2021, 11, 17, 12, 55, 30)} +) as connection: connection.execute(select(["*"], from_obj=table)).fetchall() ``` Note that the set option will be dropped when the connection is returned back to the pool.