Skip to content

Commit b27ef04

Browse files
authored
ref(ppf): Don't use --commit-batch-size for futures queue length (#45182)
Deprecate the --commit-batch-size and --commit-batch-time arguments to the post process forwarder. These were not being used for committing anyway. For some reason, commit batch size was used as the max queue length in the executor but it's not really necessary to configure this anyway.
1 parent 8199782 commit b27ef04

File tree

7 files changed

+10
-32
lines changed

7 files changed

+10
-32
lines changed

src/sentry/eventstream/base.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,6 @@ def run_post_process_forwarder(
216216
topic: Optional[str],
217217
commit_log_topic: str,
218218
synchronize_commit_group: str,
219-
commit_batch_size: int,
220-
commit_batch_timeout_ms: int,
221219
concurrency: int,
222220
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
223221
strict_offset_reset: bool,

src/sentry/eventstream/kafka/backend.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from sentry import options
3030
from sentry.eventstream.base import EventStreamEventType, GroupStates, PostProcessForwarderType
3131
from sentry.eventstream.kafka.consumer_strategy import PostProcessForwarderStrategyFactory
32-
from sentry.eventstream.kafka.synchronized import SynchronizedConsumer as ArroyoSynchronizedConsumer
32+
from sentry.eventstream.kafka.synchronized import SynchronizedConsumer
3333
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
3434
from sentry.killswitches import killswitch_matches_context
3535
from sentry.utils import json, metrics
@@ -234,8 +234,6 @@ def _build_streaming_consumer(
234234
topic: str,
235235
commit_log_topic: str,
236236
synchronize_commit_group: str,
237-
commit_batch_size: int,
238-
commit_batch_timeout_ms: int,
239237
concurrency: int,
240238
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
241239
strict_offset_reset: Optional[bool],
@@ -261,14 +259,14 @@ def _build_streaming_consumer(
261259
)
262260
)
263261

264-
synchronized_consumer = ArroyoSynchronizedConsumer(
262+
synchronized_consumer = SynchronizedConsumer(
265263
consumer=consumer,
266264
commit_log_consumer=commit_log_consumer,
267265
commit_log_topic=Topic(commit_log_topic),
268266
commit_log_groups={synchronize_commit_group},
269267
)
270268

271-
strategy_factory = PostProcessForwarderStrategyFactory(concurrency, commit_batch_size)
269+
strategy_factory = PostProcessForwarderStrategyFactory(concurrency)
272270

273271
return StreamProcessor(
274272
synchronized_consumer, Topic(topic), strategy_factory, ONCE_PER_SECOND
@@ -281,8 +279,6 @@ def run_post_process_forwarder(
281279
topic: Optional[str],
282280
commit_log_topic: str,
283281
synchronize_commit_group: str,
284-
commit_batch_size: int,
285-
commit_batch_timeout_ms: int,
286282
concurrency: int,
287283
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
288284
strict_offset_reset: bool,
@@ -302,8 +298,6 @@ def run_post_process_forwarder(
302298
topic or default_topic,
303299
commit_log_topic,
304300
synchronize_commit_group,
305-
commit_batch_size,
306-
commit_batch_timeout_ms,
307301
concurrency,
308302
initial_offset_reset,
309303
strict_offset_reset,

src/sentry/eventstream/kafka/consumer_strategy.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,9 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None:
9696

9797

9898
class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
99-
def __init__(
100-
self,
101-
concurrency: int,
102-
max_pending_futures: int,
103-
):
99+
def __init__(self, concurrency: int):
104100
self.__concurrency = concurrency
105-
self.__max_pending_futures = max_pending_futures
101+
self.__max_pending_futures = concurrency + 1000
106102

107103
def create_with_partitions(
108104
self,

src/sentry/runner/commands/devserver.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@
3636
"post-process-forwarder",
3737
"--entity=errors",
3838
"--loglevel=debug",
39-
"--commit-batch-size=100",
40-
"--commit-batch-timeout-ms=1000",
4139
"--no-strict-offset-reset",
4240
],
4341
"post-process-forwarder-transactions": [
@@ -46,8 +44,6 @@
4644
"post-process-forwarder",
4745
"--entity=transactions",
4846
"--loglevel=debug",
49-
"--commit-batch-size=100",
50-
"--commit-batch-timeout-ms=1000",
5147
"--commit-log-topic=snuba-transactions-commit-log",
5248
"--synchronize-commit-group=transactions_group",
5349
"--no-strict-offset-reset",
@@ -58,8 +54,6 @@
5854
"post-process-forwarder",
5955
"--entity=search_issues",
6056
"--loglevel=debug",
61-
"--commit-batch-size=100",
62-
"--commit-batch-timeout-ms=1000",
6357
"--commit-log-topic=snuba-generic-events-commit-log",
6458
"--synchronize-commit-group=generic_events_group",
6559
"--no-strict-offset-reset",

src/sentry/runner/commands/run.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -384,8 +384,6 @@ def post_process_forwarder(**options):
384384
topic=options["topic"],
385385
commit_log_topic=options["commit_log_topic"],
386386
synchronize_commit_group=options["synchronize_commit_group"],
387-
commit_batch_size=options["commit_batch_size"],
388-
commit_batch_timeout_ms=options["commit_batch_timeout_ms"],
389387
concurrency=options["concurrency"],
390388
initial_offset_reset=options["initial_offset_reset"],
391389
strict_offset_reset=options["strict_offset_reset"],
@@ -407,15 +405,15 @@ def post_process_forwarder(**options):
407405
@click.option("--topic", default=None, help="Topic to get subscription updates from.")
408406
@click.option(
409407
"--commit-batch-size",
410-
default=100,
408+
default=1000,
411409
type=int,
412-
help="How many messages to process before committing offsets.",
410+
help="Deprecated. Remove once no longer passed in production.",
413411
)
414412
@click.option(
415413
"--commit-batch-timeout-ms",
416414
default=5000,
417415
type=int,
418-
help="Time (in milliseconds) to wait before closing current batch and committing offsets.",
416+
help="Deprecated. Remove once no longer passed in production.",
419417
)
420418
@click.option(
421419
"--initial-offset-reset",

tests/sentry/eventstream/kafka/test_consumer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ def test_post_process_forwarder_streaming_consumer(self, dispatch_post_process_g
118118
topic=self.events_topic,
119119
commit_log_topic=self.commit_log_topic,
120120
synchronize_commit_group=synchronize_commit_group,
121-
commit_batch_size=1,
122-
commit_batch_timeout_ms=100,
123121
concurrency=1,
124122
initial_offset_reset="earliest",
125123
strict_offset_reset=None,

tests/sentry/eventstream/kafka/test_consumer_strategy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ def get_occurrence_kafka_payload() -> KafkaPayload:
7070
def test_dispatch_task(mock_dispatch: Mock) -> None:
7171
commit = Mock()
7272
partition = Partition(Topic("test"), 0)
73-
factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10)
73+
factory = PostProcessForwarderStrategyFactory(concurrency=2)
7474
strategy = factory.create_with_partitions(commit, {partition: 0})
7575

7676
strategy.submit(Message(BrokerValue(get_kafka_payload(), partition, 1, datetime.now())))
@@ -104,7 +104,7 @@ def test_dispatch_task(mock_dispatch: Mock) -> None:
104104
def test_dispatch_task_with_occurrence(mock_post_process_group: Mock) -> None:
105105
commit = Mock()
106106
partition = Partition(Topic("test-occurrence"), 0)
107-
factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10)
107+
factory = PostProcessForwarderStrategyFactory(concurrency=2)
108108
strategy = factory.create_with_partitions(commit, {partition: 0})
109109

110110
strategy.submit(

0 commit comments

Comments
 (0)