Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class Config(object):
def __init__(self,
base_uri='https://app.launchdarkly.com',
events_uri='https://events.launchdarkly.com',
connect_timeout=2,
read_timeout=10,
connect_timeout=10,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These were lower than what we've seen in the field (5 seconds just for dns lookup) so I'm upping them here.

read_timeout=15,
events_upload_max_batch_size=100,
events_max_pending=10000,
stream_uri='https://stream.launchdarkly.com',
Expand Down Expand Up @@ -149,10 +149,11 @@ def __init__(self, sdk_key, config=None, start_wait=5):
log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...")
update_processor_ready.wait(start_wait)

if self._update_processor.initialized:
if self._update_processor.initialized() is True:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was not evaluating correctly.. it would indicate success even when the streaming processor had not been initialized.

log.info("Started LaunchDarkly Client: OK")
else:
log.info("Initialization timeout exceeded for LaunchDarkly Client. Feature Flags may not yet be available.")
log.warn("Initialization timeout exceeded for LaunchDarkly Client or an error occurred. "
"Feature Flags may not yet be available.")

@property
def sdk_key(self):
Expand Down Expand Up @@ -215,7 +216,7 @@ def send_event(value, version=None):
'user': user, 'value': value, 'default': default, 'version': version})

if not self.is_initialized():
log.warn("Feature Flag evaluation attempted before client has finished initializing! Returning default: "
log.warn("Feature Flag evaluation attempted before client has initialized! Returning default: "
+ str(default) + " for feature key: " + key)
send_event(default)
return default
Expand Down
4 changes: 2 additions & 2 deletions ldclient/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ def run(self):
while self._running:
start_time = time.time()
self._store.init(self._requester.get_all())
if not self._ready.is_set() and self._store.initialized:
if not self._ready.is_set() is True and self._store.initialized is True:
log.info("PollingUpdateProcessor initialized ok")
self._ready.set()
elapsed = time.time() - start_time
if elapsed < self._config.poll_interval:
time.sleep(self._config.poll_interval - elapsed)

def initialized(self):
return self._running and self._ready.is_set() and self._store.initialized
return self._running and self._ready.is_set() is True and self._store.initialized is True

def stop(self):
log.info("Stopping PollingUpdateProcessor")
Expand Down
43 changes: 24 additions & 19 deletions ldclient/streaming.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from __future__ import absolute_import

import json
from threading import Thread

import time
import backoff
import requests
from sseclient import SSEClient

from ldclient.interfaces import UpdateProcessor
from ldclient.util import _stream_headers, log

Expand All @@ -13,44 +15,47 @@ def __init__(self, sdk_key, config, requester, store, ready):
Thread.__init__(self)
self.daemon = True
self._sdk_key = sdk_key
self._uri = config.stream_uri
self._config = config
self._requester = requester
self._store = store
self._running = False
self._ready = ready
self._headers = _stream_headers(self._sdk_key)

def run(self):
log.info("Starting StreamingUpdateProcessor connecting to uri: " + self._config.stream_uri)
log.info("Starting StreamingUpdateProcessor connecting to uri: " + self._uri)
self._running = True
hdrs = _stream_headers(self._sdk_key)
uri = self._config.stream_uri
while self._running:
try:
messages = SSEClient(uri, verify=self._config.verify_ssl, headers=hdrs)
for msg in messages:
if not self._running:
break
if self.process_message(self._store, self._requester, msg, self._ready) is True:
self._ready.set()
except Exception as e:
log.error("Could not connect to LaunchDarkly stream: " + str(e.message) +
" waiting 1 second before trying again.")
time.sleep(1)
self._connect()

def _backoff_expo():
return backoff.expo(max_value=30)

@backoff.on_exception(_backoff_expo, requests.exceptions.RequestException, max_tries=None, jitter=backoff.full_jitter)
def _connect(self):
messages = SSEClient(self._uri, verify=self._config.verify_ssl, headers=self._headers)
for msg in messages:
if not self._running:
break
message_ok = self.process_message(self._store, self._requester, msg, self._ready)
if message_ok is True and self._ready.is_set() is False:
self._ready.set()

def stop(self):
log.info("Stopping StreamingUpdateProcessor")
self._running = False

def initialized(self):
return self._running and self._ready.is_set() and self._store.initialized
return self._running and self._ready.is_set() is True and self._store.initialized is True

@staticmethod
def process_message(store, requester, msg, ready):
log.debug("Received stream event {} with data: {}".format(msg.event, msg.data))
if msg.event == 'put':
payload = json.loads(msg.data)
store.init(payload)
if not ready.is_set() and store.initialized:
if not ready.is_set() is True and store.initialized is True:
log.info("StreamingUpdateProcessor initialized ok")
return True
elif msg.event == 'patch':
Expand All @@ -63,7 +68,7 @@ def process_message(store, requester, msg, ready):
store.upsert(key, requester.get_one(key))
elif msg.event == "indirect/put":
store.init(requester.get_all())
if not ready.is_set() and store.initialized:
if not ready.is_set() is True and store.initialized is True:
log.info("StreamingUpdateProcessor initialized ok")
return True
elif msg.event == 'delete':
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
backoff>=1.3.1
CacheControl>=0.10.2
requests>=2.10.0
sseclient>=0.0.12
Expand Down