Skip to content

Commit e9df810

Browse files
authored
docs: add sample for stale reads (#539)
* docs: add sample for stale reads Adds a sample and tests for executing stale reads on Spanner. Using stale reads can improve performance when the application does not require the guarantees that are given by strong reads. Fixes #495 * chore: remove GetSession requests
1 parent 108d965 commit e9df810

File tree

4 files changed

+298
-0
lines changed

4 files changed

+298
-0
lines changed

samples/noxfile.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ def transaction(session):
5757
_sample(session)
5858

5959

60+
@nox.session()
61+
def stale_read(session):
62+
_sample(session)
63+
64+
6065
@nox.session()
6166
def read_only_transaction(session):
6267
_sample(session)

samples/stale_read_sample.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import uuid
16+
from sqlalchemy import create_engine, Engine, select, text
17+
from sqlalchemy.orm import Session
18+
from sample_helper import run_sample
19+
from model import Singer
20+
21+
22+
# Shows how to execute stale reads on Spanner using SQLAlchemy.
23+
def stale_read_sample():
24+
engine = create_engine(
25+
"spanner:///projects/sample-project/"
26+
"instances/sample-instance/"
27+
"databases/sample-database",
28+
echo=True,
29+
)
30+
# First get the current database timestamp. We can use this timestamp to
31+
# query the database at a point in time where we know it was empty.
32+
with Session(engine.execution_options(isolation_level="AUTOCOMMIT")) as session:
33+
timestamp = session.execute(select(text("current_timestamp"))).one()[0]
34+
print(timestamp)
35+
36+
# Insert a few test rows.
37+
insert_test_data(engine)
38+
39+
# Create a session that uses a read-only transaction with a strong timestamp
40+
# bound. This means that it will read all data that has been committed at the
41+
# time this transaction starts.
42+
# Read-only transactions do not take locks, and are therefore preferred
43+
# above read/write transactions for workloads that only read data on Spanner.
44+
with Session(engine.execution_options(read_only=True)) as session:
45+
print("Found singers with strong timestamp bound:")
46+
singers = session.query(Singer).order_by(Singer.last_name).all()
47+
for singer in singers:
48+
print("Singer: ", singer.full_name)
49+
50+
# Create a session that uses a read-only transaction that selects data in
51+
# the past. We'll use the timestamp that we retrieved before inserting the
52+
# test data for this transaction.
53+
with Session(
54+
engine.execution_options(
55+
read_only=True, staleness={"read_timestamp": timestamp}
56+
)
57+
) as session:
58+
print("Searching for singers using a read timestamp in the past:")
59+
singers = session.query(Singer).order_by(Singer.last_name).all()
60+
if singers:
61+
for singer in singers:
62+
print("Singer: ", singer.full_name)
63+
else:
64+
print("No singers found.")
65+
66+
# Spanner also supports min_read_timestamp and max_staleness as staleness
67+
# options. These can only be used in auto-commit mode.
68+
# Spanner will choose a read timestamp that satisfies the given restriction
69+
# and that can be served as efficiently as possible.
70+
with Session(
71+
engine.execution_options(
72+
isolation_level="AUTOCOMMIT", staleness={"max_staleness": {"seconds": 15}}
73+
)
74+
) as session:
75+
print("Searching for singers using a max staleness of 15 seconds:")
76+
singers = session.query(Singer).order_by(Singer.last_name).all()
77+
if singers:
78+
for singer in singers:
79+
print("Singer: ", singer.full_name)
80+
else:
81+
print("No singers found.")
82+
83+
84+
def insert_test_data(engine: Engine):
85+
with Session(engine) as session:
86+
session.add_all(
87+
[
88+
Singer(id=str(uuid.uuid4()), first_name="John", last_name="Doe"),
89+
Singer(id=str(uuid.uuid4()), first_name="Jane", last_name="Doe"),
90+
]
91+
)
92+
session.commit()
93+
94+
95+
if __name__ == "__main__":
96+
run_sample(stale_read_sample)
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from sqlalchemy import String, BigInteger
16+
from sqlalchemy.orm import DeclarativeBase
17+
from sqlalchemy.orm import Mapped
18+
from sqlalchemy.orm import mapped_column
19+
20+
21+
class Base(DeclarativeBase):
22+
pass
23+
24+
25+
class Singer(Base):
26+
__tablename__ = "singers"
27+
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
28+
name: Mapped[str] = mapped_column(String)
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
# Copyright 2024 Google LLC All rights reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import datetime
16+
from sqlalchemy import create_engine, select
17+
from sqlalchemy.orm import Session
18+
from sqlalchemy.testing import eq_, is_instance_of
19+
from google.cloud.spanner_v1 import (
20+
FixedSizePool,
21+
BatchCreateSessionsRequest,
22+
ExecuteSqlRequest,
23+
BeginTransactionRequest,
24+
TransactionOptions,
25+
)
26+
from test.mockserver_tests.mock_server_test_base import MockServerTestBase
27+
from test.mockserver_tests.mock_server_test_base import add_result
28+
import google.cloud.spanner_v1.types.type as spanner_type
29+
import google.cloud.spanner_v1.types.result_set as result_set
30+
31+
32+
class TestStaleReads(MockServerTestBase):
33+
def test_stale_read_multi_use(self):
34+
from test.mockserver_tests.stale_read_model import Singer
35+
36+
add_singer_query_result("SELECT singers.id, singers.name \n" + "FROM singers")
37+
engine = create_engine(
38+
"spanner:///projects/p/instances/i/databases/d",
39+
echo=True,
40+
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
41+
)
42+
43+
timestamp = datetime.datetime.fromtimestamp(1733328910)
44+
for i in range(2):
45+
with Session(
46+
engine.execution_options(
47+
read_only=True,
48+
staleness={"read_timestamp": timestamp},
49+
)
50+
) as session:
51+
# Execute two queries in a read-only transaction.
52+
session.scalars(select(Singer)).all()
53+
session.scalars(select(Singer)).all()
54+
55+
# Verify the requests that we got.
56+
requests = self.spanner_service.requests
57+
eq_(7, len(requests))
58+
is_instance_of(requests[0], BatchCreateSessionsRequest)
59+
is_instance_of(requests[1], BeginTransactionRequest)
60+
is_instance_of(requests[2], ExecuteSqlRequest)
61+
is_instance_of(requests[3], ExecuteSqlRequest)
62+
is_instance_of(requests[4], BeginTransactionRequest)
63+
is_instance_of(requests[5], ExecuteSqlRequest)
64+
is_instance_of(requests[6], ExecuteSqlRequest)
65+
# Verify that the transaction is a read-only transaction.
66+
for index in [1, 4]:
67+
begin_request: BeginTransactionRequest = requests[index]
68+
eq_(
69+
TransactionOptions(
70+
dict(
71+
read_only=TransactionOptions.ReadOnly(
72+
dict(
73+
read_timestamp={"seconds": 1733328910},
74+
return_read_timestamp=True,
75+
)
76+
)
77+
)
78+
),
79+
begin_request.options,
80+
)
81+
82+
def test_stale_read_single_use(self):
83+
from test.mockserver_tests.stale_read_model import Singer
84+
85+
add_singer_query_result("SELECT singers.id, singers.name\n" + "FROM singers")
86+
engine = create_engine(
87+
"spanner:///projects/p/instances/i/databases/d",
88+
echo=True,
89+
connect_args={"client": self.client, "pool": FixedSizePool(size=10)},
90+
)
91+
92+
with Session(
93+
engine.execution_options(
94+
isolation_level="AUTOCOMMIT",
95+
staleness={"max_staleness": {"seconds": 15}},
96+
)
97+
) as session:
98+
# Execute two queries in autocommit.
99+
session.scalars(select(Singer)).all()
100+
session.scalars(select(Singer)).all()
101+
102+
# Verify the requests that we got.
103+
requests = self.spanner_service.requests
104+
eq_(3, len(requests))
105+
is_instance_of(requests[0], BatchCreateSessionsRequest)
106+
is_instance_of(requests[1], ExecuteSqlRequest)
107+
is_instance_of(requests[2], ExecuteSqlRequest)
108+
# Verify that the requests use a stale read.
109+
for index in [1, 2]:
110+
execute_request: ExecuteSqlRequest = requests[index]
111+
eq_(
112+
TransactionOptions(
113+
dict(
114+
read_only=TransactionOptions.ReadOnly(
115+
dict(
116+
max_staleness={"seconds": 15},
117+
return_read_timestamp=True,
118+
)
119+
)
120+
)
121+
),
122+
execute_request.transaction.single_use,
123+
)
124+
125+
126+
def add_singer_query_result(sql: str):
127+
result = result_set.ResultSet(
128+
dict(
129+
metadata=result_set.ResultSetMetadata(
130+
dict(
131+
row_type=spanner_type.StructType(
132+
dict(
133+
fields=[
134+
spanner_type.StructType.Field(
135+
dict(
136+
name="singers_id",
137+
type=spanner_type.Type(
138+
dict(code=spanner_type.TypeCode.INT64)
139+
),
140+
)
141+
),
142+
spanner_type.StructType.Field(
143+
dict(
144+
name="singers_name",
145+
type=spanner_type.Type(
146+
dict(code=spanner_type.TypeCode.STRING)
147+
),
148+
)
149+
),
150+
]
151+
)
152+
)
153+
)
154+
),
155+
)
156+
)
157+
result.rows.extend(
158+
[
159+
(
160+
"1",
161+
"Jane Doe",
162+
),
163+
(
164+
"2",
165+
"John Doe",
166+
),
167+
]
168+
)
169+
add_result(sql, result)

0 commit comments

Comments
 (0)