Skip to content

Make TQueue persist work across transactions #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 66 additions & 55 deletions Control/Concurrent/STM/TQueue.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{-# OPTIONS_GHC -fno-warn-name-shadowing #-}
{-# LANGUAGE CPP, DeriveDataTypeable #-}
{-# LANGUAGE BangPatterns #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
Expand Down Expand Up @@ -50,57 +51,72 @@ import GHC.Conc
import Control.Monad (unless)
import Data.Typeable (Typeable)

data End a = End !Int [a]

-- | 'TQueue' is an abstract type representing an unbounded FIFO channel.
--
-- @since 2.4
data TQueue a = TQueue {-# UNPACK #-} !(TVar [a])
{-# UNPACK #-} !(TVar [a])
data TQueue a = TQueue {-# UNPACK #-} !(TVar Int)
{-# UNPACK #-} !(TVar (End a))
{-# UNPACK #-} !(TVar (End a))
deriving Typeable

instance Eq (TQueue a) where
TQueue a _ == TQueue b _ = a == b
TQueue a _ _ == TQueue b _ _ = a == b

-- |Build and returns a new instance of 'TQueue'
newTQueue :: STM (TQueue a)
newTQueue = do
read <- newTVar []
write <- newTVar []
return (TQueue read write)
old_len <- newTVar 0
read <- newTVar (End 0 [])
write <- newTVar (End 0 [])
return (TQueue old_len read write)

-- |@IO@ version of 'newTQueue'. This is useful for creating top-level
-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using
-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't
-- possible.
newTQueueIO :: IO (TQueue a)
newTQueueIO = do
read <- newTVarIO []
write <- newTVarIO []
return (TQueue read write)
old_len <- newTVarIO 0
read <- newTVarIO (End 0 [])
write <- newTVarIO (End 0 [])
return (TQueue old_len read write)

-- |Write a value to a 'TQueue'.
writeTQueue :: TQueue a -> a -> STM ()
writeTQueue (TQueue _read write) a = do
listend <- readTVar write
writeTVar write (a:listend)
writeTQueue (TQueue old_len read write) a = do
ol <- readTVar old_len
End write_count listend <- readTVar write
let write_count' = write_count + 1
if 2 * write_count' >= ol
then do
End read_count front <- readTVar read
let !len = ol + write_count' - read_count
writeTVar old_len len
writeTVar read (End 0 (front ++ reverse listend ++ [a]))
writeTVar write (End 0 [])
else writeTVar write (End write_count' (a:listend))

-- |Read the next value from the 'TQueue'.
readTQueue :: TQueue a -> STM a
readTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(x:xs') -> do
writeTVar read xs'
return x
[] -> do
ys <- readTVar write
case ys of
[] -> retry
_ -> do
let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be
-- short, otherwise it will conflict
writeTVar write []
writeTVar read zs
return z
readTQueue (TQueue old_len read write) = do
ol <- readTVar old_len
End read_count front <- readTVar read
case front of
[] -> retry
(a:as) -> do
let read_count' = read_count + 1
if 2 * read_count' >= ol
then do
End write_count listend <- readTVar write
let !len = ol + write_count - read_count'
writeTVar old_len len
writeTVar read (End 0 (as ++ reverse listend))
writeTVar write (End 0 [])
else do
writeTVar read (End read_count' as)
return a

-- | A version of 'readTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
Expand All @@ -112,45 +128,40 @@ tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing
--
-- @since 2.4.5
flushTQueue :: TQueue a -> STM [a]
flushTQueue (TQueue read write) = do
xs <- readTVar read
ys <- readTVar write
unless (null xs) $ writeTVar read []
unless (null ys) $ writeTVar write []
flushTQueue (TQueue old_len read write) = do
End read_count xs <- readTVar read
End write_count ys <- readTVar write
unless (read_count == 0 && null xs) $ writeTVar read (End 0 [])
unless (write_count == 0 && null ys) $ writeTVar write (End 0 [])
writeTVar old_len 0
return (xs ++ reverse ys)

-- | Get the next value from the @TQueue@ without removing it,
-- retrying if the channel is empty.
peekTQueue :: TQueue a -> STM a
peekTQueue c = do
x <- readTQueue c
unGetTQueue c x
return x
peekTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
case xs of
x:_ -> return x
[] -> retry

-- | A version of 'peekTQueue' which does not retry. Instead it
-- returns @Nothing@ if no value is available.
tryPeekTQueue :: TQueue a -> STM (Maybe a)
tryPeekTQueue c = do
m <- tryReadTQueue c
case m of
Nothing -> return Nothing
Just x -> do
unGetTQueue c x
return m
tryPeekTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
case xs of
x:_ -> return (Just x)
[] -> return Nothing

-- |Put a data item back onto a channel, where it will be the next item read.
unGetTQueue :: TQueue a -> a -> STM ()
unGetTQueue (TQueue read _write) a = do
xs <- readTVar read
writeTVar read (a:xs)
unGetTQueue (TQueue _old_len read _write) a = do
End read_count xs <- readTVar read
writeTVar read (End (read_count - 1) (a:xs))

-- |Returns 'True' if the supplied 'TQueue' is empty.
isEmptyTQueue :: TQueue a -> STM Bool
isEmptyTQueue (TQueue read write) = do
xs <- readTVar read
case xs of
(_:_) -> return False
[] -> do ys <- readTVar write
case ys of
[] -> return True
_ -> return False
isEmptyTQueue (TQueue _old_len read _write) = do
End _ xs <- readTVar read
return $! null xs