Skip to content

prepare 6.1.0 release #90

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jun 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a755854
switch from requests to urllib3 in event processor
eli-darkly Jun 12, 2018
2995909
reduce urllib3 version for compatibility with requests
eli-darkly Jun 12, 2018
16a8b98
Merge branch 'master' into eb/ch18801/remove-requests
eli-darkly Jun 12, 2018
83feeed
substitute urllib3 for requests
eli-darkly Jun 12, 2018
33b2455
exact version requirement for urllib3
eli-darkly Jun 12, 2018
dd3d98a
fix version format
eli-darkly Jun 12, 2018
1ea5fab
add check for presence of etag
eli-darkly Jun 12, 2018
cd32e01
fail fast in initialization if there's a 401
eli-darkly Jun 13, 2018
402c063
clearer test condition
eli-darkly Jun 13, 2018
017d3a3
Merge pull request #60 from launchdarkly/eb/ch18860/fail-fast-init
eli-darkly Jun 13, 2018
a2c5b49
Merge branch 'master' into eb/ch18801/remove-requests-urllib3
eli-darkly Jun 13, 2018
f257b49
verify certificates using certifi package
eli-darkly Jun 14, 2018
dc6ba26
remove unused pytest-timeout
eli-darkly Jun 14, 2018
fd5c153
don't let urllib3 do its own connection retries for the stream
eli-darkly Jun 14, 2018
75e4fac
disable verbose stacktrace of I/O errors
eli-darkly Jun 14, 2018
859028f
fail permanently on most 4xx errors
eli-darkly Jun 14, 2018
5f86557
rm unused
eli-darkly Jun 14, 2018
a347d9f
400 error should not shut things down
eli-darkly Jun 15, 2018
c0aa058
Merge pull request #59 from launchdarkly/eb/ch18801/remove-requests-u…
eli-darkly Jun 15, 2018
09aee6c
Merge pull request #61 from launchdarkly/eb/ch18901/4xx-errors
eli-darkly Jun 15, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import hmac
import threading

import requests
from builtins import object

from ldclient.config import Config as Config
Expand Down Expand Up @@ -42,7 +41,6 @@ def __init__(self, sdk_key=None, config=None, start_wait=5):
self._config = config or Config.default()
self._config._validate()

self._session = CacheControl(requests.Session())
self._event_processor = None
self._lock = Lock()

Expand Down
73 changes: 30 additions & 43 deletions ldclient/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import errno
import jsonpickle
from threading import Event, Lock, Thread
import six
import time
import urllib3

# noinspection PyBroadException
try:
Expand All @@ -14,19 +16,17 @@
# noinspection PyUnresolvedReferences,PyPep8Naming
import Queue as queue

import requests
from requests.packages.urllib3.exceptions import ProtocolError

import six

from ldclient.event_summarizer import EventSummarizer
from ldclient.fixed_thread_pool import FixedThreadPool
from ldclient.lru_cache import SimpleLRUCache
from ldclient.user_filter import UserFilter
from ldclient.interfaces import EventProcessor
from ldclient.repeating_timer import RepeatingTimer
from ldclient.util import UnsuccessfulResponseException
from ldclient.util import _headers
from ldclient.util import create_http_pool_manager
from ldclient.util import log
from ldclient.util import http_error_message, is_http_error_recoverable, throw_if_unsuccessful_response


__MAX_FLUSH_THREADS__ = 5
Expand Down Expand Up @@ -144,8 +144,8 @@ def make_summary_event(self, summary):


class EventPayloadSendTask(object):
def __init__(self, session, config, formatter, payload, response_fn):
self._session = session
def __init__(self, http, config, formatter, payload, response_fn):
self._http = http
self._config = config
self._formatter = formatter
self._payload = payload
Expand All @@ -154,43 +154,30 @@ def __init__(self, session, config, formatter, payload, response_fn):
def run(self):
try:
output_events = self._formatter.make_output_events(self._payload.events, self._payload.summary)
resp = self._do_send(output_events, True)
if resp is not None:
self._response_fn(resp)
resp = self._do_send(output_events)
except Exception:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)

def _do_send(self, output_events, should_retry):
def _do_send(self, output_events):
# noinspection PyBroadException
try:
json_body = jsonpickle.encode(output_events, unpicklable=False)
log.debug('Sending events payload: ' + json_body)
hdrs = _headers(self._config.sdk_key)
hdrs['X-LaunchDarkly-Event-Schema'] = str(__CURRENT_EVENT_SCHEMA__)
uri = self._config.events_uri
r = self._session.post(uri,
r = self._http.request('POST', uri,
headers=hdrs,
timeout=(self._config.connect_timeout, self._config.read_timeout),
data=json_body)
r.raise_for_status()
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
body=json_body,
retries=1)
self._response_fn(r)
return r
except ProtocolError as e:
if e.args is not None and len(e.args) > 1 and e.args[1] is not None:
inner = e.args[1]
if inner.errno is not None and inner.errno == errno.ECONNRESET and should_retry:
log.warning(
'ProtocolError exception caught while sending events. Retrying.')
self._do_send(output_events, False)
else:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
except Exception:
except Exception as e:
log.warning(
'Unhandled exception in event processor. Analytics events were not processed.',
exc_info=True)
'Unhandled exception in event processor. Analytics events were not processed. [%s]', e)


FlushPayload = namedtuple('FlushPayload', ['events', 'summary'])
Expand Down Expand Up @@ -224,11 +211,11 @@ def clear(self):


class EventDispatcher(object):
def __init__(self, queue, config, session):
def __init__(self, queue, config, http_client):
self._queue = queue
self._config = config
self._session = requests.Session() if session is None else session
self._close_session = (session is None) # so we know whether to close it later
self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl) if http_client is None else http_client
self._close_http = (http_client is None) # so we know whether to close it later
self._disabled = False
self._buffer = EventBuffer(config.events_max_pending)
self._user_keys = SimpleLRUCache(config.user_keys_capacity)
Expand Down Expand Up @@ -261,7 +248,6 @@ def _run_main_loop(self):
return
except Exception:
log.error('Unhandled exception in event processor', exc_info=True)
self._session.close()

def _process_event(self, event):
if self._disabled:
Expand Down Expand Up @@ -320,7 +306,7 @@ def _trigger_flush(self):
return
payload = self._buffer.get_payload()
if len(payload.events) > 0 or len(payload.summary.counters) > 0:
task = EventPayloadSendTask(self._session, self._config, self._formatter, payload,
task = EventPayloadSendTask(self._http, self._config, self._formatter, payload,
self._handle_response)
if self._flush_workers.execute(task.run):
# The events have been handed off to a flush worker; clear them from our buffer.
Expand All @@ -330,34 +316,35 @@ def _trigger_flush(self):
pass

def _handle_response(self, r):
server_date_str = r.headers.get('Date')
server_date_str = r.getheader('Date')
if server_date_str is not None:
server_date = parsedate(server_date_str)
if server_date is not None:
timestamp = int(time.mktime(server_date) * 1000)
self._last_known_past_time = timestamp
if r.status_code == 401:
log.error('Received 401 error, no further events will be posted since SDK key is invalid')
self._disabled = True
return
if r.status > 299:
log.error(http_error_message(r.status, "event delivery", "some events were dropped"))
if not is_http_error_recoverable(r.status):
self._disabled = True
return

def _do_shutdown(self):
self._flush_workers.stop()
self._flush_workers.wait()
if self._close_session:
self._session.close()
if self._close_http:
self._http.clear()


class DefaultEventProcessor(EventProcessor):
def __init__(self, config, session=None):
def __init__(self, config, http=None):
self._queue = queue.Queue(config.events_max_pending)
self._flush_timer = RepeatingTimer(config.flush_interval, self.flush)
self._users_flush_timer = RepeatingTimer(config.user_keys_flush_interval, self._flush_users)
self._flush_timer.start()
self._users_flush_timer.start()
self._close_lock = Lock()
self._closed = False
EventDispatcher(self._queue, config, session)
EventDispatcher(self._queue, config, http)

def send_event(self, event):
event['creationDate'] = int(time.time() * 1000)
Expand Down
65 changes: 37 additions & 28 deletions ldclient/feature_requester.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,61 @@
from __future__ import absolute_import

import requests
from cachecontrol import CacheControl
from collections import namedtuple
import json
import urllib3

from ldclient.interfaces import FeatureRequester
from ldclient.util import UnsuccessfulResponseException
from ldclient.util import _headers
from ldclient.util import create_http_pool_manager
from ldclient.util import log
from ldclient.util import throw_if_unsuccessful_response
from ldclient.versioned_data_kind import FEATURES, SEGMENTS


LATEST_ALL_URI = '/sdk/latest-all'


CacheEntry = namedtuple('CacheEntry', ['data', 'etag'])


class FeatureRequesterImpl(FeatureRequester):
def __init__(self, config):
self._session_cache = CacheControl(requests.Session())
self._session_no_cache = requests.Session()
self._cache = dict()
self._http = create_http_pool_manager(num_pools=1, verify_ssl=config.verify_ssl)
self._config = config

def get_all_data(self):
hdrs = _headers(self._config.sdk_key)
uri = self._config.base_uri + LATEST_ALL_URI
r = self._session_cache.get(uri,
headers=hdrs,
timeout=(
self._config.connect_timeout,
self._config.read_timeout))
r.raise_for_status()
all_data = r.json()
log.debug("Get All flags response status:[%d] From cache?[%s] ETag:[%s]",
r.status_code, r.from_cache, r.headers.get('ETag'))
all_data = self._do_request(self._config.base_uri + LATEST_ALL_URI, True)
return {
FEATURES: all_data['flags'],
SEGMENTS: all_data['segments']
}

def get_one(self, kind, key):
return self._do_request(kind.request_api_path + '/' + key, False)

def _do_request(self, uri, allow_cache):
hdrs = _headers(self._config.sdk_key)
path = kind.request_api_path + '/' + key
uri = config.base_uri + path
log.debug("Getting %s from %s using uri: %s", key, kind['namespace'], uri)
r = self._session_no_cache.get(uri,
headers=hdrs,
timeout=(
self._config.connect_timeout,
self._config.read_timeout))
r.raise_for_status()
obj = r.json()
log.debug("%s response status:[%d] key:[%s] version:[%d]",
path, r.status_code, key, segment.get("version"))
return obj
if allow_cache:
cache_entry = self._cache.get(uri)
if cache_entry is not None:
hdrs['If-None-Match'] = cache_entry.etag
r = self._http.request('GET', uri,
headers=hdrs,
timeout=urllib3.Timeout(connect=self._config.connect_timeout, read=self._config.read_timeout),
retries=1)
throw_if_unsuccessful_response(r)
if r.status == 304 and cache_entry is not None:
data = cache_entry.data
etag = cache_entry.etag
from_cache = True
else:
data = json.loads(r.data.decode('UTF-8'))
etag = r.getheader('ETag')
from_cache = False
if allow_cache and etag is not None:
self._cache[uri] = CacheEntry(data=data, etag=etag)
log.debug("%s response status:[%d] From cache? [%s] ETag:[%s]",
uri, r.status, from_cache, etag)
return data
15 changes: 8 additions & 7 deletions ldclient/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from ldclient.interfaces import UpdateProcessor
from ldclient.util import log
from requests import HTTPError
from ldclient.util import UnsuccessfulResponseException, http_error_message, is_http_error_recoverable

import time


Expand All @@ -28,15 +29,15 @@ def run(self):
if not self._ready.is_set() is True and self._store.initialized is True:
log.info("PollingUpdateProcessor initialized ok")
self._ready.set()
except HTTPError as e:
log.error('Received unexpected status code %d from polling request' % e.response.status_code)
if e.response.status_code == 401:
log.error('Received 401 error, no further polling requests will be made since SDK key is invalid')
except UnsuccessfulResponseException as e:
log.error(http_error_message(e.status, "polling request"))
if not is_http_error_recoverable(e.status):
self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
self.stop()
break
except Exception:
except Exception as e:
log.exception(
'Error: Exception encountered when updating flags.')
'Error: Exception encountered when updating flags. %s' % e)

elapsed = time.time() - start_time
if elapsed < self._config.poll_interval:
Expand Down
31 changes: 18 additions & 13 deletions ldclient/sse_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import six

import requests
import urllib3

from ldclient.util import create_http_pool_manager
from ldclient.util import throw_if_unsuccessful_response

# Inspired by: https://bitbucket.org/btubbs/sseclient/src/a47a380a3d7182a205c0f1d5eb470013ce796b4d/sseclient.py?at=default&fileviewer=file-view-default

Expand All @@ -16,18 +19,19 @@


class SSEClient(object):
def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, chunk_size=10000, session=None, **kwargs):
def __init__(self, url, last_id=None, retry=3000, connect_timeout=10, read_timeout=300, chunk_size=10000,
verify_ssl=False, http=None, **kwargs):
self.url = url
self.last_id = last_id
self.retry = retry
self._connect_timeout = connect_timeout
self._read_timeout = read_timeout
self._chunk_size = chunk_size

# Optional support for passing in a requests.Session()
self.session = session
# Optional support for passing in an HTTP client
self.http = create_http_pool_manager(num_pools=1, verify_ssl=verify_ssl)

# Any extra kwargs will be fed into the requests.get call later.
# Any extra kwargs will be fed into the request call later.
self.requests_kwargs = kwargs

# The SSE spec requires making requests with Cache-Control: nocache
Expand All @@ -48,21 +52,22 @@ def _connect(self):
self.requests_kwargs['headers']['Last-Event-ID'] = self.last_id

# Use session if set. Otherwise fall back to requests module.
requester = self.session or requests
self.resp = requester.get(
self.resp = self.http.request(
'GET',
self.url,
stream=True,
timeout=(self._connect_timeout, self._read_timeout),
timeout=urllib3.Timeout(connect=self._connect_timeout, read=self._read_timeout),
preload_content=False,
retries=0, # caller is responsible for implementing appropriate retry semantics, e.g. backoff
**self.requests_kwargs)

# Raw readlines doesn't work because we may be missing newline characters until the next chunk
# For some reason, we also need to specify a chunk size because stream=True doesn't seem to guarantee
# that we get the newlines in a timeline manner
self.resp_file = self.resp.iter_content(chunk_size=self._chunk_size, decode_unicode=True)
self.resp_file = self.resp.stream(amt=self._chunk_size)

# TODO: Ensure we're handling redirects. Might also stick the 'origin'
# attribute on Events like the Javascript spec requires.
self.resp.raise_for_status()
throw_if_unsuccessful_response(self.resp)

def _event_complete(self):
return re.search(end_of_field, self.buf[len(self.buf)-self._chunk_size-10:]) is not None # Just search the last chunk plus a bit
Expand All @@ -77,8 +82,8 @@ def __next__(self):
# There are some bad cases where we don't always get a line: https://github.com/requests/requests/pull/2431
if not nextline:
raise EOFError()
self.buf += nextline
except (StopIteration, requests.RequestException, EOFError) as e:
self.buf += nextline.decode("utf-8")
except (StopIteration, EOFError) as e:
time.sleep(self.retry / 1000.0)
self._connect()

Expand Down
Loading