15
15
# specific language governing permissions and limitations
16
16
# under the License.
17
17
18
+ import contextlib
19
+
18
20
from elasticsearch .exceptions import ApiError
19
21
from elasticsearch .helpers import scan
20
22
@@ -88,6 +90,8 @@ def scan(self):
88
90
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
89
91
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
90
92
93
+ The ``iterate()`` method should be preferred, as it provides similar
94
+ functionality using a point in time.
91
95
"""
92
96
es = get_connection (self ._using )
93
97
@@ -105,6 +109,55 @@ def delete(self):
105
109
es .delete_by_query (index = self ._index , body = self .to_dict (), ** self ._params )
106
110
)
107
111
112
+ @contextlib .contextmanager
113
+ def point_in_time (self , keep_alive = "1m" ):
114
+ """
115
+ Open a point in time (pit) that can be used across several searches.
116
+
117
+ This method implements a context manager that returns a search object
118
+ configured to operate within the created pit.
119
+
120
+ :arg keep_alive: the time to live for the point in time, renewed with each search request
121
+
122
+ The following example shows how to paginate through all the documents of an index::
123
+
124
+ page_size = 10
125
+ with Search(index="my-index")[:page_size].point_in_time() as s:
126
+ while True:
127
+ r = s.execute() # get a page of results
128
+ // ... do something with r.hits
129
+
130
+ if len(r.hits) < page_size:
131
+ break # we reached the end
132
+ s = r.search_after()
133
+ """
134
+ es = get_connection (self ._using )
135
+
136
+ pit = es .open_point_in_time (index = self ._index or "*" , keep_alive = keep_alive )
137
+ search = self .index ().extra (pit = {"id" : pit ["id" ], "keep_alive" : keep_alive })
138
+ if not search ._sort :
139
+ search = search .sort ("_shard_doc" )
140
+ yield search
141
+ es .close_point_in_time (id = pit ["id" ])
142
+
143
+ def iterate (self , keep_alive = "1m" ):
144
+ """
145
+ Return a generator that iterates over all the documents matching the query.
146
+
147
+ This method uses a point in time to provide consistent results even when
148
+ the index is changing. It should be preferred over ``scan()``.
149
+
150
+ :arg keep_alive: the time to live for the point in time, renewed with each new search request
151
+ """
152
+ with self .point_in_time (keep_alive = keep_alive ) as s :
153
+ while True :
154
+ r = s .execute ()
155
+ for hit in r :
156
+ yield hit
157
+ if len (r .hits ) == 0 :
158
+ break
159
+ s = r .search_after ()
160
+
108
161
109
162
class MultiSearch (MultiSearchBase ):
110
163
"""
0 commit comments