Kafka¶
The kafka producer writes messages to a kafka cluster. This producer is backed by the sarama library so most settings relate to that library. This producer uses a fuse breaker if any connection reports an error.
Parameters¶
- Enable
- Enable switches the consumer on or off. By default this value is set to true.
- ID
- ID allows this producer to be found by other plugins by name. By default this is set to “” which does not register this producer.
- Channel
- Channel sets the size of the channel used to communicate messages. By default this value is set to 8192.
- ChannelTimeoutMs
- ChannelTimeoutMs sets a timeout in milliseconds for messages to wait if this producer’s queue is full. A timeout of -1 or lower will drop the message without notice. A timeout of 0 will block until the queue is free. This is the default. A timeout of 1 or higher will wait x milliseconds for the queues to become available again. If this does not happen, the message will be send to the retry channel.
- ShutdownTimeoutMs
- ShutdownTimeoutMs sets a timeout in milliseconds that will be used to detect a blocking producer during shutdown. By default this is set to 3 seconds. If processing a message takes longer to process than this duration, messages will be dropped during shutdown.
- Stream
- Stream contains either a single string or a list of strings defining the message channels this producer will consume. By default this is set to “*” which means “listen to all streams but the internal”.
- DropToStream
- DropToStream defines the stream used for messages that are dropped after a timeout (see ChannelTimeoutMs). By default this is _DROPPED_.
- Formatter
- Formatter sets a formatter to use. Each formatter has its own set of options which can be set here, too. By default this is set to format.Forward. Each producer decides if and when to use a Formatter.
- Filter
- Filter sets a filter that is applied before formatting, i.e. before a message is send to the message queue. If a producer requires filtering after formatting it has to define a separate filter as the producer decides if and where to format.
- Fuse
- Fuse defines the name of a fuse to burn if e.g. the producer encounters a lost connection. Each producer defines its own fuse breaking logic if necessary / applyable. Disable fuse behavior for a producer by setting an empty name or a FuseTimeoutSec <= 0. By default this is set to “”.
- FuseTimeoutSec
- FuseTimeoutSec defines the interval in seconds used to check if the fuse can be recovered. Note that automatic fuse recovery logic depends on each producer’s implementation. By default this setting is set to 10.
- ClientId
- ClientId sets the client id of this producer. By default this is “gollum”.
- Version
- Version defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. Defaults to “0.8.2”. If the version given is not known, the closest possible version is chosen.
- Partitioner
- Partitioner sets the distribution algorithm to use. Valid values are: “Random”,”Roundrobin” and “Hash”. By default “Roundrobin” is set.
- KeyFormatter
- KeyFormatter can define a formatter that extracts the key for a kafka message from the message payload. By default this is an empty string, which disables this feature. A good formatter for this can be format.Identifier.
- KeyFormatterFirst
- KeyFormatterFirst can be set to true to apply the key formatter to the unformatted message. By default this is set to false, so that key formatter uses the message after Formatter has been applied. KeyFormatter does never affect the payload of the message sent to kafka.
- FilterAfterFormat
- FilterAfterFormat behaves like Filter but allows filters to be executed after the formatter has run. By default no such filter is set.
- RequiredAcks
- RequiredAcks defines the acknowledgment level required by the broker. 0 = No responses required. 1 = wait for the local commit. -1 = wait for all replicas to commit. >1 = wait for a specific number of commits. By default this is set to 1.
- TimeoutMs
- TimeoutMs denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this is set to 10 seconds.
- SendRetries
- SendRetries defines how many times to retry sending data before marking a server as not reachable. By default this is set to 1.
- Compression
- Compression sets the method of compression to use. Valid values are: “None”,”Zip” and “Snappy”. By default “None” is set.
- MaxOpenRequests
- MaxOpenRequests defines the number of simultaneous connections are allowed. By default this is set to 5.
- BatchMinCount
- BatchMinCount sets the minimum number of messages required to trigger a flush. By default this is set to 1.
- BatchMaxCount
- BatchMaxCount defines the maximum number of messages processed per request. By default this is set to 0 for “unlimited”.
- BatchSizeByte
- BatchSizeByte sets the minimum number of bytes to collect before a new flush is triggered. By default this is set to 8192.
- BatchSizeMaxKB
- BatchSizeMaxKB defines the maximum allowed message size. By default this is set to 1024.
- BatchTimeoutMs
- BatchTimeoutMs sets the minimum time in milliseconds to pass after which a new flush will be triggered. By default this is set to 3.
- MessageBufferCount
- MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
- ServerTimeoutSec
- ServerTimeoutSec defines the time after which a connection is set to timed out. By default this is set to 30 seconds.
- SendTimeoutMs
- SendTimeoutMs defines the number of milliseconds to wait for a server to resond before triggering a timeout. Defaults to 250.
- ElectRetries
- ElectRetries defines how many times to retry during a leader election. By default this is set to 3.
- ElectTimeoutMs
- ElectTimeoutMs defines the number of milliseconds to wait for the cluster to elect a new leader. Defaults to 250.
- GracePeriodMs
- GracePeriodMs defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is dropped. By default this is set to 100ms.
- MetadataRefreshMs
- MetadataRefreshMs set the interval in seconds for fetching cluster metadata. By default this is set to 600000 (10 minutes). This corresponds to the JVM setting topic.metadata.refresh.interval.ms.
- TlsEnable
- TlsEnable defines whether to use TLS to communicate with brokers. Defaults to false.
- TlsKeyLocation
- TlsKeyLocation defines the path to the client’s private key (PEM) for used for authentication. Defaults to “”.
- TlsCertificateLocation
- TlsCertificateLocation defines the path to the client’s public key (PEM) used for authentication. Defaults to “”.
- TlsCaLocation
- TlsCaLocation defines the path to CA certificate(s) for verifying the broker’s key. Defaults to “”.
- TlsServerName
- TlsServerName is used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. Defaults to “”.
- TlsInsecureSkipVerify
- TlsInsecureSkipVerify controls whether to verify the server’s certificate chain and host name. Defaults to false.
- SaslEnable
- SaslEnable is whether to use SASL for authentication. Defaults to false.
- SaslUsername
- SaslUsername is the user for SASL/PLAIN authentication. Defaults to “gollum”.
- SaslPassword
- SaslPassword is the password for SASL/PLAIN authentication. Defaults to “”.
- Servers
- Servers contains the list of all kafka servers to connect to.
- By default this is set to contain only “localhost:9092”.
- Topic
- Topic maps a stream to a specific kafka topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). If no topic mappings are set the stream names will be used as topic.
Example¶
- "producer.Kafka":
Enable: true
ID: ""
Channel: 8192
ChannelTimeoutMs: 0
ShutdownTimeoutMs: 3000
Formatter: "format.Forward"
Filter: "filter.All"
DropToStream: "_DROPPED_"
Fuse: ""
FuseTimeoutSec: 5
Stream:
- "foo"
- "bar"
ClientId: "gollum"
Version: "0.8.2"
Partitioner: "Roundrobin"
RequiredAcks: 1
TimeoutMs: 1500
GracePeriodMs: 10
SendRetries: 0
Compression: "None"
MaxOpenRequests: 5
MessageBufferCount: 256
BatchMinCount: 1
BatchMaxCount: 0
BatchSizeByte: 8192
BatchSizeMaxKB: 1024
BatchTimeoutMs: 3000
ServerTimeoutSec: 30
SendTimeoutMs: 250
ElectRetries: 3
ElectTimeoutMs: 250
MetadataRefreshMs: 10000
TlsEnabled: true
TlsKeyLocation: ""
TlsCertificateLocation: ""
TlsCaLocation: ""
TlsServerName: ""
TlsInsecureSkipVerify: false
SaslEnabled: false
SaslUsername: "gollum"
SaslPassword: ""
KeyFormatter: ""
KeyFormatterFirst: false
Servers:
- "localhost:9092"
Topic:
"console" : "console"