Skip to content

Commit 4404fad

Browse files
committed
Make TQueue persist work across transactions
Previously, `TQueue` could build up a large write list, leading to the reader having to do too much work reversing it and aborting. Rotate the queue more frequently so the reversal work will effectively be saved even when a transaction aborts.
1 parent 92af455 commit 4404fad

File tree

1 file changed

+66
-53
lines changed

1 file changed

+66
-53
lines changed

Control/Concurrent/STM/TQueue.hs

Lines changed: 66 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
22
{-# LANGUAGE CPP, DeriveDataTypeable #-}
3+
{-# LANGUAGE BangPatterns #-}
34

45
#if __GLASGOW_HASKELL__ >= 701
56
{-# LANGUAGE Trustworthy #-}
@@ -50,57 +51,72 @@ import GHC.Conc
5051
import Control.Monad (unless)
5152
import Data.Typeable (Typeable)
5253

54+
data End a = End !Int [a]
55+
5356
-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
5457
--
5558
-- @since 2.4
56-
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
57-
{-# UNPACK #-} !(TVar [a])
59+
data TQueue a = TQueue {-# UNPACK #-} !(TVar Int)
60+
{-# UNPACK #-} !(TVar (End a))
61+
{-# UNPACK #-} !(TVar (End a))
5862
deriving Typeable
5963

6064
instance Eq (TQueue a) where
61-
TQueue a _ == TQueue b _ = a == b
65+
TQueue a _ _ == TQueue b _ _ = a == b
6266

6367
-- |Build and returns a new instance of 'TQueue'
6468
newTQueue :: STM (TQueue a)
6569
newTQueue = do
66-
read <- newTVar []
67-
write <- newTVar []
68-
return (TQueue read write)
70+
old_len <- newTVar 0
71+
read <- newTVar (End 0 [])
72+
write <- newTVar (End 0 [])
73+
return (TQueue old_len read write)
6974

7075
-- |@IO@ version of 'newTQueue'. This is useful for creating top-level
7176
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
7277
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
7378
-- possible.
7479
newTQueueIO :: IO (TQueue a)
7580
newTQueueIO = do
76-
read <- newTVarIO []
77-
write <- newTVarIO []
78-
return (TQueue read write)
81+
old_len <- newTVarIO 0
82+
read <- newTVarIO (End 0 [])
83+
write <- newTVarIO (End 0 [])
84+
return (TQueue old_len read write)
7985

8086
-- |Write a value to a 'TQueue'.
8187
writeTQueue :: TQueue a -> a -> STM ()
82-
writeTQueue (TQueue _read write) a = do
83-
listend <- readTVar write
84-
writeTVar write (a:listend)
88+
writeTQueue (TQueue old_len read write) a = do
89+
ol <- readTVar old_len
90+
End write_count listend <- readTVar write
91+
let write_count' = write_count + 1
92+
if 2 * write_count' >= ol
93+
then do
94+
End read_count front <- readTVar read
95+
let !len = ol + write_count' - read_count
96+
writeTVar old_len len
97+
writeTVar read (End 0 (front ++ reverse listend ++ [a]))
98+
writeTVar write (End 0 [])
99+
else writeTVar write (End write_count' (a:listend))
85100

86101
-- |Read the next value from the 'TQueue'.
87102
readTQueue :: TQueue a -> STM a
88-
readTQueue (TQueue read write) = do
89-
xs <- readTVar read
90-
case xs of
91-
(x:xs') -> do
92-
writeTVar read xs'
93-
return x
94-
[] -> do
95-
ys <- readTVar write
96-
case ys of
97-
[] -> retry
98-
_ -> do
99-
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
100-
-- short, otherwise it will conflict
101-
writeTVar write []
102-
writeTVar read zs
103-
return z
103+
readTQueue (TQueue old_len read write) = do
104+
ol <- readTVar old_len
105+
End read_count front <- readTVar read
106+
case front of
107+
[] -> retry
108+
(a:as) -> do
109+
let read_count' = read_count + 1
110+
if 2 * read_count' >= ol
111+
then do
112+
End write_count listend <- readTVar write
113+
let !len = ol + write_count - read_count'
114+
writeTVar old_len len
115+
writeTVar read (End 0 (as ++ reverse listend))
116+
writeTVar write (End 0 [])
117+
else do
118+
writeTVar read (End read_count' as)
119+
return a
104120

105121
-- | A version of 'readTQueue' which does not retry. Instead it
106122
-- returns @Nothing@ if no value is available.
@@ -112,45 +128,42 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
112128
--
113129
-- @since 2.4.5
114130
flushTQueue :: TQueue a -> STM [a]
115-
flushTQueue (TQueue read write) = do
116-
xs <- readTVar read
117-
ys <- readTVar write
118-
unless (null xs) $ writeTVar read []
119-
unless (null ys) $ writeTVar write []
131+
flushTQueue (TQueue old_len read write) = do
132+
End read_count xs <- readTVar read
133+
End write_count ys <- readTVar write
134+
unless (read_count == 0 && null xs) $ writeTVar read (End 0 [])
135+
unless (write_count == 0 && null ys) $ writeTVar write (End 0 [])
136+
writeTVar old_len 0
120137
return (xs ++ reverse ys)
121138

122139
-- | Get the next value from the @TQueue@ without removing it,
123140
-- retrying if the channel is empty.
124141
peekTQueue :: TQueue a -> STM a
125-
peekTQueue c = do
126-
x <- readTQueue c
127-
unGetTQueue c x
128-
return x
142+
peekTQueue (TQueue _old_len read _write) = do
143+
End _ xs <- readTVar read
144+
case xs of
145+
x:_ -> return x
146+
[] -> retry
129147

130148
-- | A version of 'peekTQueue' which does not retry. Instead it
131149
-- returns @Nothing@ if no value is available.
132150
tryPeekTQueue :: TQueue a -> STM (Maybe a)
133-
tryPeekTQueue c = do
134-
m <- tryReadTQueue c
135-
case m of
136-
Nothing -> return Nothing
137-
Just x -> do
138-
unGetTQueue c x
139-
return m
151+
tryPeekTQueue (TQueue _old_len read _write) = do
152+
End _ xs <- readTVar read
153+
case xs of
154+
x:_ -> return (Just x)
155+
[] -> return Nothing
140156

141157
-- |Put a data item back onto a channel, where it will be the next item read.
142158
unGetTQueue :: TQueue a -> a -> STM ()
143-
unGetTQueue (TQueue read _write) a = do
144-
xs <- readTVar read
145-
writeTVar read (a:xs)
159+
unGetTQueue (TQueue _old_len read _write) a = do
160+
End read_count xs <- readTVar read
161+
writeTVar read (End (read_count - 1) (a:xs))
146162

147163
-- |Returns 'True' if the supplied 'TQueue' is empty.
148164
isEmptyTQueue :: TQueue a -> STM Bool
149-
isEmptyTQueue (TQueue read write) = do
150-
xs <- readTVar read
165+
isEmptyTQueue (TQueue _old_len read _write) = do
166+
End _ xs <- readTVar read
151167
case xs of
152168
(_:_) -> return False
153-
[] -> do ys <- readTVar write
154-
case ys of
155-
[] -> return True
156-
_ -> return False
169+
[] -> return True

0 commit comments

Comments
 (0)