KafkaProducer

NOTICE: This producer is not included in standard builds. To enable it you need to trigger a custom build with native plugins enabled. The kafka producer writes messages to a kafka cluster. This producer is backed by the native librdkafka (0.8.6) library so most settings relate to that library. This producer does not implement a fuse breaker.

Parameters

Enable
Enable switches the consumer on or off. By default this value is set to true.
ID
ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
Channel
Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
ChannelTimeoutMs
ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
ShutdownTimeoutMs
ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
Stream
Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
DropToStream
DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
Formatter
Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
Filter
Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
Fuse
Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
FuseTimeoutSec
FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
SendRetries
SendRetries is mapped to message.send.max.retries. This defines the number of times librdkafka will try to re-send a message if it did not succeed. Set to 0 by default (don’t retry).
Compression
Compression is mapped to compression.codec. Please note that “zip” has to be used instead of “gzip”. Possible values are “none”, “zip” and “snappy”. By default this is set to “none”.
TimeoutMs
TimeoutMs is mapped to request.timeout.ms. This defines the number of milliseconds to wait until a request is marked as failed. By default this is set to 1.5sec.
BatchSizeMaxKB
BatchSizeMaxKB is mapped to message.max.bytes (x1024). This defines the maximum message size in KB. By default this is set to 1 MB. Messages above this size are rejected.
BatchMaxMessages
BatchMaxMessages is mapped to queue.buffering.max.messages. This defines the maximum number of messages that can be pending at any given moment in time. If this limit is hit additional messages will be rejected. This value is set to 100.000 by default and should be adjusted according to your average message throughput.
BatchMinMessages
BatchMinMessages is mapped to batch.num.messages. This defines the minimum number of messages required for a batch to be sent. This is set to 1000 by default and should be significantly lower than BatchMaxMessages to avoid messages to be rejected.
BatchTimeoutMs
BatchTimeoutMs is mapped to queue.buffering.max.ms. This defines the number of milliseconds to wait until a batch is flushed to kafka. Set to 1sec by default.
ServerTimeoutSec
ServerTimeoutSec is mapped to socket.timeout.ms. Defines the time in seconds after a server is defined as “not reachable”. Set to 1 minute by default.
ServerMaxFails
ServerMaxFails is mapped to socket.max.fails. Number of retries after a server is marked as “failing”.
MetadataTimeoutMs
MetadataTimeoutMs is mapped to metadata.request.timeout.ms. Number of milliseconds a metadata request may take until considered as failed. Set to 1.5 seconds by default.
MetadataRefreshMs
MetadataRefreshMs is mapped to topic.metadata.refresh.interval.ms. Interval in milliseconds for querying metadata. Set to 5 minutes by default.
SecurityProtocol
SecurityProtocol is mapped to security.protocol. Protocol used to communicate with brokers. Set to plaintext by default.
SslCipherSuites
SslCipherSuites is mapped to ssl.cipher.suites. Cipher Suites to use when connection via TLS/SSL. Not set by default.
SslKeyLocation
SslKeyLocation is mapped to ssl.key.location. Path to client’s private key (PEM) for used for authentication. Not set by default.
SslKeyPassword
SslKeyPassword is mapped to ssl.key.password. Private key passphrase. Not set by default.
SslCertificateLocation
SslCertificateLocation is mapped to ssl.certificate.location. Path to client’s public key (PEM) used for authentication. Not set by default.
SslCaLocation
SslCaLocation is mapped to ssl.ca.location. File or directory path to CA certificate(s) for verifying the broker’s key. Not set by default.
SslCrlLocation
SslCrlLocation is mapped to ssl.crl.location. Path to CRL for verifying broker’s certificate validity. Not set by default.
SaslMechanism
SaslMechanism is mapped to sasl.mechanisms. SASL mechanism to use for authentication. Not set by default.
SaslUsername
SaslUsername is mapped to sasl.username. SASL username for use with the PLAIN mechanism. Not set by default.
SaslPassword
SaslPassword is mapped to sasl.password. SASL password for use with the PLAIN mechanism. Not set by default.
Servers
Servers defines the list of brokers to produce messages to.
Topic
Topic defines a stream to topic mapping. If a stream is not mapped a topic named like the stream is assumed.
KeyFormatter
KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.
KeyFormatterFirst
KeyFormatterFirst can be set to true to apply the key formatter to the unformatted message. By default this is set to false, so that key formatter uses the message after Formatter has been applied. KeyFormatter does never affect the payload of the message sent to kafka.
FilterAfterFormat
FilterAfterFormat behaves like Filter but allows filters to be executed after the formatter has run. By default no such filter is set.

Example

- "native.KafkaProducer":
    Enable: true
    ID: ""
    Channel: 8192
    ChannelTimeoutMs: 0
    ShutdownTimeoutMs: 3000
    Formatter: "format.Forward"
    Filter: "filter.All"
    DropToStream: "_DROPPED_"
    Fuse: ""
    FuseTimeoutSec: 5
    Stream:
        - "foo"
        - "bar"
    ClientId: "weblog"
    RequiredAcks: 1
    TimeoutMs: 1500
    SendRetries: 0
    Compression: "none"
    BatchSizeMaxKB: 1024
    BatchMaxMessages: 100000
    BatchMinMessages: 1000
    BatchTimeoutMs: 1000
    ServerTimeoutSec: 60
    ServerMaxFails: 3
    MetadataTimeoutMs: 1500
    MetadataRefreshMs: 300000
    SecurityProtocol: "plaintext"
    SslCipherSuites: ""
    SslKeyLocation: ""
    SslKeyPassword: ""
    SslCertificateLocation: ""
    SslCaLocation: ""
    SslCrlLocation: ""
    SaslMechanism: ""
    SaslUsername: ""
    SaslPassword: ""
    KeyFormatter: ""
    Servers:
        - "localhost:9092"
    Topic:
        "console" : "console"