diff --git a/segment/analytics/consumer.py b/segment/analytics/consumer.py index 1c47157..157e3c9 100644 --- a/segment/analytics/consumer.py +++ b/segment/analytics/consumer.py @@ -14,9 +14,11 @@ # lower to leave space for extra data that will be added later, eg. "sentAt". BATCH_SIZE_LIMIT = 475000 + class FatalError(Exception): def __init__(self, message): self.message = message + def __str__(self): msg = "[Segment] {0})" return msg.format(self.message) @@ -81,7 +83,7 @@ def upload(self): # mark items as acknowledged from queue for _ in batch: self.queue.task_done() - return success + return success def next(self): """Return the next batch of items to upload.""" @@ -132,14 +134,26 @@ def fatal_exception(exc): # retry on all other errors (eg. network) return False + attempt_count = 0 + @backoff.on_exception( backoff.expo, Exception, max_tries=self.retries + 1, - giveup=fatal_exception) + giveup=fatal_exception, + on_backoff=lambda details: self.log.debug( + f"Retry attempt {details['tries']}/{self.retries + 1} after {details['elapsed']:.2f}s" + )) def send_request(): - post(self.write_key, self.host, gzip=self.gzip, - timeout=self.timeout, batch=batch, proxies=self.proxies, - oauth_manager=self.oauth_manager) + nonlocal attempt_count + attempt_count += 1 + try: + return post(self.write_key, self.host, gzip=self.gzip, + timeout=self.timeout, batch=batch, proxies=self.proxies, + oauth_manager=self.oauth_manager) + except Exception as e: + if attempt_count >= self.retries + 1: + self.log.error(f"All {self.retries} retries exhausted. Final error: {e}") + raise send_request() diff --git a/segment/analytics/request.py b/segment/analytics/request.py index 4118e2c..ab92b80 100644 --- a/segment/analytics/request.py +++ b/segment/analytics/request.py @@ -54,9 +54,8 @@ def post(write_key, host=None, gzip=False, timeout=15, proxies=None, oauth_manag try: res = _session.post(url, **kwargs) except Exception as e: - log.error(e) raise e - + if res.status_code == 200: log.debug('data uploaded successfully') return res