From c61a445760aeadcc02833fd683a076d1cc1bdd9c Mon Sep 17 00:00:00 2001 From: Lyn Date: Mon, 27 Feb 2023 18:51:32 -0800 Subject: [PATCH 1/2] ref(post-process-forwarder): Don't use --commit-batch-size as futures queue length Deprecate the --commit-batch-size and --commit-batch-time arguments to the post process forwarder. These were not being used for committing anyway. For some reason, commit batch size was used as the max queue length in the executor but it's not really necessary to configure this anyway. --- src/sentry/eventstream/base.py | 2 -- src/sentry/eventstream/kafka/backend.py | 12 +++--------- src/sentry/eventstream/kafka/consumer_strategy.py | 8 ++------ src/sentry/runner/commands/devserver.py | 6 ------ src/sentry/runner/commands/run.py | 8 +++----- .../eventstream/kafka/test_consumer_strategy.py | 4 ++-- 6 files changed, 10 insertions(+), 30 deletions(-) diff --git a/src/sentry/eventstream/base.py b/src/sentry/eventstream/base.py index f5a53a5f2ed7ad..7cacb8d1b46e07 100644 --- a/src/sentry/eventstream/base.py +++ b/src/sentry/eventstream/base.py @@ -216,8 +216,6 @@ def run_post_process_forwarder( topic: Optional[str], commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: bool, diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 56539d4cc021d5..4b721a38b09c7d 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -29,7 +29,7 @@ from sentry import options from sentry.eventstream.base import EventStreamEventType, GroupStates, PostProcessForwarderType from sentry.eventstream.kafka.consumer_strategy import PostProcessForwarderStrategyFactory -from sentry.eventstream.kafka.synchronized import SynchronizedConsumer as ArroyoSynchronizedConsumer +from sentry.eventstream.kafka.synchronized import SynchronizedConsumer from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.killswitches import killswitch_matches_context from sentry.utils import json, metrics @@ -234,8 +234,6 @@ def _build_streaming_consumer( topic: str, commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: Optional[bool], @@ -261,14 +259,14 @@ def _build_streaming_consumer( ) ) - synchronized_consumer = ArroyoSynchronizedConsumer( + synchronized_consumer = SynchronizedConsumer( consumer=consumer, commit_log_consumer=commit_log_consumer, commit_log_topic=Topic(commit_log_topic), commit_log_groups={synchronize_commit_group}, ) - strategy_factory = PostProcessForwarderStrategyFactory(concurrency, commit_batch_size) + strategy_factory = PostProcessForwarderStrategyFactory(concurrency) return StreamProcessor( synchronized_consumer, Topic(topic), strategy_factory, ONCE_PER_SECOND @@ -281,8 +279,6 @@ def run_post_process_forwarder( topic: Optional[str], commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: bool, @@ -302,8 +298,6 @@ def run_post_process_forwarder( topic or default_topic, commit_log_topic, synchronize_commit_group, - commit_batch_size, - commit_batch_timeout_ms, concurrency, initial_offset_reset, strict_offset_reset, diff --git a/src/sentry/eventstream/kafka/consumer_strategy.py b/src/sentry/eventstream/kafka/consumer_strategy.py index d7f2e8706e8320..ce29f3047783d0 100644 --- a/src/sentry/eventstream/kafka/consumer_strategy.py +++ b/src/sentry/eventstream/kafka/consumer_strategy.py @@ -96,13 +96,9 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None: class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): - def __init__( - self, - concurrency: int, - max_pending_futures: int, - ): + def __init__(self, concurrency: int): self.__concurrency = concurrency - self.__max_pending_futures = max_pending_futures + self.__max_pending_futures = concurrency + 1000 def create_with_partitions( self, diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 5b8e8dbebe7c15..91fd6d29ddbe1b 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -36,8 +36,6 @@ "post-process-forwarder", "--entity=errors", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--no-strict-offset-reset", ], "post-process-forwarder-transactions": [ @@ -46,8 +44,6 @@ "post-process-forwarder", "--entity=transactions", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--commit-log-topic=snuba-transactions-commit-log", "--synchronize-commit-group=transactions_group", "--no-strict-offset-reset", @@ -58,8 +54,6 @@ "post-process-forwarder", "--entity=search_issues", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--commit-log-topic=snuba-generic-events-commit-log", "--synchronize-commit-group=generic_events_group", "--no-strict-offset-reset", diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 388d09c544d15c..4ecc87804ae9e4 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -384,8 +384,6 @@ def post_process_forwarder(**options): topic=options["topic"], commit_log_topic=options["commit_log_topic"], synchronize_commit_group=options["synchronize_commit_group"], - commit_batch_size=options["commit_batch_size"], - commit_batch_timeout_ms=options["commit_batch_timeout_ms"], concurrency=options["concurrency"], initial_offset_reset=options["initial_offset_reset"], strict_offset_reset=options["strict_offset_reset"], @@ -407,15 +405,15 @@ def post_process_forwarder(**options): @click.option("--topic", default=None, help="Topic to get subscription updates from.") @click.option( "--commit-batch-size", - default=100, + default=1000, type=int, - help="How many messages to process before committing offsets.", + help="Deprecated. Remove once no longer passed in production.", ) @click.option( "--commit-batch-timeout-ms", default=5000, type=int, - help="Time (in milliseconds) to wait before closing current batch and committing offsets.", + help="Deprecated. Remove once no longer passed in production.", ) @click.option( "--initial-offset-reset", diff --git a/tests/sentry/eventstream/kafka/test_consumer_strategy.py b/tests/sentry/eventstream/kafka/test_consumer_strategy.py index 5fbfc0d520cad1..4b3a051d7d5d1c 100644 --- a/tests/sentry/eventstream/kafka/test_consumer_strategy.py +++ b/tests/sentry/eventstream/kafka/test_consumer_strategy.py @@ -70,7 +70,7 @@ def get_occurrence_kafka_payload() -> KafkaPayload: def test_dispatch_task(mock_dispatch: Mock) -> None: commit = Mock() partition = Partition(Topic("test"), 0) - factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10) + factory = PostProcessForwarderStrategyFactory(concurrency=2) strategy = factory.create_with_partitions(commit, {partition: 0}) strategy.submit(Message(BrokerValue(get_kafka_payload(), partition, 1, datetime.now()))) @@ -104,7 +104,7 @@ def test_dispatch_task(mock_dispatch: Mock) -> None: def test_dispatch_task_with_occurrence(mock_post_process_group: Mock) -> None: commit = Mock() partition = Partition(Topic("test-occurrence"), 0) - factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10) + factory = PostProcessForwarderStrategyFactory(concurrency=2) strategy = factory.create_with_partitions(commit, {partition: 0}) strategy.submit( From b5a799abd8ae0be0462de06a65a09bc4e225cf02 Mon Sep 17 00:00:00 2001 From: Lyn Date: Tue, 28 Feb 2023 15:22:54 -0800 Subject: [PATCH 2/2] update test --- tests/sentry/eventstream/kafka/test_consumer.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/sentry/eventstream/kafka/test_consumer.py b/tests/sentry/eventstream/kafka/test_consumer.py index 8653b2feb2a521..1c93619d0042fb 100644 --- a/tests/sentry/eventstream/kafka/test_consumer.py +++ b/tests/sentry/eventstream/kafka/test_consumer.py @@ -118,8 +118,6 @@ def test_post_process_forwarder_streaming_consumer(self, dispatch_post_process_g topic=self.events_topic, commit_log_topic=self.commit_log_topic, synchronize_commit_group=synchronize_commit_group, - commit_batch_size=1, - commit_batch_timeout_ms=100, concurrency=1, initial_offset_reset="earliest", strict_offset_reset=None,