3
3
import hashlib
4
4
import hmac
5
5
import threading
6
- import time
7
6
8
7
import requests
9
8
from builtins import object
10
9
11
10
from ldclient .config import Config as Config
11
+ from ldclient .event_processor import NullEventProcessor
12
12
from ldclient .feature_requester import FeatureRequesterImpl
13
13
from ldclient .flag import evaluate
14
14
from ldclient .polling import PollingUpdateProcessor
21
21
import queue
22
22
except :
23
23
# noinspection PyUnresolvedReferences,PyPep8Naming
24
- import Queue as queue
24
+ import Queue as queue # Python 3
25
25
26
26
from cachecontrol import CacheControl
27
27
from threading import Lock
@@ -43,46 +43,46 @@ def __init__(self, sdk_key=None, config=None, start_wait=5):
43
43
self ._config ._validate ()
44
44
45
45
self ._session = CacheControl (requests .Session ())
46
- self ._queue = queue .Queue (self ._config .events_max_pending )
47
- self ._event_consumer = None
46
+ self ._event_processor = None
48
47
self ._lock = Lock ()
49
48
50
49
self ._store = self ._config .feature_store
51
50
""" :type: FeatureStore """
52
51
52
+ if self ._config .offline or not self ._config .send_events :
53
+ self ._event_processor = NullEventProcessor ()
54
+ else :
55
+ self ._event_processor = self ._config .event_processor_class (self ._config )
56
+
53
57
if self ._config .offline :
54
58
log .info ("Started LaunchDarkly Client in offline mode" )
55
59
return
56
60
57
- if self ._config .send_events :
58
- self ._event_consumer = self ._config .event_consumer_class (self ._queue , self ._config )
59
- self ._event_consumer .start ()
60
-
61
61
if self ._config .use_ldd :
62
62
log .info ("Started LaunchDarkly Client in LDD mode" )
63
63
return
64
64
65
- if self ._config .feature_requester_class :
66
- self ._feature_requester = self ._config .feature_requester_class (self ._config )
67
- else :
68
- self ._feature_requester = FeatureRequesterImpl (self ._config )
69
- """ :type: FeatureRequester """
70
-
71
65
update_processor_ready = threading .Event ()
72
66
73
67
if self ._config .update_processor_class :
74
68
log .info ("Using user-specified update processor: " + str (self ._config .update_processor_class ))
75
69
self ._update_processor = self ._config .update_processor_class (
76
- self ._config , self ._feature_requester , self . _store , update_processor_ready )
70
+ self ._config , self ._store , update_processor_ready )
77
71
else :
72
+ if self ._config .feature_requester_class :
73
+ feature_requester = self ._config .feature_requester_class (self ._config )
74
+ else :
75
+ feature_requester = FeatureRequesterImpl (self ._config )
76
+ """ :type: FeatureRequester """
77
+
78
78
if self ._config .stream :
79
79
self ._update_processor = StreamingUpdateProcessor (
80
- self ._config , self . _feature_requester , self ._store , update_processor_ready )
80
+ self ._config , feature_requester , self ._store , update_processor_ready )
81
81
else :
82
82
log .info ("Disabling streaming API" )
83
83
log .warn ("You should only disable the streaming API if instructed to do so by LaunchDarkly support" )
84
84
self ._update_processor = PollingUpdateProcessor (
85
- self ._config , self . _feature_requester , self ._store , update_processor_ready )
85
+ self ._config , feature_requester , self ._store , update_processor_ready )
86
86
""" :type: UpdateProcessor """
87
87
88
88
self ._update_processor .start ()
@@ -102,19 +102,13 @@ def close(self):
102
102
log .info ("Closing LaunchDarkly client.." )
103
103
if self .is_offline ():
104
104
return
105
- if self ._event_consumer and self . _event_consumer . is_alive () :
106
- self ._event_consumer .stop ()
105
+ if self ._event_processor :
106
+ self ._event_processor .stop ()
107
107
if self ._update_processor and self ._update_processor .is_alive ():
108
108
self ._update_processor .stop ()
109
109
110
110
def _send_event (self , event ):
111
- if self ._config .offline or not self ._config .send_events :
112
- return
113
- event ['creationDate' ] = int (time .time () * 1000 )
114
- if self ._queue .full ():
115
- log .warning ("Event queue is full-- dropped an event" )
116
- else :
117
- self ._queue .put (event )
111
+ self ._event_processor .send_event (event )
118
112
119
113
def track (self , event_name , user , data = None ):
120
114
self ._sanitize_user (user )
@@ -135,24 +129,26 @@ def is_initialized(self):
135
129
return self .is_offline () or self ._config .use_ldd or self ._update_processor .initialized ()
136
130
137
131
def flush (self ):
138
- if self ._config .offline or not self . _config . send_events :
132
+ if self ._config .offline :
139
133
return
140
- return self ._event_consumer .flush ()
134
+ return self ._event_processor .flush ()
141
135
142
136
def toggle (self , key , user , default ):
143
137
log .warn ("Deprecated method: toggle() called. Use variation() instead." )
144
138
return self .variation (key , user , default )
145
139
146
140
def variation (self , key , user , default ):
147
141
default = self ._config .get_default (key , default )
148
- self ._sanitize_user (user )
142
+ if user is not None :
143
+ self ._sanitize_user (user )
149
144
150
145
if self ._config .offline :
151
146
return default
152
147
153
148
def send_event (value , version = None ):
154
- self ._send_event ({'kind' : 'feature' , 'key' : key ,
155
- 'user' : user , 'value' : value , 'default' : default , 'version' : version })
149
+ self ._send_event ({'kind' : 'feature' , 'key' : key , 'user' : user , 'variation' : None ,
150
+ 'value' : value , 'default' : default , 'version' : version ,
151
+ 'trackEvents' : False , 'debugEventsUntilDate' : None })
156
152
157
153
if not self .is_initialized ():
158
154
if self ._store .initialized :
@@ -163,12 +159,7 @@ def send_event(value, version=None):
163
159
send_event (default )
164
160
return default
165
161
166
- if user is None or user .get ('key' ) is None :
167
- log .warn ("Missing user or user key when evaluating Feature Flag key: " + key + ". Returning default." )
168
- send_event (default )
169
- return default
170
-
171
- if user .get ('key' , "" ) == "" :
162
+ if user is not None and user .get ('key' , "" ) == "" :
172
163
log .warn ("User key is blank. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly." )
173
164
174
165
def cb (flag ):
@@ -182,6 +173,7 @@ def cb(flag):
182
173
183
174
except Exception as e :
184
175
log .error ("Exception caught in variation: " + e .message + " for flag key: " + key + " and user: " + str (user ))
176
+ send_event (default )
185
177
186
178
return default
187
179
@@ -191,14 +183,22 @@ def _evaluate(self, flag, user):
191
183
return evaluate (flag , user , self ._store )
192
184
193
185
def _evaluate_and_send_events (self , flag , user , default ):
194
- value , events = self ._evaluate (flag , user )
195
- for event in events or []:
196
- self ._send_event (event )
197
-
198
- if value is None :
186
+ if user is None or user .get ('key' ) is None :
187
+ log .warn ("Missing user or user key when evaluating Feature Flag key: " + flag .get ('key' ) + ". Returning default." )
199
188
value = default
189
+ variation = None
190
+ else :
191
+ result = evaluate (flag , user , self ._store )
192
+ for event in result .events or []:
193
+ self ._send_event (event )
194
+ value = default if result .value is None else result .value
195
+ variation = result .variation
196
+
200
197
self ._send_event ({'kind' : 'feature' , 'key' : flag .get ('key' ),
201
- 'user' : user , 'value' : value , 'default' : default , 'version' : flag .get ('version' )})
198
+ 'user' : user , 'variation' : variation , 'value' : value ,
199
+ 'default' : default , 'version' : flag .get ('version' ),
200
+ 'trackEvents' : flag .get ('trackEvents' ),
201
+ 'debugEventsUntilDate' : flag .get ('debugEventsUntilDate' )})
202
202
return value
203
203
204
204
def all_flags (self , user ):
@@ -227,7 +227,7 @@ def cb(all_flags):
227
227
return self ._store .all (FEATURES , cb )
228
228
229
229
def _evaluate_multi (self , user , flags ):
230
- return dict ([(k , self ._evaluate (v , user )[ 0 ] ) for k , v in flags .items () or {}])
230
+ return dict ([(k , self ._evaluate (v , user ). value ) for k , v in flags .items () or {}])
231
231
232
232
def secure_mode_hash (self , user ):
233
233
if user .get ('key' ) is None or self ._config .sdk_key is None :
0 commit comments