Safe Haskell | None |
---|---|
Language | Haskell2010 |
Database.PostgreSQL.Simple.Queue
Description
This module utilize PostgreSQL to implement a durable queue for efficently processing arbitrary payloads which can be represented as JSON.
Typically a producer would enqueue a new payload as part of larger database transaction
createAccount userRecord = dorunDBTSerializable
$ do createUserDB userRecordenqueueDB
$ makeVerificationEmail userRecord
In another thread or process, the consumer would drain the queue.
forever $ do -- Attempt get a payload or block until one is available payload <-lock
conn -- Perform application specifc parsing of the payload value case fromJSON $pValue
payload of Success x -> sendEmail x -- Perform application specific processing Error err -> logErr err -- Remove the payload from future processingdequeue
conn $pId
payload
For a more complete example or a consumer, utilizing the provided
defaultMain
, see
EmailQueue
.
This modules provides two flavors of functions, a DB API and an IO API.
Most operations are provided in both flavors, with the exception of lock
.
lock
blocks and would not be that useful as part of a larger transaction
since it would keep the transaction open for a potentially long time. Although
both flavors are provided, in general one versions is more useful for typical
use cases.
- newtype PayloadId = PayloadId {
- unPayloadId :: Int64
- data State
- data Payload = Payload {}
- enqueueDB :: String -> Value -> DB PayloadId
- dequeueDB :: String -> DB (Maybe Payload)
- withPayloadDB :: String -> Int -> (Payload -> IO a) -> DB (Either SomeException (Maybe a))
- getCountDB :: String -> DB Int64
- enqueue :: String -> Connection -> Value -> IO PayloadId
- tryDequeue :: String -> Connection -> IO (Maybe Payload)
- dequeue :: String -> Connection -> IO Payload
- withPayload :: String -> Connection -> Int -> (Payload -> IO a) -> IO (Either SomeException a)
- getCount :: String -> Connection -> IO Int64
Types
Constructors
PayloadId | |
Fields
|
Constructors
Payload | |
DB API
enqueueDB :: String -> Value -> DB PayloadId Source #
Enqueue a new JSON value into the queue. This particularly function can be composed as part of a larger database transaction. For instance, a single transaction could create a user and enqueue a email message.
createAccount userRecord = dorunDBTSerializable
$ do createUserDB userRecordenqueueDB
$ makeVerificationEmail userRecord
IO API
enqueue :: String -> Connection -> Value -> IO PayloadId Source #
Enqueue a new JSON value into the queue. See enqueueDB
for a version
which can be composed with other queries in a single transaction.
tryDequeue :: String -> Connection -> IO (Maybe Payload) Source #
Return a the oldest Payload
in the Enqueued
state or Nothing
if there are no payloads. For a blocking version utilizing PostgreSQL's
NOTIFY and LISTEN, see dequeue
. This functions runs dequeueDb
as a
ReadCommitted
transaction.
See withPayload
for an alternative interface that will automatically return
the payload to the Enqueued
state if an exception occurs.
dequeue :: String -> Connection -> IO Payload Source #
Transition a Payload
to the Dequeued
state. his functions runs
dequeueDB
as a Serializable
transaction.
Arguments
:: String | |
-> Connection | |
-> Int | retry count |
-> (Payload -> IO a) | |
-> IO (Either SomeException a) |
getCount :: String -> Connection -> IO Int64 Source #
Get the number of rows in the Enqueued
state. This function runs
getCountDB
in a ReadCommitted
transaction.