Safe Haskell | None |
---|---|
Language | Haskell2010 |
Kafka.Producer.Sync
Contents
Description
This module provides a synchronous interface on top of the hw-kafka-client
It works by using MVars managed in two different queues. Each request is sent as soon as there are no other effectively equal Kafka records in-flight. This is done in order to make sure that there is no ambiguity as to which MVar to resolve.
Currently, this implements fair sending. For all requests, the oldest pending request should be sent first.
Synopsis
- data SyncKafkaProducer
- newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer)
- produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ())
- data ProducerRecord = ProducerRecord {}
- newtype TopicName = TopicName {
- unTopicName :: Text
- data ProducePartition
- data KafkaError
- data ProducerProperties = ProducerProperties {
- ppKafkaProps :: Map Text Text
- ppTopicProps :: Map Text Text
- ppLogLevel :: Maybe KafkaLogLevel
- ppCallbacks :: [KafkaConf -> IO ()]
- brokersList :: [BrokerAddress] -> ProducerProperties
- logLevel :: KafkaLogLevel -> ProducerProperties
- compression :: KafkaCompressionCodec -> ProducerProperties
- topicCompression :: KafkaCompressionCodec -> ProducerProperties
- sendTimeout :: Timeout -> ProducerProperties
- extraProps :: Map Text Text -> ProducerProperties
- suppressDisconnectLogs :: ProducerProperties
- extraTopicProps :: Map Text Text -> ProducerProperties
- debugOptions :: [KafkaDebug] -> ProducerProperties
- newtype BrokerAddress = BrokerAddress {}
- data KafkaCompressionCodec
- = NoCompression
- | Gzip
- | Snappy
- | Lz4
- data KafkaDebug
- data KafkaLogLevel
- newtype Timeout = Timeout {}
Sync producer
data SyncKafkaProducer Source #
A producer for sending messages to Kafka and waiting for the DeliveryReport
newSyncProducer :: MonadIO m => ProducerProperties -> m (Either KafkaError SyncKafkaProducer) Source #
Create a new SyncKafkaProducer
Note: since this library wraps the regular hw-kafka-client, please be aware that you should not set the delivery report callback. As it is set internally.
produceRecord :: MonadIO m => SyncKafkaProducer -> ProducerRecord -> m (Either KafkaError ()) Source #
Re-exports
Record datatypes
data ProducerRecord #
Constructors
ProducerRecord | |
Fields
|
Instances
Constructors
TopicName | |
Fields
|
Instances
Eq TopicName | |
Ord TopicName | |
Read TopicName | |
Show TopicName | |
Generic TopicName | |
type Rep TopicName | |
Defined in Kafka.Types |
data ProducePartition #
Constructors
SpecifiedPartition !Int | |
UnassignedPartition |
Instances
Errors
data KafkaError #
Constructors
KafkaError Text | |
KafkaInvalidReturnValue | |
KafkaBadSpecification Text | |
KafkaResponseError RdKafkaRespErrT | |
KafkaInvalidConfigurationValue Text | |
KafkaUnknownConfigurationKey Text | |
KafkaBadConfiguration |
Instances
Producer configuration
data ProducerProperties #
Constructors
ProducerProperties | |
Fields
|
Instances
Semigroup ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods (<>) :: ProducerProperties -> ProducerProperties -> ProducerProperties # sconcat :: NonEmpty ProducerProperties -> ProducerProperties # stimes :: Integral b => b -> ProducerProperties -> ProducerProperties # | |
Monoid ProducerProperties | |
Defined in Kafka.Producer.ProducerProperties Methods mempty :: ProducerProperties # mappend :: ProducerProperties -> ProducerProperties -> ProducerProperties # mconcat :: [ProducerProperties] -> ProducerProperties # |
Configuration helpers
brokersList :: [BrokerAddress] -> ProducerProperties #
Set brokers for producer
Set log-level for producer
Set compression level for producer
Set topic compression for producer
Set send timeout for producer
extraProps :: Map Text Text -> ProducerProperties #
Set extra properties for producer
Suppress disconnect log lines
Configure extra topic properties
debugOptions :: [KafkaDebug] -> ProducerProperties #
Add KafkaDebug
options
Other datatypes
newtype BrokerAddress #
Constructors
BrokerAddress | |
Fields |
Instances
Eq BrokerAddress | |
Defined in Kafka.Types Methods (==) :: BrokerAddress -> BrokerAddress -> Bool # (/=) :: BrokerAddress -> BrokerAddress -> Bool # | |
Show BrokerAddress | |
Defined in Kafka.Types Methods showsPrec :: Int -> BrokerAddress -> ShowS # show :: BrokerAddress -> String # showList :: [BrokerAddress] -> ShowS # | |
Generic BrokerAddress | |
Defined in Kafka.Types Associated Types type Rep BrokerAddress :: Type -> Type # | |
type Rep BrokerAddress | |
Defined in Kafka.Types type Rep BrokerAddress = D1 (MetaData "BrokerAddress" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" True) (C1 (MetaCons "BrokerAddress" PrefixI True) (S1 (MetaSel (Just "unBrokerAddress") NoSourceUnpackedness NoSourceStrictness DecidedLazy) (Rec0 Text))) |
data KafkaCompressionCodec #
Constructors
NoCompression | |
Gzip | |
Snappy | |
Lz4 |
Instances
data KafkaDebug #
Constructors
DebugGeneric | |
DebugBroker | |
DebugTopic | |
DebugMetadata | |
DebugQueue | |
DebugMsg | |
DebugProtocol | |
DebugCgrp | |
DebugSecurity | |
DebugFetch | |
DebugFeature | |
DebugAll |
Instances
Eq KafkaDebug | |
Defined in Kafka.Types | |
Show KafkaDebug | |
Defined in Kafka.Types Methods showsPrec :: Int -> KafkaDebug -> ShowS # show :: KafkaDebug -> String # showList :: [KafkaDebug] -> ShowS # | |
Generic KafkaDebug | |
Defined in Kafka.Types Associated Types type Rep KafkaDebug :: Type -> Type # | |
type Rep KafkaDebug | |
Defined in Kafka.Types type Rep KafkaDebug = D1 (MetaData "KafkaDebug" "Kafka.Types" "hw-kafka-client-3.0.0-f3f0ff4ee651ea9a164f922cc2bc7402c8529ca1a9367543bbc14e3ba60051d1" False) (((C1 (MetaCons "DebugGeneric" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugBroker" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugTopic" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugMetadata" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugQueue" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugMsg" PrefixI False) (U1 :: Type -> Type)))) :+: ((C1 (MetaCons "DebugProtocol" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugCgrp" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugSecurity" PrefixI False) (U1 :: Type -> Type))) :+: (C1 (MetaCons "DebugFetch" PrefixI False) (U1 :: Type -> Type) :+: (C1 (MetaCons "DebugFeature" PrefixI False) (U1 :: Type -> Type) :+: C1 (MetaCons "DebugAll" PrefixI False) (U1 :: Type -> Type))))) |
data KafkaLogLevel #
Constructors
KafkaLogEmerg | |
KafkaLogAlert | |
KafkaLogCrit | |
KafkaLogErr | |
KafkaLogWarning | |
KafkaLogNotice | |
KafkaLogInfo | |
KafkaLogDebug |
Instances
Enum KafkaLogLevel | |
Defined in Kafka.Types Methods succ :: KafkaLogLevel -> KafkaLogLevel # pred :: KafkaLogLevel -> KafkaLogLevel # toEnum :: Int -> KafkaLogLevel # fromEnum :: KafkaLogLevel -> Int # enumFrom :: KafkaLogLevel -> [KafkaLogLevel] # enumFromThen :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromTo :: KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # enumFromThenTo :: KafkaLogLevel -> KafkaLogLevel -> KafkaLogLevel -> [KafkaLogLevel] # | |
Eq KafkaLogLevel | |
Defined in Kafka.Types Methods (==) :: KafkaLogLevel -> KafkaLogLevel -> Bool # (/=) :: KafkaLogLevel -> KafkaLogLevel -> Bool # | |
Show KafkaLogLevel | |
Defined in Kafka.Types Methods showsPrec :: Int -> KafkaLogLevel -> ShowS # show :: KafkaLogLevel -> String # showList :: [KafkaLogLevel] -> ShowS # |