Kafka

This consumer reads data from a kafka topic. It is based on the sarama library; most settings are mapped to the settings from this library.

Metadata

NOTE: The metadata will only set if the parameter `SetMetadata` is active.

topic

Contains the name of the kafka topic

key

Contains the key of the kafka message

Parameters

Enable (default: true)

Switches this plugin on or off.

Servers

Defines the list of all kafka brokers to initially connect to when querying topic metadata. This list requires at least one borker to work and ideally contains all the brokers in the cluster. By default this parameter is set to [“localhost:9092”].

Topic (default: default)

Defines the kafka topic to read from. By default this parameter is set to “default”.

ClientId

Sets the client id used in requests by this consumer. By default this parameter is set to “gollum”.

GroupId

Sets the consumer group of this consumer. If empty, consumer groups are not used. This setting requires Kafka version >= 0.9. By default this parameter is set to “”.

Version

Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.

SetMetadata (default: false)

When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.

DefaultOffset

Defines the initial offest when starting to read the topic. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. If GroupId is defined, this setting will only be used for the first request. By default this parameter is set to “newest”.

OffsetFile

Defines the path to a file that holds the current offset of a given partition. If the consumer is restarted, reading continues from that offset. To disable this setting, set it to “”. Please note that offsets stored in the file might be outdated. In that case DefaultOffset “oldest” will be used. By default this parameter is set to “”.

FolderPermissions (default: 0755)

Used to create the path to the offset file if necessary. By default this parameter is set to “0755”.

Ordered

Forces partitions to be read one-by-one in a round robin fashion instead of reading them all in parallel. Please note that this may restore the original ordering but does not necessarily do so. The term “ordered” refers to an ordered reading of all partitions, as opposed to reading them randomly. By default this parameter is set to false.

MaxOpenRequests

Defines the number of simultaneous connections to a broker at a time. By default this parameter is set to 5.

ServerTimeoutSec

Defines the time after which a connection will time out. By default this parameter is set to 30.

MaxFetchSizeByte

Sets the maximum size of a message to fetch. Larger messages will be ignored. When set to 0 size of the messages is ignored. By default this parameter is set to 0.

MinFetchSizeByte

Defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this parameter is set to 1.

DefaultFetchSizeByte

Defines the average amout of data to fetch per request. This value must be greater than 0. By default this parameter is set to 32768.

FetchTimeoutMs

Defines the time in milliseconds to wait on reaching MinFetchSizeByte before fetching new data regardless of size. By default this parameter is set to 250.

MessageBufferCount

Sets the internal channel size for the kafka client. By default this parameter is set to 8192.

PresistTimoutMs (default: 5000, unit: ms)

Defines the interval in milliseconds in which data is written to the OffsetFile. A short duration reduces the amount of duplicate messages after a crash but increases I/O. When using GroupId this setting controls the pause time after receiving errors. By default this parameter is set to 5000.

ElectRetries

Defines how many times to retry fetching the new master partition during a leader election. By default this parameter is set to 3.

ElectTimeoutMs

Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.

MetadataRefreshMs

Defines the interval in milliseconds used for fetching kafka metadata from the cluster (e.g. number of partitons). By default this parameter is set to 10000.

TlsEnable

Defines whether to use TLS based authentication when communicating with brokers. By default this parameter is set to false.

TlsKeyLocation

Defines the path to the client’s PEM-formatted private key used for TLS based authentication. By default this parameter is set to “”.

TlsCertificateLocation

Defines the path to the client’s PEM-formatted public key used for TLS based authentication. By default this parameter is set to “”.

TlsCaLocation

Defines the path to the CA certificate(s) for verifying a broker’s key when using TLS based authentication. By default this parameter is set to “”.

TlsServerName

Defines the expected hostname used by hostname verification when using TlsInsecureSkipVerify. By default this parameter is set to “”.

TlsInsecureSkipVerify

Enables verification of the server’s certificate chain and host name. By default this parameter is set to false.

SaslEnable

Defines whether to use SASL based authentication when communicating with brokers. By default this parameter is set to false.

SaslUsername

Defines the username for SASL/PLAIN authentication. By default this parameter is set to “gollum”.

SaslPassword

Defines the password for SASL/PLAIN authentication. By default this parameter is set to “”.

Parameters (from core.SimpleConsumer)

Streams

Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.

Modulators

Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.

ModulatorRoutines

Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0

ModulatorQueueSize

Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.

Examples

This config reads the topic “logs” from a cluster with 4 brokers.

kafkaIn:
  Type: consumer.Kafka
  Streams: logs
  Topic: logs
  ClientId: "gollum log reader"
  DefaultOffset: newest
  OffsetFile: /var/gollum/logs.offset
  Servers:
    - "kafka0:9092"
    - "kafka1:9092"
    - "kafka2:9092"
    - "kafka3:9092"