From a9438aed768ff33fee8f7ee041d0b3f9aacb7c94 Mon Sep 17 00:00:00 2001 From: David Feuer Date: Thu, 24 May 2018 17:04:14 -0400 Subject: [PATCH] Use a real-time queue for TQueue This is another alternative design for `TQueue`. Instead of an amortized queue, this uses a real-time one based on Okasaki's scheduled banker's queues. We limit contention by using two independent schedules. --- Control/Concurrent/STM/TQueue.hs | 169 +++++++++++++++++++------------ 1 file changed, 104 insertions(+), 65 deletions(-) diff --git a/Control/Concurrent/STM/TQueue.hs b/Control/Concurrent/STM/TQueue.hs index 483db15..c83464f 100644 --- a/Control/Concurrent/STM/TQueue.hs +++ b/Control/Concurrent/STM/TQueue.hs @@ -1,5 +1,6 @@ {-# OPTIONS_GHC -fno-warn-name-shadowing #-} {-# LANGUAGE CPP, DeriveDataTypeable #-} +{-# LANGUAGE BangPatterns #-} #if __GLASGOW_HASKELL__ >= 701 {-# LANGUAGE Trustworthy #-} @@ -17,16 +18,14 @@ -- -- A 'TQueue' is like a 'TChan', with two important differences: -- --- * it has faster throughput than both 'TChan' and 'Chan' (although --- the costs are amortised, so the cost of individual operations --- can vary a lot). +-- * it has faster throughput than both 'TChan' and 'Chan' -- -- * it does /not/ provide equivalents of the 'dupTChan' and -- 'cloneTChan' operations. -- --- The implementation is based on the traditional purely-functional --- queue representation that uses two lists to obtain amortised /O(1)/ --- enqueue and dequeue operations. +-- The implementation is based on Okasaki's scheduled banker's queues, +-- but it uses *two* schedules so there's only contention between the +-- reader and writer when the queue needs to be rotated. -- -- @since 2.4 ----------------------------------------------------------------------------- @@ -44,63 +43,109 @@ module Control.Concurrent.STM.TQueue ( writeTQueue, unGetTQueue, isEmptyTQueue, - ) where + ) where import GHC.Conc import Control.Monad (unless) import Data.Typeable (Typeable) +data End a = + End [a] -- list + [a] -- schedule + -- | 'TQueue' is an abstract type representing an unbounded FIFO channel. -- -- @since 2.4 -data TQueue a = TQueue {-# UNPACK #-} !(TVar [a]) - {-# UNPACK #-} !(TVar [a]) +data TQueue a = TQueue {-# UNPACK #-} !(TVar (End a)) + {-# UNPACK #-} !(TVar (End a)) deriving Typeable +{- +Invariant: + +Given front list, rear list, front schedule, and rear schedule called +front, rear, fsched, and rsched, respectively, + + 2 * (|front| - |rear|) = |fsched| + |rsched| + +Note that because lengths cannot be negative, this implies that + + |front| >= |rear| + +We rotate the queue when either schedule is empty. This preserves +the invariant and ensures that the spine of the front list is +fully realized when a rotation occurs. The spine of the rear list +is *always* fully realized. We could use a strict-spined list for +the rear, but it doesn't really seem to be worth the trouble. +-} instance Eq (TQueue a) where TQueue a _ == TQueue b _ = a == b --- |Build and returns a new instance of 'TQueue' +-- | Build and returns a new instance of 'TQueue' newTQueue :: STM (TQueue a) newTQueue = do - read <- newTVar [] - write <- newTVar [] + read <- newTVar (End [] []) + write <- newTVar (End [] []) return (TQueue read write) --- |@IO@ version of 'newTQueue'. This is useful for creating top-level +-- | @IO@ version of 'newTQueue'. This is useful for creating top-level -- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using -- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't -- possible. newTQueueIO :: IO (TQueue a) newTQueueIO = do - read <- newTVarIO [] - write <- newTVarIO [] + read <- newTVarIO (End [] []) + write <- newTVarIO (End [] []) return (TQueue read write) --- |Write a value to a 'TQueue'. +-- rotate front end = front ++ reverse rear, but the reverse is performed +-- incrementally as the append proceeds. +-- +-- Precondition: |front| + 1 >= |rear|. This ensures that when the front +-- list is empty, the rear list has at most one element, so we don't need +-- to reverse it. +rotate :: [a] -> [a] -> [a] +rotate = go [] + where + go acc [] rear = rear ++ acc + go acc (x:xs) (r:rs) + = x : go (r:acc) xs rs + go acc xs [] = xs ++ acc + +-- | Write a value to a 'TQueue'. writeTQueue :: TQueue a -> a -> STM () -writeTQueue (TQueue _read write) a = do - listend <- readTVar write - writeTVar write (a:listend) - --- |Read the next value from the 'TQueue'. +writeTQueue (TQueue read write) a = do + End listend rsched <- readTVar write + let listend' = a : listend + case rsched of + -- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2 + _:_:rsched' -> writeTVar write (End listend' rsched') + + -- Rotate the queue; the invariant holds trivially. + _ -> do + End listfront _fsched <- readTVar read + let !front' = rotate listfront listend' + writeTVar read (End front' front') + writeTVar write (End [] front') + +-- | Read the next value from the 'TQueue'. readTQueue :: TQueue a -> STM a readTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (x:xs') -> do - writeTVar read xs' - return x - [] -> do - ys <- readTVar write - case ys of - [] -> retry - _ -> do - let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be - -- short, otherwise it will conflict - writeTVar write [] - writeTVar read zs - return z + End listfront fsched <- readTVar read + case listfront of + [] -> retry + x:front' -> + case fsched of + -- Reduce |front|-|rear| by 1; reduce |fsched|+|rsched| by 2 + _:_:fsched' -> writeTVar read (End front' fsched') >> return x + + -- Rotate the queue; the invariant holds trivially. + _ -> do + End listend _rsched <- readTVar write + let !front'' = rotate front' listend + writeTVar read (End front'' front'') + writeTVar write (End [] front'') + return x -- | A version of 'readTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. @@ -113,44 +158,38 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing -- @since 2.4.5 flushTQueue :: TQueue a -> STM [a] flushTQueue (TQueue read write) = do - xs <- readTVar read - ys <- readTVar write - unless (null xs) $ writeTVar read [] - unless (null ys) $ writeTVar write [] - return (xs ++ reverse ys) + End front fsched <- readTVar read + End rear rsched <- readTVar write + unless (null front && null fsched) $ writeTVar read (End [] []) + unless (null rear && null rsched) $ writeTVar write (End [] []) + return (rotate front rear) -- | Get the next value from the @TQueue@ without removing it, -- retrying if the channel is empty. peekTQueue :: TQueue a -> STM a -peekTQueue c = do - x <- readTQueue c - unGetTQueue c x - return x +peekTQueue (TQueue read _write) = do + End front _fsched <- readTVar read + case front of + x:_ -> return x + [] -> retry -- | A version of 'peekTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryPeekTQueue :: TQueue a -> STM (Maybe a) -tryPeekTQueue c = do - m <- tryReadTQueue c - case m of - Nothing -> return Nothing - Just x -> do - unGetTQueue c x - return m - --- |Put a data item back onto a channel, where it will be the next item read. +tryPeekTQueue (TQueue read _write) = do + End front _fsched <- readTVar read + case front of + x:_ -> return (Just x) + [] -> return Nothing + +-- | Put a data item back onto a channel, where it will be the next item read. unGetTQueue :: TQueue a -> a -> STM () unGetTQueue (TQueue read _write) a = do - xs <- readTVar read - writeTVar read (a:xs) + End front fsched <- readTVar read + writeTVar read (End (a:front) (a:a:fsched)) --- |Returns 'True' if the supplied 'TQueue' is empty. +-- | Returns 'True' if the supplied 'TQueue' is empty. isEmptyTQueue :: TQueue a -> STM Bool -isEmptyTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (_:_) -> return False - [] -> do ys <- readTVar write - case ys of - [] -> return True - _ -> return False +isEmptyTQueue (TQueue read _write) = do + End front _fsched <- readTVar read + return $! null front