Kafka¶
This producer writes messages to a kafka cluster. This producer is backed by the sarama library (https://github.com/Shopify/sarama) so most settings directly relate to the settings of that library.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Servers
Defines a list of ideally all brokers in the cluster. At least one broker is required. By default this parameter is set to an empty list.
Version
Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.
Topics
Defines a stream to topic mapping. If a stream is not mapped the stream name is used as topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). By default this parameter is set to an empty list.
ClientId (default: gollum)
Sets the kafka client id used by this producer. By default this parameter is set to “gollum”.
Partitioner
Defines the distribution algorithm to use. Valid values are: Random, Roundrobin and Hash. By default this parameter is set to “Roundrobin”.
PartitionHasher
Defines the hash algorithm to use when Partitioner is set to “Hash”. Accepted values are “fnv1-a” and “murmur2”.
KeyFrom
Defines the metadata field that contains the string to be used as the key passed to kafka. When set to an empty string no key is used. By default this parameter is set to “”.
Compression
Defines the compression algorithm to use. Possible values are “none”, “zip” and “snappy”. By default this parameter is set to “none”.
RequiredAcks
Defines the numbers of acknowledgements required until a message is marked as “sent”. When set to -1 all replicas must acknowledge a message. By default this parameter is set to 1.
TimeoutMs
Denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this parameter is set to 10000.
GracePeriodMs (default: 100, unit: ms)
Defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is sent to the fallback. This setting mitigates a conceptual problem in the saram API which can lead to long blocking times during startup. By default this parameter is set to 100.
MaxOpenRequests
Defines the maximum number of simultaneous connections opened to a single broker at a time. By default this parameter is set to 5.
ServerTimeoutSec
Defines the time after which a connection is set to timed out. By default this parameter is set to 30.
SendTimeoutMs
Defines the number of milliseconds to wait for a broker to before marking a message as timed out. By default this parameter is set to 250.
SendRetries
Defines how many times a message should be send again before a broker is marked as not reachable. Please note that this setting should never be 0. See https://github.com/Shopify/sarama/issues/294. By default this parameter is set to 1.
AllowNilValue (default: false)
When enabled messages containing an empty or nil payload will not be rejected. By default this parameter is set to false.
Batch/MinCount
Sets the minimum number of messages required to send a request. By default this parameter is set to 1.
Batch/MaxCount
Defines the maximum number of messages bufferd before a request is sent. A value of 0 will remove this limit. By default this parameter is set to 0.
Batch/MinSizeByte
Defines the minimum number of bytes to buffer before sending a request. By default this parameter is set to 8192.
Batch/SizeMaxKB
Defines the maximum allowed message size in KB. Messages bigger than this limit will be rejected. By default this parameter is set to 1024.
Batch/TimeoutMs
Defines the maximum time in milliseconds after which a new request will be sent, ignoring of Batch/MinCount and Batch/MinSizeByte By default this parameter is set to 3.
ElectRetries
Defines how many times a metadata request is to be retried during a leader election phase. By default this parameter is set to 3.
ElectTimeoutMs
Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.
MetadataRefreshMs
Defines the interval in milliseconds for refetching cluster metadata. By default this parameter is set to 600000.
TlsEnable
Enables TLS communication with brokers. By default this parameter is set to false.
TlsKeyLocation
Path to the client’s private key (PEM) used for TLS based authentication. By default this parameter is set to “”.
TlsCertificateLocation
Path to the client’s public key (PEM) used for TLS based authentication. By default this parameter is set to “”.
TlsCaLocation
Path to the CA certificate(s) used for verifying the broker’s key. By default this parameter is set to “”.
TlsServerName
Used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. By default this parameter is set to “”.
TlsInsecureSkipVerify
Enables server certificate chain and host name verification. By default this parameter is set to false.
SaslEnable
Enables SASL based authentication. By default this parameter is set to false.
SaslUsername
Sets the user name used for SASL/PLAIN authentication. By default this parameter is set to “”.
SaslPassword
Sets the password used for SASL/PLAIN authentication. By default this parameter is set to “”. MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
kafkaWriter:
Type: producer.Kafka
Streams: logs
Compression: zip
Servers:
- "kafka01:9092"
- "kafka02:9092"
- "kafka03:9092"
- "kafka04:9092"