Kafka

This producer writes messages to a kafka cluster. This producer is backed by the sarama library (https://github.com/Shopify/sarama) so most settings directly relate to the settings of that library.

Parameters

Enable (default: true)

Switches this plugin on or off.

Servers

Defines a list of ideally all brokers in the cluster. At least one broker is required. By default this parameter is set to an empty list.

Version

Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.

Topics

Defines a stream to topic mapping. If a stream is not mapped the stream name is used as topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). By default this parameter is set to an empty list.

ClientId (default: gollum)

Sets the kafka client id used by this producer. By default this parameter is set to “gollum”.

Partitioner

Defines the distribution algorithm to use. Valid values are: Random, Roundrobin and Hash. By default this parameter is set to “Roundrobin”.

PartitionHasher

Defines the hash algorithm to use when Partitioner is set to “Hash”. Accepted values are “fnv1-a” and “murmur2”.

KeyFrom

Defines the metadata field that contains the string to be used as the key passed to kafka. When set to an empty string no key is used. By default this parameter is set to “”.

Compression

Defines the compression algorithm to use. Possible values are “none”, “zip” and “snappy”. By default this parameter is set to “none”.

RequiredAcks

Defines the numbers of acknowledgements required until a message is marked as “sent”. When set to -1 all replicas must acknowledge a message. By default this parameter is set to 1.

TimeoutMs

Denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this parameter is set to 10000.

GracePeriodMs (default: 100, unit: ms)

Defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is sent to the fallback. This setting mitigates a conceptual problem in the saram API which can lead to long blocking times during startup. By default this parameter is set to 100.

MaxOpenRequests

Defines the maximum number of simultaneous connections opened to a single broker at a time. By default this parameter is set to 5.

ServerTimeoutSec

Defines the time after which a connection is set to timed out. By default this parameter is set to 30.

SendTimeoutMs

Defines the number of milliseconds to wait for a broker to before marking a message as timed out. By default this parameter is set to 250.

SendRetries

Defines how many times a message should be send again before a broker is marked as not reachable. Please note that this setting should never be 0. See https://github.com/Shopify/sarama/issues/294. By default this parameter is set to 1.

AllowNilValue (default: false)

When enabled messages containing an empty or nil payload will not be rejected. By default this parameter is set to false.

Batch/MinCount

Sets the minimum number of messages required to send a request. By default this parameter is set to 1.

Batch/MaxCount

Defines the maximum number of messages bufferd before a request is sent. A value of 0 will remove this limit. By default this parameter is set to 0.

Batch/MinSizeByte

Defines the minimum number of bytes to buffer before sending a request. By default this parameter is set to 8192.

Batch/SizeMaxKB

Defines the maximum allowed message size in KB. Messages bigger than this limit will be rejected. By default this parameter is set to 1024.

Batch/TimeoutMs

Defines the maximum time in milliseconds after which a new request will be sent, ignoring of Batch/MinCount and Batch/MinSizeByte By default this parameter is set to 3.

ElectRetries

Defines how many times a metadata request is to be retried during a leader election phase. By default this parameter is set to 3.

ElectTimeoutMs

Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

MetadataRefreshMs

Defines the interval in milliseconds for refetching cluster metadata. By default this parameter is set to 600000.

TlsEnable

Enables TLS communication with brokers. By default this parameter is set to false.

TlsKeyLocation

Path to the client’s private key (PEM) used for TLS based authentication. By default this parameter is set to “”.

TlsCertificateLocation

Path to the client’s public key (PEM) used for TLS based authentication. By default this parameter is set to “”.

TlsCaLocation

Path to the CA certificate(s) used for verifying the broker’s key. By default this parameter is set to “”.

TlsServerName

Used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. By default this parameter is set to “”.

TlsInsecureSkipVerify

Enables server certificate chain and host name verification. By default this parameter is set to false.

SaslEnable

Enables SASL based authentication. By default this parameter is set to false.

SaslUsername

Sets the user name used for SASL/PLAIN authentication. By default this parameter is set to “”.

SaslPassword

Sets the password used for SASL/PLAIN authentication. By default this parameter is set to “”. MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.

Parameters (from core.BufferedProducer)

Channel

This value defines the capacity of the message buffer. By default this parameter is set to “8192”.

ChannelTimeoutMs (default: 0, unit: ms)

This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.

Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.

Examples

kafkaWriter:
  Type: producer.Kafka
  Streams: logs
  Compression: zip
  Servers:
    - "kafka01:9092"
    - "kafka02:9092"
    - "kafka03:9092"
    - "kafka04:9092"