From 91576a975835bb840508c64c34fcea1ed42f4266 Mon Sep 17 00:00:00 2001 From: David Feuer Date: Wed, 23 May 2018 23:27:57 -0400 Subject: [PATCH] Make TQueue persist work across transactions Previously, `TQueue` could build up a large write list, leading to the reader having to do too much work reversing it and aborting. Rotate the queue more frequently so the reversal work will effectively be saved even when a transaction aborts. --- Control/Concurrent/STM/TQueue.hs | 121 +++++++++++++++++-------------- 1 file changed, 66 insertions(+), 55 deletions(-) diff --git a/Control/Concurrent/STM/TQueue.hs b/Control/Concurrent/STM/TQueue.hs index 483db15..a1153b3 100644 --- a/Control/Concurrent/STM/TQueue.hs +++ b/Control/Concurrent/STM/TQueue.hs @@ -1,5 +1,6 @@ {-# OPTIONS_GHC -fno-warn-name-shadowing #-} {-# LANGUAGE CPP, DeriveDataTypeable #-} +{-# LANGUAGE BangPatterns #-} #if __GLASGOW_HASKELL__ >= 701 {-# LANGUAGE Trustworthy #-} @@ -50,22 +51,26 @@ import GHC.Conc import Control.Monad (unless) import Data.Typeable (Typeable) +data End a = End !Int [a] + -- | 'TQueue' is an abstract type representing an unbounded FIFO channel. -- -- @since 2.4 -data TQueue a = TQueue {-# UNPACK #-} !(TVar [a]) - {-# UNPACK #-} !(TVar [a]) +data TQueue a = TQueue {-# UNPACK #-} !(TVar Int) + {-# UNPACK #-} !(TVar (End a)) + {-# UNPACK #-} !(TVar (End a)) deriving Typeable instance Eq (TQueue a) where - TQueue a _ == TQueue b _ = a == b + TQueue a _ _ == TQueue b _ _ = a == b -- |Build and returns a new instance of 'TQueue' newTQueue :: STM (TQueue a) newTQueue = do - read <- newTVar [] - write <- newTVar [] - return (TQueue read write) + old_len <- newTVar 0 + read <- newTVar (End 0 []) + write <- newTVar (End 0 []) + return (TQueue old_len read write) -- |@IO@ version of 'newTQueue'. This is useful for creating top-level -- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using @@ -73,34 +78,45 @@ newTQueue = do -- possible. newTQueueIO :: IO (TQueue a) newTQueueIO = do - read <- newTVarIO [] - write <- newTVarIO [] - return (TQueue read write) + old_len <- newTVarIO 0 + read <- newTVarIO (End 0 []) + write <- newTVarIO (End 0 []) + return (TQueue old_len read write) -- |Write a value to a 'TQueue'. writeTQueue :: TQueue a -> a -> STM () -writeTQueue (TQueue _read write) a = do - listend <- readTVar write - writeTVar write (a:listend) +writeTQueue (TQueue old_len read write) a = do + ol <- readTVar old_len + End write_count listend <- readTVar write + let write_count' = write_count + 1 + if 2 * write_count' >= ol + then do + End read_count front <- readTVar read + let !len = ol + write_count' - read_count + writeTVar old_len len + writeTVar read (End 0 (front ++ reverse listend ++ [a])) + writeTVar write (End 0 []) + else writeTVar write (End write_count' (a:listend)) -- |Read the next value from the 'TQueue'. readTQueue :: TQueue a -> STM a -readTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (x:xs') -> do - writeTVar read xs' - return x - [] -> do - ys <- readTVar write - case ys of - [] -> retry - _ -> do - let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be - -- short, otherwise it will conflict - writeTVar write [] - writeTVar read zs - return z +readTQueue (TQueue old_len read write) = do + ol <- readTVar old_len + End read_count front <- readTVar read + case front of + [] -> retry + (a:as) -> do + let read_count' = read_count + 1 + if 2 * read_count' >= ol + then do + End write_count listend <- readTVar write + let !len = ol + write_count - read_count' + writeTVar old_len len + writeTVar read (End 0 (as ++ reverse listend)) + writeTVar write (End 0 []) + else do + writeTVar read (End read_count' as) + return a -- | A version of 'readTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. @@ -112,45 +128,40 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing -- -- @since 2.4.5 flushTQueue :: TQueue a -> STM [a] -flushTQueue (TQueue read write) = do - xs <- readTVar read - ys <- readTVar write - unless (null xs) $ writeTVar read [] - unless (null ys) $ writeTVar write [] +flushTQueue (TQueue old_len read write) = do + End read_count xs <- readTVar read + End write_count ys <- readTVar write + unless (read_count == 0 && null xs) $ writeTVar read (End 0 []) + unless (write_count == 0 && null ys) $ writeTVar write (End 0 []) + writeTVar old_len 0 return (xs ++ reverse ys) -- | Get the next value from the @TQueue@ without removing it, -- retrying if the channel is empty. peekTQueue :: TQueue a -> STM a -peekTQueue c = do - x <- readTQueue c - unGetTQueue c x - return x +peekTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + case xs of + x:_ -> return x + [] -> retry -- | A version of 'peekTQueue' which does not retry. Instead it -- returns @Nothing@ if no value is available. tryPeekTQueue :: TQueue a -> STM (Maybe a) -tryPeekTQueue c = do - m <- tryReadTQueue c - case m of - Nothing -> return Nothing - Just x -> do - unGetTQueue c x - return m +tryPeekTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + case xs of + x:_ -> return (Just x) + [] -> return Nothing -- |Put a data item back onto a channel, where it will be the next item read. unGetTQueue :: TQueue a -> a -> STM () -unGetTQueue (TQueue read _write) a = do - xs <- readTVar read - writeTVar read (a:xs) +unGetTQueue (TQueue _old_len read _write) a = do + End read_count xs <- readTVar read + writeTVar read (End (read_count - 1) (a:xs)) -- |Returns 'True' if the supplied 'TQueue' is empty. isEmptyTQueue :: TQueue a -> STM Bool -isEmptyTQueue (TQueue read write) = do - xs <- readTVar read - case xs of - (_:_) -> return False - [] -> do ys <- readTVar write - case ys of - [] -> return True - _ -> return False +isEmptyTQueue (TQueue _old_len read _write) = do + End _ xs <- readTVar read + return $! null xs