77# Copyright (c) Microsoft Corporation. All rights reserved.
88# Licensed under the MIT License.
99from typing import Dict , List
10- from threading import Semaphore
10+ from threading import Lock
1111import json
1212
1313from azure .cosmos import documents , http_constants
@@ -29,7 +29,9 @@ def __init__(
2929 database_id : str = None ,
3030 container_id : str = None ,
3131 cosmos_client_options : dict = None ,
32- container_throughput : int = None ,
32+ container_throughput : int = 400 ,
33+ key_suffix : str = "" ,
34+ compatibility_mode : bool = False ,
3335 ** kwargs ,
3436 ):
3537 """Create the Config object.
@@ -41,6 +43,10 @@ def __init__(
4143 :param cosmos_client_options: The options for the CosmosClient. Currently only supports connection_policy and
4244 consistency_level
4345 :param container_throughput: The throughput set when creating the Container. Defaults to 400.
46+ :param key_suffix: The suffix to be added to every key. The keySuffix must contain only valid ComosDb
47+ key characters. (e.g. not: '\\ ', '?', '/', '#', '*')
48+ :param compatibility_mode: True if keys should be truncated in order to support previous CosmosDb
49+ max key length of 255.
4450 :return CosmosDbPartitionedConfig:
4551 """
4652 self .__config_file = kwargs .get ("filename" )
@@ -56,6 +62,8 @@ def __init__(
5662 self .container_throughput = container_throughput or kwargs .get (
5763 "container_throughput"
5864 )
65+ self .key_suffix = key_suffix or kwargs .get ("key_suffix" )
66+ self .compatibility_mode = compatibility_mode or kwargs .get ("compatibility_mode" )
5967
6068
6169class CosmosDbPartitionedStorage (Storage ):
@@ -71,7 +79,21 @@ def __init__(self, config: CosmosDbPartitionedConfig):
7179 self .client = None
7280 self .database = None
7381 self .container = None
74- self .__semaphore = Semaphore ()
82+ self .compatability_mode_partition_key = False
83+ # Lock used for synchronizing container creation
84+ self .__lock = Lock ()
85+ if config .key_suffix is None :
86+ config .key_suffix = ""
87+ if not config .key_suffix .__eq__ ("" ):
88+ if config .compatibility_mode :
89+ raise Exception (
90+ "compatibilityMode cannot be true while using a keySuffix."
91+ )
92+ suffix_escaped = CosmosDbKeyEscape .sanitize_key (config .key_suffix )
93+ if not suffix_escaped .__eq__ (config .key_suffix ):
94+ raise Exception (
95+ f"Cannot use invalid Row Key characters: { config .key_suffix } in keySuffix."
96+ )
7597
7698 async def read (self , keys : List [str ]) -> Dict [str , object ]:
7799 """Read storeitems from storage.
@@ -88,10 +110,12 @@ async def read(self, keys: List[str]) -> Dict[str, object]:
88110
89111 for key in keys :
90112 try :
91- escaped_key = CosmosDbKeyEscape .sanitize_key (key )
113+ escaped_key = CosmosDbKeyEscape .sanitize_key (
114+ key , self .config .key_suffix , self .config .compatibility_mode
115+ )
92116
93117 read_item_response = self .client .ReadItem (
94- self .__item_link (escaped_key ), { "partitionKey" : escaped_key }
118+ self .__item_link (escaped_key ), self . __get_partition_key ( escaped_key )
95119 )
96120 document_store_item = read_item_response
97121 if document_store_item :
@@ -128,7 +152,9 @@ async def write(self, changes: Dict[str, object]):
128152 for (key , change ) in changes .items ():
129153 e_tag = change .get ("e_tag" , None )
130154 doc = {
131- "id" : CosmosDbKeyEscape .sanitize_key (key ),
155+ "id" : CosmosDbKeyEscape .sanitize_key (
156+ key , self .config .key_suffix , self .config .compatibility_mode
157+ ),
132158 "realId" : key ,
133159 "document" : self .__create_dict (change ),
134160 }
@@ -161,11 +187,13 @@ async def delete(self, keys: List[str]):
161187 await self .initialize ()
162188
163189 for key in keys :
164- escaped_key = CosmosDbKeyEscape .sanitize_key (key )
190+ escaped_key = CosmosDbKeyEscape .sanitize_key (
191+ key , self .config .key_suffix , self .config .compatibility_mode
192+ )
165193 try :
166194 self .client .DeleteItem (
167195 document_link = self .__item_link (escaped_key ),
168- options = { "partitionKey" : escaped_key } ,
196+ options = self . __get_partition_key ( escaped_key ) ,
169197 )
170198 except cosmos_errors .HTTPFailure as err :
171199 if (
@@ -188,41 +216,57 @@ async def initialize(self):
188216 )
189217
190218 if not self .database :
191- with self .__semaphore :
219+ with self .__lock :
192220 try :
193- self .database = self .client .CreateDatabase (
194- {"id" : self .config .database_id }
195- )
221+ if not self .database :
222+ self .database = self .client .CreateDatabase (
223+ {"id" : self .config .database_id }
224+ )
196225 except cosmos_errors .HTTPFailure :
197226 self .database = self .client .ReadDatabase (
198227 "dbs/" + self .config .database_id
199228 )
200229
201- if not self .container :
202- with self .__semaphore :
203- container_def = {
204- "id" : self .config .container_id ,
205- "partitionKey" : {
206- "paths" : ["/id" ],
207- "kind" : documents .PartitionKind .Hash ,
208- },
209- }
210- try :
211- self .container = self .client .CreateContainer (
212- "dbs/" + self .database ["id" ],
213- container_def ,
214- {"offerThroughput" : 400 },
215- )
216- except cosmos_errors .HTTPFailure as err :
217- if err .status_code == http_constants .StatusCodes .CONFLICT :
218- self .container = self .client .ReadContainer (
219- "dbs/"
220- + self .database ["id" ]
221- + "/colls/"
222- + container_def ["id" ]
230+ self .__get_or_create_container ()
231+
232+ def __get_or_create_container (self ):
233+ with self .__lock :
234+ container_def = {
235+ "id" : self .config .container_id ,
236+ "partitionKey" : {
237+ "paths" : ["/id" ],
238+ "kind" : documents .PartitionKind .Hash ,
239+ },
240+ }
241+ try :
242+ if not self .container :
243+ self .container = self .client .CreateContainer (
244+ "dbs/" + self .database ["id" ],
245+ container_def ,
246+ {"offerThroughput" : self .config .container_throughput },
247+ )
248+ except cosmos_errors .HTTPFailure as err :
249+ if err .status_code == http_constants .StatusCodes .CONFLICT :
250+ self .container = self .client .ReadContainer (
251+ "dbs/" + self .database ["id" ] + "/colls/" + container_def ["id" ]
252+ )
253+ if "partitionKey" not in self .container :
254+ self .compatability_mode_partition_key = True
255+ else :
256+ paths = self .container ["partitionKey" ]["paths" ]
257+ if "/partitionKey" in paths :
258+ self .compatability_mode_partition_key = True
259+ elif "/id" not in paths :
260+ raise Exception (
261+ f"Custom Partition Key Paths are not supported. { self .config .container_id } "
262+ "has a custom Partition Key Path of {paths[0]}."
223263 )
224- else :
225- raise err
264+
265+ else :
266+ raise err
267+
268+ def __get_partition_key (self , key : str ) -> str :
269+ return None if self .compatability_mode_partition_key else {"partitionKey" : key }
226270
227271 @staticmethod
228272 def __create_si (result ) -> object :
0 commit comments