Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions src/StackExchange.Redis/PhysicalBridge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,15 @@ internal WriteResult WriteMessageTakingWriteLockSync(PhysicalConnection physical
#pragma warning restore CS0618
}

UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
catch (Exception ex) { return HandleWriteException(message, ex); }
finally { token.Dispose(); }
finally
{
UnmarkActiveMessage(message);
token.Dispose();
}

}

Expand Down Expand Up @@ -887,7 +890,6 @@ private void ProcessBacklog()
}

_backlogStatus = BacklogStatus.MarkingInactive;
UnmarkActiveMessage(message);
if (result != WriteResult.Success)
{
_backlogStatus = BacklogStatus.RecordingWriteFailure;
Expand All @@ -901,6 +903,10 @@ private void ProcessBacklog()
_backlogStatus = BacklogStatus.RecordingFault;
HandleWriteException(message, ex);
}
finally
{
UnmarkActiveMessage(message);
}
}
_backlogStatus = BacklogStatus.SettingIdle;
physical.SetIdle();
Expand Down Expand Up @@ -985,21 +991,25 @@ internal ValueTask<WriteResult> WriteMessageTakingWriteLockAsync(PhysicalConnect

result = flush.Result; // we know it was completed, this is fine
}

UnmarkActiveMessage(message);

physical.SetIdle();

return new ValueTask<WriteResult>(result);
}
catch (Exception ex) { return new ValueTask<WriteResult>(HandleWriteException(message, ex)); }
finally
{
if (releaseLock & token.Success)
if (token.Success)
{
UnmarkActiveMessage(message);

if (releaseLock)
{
#if DEBUG
RecordLockDuration(lockTaken);
RecordLockDuration(lockTaken);
#endif
token.Dispose();
token.Dispose();
}
}
}
}
Expand Down Expand Up @@ -1029,8 +1039,7 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{
result = await physical.FlushAsync(false).ForAwait();
}

UnmarkActiveMessage(message);

physical.SetIdle();

#if DEBUG
Expand All @@ -1043,6 +1052,10 @@ private async ValueTask<WriteResult> WriteMessageTakingWriteLockAsync_Awaited(Va
{
return HandleWriteException(message, ex);
}
finally
{
UnmarkActiveMessage(message);
}
}

private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken lockToken, ValueTask<WriteResult> flush, Message message, int lockTaken)
Expand All @@ -1052,7 +1065,6 @@ private async ValueTask<WriteResult> CompleteWriteAndReleaseLockAsync(LockToken
try
{
var result = await flush.ForAwait();
UnmarkActiveMessage(message);
physical.SetIdle();
return result;
}
Expand Down