15
15
# specific language governing permissions and limitations
16
16
# under the License.
17
17
18
+ import contextlib
19
+
18
20
from elasticsearch .exceptions import ApiError
19
21
from elasticsearch .helpers import scan
20
22
@@ -88,6 +90,8 @@ def scan(self):
88
90
pass to the underlying ``scan`` helper from ``elasticsearch-py`` -
89
91
https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
90
92
93
+ The ``iterate()`` method should be preferred, as it provides similar
94
+ functionality using a point in time.
91
95
"""
92
96
es = get_connection (self ._using )
93
97
@@ -105,6 +109,54 @@ def delete(self):
105
109
es .delete_by_query (index = self ._index , body = self .to_dict (), ** self ._params )
106
110
)
107
111
112
+ @contextlib .contextmanager
113
+ def point_in_time (self , keep_alive = "1m" ):
114
+ """
115
+ Open a point in time (pit) that can be used across several searches.
116
+
117
+ :arg keep_alive: the time to live for the point in time, renewed with each new search request
118
+
119
+ This method implements a context manager that returns a search object
120
+ configured to operate within the created pit. The following example
121
+ shows how to paginate through all the documents of an index::
122
+
123
+ page_size = 10
124
+ with Search(index="my-index")[:page_size].point_in_time() as s:
125
+ while True:
126
+ r = s.execute() # get a page of results
127
+ // ... do something with r.hits
128
+
129
+ if len(r.hits) < page_size:
130
+ break # we reached the end
131
+ s = r.search_after()
132
+ """
133
+ es = get_connection (self ._using )
134
+
135
+ pit = es .open_point_in_time (index = self ._index or "*" , keep_alive = keep_alive )
136
+ search = self .index ().extra (pit = {"id" : pit ["id" ], "keep_alive" : keep_alive })
137
+ if not search ._sort :
138
+ search = search .sort ("_shard_doc" )
139
+ yield search
140
+ es .close_point_in_time (id = pit ["id" ])
141
+
142
+ def iterate (self , keep_alive = "1m" ):
143
+ """
144
+ Return a generator that iterates over all the documents matching the query.
145
+
146
+ :arg keep_alive: the time to live for the point in time, renewed with each new search request
147
+
148
+ This method uses a point in time to provide consistent results even when
149
+ the underlying index is changing. It should be preferred over ``scan()``.
150
+ """
151
+ with self .point_in_time (keep_alive = keep_alive ) as s :
152
+ while True :
153
+ r = s .execute ()
154
+ for hit in r :
155
+ yield self ._get_result (hit )
156
+ if len (r .hits ) == 0 :
157
+ break
158
+ s = r .search_after ()
159
+
108
160
109
161
class MultiSearch (MultiSearchBase ):
110
162
"""
0 commit comments