Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/sentry/eventstream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,6 @@ def run_post_process_forwarder(
topic: Optional[str],
commit_log_topic: str,
synchronize_commit_group: str,
commit_batch_size: int,
commit_batch_timeout_ms: int,
concurrency: int,
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
strict_offset_reset: bool,
Expand Down
12 changes: 3 additions & 9 deletions src/sentry/eventstream/kafka/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from sentry import options
from sentry.eventstream.base import EventStreamEventType, GroupStates, PostProcessForwarderType
from sentry.eventstream.kafka.consumer_strategy import PostProcessForwarderStrategyFactory
from sentry.eventstream.kafka.synchronized import SynchronizedConsumer as ArroyoSynchronizedConsumer
from sentry.eventstream.kafka.synchronized import SynchronizedConsumer
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
from sentry.killswitches import killswitch_matches_context
from sentry.utils import json, metrics
Expand Down Expand Up @@ -234,8 +234,6 @@ def _build_streaming_consumer(
topic: str,
commit_log_topic: str,
synchronize_commit_group: str,
commit_batch_size: int,
commit_batch_timeout_ms: int,
concurrency: int,
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
strict_offset_reset: Optional[bool],
Expand All @@ -261,14 +259,14 @@ def _build_streaming_consumer(
)
)

synchronized_consumer = ArroyoSynchronizedConsumer(
synchronized_consumer = SynchronizedConsumer(
consumer=consumer,
commit_log_consumer=commit_log_consumer,
commit_log_topic=Topic(commit_log_topic),
commit_log_groups={synchronize_commit_group},
)

strategy_factory = PostProcessForwarderStrategyFactory(concurrency, commit_batch_size)
strategy_factory = PostProcessForwarderStrategyFactory(concurrency)

return StreamProcessor(
synchronized_consumer, Topic(topic), strategy_factory, ONCE_PER_SECOND
Expand All @@ -281,8 +279,6 @@ def run_post_process_forwarder(
topic: Optional[str],
commit_log_topic: str,
synchronize_commit_group: str,
commit_batch_size: int,
commit_batch_timeout_ms: int,
concurrency: int,
initial_offset_reset: Union[Literal["latest"], Literal["earliest"]],
strict_offset_reset: bool,
Expand All @@ -302,8 +298,6 @@ def run_post_process_forwarder(
topic or default_topic,
commit_log_topic,
synchronize_commit_group,
commit_batch_size,
commit_batch_timeout_ms,
concurrency,
initial_offset_reset,
strict_offset_reset,
Expand Down
8 changes: 2 additions & 6 deletions src/sentry/eventstream/kafka/consumer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,9 @@ def _get_task_kwargs_and_dispatch(message: Message[KafkaPayload]) -> None:


class PostProcessForwarderStrategyFactory(ProcessingStrategyFactory[KafkaPayload]):
def __init__(
self,
concurrency: int,
max_pending_futures: int,
):
def __init__(self, concurrency: int):
self.__concurrency = concurrency
self.__max_pending_futures = max_pending_futures
self.__max_pending_futures = concurrency + 1000

def create_with_partitions(
self,
Expand Down
6 changes: 0 additions & 6 deletions src/sentry/runner/commands/devserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
"post-process-forwarder",
"--entity=errors",
"--loglevel=debug",
"--commit-batch-size=100",
"--commit-batch-timeout-ms=1000",
"--no-strict-offset-reset",
],
"post-process-forwarder-transactions": [
Expand All @@ -46,8 +44,6 @@
"post-process-forwarder",
"--entity=transactions",
"--loglevel=debug",
"--commit-batch-size=100",
"--commit-batch-timeout-ms=1000",
"--commit-log-topic=snuba-transactions-commit-log",
"--synchronize-commit-group=transactions_group",
"--no-strict-offset-reset",
Expand All @@ -58,8 +54,6 @@
"post-process-forwarder",
"--entity=search_issues",
"--loglevel=debug",
"--commit-batch-size=100",
"--commit-batch-timeout-ms=1000",
"--commit-log-topic=snuba-generic-events-commit-log",
"--synchronize-commit-group=generic_events_group",
"--no-strict-offset-reset",
Expand Down
8 changes: 3 additions & 5 deletions src/sentry/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -384,8 +384,6 @@ def post_process_forwarder(**options):
topic=options["topic"],
commit_log_topic=options["commit_log_topic"],
synchronize_commit_group=options["synchronize_commit_group"],
commit_batch_size=options["commit_batch_size"],
commit_batch_timeout_ms=options["commit_batch_timeout_ms"],
concurrency=options["concurrency"],
initial_offset_reset=options["initial_offset_reset"],
strict_offset_reset=options["strict_offset_reset"],
Expand All @@ -407,15 +405,15 @@ def post_process_forwarder(**options):
@click.option("--topic", default=None, help="Topic to get subscription updates from.")
@click.option(
"--commit-batch-size",
default=100,
default=1000,
type=int,
help="How many messages to process before committing offsets.",
help="Deprecated. Remove once no longer passed in production.",
)
@click.option(
"--commit-batch-timeout-ms",
default=5000,
type=int,
help="Time (in milliseconds) to wait before closing current batch and committing offsets.",
help="Deprecated. Remove once no longer passed in production.",
)
@click.option(
"--initial-offset-reset",
Expand Down
2 changes: 0 additions & 2 deletions tests/sentry/eventstream/kafka/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,6 @@ def test_post_process_forwarder_streaming_consumer(self, dispatch_post_process_g
topic=self.events_topic,
commit_log_topic=self.commit_log_topic,
synchronize_commit_group=synchronize_commit_group,
commit_batch_size=1,
commit_batch_timeout_ms=100,
concurrency=1,
initial_offset_reset="earliest",
strict_offset_reset=None,
Expand Down
4 changes: 2 additions & 2 deletions tests/sentry/eventstream/kafka/test_consumer_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def get_occurrence_kafka_payload() -> KafkaPayload:
def test_dispatch_task(mock_dispatch: Mock) -> None:
commit = Mock()
partition = Partition(Topic("test"), 0)
factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10)
factory = PostProcessForwarderStrategyFactory(concurrency=2)
strategy = factory.create_with_partitions(commit, {partition: 0})

strategy.submit(Message(BrokerValue(get_kafka_payload(), partition, 1, datetime.now())))
Expand Down Expand Up @@ -104,7 +104,7 @@ def test_dispatch_task(mock_dispatch: Mock) -> None:
def test_dispatch_task_with_occurrence(mock_post_process_group: Mock) -> None:
commit = Mock()
partition = Partition(Topic("test-occurrence"), 0)
factory = PostProcessForwarderStrategyFactory(concurrency=2, max_pending_futures=10)
factory = PostProcessForwarderStrategyFactory(concurrency=2)
strategy = factory.create_with_partitions(commit, {partition: 0})

strategy.submit(
Expand Down