Skip to content
4 changes: 2 additions & 2 deletions OptimizelySDK.Tests/OdpTests/OdpEventManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public void ShouldAddAdditionalInformationToEachEvent()
}

[Test]
public void ShouldAttemptToFlushAnEmptyQueueAtFlushInterval()
public void ShouldNotAttemptToFlushAnEmptyQueueAtFlushInterval()
{
var eventManager = new OdpEventManager.Builder().
WithOdpEventApiManager(_mockApiManager.Object).
Expand All @@ -327,7 +327,7 @@ public void ShouldAttemptToFlushAnEmptyQueueAtFlushInterval()
eventManager.Stop();

_mockLogger.Verify(l => l.Log(LogLevel.DEBUG, "Flushing queue."),
Times.AtLeast(3));
Times.Never);
}

[Test]
Expand Down
32 changes: 21 additions & 11 deletions OptimizelySDK/Odp/OdpEventManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,35 +146,45 @@ protected virtual void Run()
{
while (true)
{
if (DateTime.Now.MillisecondsSince1970() > _flushingIntervalDeadline)
object item;
// If batch has events, set the timeout to remaining time for flush interval,
// otherwise wait for the new event indefinitely
if (_currentBatch.Count > 0)
{
_logger.Log(LogLevel.DEBUG, $"Flushing queue.");
FlushQueue();
_eventQueue.TryTake(out item, (int)(_flushingIntervalDeadline - DateTime.Now.MillisecondsSince1970()));
}
else
{
item = _eventQueue.Take();
Thread.Sleep(1); // TODO: need to figure out why this is allowing item to read shutdown signal.
}

if (!_eventQueue.TryTake(out object item, 50))
if (item == null)
{
Thread.Sleep(50);
// null means no new events received and flush interval is over, dispatch whatever is in the batch.
if (_currentBatch.Count != 0)
{
_logger.Log(LogLevel.DEBUG, $"Flushing queue.");
FlushQueue();
}
continue;
}

if (item == _shutdownSignal)
else if (item == _shutdownSignal)
{
_logger.Log(LogLevel.INFO, "Received shutdown signal.");
break;
}

if (item == _flushSignal)
else if (item == _flushSignal)
{
_logger.Log(LogLevel.DEBUG, "Received flush signal.");
FlushQueue();
continue;
}

if (item is OdpEvent odpEvent)
else if (item is OdpEvent odpEvent)
{
AddToBatch(odpEvent);
}

}
}
catch (InvalidOperationException ioe)
Expand Down