diff --git a/src/sentry/eventstream/base.py b/src/sentry/eventstream/base.py index f5a53a5f2ed7ad..7cacb8d1b46e07 100644 --- a/src/sentry/eventstream/base.py +++ b/src/sentry/eventstream/base.py @@ -216,8 +216,6 @@ def run_post_process_forwarder( topic: Optional[str], commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: bool, diff --git a/src/sentry/eventstream/kafka/backend.py b/src/sentry/eventstream/kafka/backend.py index 56539d4cc021d5..4b721a38b09c7d 100644 --- a/src/sentry/eventstream/kafka/backend.py +++ b/src/sentry/eventstream/kafka/backend.py @@ -29,7 +29,7 @@ from sentry import options from sentry.eventstream.base import EventStreamEventType, GroupStates, PostProcessForwarderType from sentry.eventstream.kafka.consumer_strategy import PostProcessForwarderStrategyFactory -from sentry.eventstream.kafka.synchronized import SynchronizedConsumer as ArroyoSynchronizedConsumer +from sentry.eventstream.kafka.synchronized import SynchronizedConsumer from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream from sentry.killswitches import killswitch_matches_context from sentry.utils import json, metrics @@ -234,8 +234,6 @@ def _build_streaming_consumer( topic: str, commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: Optional[bool], @@ -261,14 +259,14 @@ def _build_streaming_consumer( ) ) - synchronized_consumer = ArroyoSynchronizedConsumer( + synchronized_consumer = SynchronizedConsumer( consumer=consumer, commit_log_consumer=commit_log_consumer, commit_log_topic=Topic(commit_log_topic), commit_log_groups={synchronize_commit_group}, ) - strategy_factory = PostProcessForwarderStrategyFactory(concurrency, commit_batch_size) + strategy_factory = PostProcessForwarderStrategyFactory(concurrency) return StreamProcessor( synchronized_consumer, Topic(topic), strategy_factory, ONCE_PER_SECOND @@ -281,8 +279,6 @@ def run_post_process_forwarder( topic: Optional[str], commit_log_topic: str, synchronize_commit_group: str, - commit_batch_size: int, - commit_batch_timeout_ms: int, concurrency: int, initial_offset_reset: Union[Literal["latest"], Literal["earliest"]], strict_offset_reset: bool, @@ -302,8 +298,6 @@ def run_post_process_forwarder( topic or default_topic, commit_log_topic, synchronize_commit_group, - commit_batch_size, - commit_batch_timeout_ms, concurrency, initial_offset_reset, strict_offset_reset, diff --git a/src/sentry/eventstream/kafka/consumer_strategy.py b/src/sentry/eventstream/kafka/consumer_strategy.py index d7f2e8706e8320..ce29f3047783d0 100644 --- a/src/sentry/eventstream/kafka/consumer_strategy.py +++ b/src/sentry/eventstream/kafka/consumer_strategy.py @@ -96,13 +96,9 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None: class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload]): - def __init__( - self, - concurrency: int, - max_pending_futures: int, - ): + def __init__(self, concurrency: int): self.__concurrency = concurrency - self.__max_pending_futures = max_pending_futures + self.__max_pending_futures = concurrency + 1000 def create_with_partitions( self, diff --git a/src/sentry/runner/commands/devserver.py b/src/sentry/runner/commands/devserver.py index 5b8e8dbebe7c15..91fd6d29ddbe1b 100644 --- a/src/sentry/runner/commands/devserver.py +++ b/src/sentry/runner/commands/devserver.py @@ -36,8 +36,6 @@ "post-process-forwarder", "--entity=errors", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--no-strict-offset-reset", ], "post-process-forwarder-transactions": [ @@ -46,8 +44,6 @@ "post-process-forwarder", "--entity=transactions", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--commit-log-topic=snuba-transactions-commit-log", "--synchronize-commit-group=transactions_group", "--no-strict-offset-reset", @@ -58,8 +54,6 @@ "post-process-forwarder", "--entity=search_issues", "--loglevel=debug", - "--commit-batch-size=100", - "--commit-batch-timeout-ms=1000", "--commit-log-topic=snuba-generic-events-commit-log", "--synchronize-commit-group=generic_events_group", "--no-strict-offset-reset", diff --git a/src/sentry/runner/commands/run.py b/src/sentry/runner/commands/run.py index 388d09c544d15c..4ecc87804ae9e4 100644 --- a/src/sentry/runner/commands/run.py +++ b/src/sentry/runner/commands/run.py @@ -384,8 +384,6 @@ def post_process_forwarder(**options): topic=options["topic"], commit_log_topic=options["commit_log_topic"], synchronize_commit_group=options["synchronize_commit_group"], - commit_batch_size=options["commit_batch_size"], - commit_batch_timeout_ms=options["commit_batch_timeout_ms"], concurrency=options["concurrency"], initial_offset_reset=options["initial_offset_reset"], strict_offset_reset=options["strict_offset_reset"], @@ -407,15 +405,15 @@ def post_process_forwarder(**options): @click.option("--topic", default=None, help="Topic to get subscription updates from.") @click.option( "--commit-batch-size", - default=100, + default=1000, type=int, - help="How many messages to process before committing offsets.", + help="Deprecated. Remove once no longer passed in production.", ) @click.option( "--commit-batch-timeout-ms", default=5000, type=int, - help="Time (in milliseconds) to wait before closing current batch and committing offsets.", + help="Deprecated. Remove once no longer passed in production.", ) @click.option( "--initial-offset-reset", diff --git a/tests/sentry/eventstream/kafka/test_consumer.py b/tests/sentry/eventstream/kafka/test_consumer.py index 8653b2feb2a521..1c93619d0042fb 100644 --- a/tests/sentry/eventstream/kafka/test_consumer.py +++ b/tests/sentry/eventstream/kafka/test_consumer.py @@ -118,8 +118,6 @@ def test_post_process_forwarder_streaming_consumer(self, dispatch_post_process_g topic=self.events_topic, commit_log_topic=self.commit_log_topic, synchronize_commit_group=synchronize_commit_group, - commit_batch_size=1, - commit_batch_timeout_ms=100, concurrency=1, initial_offset_reset="earliest", strict_offset_reset=None, diff --git a/tests/sentry/eventstream/kafka/test_consumer_strategy.py b/tests/sentry/eventstream/kafka/test_consumer_strategy.py index 5fbfc0d520cad1..4b3a051d7d5d1c 100644 --- a/tests/sentry/eventstream/kafka/test_consumer_strategy.py +++ b/tests/sentry/eventstream/kafka/test_consumer_strategy.py @@ -70,7 +70,7 @@ def get_occurrence_kafka_payload() -> KafkaPayload: def test_dispatch_task(mock_dispatch: Mock) -> None: commit = Mock() partition = Partition(Topic("test"), 0) - factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10) + factory = PostProcessForwarderStrategyFactory(concurrency=2) strategy = factory.create_with_partitions(commit, {partition: 0}) strategy.submit(Message(BrokerValue(get_kafka_payload(), partition, 1, datetime.now()))) @@ -104,7 +104,7 @@ def test_dispatch_task(mock_dispatch: Mock) -> None: def test_dispatch_task_with_occurrence(mock_post_process_group: Mock) -> None: commit = Mock() partition = Partition(Topic("test-occurrence"), 0) - factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10) + factory = PostProcessForwarderStrategyFactory(concurrency=2) strategy = factory.create_with_partitions(commit, {partition: 0}) strategy.submit(