Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions beam-postgres/Database/Beam/Postgres/Connection.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ import Foreign.C.Types
import System.Clock

import Network.URI (uriToString)
import Control.Exception (try)
import Control.Exception (try,toException)

data PgStream a = PgStreamDone (Either BeamRowReadError a)
data PgStream a = PgStreamDone (Either SomeException a)
| PgStreamContinue (Maybe PgI.Row -> IO (PgStream a))

-- | 'BeamURIOpeners' for the standard @postgresql:@ URI scheme. See the
Expand Down Expand Up @@ -134,17 +134,17 @@ getFields res = do
mapM getField [0..colCount - 1]

runPgRowReader ::
Pg.Connection -> Pg.Row -> Pg.Result -> [Pg.Field] -> FromBackendRowM Postgres a -> IO (Either BeamRowReadError a)
Pg.Connection -> Pg.Row -> Pg.Result -> [Pg.Field] -> FromBackendRowM Postgres a -> IO (Either SomeException a)
runPgRowReader conn rowIdx res fields (FromBackendRowM readRow) =
Pg.nfields res >>= \(Pg.Col colCount) ->
runF readRow finish step 0 colCount fields
where

step :: forall x. FromBackendRowF Postgres (CInt -> CInt -> [PgI.Field] -> IO (Either BeamRowReadError x))
-> CInt -> CInt -> [PgI.Field] -> IO (Either BeamRowReadError x)
step (ParseOneField _) curCol colCount [] = pure (Left (BeamRowReadError (Just (fromIntegral curCol)) (ColumnNotEnoughColumns (fromIntegral colCount))))
step :: forall x. FromBackendRowF Postgres (CInt -> CInt -> [PgI.Field] -> IO (Either SomeException x))
-> CInt -> CInt -> [PgI.Field] -> IO (Either SomeException x)
step (ParseOneField _) curCol colCount [] = pure (Left $ toException (BeamRowReadError (Just (fromIntegral curCol)) (ColumnNotEnoughColumns (fromIntegral colCount))))
step (ParseOneField _) curCol colCount _
| curCol >= colCount = pure (Left (BeamRowReadError (Just (fromIntegral curCol)) (ColumnNotEnoughColumns (fromIntegral colCount))))
| curCol >= colCount = pure (Left $ toException (BeamRowReadError (Just (fromIntegral curCol)) (ColumnNotEnoughColumns (fromIntegral colCount))))
step (ParseOneField (next' :: next -> _)) curCol colCount (field:remainingFields) =
do fieldValue <- Pg.getvalue res rowIdx (Pg.Col curCol)
res' <- Pg.runConversion (Pg.fromField field fieldValue) conn
Expand All @@ -165,7 +165,7 @@ runPgRowReader conn rowIdx res fields (FromBackendRowM readRow) =
pure (ColumnTypeMismatch hs sql msg)
Pg.UnexpectedNull {} ->
pure ColumnUnexpectedNull
in pure (Left (BeamRowReadError (Just (fromIntegral curCol)) err))
in pure (Left $ toException (BeamRowReadError (Just (fromIntegral curCol)) err))
Pg.Ok x -> next' x (curCol + 1) colCount remainingFields

step (Alt (FromBackendRowM a) (FromBackendRowM b) next) curCol colCount cols =
Expand All @@ -179,11 +179,11 @@ runPgRowReader conn rowIdx res fields (FromBackendRowM readRow) =
Left {} -> pure (Left aErr)

step (FailParseWith err) _ _ _ =
pure (Left err)
pure (Left $ toException err)

finish x _ _ _ = pure (Right x)

withPgDebug :: (Text -> IO ()) -> Pg.Connection -> Pg a -> IO (Either BeamRowReadError a)
withPgDebug :: (Text -> IO ()) -> Pg.Connection -> Pg a -> IO (Either SomeException a)
withPgDebug dbg conn (Pg action) =
let finish x = pure (Right x)
step (PgLiftIO io next) = io >>= next
Expand All @@ -203,7 +203,7 @@ withPgDebug dbg conn (Pg action) =
case respWithException of
Left err -> do
dbg (decodeUtf8 query)
return (Left $ BeamRowReadError Nothing $ ColumnErrorInternal (show err ) , Nothing)
return (Left err, Nothing)
Right _ -> do
end <- getTime Monotonic
(, Just (end - start)) <$> next x
Expand All @@ -230,7 +230,7 @@ withPgDebug dbg conn (Pg action) =
case respWithException of
Left err -> do
dbg (decodeUtf8 query)
return $ Left $ BeamRowReadError Nothing $ ColumnErrorInternal (show err )
return $ Left err
Right res -> do
end <- getTime Monotonic
let extime = end - start
Expand All @@ -249,21 +249,21 @@ withPgDebug dbg conn (Pg action) =
case respWithException of
Left err -> do
dbg (decodeUtf8 query)
return $ Left $ BeamRowReadError Nothing $ ColumnErrorInternal (show err )
return $ Left err
Right _ -> do
end <- getTime Monotonic
let extime = end - start
dbg (decodeUtf8 query <> " Executed in: " <> T.pack (show (((sec extime) * 1000) + ((nsec extime) `div` 1000000)) <> " ms "))
let Pg process = mkProcess (Pg (liftF (PgFetchNext id)))
runF process next stepReturningNone

stepReturningNone :: forall a. PgF (IO (Either BeamRowReadError a)) -> IO (Either BeamRowReadError a)
stepReturningNone :: forall a. PgF (IO (Either SomeException a)) -> IO (Either SomeException a)
stepReturningNone (PgLiftIO action' next) = action' >>= next
stepReturningNone (PgLiftWithHandle withConn next) = withConn conn >>= next
stepReturningNone (PgFetchNext next) = next Nothing
stepReturningNone (PgRunReturning _ _ _) = pure (Left (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))
stepReturningNone (PgRunReturning _ _ _) = pure (Left $ toException (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))

stepReturningList :: forall a. Pg.Result -> PgF (CInt -> IO (Either BeamRowReadError a)) -> CInt -> IO (Either BeamRowReadError a)
stepReturningList :: forall a. Pg.Result -> PgF (CInt -> IO (Either SomeException a)) -> CInt -> IO (Either SomeException a)
stepReturningList _ (PgLiftIO action' next) rowIdx = action' >>= \x -> next x rowIdx
stepReturningList res (PgFetchNext next) rowIdx =
do fields <- getFields res
Expand All @@ -273,8 +273,8 @@ withPgDebug dbg conn (Pg action) =
else runPgRowReader conn (Pg.Row rowIdx) res fields fromBackendRow >>= \case
Left err -> pure (Left err)
Right r -> next (Just r) (rowIdx + 1)
stepReturningList _ (PgRunReturning _ _ _) _ = pure (Left (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))
stepReturningList _ (PgLiftWithHandle {}) _ = pure (Left (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))
stepReturningList _ (PgRunReturning _ _ _) _ = pure (Left $ toException (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))
stepReturningList _ (PgLiftWithHandle {}) _ = pure (Left $ toException (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed")))

finishProcess :: forall a. a -> Maybe PgI.Row -> IO (PgStream a)
finishProcess x _ = pure (PgStreamDone (Right x))
Expand All @@ -295,8 +295,8 @@ withPgDebug dbg conn (Pg action) =
runPgRowReader conn rowIdx res fields fromBackendRow >>= \case
Left err -> pure (PgStreamDone (Left err))
Right r -> pure (PgStreamContinue (next (Just r)))
stepProcess (PgRunReturning _ _ _) _ = pure (PgStreamDone (Left (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed"))))
stepProcess (PgLiftWithHandle _ _) _ = pure (PgStreamDone (Left (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed"))))
stepProcess (PgRunReturning _ _ _) _ = pure (PgStreamDone (Left $ toException (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed"))))
stepProcess (PgLiftWithHandle _ _) _ = pure (PgStreamDone (Left $ toException (BeamRowReadError Nothing (ColumnErrorInternal "Nested queries not allowed"))))

runConsumer :: forall a. PgStream a -> PgI.Row -> IO (PgStream a)
runConsumer s@(PgStreamDone {}) _ = pure s
Expand Down