2828)
2929from opentelemetry .sdk ._logs .export import (
3030 BatchLogRecordProcessor ,
31+ InMemoryLogExporter ,
3132)
3233from opentelemetry .sdk .util .instrumentation import InstrumentationScope
3334
3940
4041# BatchLogRecodpRocessor initializes / uses BatchProcessor.
4142@pytest .mark .parametrize (
42- "batch_processor_class,telemetry" , [(BatchLogRecordProcessor , EMPTY_LOG )]
43+ "batch_processor_class,telemetry,in_memory_exporter" ,
44+ [(BatchLogRecordProcessor , EMPTY_LOG , InMemoryLogExporter )],
4345)
4446class TestBatchProcessor :
4547 # pylint: disable=no-self-use
4648 def test_telemetry_exported_once_batch_size_reached (
47- self , batch_processor_class , telemetry
49+ self , batch_processor_class , telemetry , in_memory_exporter
4850 ):
4951 exporter = Mock ()
5052 batch_processor = batch_processor_class (
@@ -67,7 +69,7 @@ def test_telemetry_exported_once_batch_size_reached(
6769
6870 # pylint: disable=no-self-use
6971 def test_telemetry_exported_once_schedule_delay_reached (
70- self , batch_processor_class , telemetry
72+ self , batch_processor_class , telemetry , in_memory_exporter
7173 ):
7274 exporter = Mock ()
7375 batch_processor = batch_processor_class (
@@ -82,7 +84,7 @@ def test_telemetry_exported_once_schedule_delay_reached(
8284 exporter .export .assert_called_once_with ([telemetry ])
8385
8486 def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown (
85- self , batch_processor_class , telemetry , caplog
87+ self , batch_processor_class , telemetry , in_memory_exporter , caplog
8688 ):
8789 exporter = Mock ()
8890 batch_processor = batch_processor_class (
@@ -107,7 +109,7 @@ def test_telemetry_flushed_before_shutdown_and_dropped_after_shutdown(
107109
108110 # pylint: disable=no-self-use
109111 def test_force_flush_flushes_telemetry (
110- self , batch_processor_class , telemetry
112+ self , batch_processor_class , telemetry , in_memory_exporter
111113 ):
112114 exporter = Mock ()
113115 batch_processor = batch_processor_class (
@@ -123,8 +125,10 @@ def test_force_flush_flushes_telemetry(
123125 batch_processor .force_flush ()
124126 exporter .export .assert_called_once_with ([telemetry for _ in range (10 )])
125127
126- def test_with_multiple_threads (self , batch_processor_class , telemetry ):
127- exporter = Mock ()
128+ def test_with_multiple_threads (
129+ self , batch_processor_class , telemetry , in_memory_exporter
130+ ):
131+ exporter = in_memory_exporter ()
128132 batch_processor = batch_processor_class (
129133 exporter = exporter ,
130134 max_queue_size = 3000 ,
@@ -141,19 +145,16 @@ def bulk_emit_and_flush(num_emit):
141145 with ThreadPoolExecutor (max_workers = 69 ) as executor :
142146 for idx in range (69 ):
143147 executor .submit (bulk_emit_and_flush , idx + 1 )
144- time .sleep (2 )
145148
146149 executor .shutdown ()
147-
148- # 69 force flush calls, should result in 69 export calls.
149- assert exporter .export .call_count == 69
150+ assert len (exporter .get_finished_logs ()) == 2415
150151
151152 @unittest .skipUnless (
152153 hasattr (os , "fork" ),
153154 "needs *nix" ,
154155 )
155156 def test_batch_telemetry_record_processor_fork (
156- self , batch_processor_class , telemetry
157+ self , batch_processor_class , telemetry , in_memory_exporter
157158 ):
158159 exporter = Mock ()
159160 batch_processor = batch_processor_class (
0 commit comments