Kafka¶
This consumer reads data from a kafka topic. It is based on the sarama library; most settings are mapped to the settings from this library.
Metadata¶
NOTE: The metadata will only set if the parameter `SetMetadata` is active.
topic
Contains the name of the kafka topic
key
Contains the key of the kafka message
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Servers
Defines the list of all kafka brokers to initially connect to when querying topic metadata. This list requires at least one borker to work and ideally contains all the brokers in the cluster. By default this parameter is set to [“localhost:9092”].
Topic (default: default)
Defines the kafka topic to read from. By default this parameter is set to “default”.
ClientId
Sets the client id used in requests by this consumer. By default this parameter is set to “gollum”.
GroupId
Sets the consumer group of this consumer. If empty, consumer groups are not used. This setting requires Kafka version >= 0.9. By default this parameter is set to “”.
Version
Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.
SetMetadata (default: false)
When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
DefaultOffset
Defines the initial offest when starting to read the topic. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. If GroupId is defined, this setting will only be used for the first request. By default this parameter is set to “newest”.
OffsetFile
Defines the path to a file that holds the current offset of a given partition. If the consumer is restarted, reading continues from that offset. To disable this setting, set it to “”. Please note that offsets stored in the file might be outdated. In that case DefaultOffset “oldest” will be used. By default this parameter is set to “”.
FolderPermissions (default: 0755)
Used to create the path to the offset file if necessary. By default this parameter is set to “0755”.
Ordered
Forces partitions to be read one-by-one in a round robin fashion instead of reading them all in parallel. Please note that this may restore the original ordering but does not necessarily do so. The term “ordered” refers to an ordered reading of all partitions, as opposed to reading them randomly. By default this parameter is set to false.
MaxOpenRequests
Defines the number of simultaneous connections to a broker at a time. By default this parameter is set to 5.
ServerTimeoutSec
Defines the time after which a connection will time out. By default this parameter is set to 30.
MaxFetchSizeByte
Sets the maximum size of a message to fetch. Larger messages will be ignored. When set to 0 size of the messages is ignored. By default this parameter is set to 0.
MinFetchSizeByte
Defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this parameter is set to 1.
DefaultFetchSizeByte
Defines the average amout of data to fetch per request. This value must be greater than 0. By default this parameter is set to 32768.
FetchTimeoutMs
Defines the time in milliseconds to wait on reaching MinFetchSizeByte before fetching new data regardless of size. By default this parameter is set to 250.
MessageBufferCount
Sets the internal channel size for the kafka client. By default this parameter is set to 8192.
PresistTimoutMs (default: 5000, unit: ms)
Defines the interval in milliseconds in which data is written to the OffsetFile. A short duration reduces the amount of duplicate messages after a crash but increases I/O. When using GroupId this setting controls the pause time after receiving errors. By default this parameter is set to 5000.
ElectRetries
Defines how many times to retry fetching the new master partition during a leader election. By default this parameter is set to 3.
ElectTimeoutMs
Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.
MetadataRefreshMs
Defines the interval in milliseconds used for fetching kafka metadata from the cluster (e.g. number of partitons). By default this parameter is set to 10000.
TlsEnable
Defines whether to use TLS based authentication when communicating with brokers. By default this parameter is set to false.
TlsKeyLocation
Defines the path to the client’s PEM-formatted private key used for TLS based authentication. By default this parameter is set to “”.
TlsCertificateLocation
Defines the path to the client’s PEM-formatted public key used for TLS based authentication. By default this parameter is set to “”.
TlsCaLocation
Defines the path to the CA certificate(s) for verifying a broker’s key when using TLS based authentication. By default this parameter is set to “”.
TlsServerName
Defines the expected hostname used by hostname verification when using TlsInsecureSkipVerify. By default this parameter is set to “”.
TlsInsecureSkipVerify
Enables verification of the server’s certificate chain and host name. By default this parameter is set to false.
SaslEnable
Defines whether to use SASL based authentication when communicating with brokers. By default this parameter is set to false.
SaslUsername
Defines the username for SASL/PLAIN authentication. By default this parameter is set to “gollum”.
SaslPassword
Defines the password for SASL/PLAIN authentication. By default this parameter is set to “”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This config reads the topic “logs” from a cluster with 4 brokers.
kafkaIn:
Type: consumer.Kafka
Streams: logs
Topic: logs
ClientId: "gollum log reader"
DefaultOffset: newest
OffsetFile: /var/gollum/logs.offset
Servers:
- "kafka0:9092"
- "kafka1:9092"
- "kafka2:9092"
- "kafka3:9092"