From 7784003472df77bfb513e7af924b4342fc5c93bc Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Mon, 13 Nov 2023 17:41:00 -0800 Subject: [PATCH 1/4] outline --- BitFaster.Caching/Atomic/AtomicFactory.cs | 37 ++++++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/BitFaster.Caching/Atomic/AtomicFactory.cs b/BitFaster.Caching/Atomic/AtomicFactory.cs index 2873dbe5..ddaa97b0 100644 --- a/BitFaster.Caching/Atomic/AtomicFactory.cs +++ b/BitFaster.Caching/Atomic/AtomicFactory.cs @@ -1,6 +1,8 @@ using System; using System.Collections.Generic; using System.Diagnostics; +using System.Diagnostics.CodeAnalysis; +using System.Runtime.ExceptionServices; using System.Threading; namespace BitFaster.Caching.Atomic @@ -98,8 +100,17 @@ private V CreateValue(K key, TFactory valueFactory) where TFactory : s if (init != null) { - value = init.CreateValue(key, valueFactory); - Volatile.Write(ref initializer, null); // volatile write must occur after setting value + try + { + value = init.CreateValue(key, valueFactory); + Volatile.Write(ref initializer, null); // volatile write must occur after setting value + } + catch + { + // Overwrite the initializer with a fresh copy. New threads will start from a clean state. + Volatile.Write(ref initializer, new Initializer()); + throw; + } } return value; @@ -138,6 +149,7 @@ private class Initializer { private bool isInitialized; private V value; + private ExceptionDispatchInfo exceptionDispatch; public V CreateValue(K key, TFactory valueFactory) where TFactory : struct, IValueFactory { @@ -148,9 +160,24 @@ public V CreateValue(K key, TFactory valueFactory) where TFactory : st return value; } - value = valueFactory.Create(key); - isInitialized = true; - return value; + // If a previous thread called the factory and failed, throw the same error instead + // of calling the factory again. + if (exceptionDispatch != null) + { + exceptionDispatch.Throw(); + } + + try + { + value = valueFactory.Create(key); + isInitialized = true; + return value; + } + catch (Exception ex) + { + exceptionDispatch = ExceptionDispatchInfo.Capture(ex); + throw; + } } } } From 94f0d95752de2ae4fa2c24f59a21dc769c7991a7 Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Mon, 13 Nov 2023 18:06:53 -0800 Subject: [PATCH 2/4] docs --- BitFaster.Caching/Atomic/AtomicFactory.cs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/BitFaster.Caching/Atomic/AtomicFactory.cs b/BitFaster.Caching/Atomic/AtomicFactory.cs index ddaa97b0..446358d2 100644 --- a/BitFaster.Caching/Atomic/AtomicFactory.cs +++ b/BitFaster.Caching/Atomic/AtomicFactory.cs @@ -1,15 +1,14 @@ using System; using System.Collections.Generic; using System.Diagnostics; -using System.Diagnostics.CodeAnalysis; using System.Runtime.ExceptionServices; using System.Threading; namespace BitFaster.Caching.Atomic { /// - /// A class that provides simple, lightweight exactly once initialization for values - /// stored in a cache. + /// A class that provides simple, lightweight exactly once initialization for values stored + /// in a cache. Exceptions are propogated to the caller. /// /// The type of the key. /// The type of the value. @@ -94,6 +93,18 @@ public V ValueIfCreated } } + /// + /// Note the failure case works like this: + /// 1. Thread A enters AtomicFactory.CreateValue then Initializer.CreateValue and holds the lock. + /// 2. Thread B enters AtomicFactory.CreateValue then Initializer.CreateValue and queues on the lock. + /// 3. Thread A calls value factory, and after 1 second throws an exception. The exception is + /// captured in exceptionDispatch, lock is released, and an exeption is thrown. + /// 4. AtomicFactory.CreateValue catches the exception and creates a fresh initializer. + /// 5. Thread B enters the lock, finds exceptionDispatch is populated and immediately throws. + /// 6. Thread C can now start from a clean state. + /// This mitigates lock convoys where many queued threads will fail slowly one by one, introducing delays + /// and multiplying the number of calls to the failing resource. + /// private V CreateValue(K key, TFactory valueFactory) where TFactory : struct, IValueFactory { var init = Volatile.Read(ref initializer); From 67bb85db8c54f200fb1c299bb604eeae5d80896c Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Mon, 13 Nov 2023 18:24:05 -0800 Subject: [PATCH 3/4] unit test --- .../Atomic/AtomicFactoryTests.cs | 88 +++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs b/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs index eae14265..e609bd49 100644 --- a/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs +++ b/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs @@ -1,4 +1,5 @@  +using System; using System.Threading; using System.Threading.Tasks; using BitFaster.Caching.Atomic; @@ -159,5 +160,92 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner() winnerCount.Should().Be(1); } + + [Fact] + public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() + { + var enter = new ManualResetEvent(false); + var resume = new ManualResetEvent(false); + + var atomicFactory = new AtomicFactory(); + var throwCount = 0; + + Task first = Task.Run(() => + { + return atomicFactory.GetValue(1, k => + { + enter.Set(); + resume.WaitOne(); + + Interlocked.Increment(ref throwCount); + throw new Exception(); + }); + }); + + Task second = Task.Run(() => + { + return atomicFactory.GetValue(1, k => + { + enter.Set(); + resume.WaitOne(); + + Interlocked.Increment(ref throwCount); + throw new Exception(); + }); + }); + + enter.WaitOne(); + resume.Set(); + + Func act1 = () => first; + Func act2 = () => second; + + await act1.Should().ThrowAsync(); + await act2.Should().ThrowAsync(); + + // verify only one exception was thrown + throwCount.Should().Be(1); + } + + [Fact] + public async Task WhenCallersRunConcurrentlyAndFailNewCallerStartsClean() + { + var enter = new ManualResetEvent(false); + var resume = new ManualResetEvent(false); + + var atomicFactory = new AtomicFactory(); + + Task first = Task.Run(() => + { + return atomicFactory.GetValue(1, k => + { + enter.Set(); + resume.WaitOne(); + throw new Exception(); + }); + }); + + Task second = Task.Run(() => + { + return atomicFactory.GetValue(1, k => + { + enter.Set(); + resume.WaitOne(); + throw new Exception(); + }); + }); + + enter.WaitOne(); + resume.Set(); + + Func act1 = () => first; + Func act2 = () => second; + + await act1.Should().ThrowAsync(); + await act2.Should().ThrowAsync(); + + // verify exception is no longer cached + atomicFactory.GetValue(1, k => k).Should().Be(1); + } } } From 24a79aafa7fc36dee7abde7f141ca3c9244d94f9 Mon Sep 17 00:00:00 2001 From: Alex Peck Date: Mon, 13 Nov 2023 19:01:23 -0800 Subject: [PATCH 4/4] stricter --- .../Atomic/AtomicFactoryTests.cs | 42 +++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs b/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs index e609bd49..08b8f228 100644 --- a/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs +++ b/BitFaster.Caching.UnitTests/Atomic/AtomicFactoryTests.cs @@ -119,7 +119,9 @@ public void WhenArgObjectValuesAreSameEqualsTrue() [Fact] public async Task WhenCallersRunConcurrentlyResultIsFromWinner() { - var enter = new ManualResetEvent(false); + var enter1 = new ManualResetEvent(false); + var enter2 = new ManualResetEvent(false); + var factory = new ManualResetEvent(false); var resume = new ManualResetEvent(false); var atomicFactory = new AtomicFactory(); @@ -128,9 +130,10 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner() Task first = Task.Run(() => { + enter1.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); result = 1; @@ -141,9 +144,10 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner() Task second = Task.Run(() => { + enter2.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); result = 2; @@ -152,7 +156,9 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner() }); }); - enter.WaitOne(); + enter1.WaitOne(); + enter2.WaitOne(); + factory.WaitOne(); resume.Set(); (await first).Should().Be(result); @@ -164,7 +170,9 @@ public async Task WhenCallersRunConcurrentlyResultIsFromWinner() [Fact] public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() { - var enter = new ManualResetEvent(false); + var enter1 = new ManualResetEvent(false); + var enter2 = new ManualResetEvent(false); + var factory = new ManualResetEvent(false); var resume = new ManualResetEvent(false); var atomicFactory = new AtomicFactory(); @@ -172,9 +180,10 @@ public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() Task first = Task.Run(() => { + enter1.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); Interlocked.Increment(ref throwCount); @@ -184,9 +193,10 @@ public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() Task second = Task.Run(() => { + enter2.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); Interlocked.Increment(ref throwCount); @@ -194,7 +204,9 @@ public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() }); }); - enter.WaitOne(); + enter1.WaitOne(); + enter2.WaitOne(); + factory.WaitOne(); resume.Set(); Func act1 = () => first; @@ -210,16 +222,19 @@ public async Task WhenCallersRunConcurrentlyAndFailExceptionIsPropogated() [Fact] public async Task WhenCallersRunConcurrentlyAndFailNewCallerStartsClean() { - var enter = new ManualResetEvent(false); + var enter1 = new ManualResetEvent(false); + var enter2 = new ManualResetEvent(false); + var factory = new ManualResetEvent(false); var resume = new ManualResetEvent(false); var atomicFactory = new AtomicFactory(); Task first = Task.Run(() => { + enter1.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); throw new Exception(); }); @@ -227,15 +242,18 @@ public async Task WhenCallersRunConcurrentlyAndFailNewCallerStartsClean() Task second = Task.Run(() => { + enter2.Set(); return atomicFactory.GetValue(1, k => { - enter.Set(); + factory.Set(); resume.WaitOne(); throw new Exception(); }); }); - enter.WaitOne(); + enter1.WaitOne(); + enter2.WaitOne(); + factory.WaitOne(); resume.Set(); Func act1 = () => first;