@@ -94,25 +94,24 @@ public ThreadCachingPool( int targetSize, Allocator<T> allocator, ValidationStra
9494
9595 public T acquire ( long timeout , TimeUnit unit ) throws InterruptedException
9696 {
97- assert live .size () <= maxSize ;
9897 long deadline = clock .millis () + unit .toMillis ( timeout );
9998
10099 // 1. Try and value an object from our local slot
101100 Slot <T > slot = local .get ();
102101
103- if ( slot != null && slot .availableToThreadLocalClaimed () )
102+ if ( slot != null && slot .availableToClaimed () )
104103 {
105104 if ( slot .isValid ( validationStrategy ) )
106105 {
107106 allocator .onAcquire ( slot .value );
108107 return slot .value ;
109108 }
110- else {
109+ else
110+ {
111+ // We've acquired the slot, but the validation strategy says it's time for it to die. Dispose of it,
112+ // and go to the global pool.
111113 dispose ( slot );
112114 }
113-
114- //The slot was invalidated however we cannot put it to the
115- //disposed queue yet since it already exists in the live queue
116115 }
117116
118117 // 2. If that fails, acquire from big pool
@@ -134,18 +133,17 @@ private T acquireFromGlobal( long deadline ) throws InterruptedException
134133 if ( slot != null )
135134 {
136135 // Yay, got a slot - can we keep it?
137- if ( slot .isValid ( validationStrategy ) )
136+ if ( slot .availableToClaimed ( ) )
138137 {
139- if ( slot .availableToClaimed ( ) )
138+ if ( slot .isValid ( validationStrategy ) )
140139 {
141140 break ;
142141 }
143- }
144- // We've acquired the slot, but the validation strategy says it's time for it to die.
145- // Either the slot is already claimed or if it is available make it claimed
146- else if ( slot .isClaimedOrAvailableToClaimed () )
147- {
148- dispose ( slot );
142+ else
143+ {
144+ // We've acquired the slot, but the validation strategy says it's time for it to die.
145+ dispose ( slot );
146+ }
149147 }
150148 }
151149 else
@@ -181,38 +179,22 @@ else if ( slot.isClaimedOrAvailableToClaimed() )
181179
182180 // Keep this slot cached with our thread, so that we can grab this value quickly next time,
183181 // assuming threads generally availableToClaimed one instance at a time
184- updateThreadLocal ( slot );
182+ local . set ( slot );
185183 allocator .onAcquire ( slot .value );
186184 return slot .value ;
187185 }
188186
189- private void updateThreadLocal (Slot <T > slot )
190- {
191- Slot <T > localSlot = local .get ();
192- if ( localSlot != null )
193- {
194- //The old slot is no longer in the tread local
195- localSlot .threadLocalClaimedToClaimed ();
196- }
197- else
198- {
199- //There was nothing stored in thread local
200- //no we must also add this slot to the live queue
201- live .add ( slot );
202- }
203- slot .claimByThreadLocal ();
204- local .set ( slot );
205- }
206-
207187 private void dispose ( Slot <T > slot )
208188 {
209- if ( slot .claimedToDisposed () || slot . threadLocalClaimedToDisposed () )
189+ if ( ! slot .claimedToDisposed () )
210190 {
211- // Done before below, in case dispose call fails. This is safe since objects on the
212- // pool are used for read-only operations
213- disposed .add ( slot );
214- allocator .onDispose ( slot .value );
191+ throw new IllegalStateException ( "Cannot dispose unclaimed pool object: " + slot );
215192 }
193+
194+ // Done before below, in case dispose call fails. This is safe since objects on the
195+ // pool are used for read-only operations
196+ disposed .add ( slot );
197+ allocator .onDispose ( slot .value );
216198 }
217199
218200 /**
@@ -235,7 +217,7 @@ private Slot<T> allocate( int slotIndex )
235217 // Return it :)
236218 return slot ;
237219 }
238- catch ( Neo4jException e )
220+ catch ( Neo4jException e )
239221 {
240222 // Failed to allocate slot, return it to the list of disposed slots, rethrow exception.
241223 slot .claimedToDisposed ();
@@ -254,41 +236,33 @@ public void accept( T t )
254236 slot .updateUsageTimestamp ();
255237 if ( !slot .isValid ( validationStrategy ) )
256238 {
239+ // The value has for some reason become invalid, dispose of it
257240 dispose ( slot );
258241 return ;
259242 }
260243
261- if ( slot .claimedToAvailable () )
244+ if ( ! slot .claimedToAvailable () )
262245 {
263- // Make sure the pool isn't being stopped in the middle of all these shenanigans
264- if ( !stopped .get () )
265- {
266- // All good, as you were.
267- live .add ( slot );
268- }
269- else
270- {
271- // Another thread concurrently closing the pool may have started closing before we
272- // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
273- // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
274- // If we can't claim the slot back, that means another thread is dealing with it.
275- if ( slot .availableToClaimed () )
276- {
277- dispose ( slot );
278- }
279- }
246+ throw new IllegalStateException ( "Failed to release pooled object: " + slot );
280247 }
281248
282- // If we are claimed by thread local we are already in the live queue
283- if ( slot . threadLocalClaimedToAvailable () && stopped .get () )
249+ // Make sure the pool isn't being stopped in the middle of all these shenanigans
250+ if ( ! stopped .get () )
284251 {
285- // As above, try to claim the slot back and dispose
252+ // All good, as you were.
253+ live .add ( slot );
254+ }
255+ else
256+ {
257+ // Another thread concurrently closing the pool may have started closing before we
258+ // set our slot to "available". In that case, the slot will not be disposed of by the closing thread
259+ // We mitigate this by trying to claim the slot back - if we are able to, we dispose the slot.
260+ // If we can't claim the slot back, that means another thread is dealing with it.
286261 if ( slot .availableToClaimed () )
287262 {
288263 dispose ( slot );
289264 }
290265 }
291-
292266 }
293267 };
294268 }
@@ -319,7 +293,6 @@ class Slot<T>
319293 enum State
320294 {
321295 AVAILABLE ,
322- THREAD_LOCAL_CLAIMED ,
323296 CLAIMED ,
324297 DISPOSED
325298 }
@@ -331,6 +304,13 @@ enum State
331304 long lastUsed ;
332305 T value ;
333306
307+ public static <T > Slot <T > disposed ( int index , Clock clock )
308+ {
309+ Slot <T > slot = new Slot <>( index , clock );
310+ slot .claimedToDisposed ();
311+ return slot ;
312+ }
313+
334314 /**
335315 * @param index the index into the {@link ThreadCachingPool#all all} array, used to re-use that slot when this is
336316 * disposed
@@ -352,11 +332,6 @@ public boolean availableToClaimed()
352332 return state .compareAndSet ( State .AVAILABLE , State .CLAIMED );
353333 }
354334
355- public boolean availableToThreadLocalClaimed ()
356- {
357- return state .compareAndSet ( State .AVAILABLE , State .THREAD_LOCAL_CLAIMED );
358- }
359-
360335 public boolean claimedToAvailable ()
361336 {
362337 updateUsageTimestamp ();
@@ -368,36 +343,6 @@ public boolean claimedToDisposed()
368343 return state .compareAndSet ( State .CLAIMED , State .DISPOSED );
369344 }
370345
371- public boolean threadLocalClaimedToDisposed ()
372- {
373- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .DISPOSED );
374- }
375-
376- public boolean threadLocalClaimedToClaimed ()
377- {
378- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .CLAIMED );
379- }
380-
381- public boolean threadLocalClaimedToAvailable ()
382- {
383- return state .compareAndSet ( State .THREAD_LOCAL_CLAIMED , State .AVAILABLE );
384- }
385-
386- public void claimByThreadLocal ()
387- {
388- state .set ( State .THREAD_LOCAL_CLAIMED );
389- }
390-
391- public boolean isClaimedOrAvailableToClaimed ()
392- {
393- return availableToClaimed () || state .get () == State .CLAIMED ;
394- }
395-
396- public boolean disposed ()
397- {
398- return state .get () == State .DISPOSED ;
399- }
400-
401346 public void updateUsageTimestamp ()
402347 {
403348 lastUsed = clock .millis ();
0 commit comments