ElasticSearch

The ElasticSearch producer sends messages to elastic search using the bulk http API. The producer expects a json payload.

Parameters

Enable (default: true)

Switches this plugin on or off.

Retry/Count

Set the amount of retries before a Elasticsearch request fail finally. By default this parameter is set to “3”.

Retry/TimeToWaitSec

This value denotes the time in seconds after which a failed dataset will be transmitted again. By default this parameter is set to “3”.

SetGzip

This value enables or disables gzip compression for Elasticsearch requests (disabled by default). This option is used one to one for the library package. See http://godoc.org/gopkg.in/olivere/elastic.v5#SetGzip By default this parameter is set to “false”.

Servers

This value defines a list of servers to connect to.

User

This value used as the username for the elasticsearch server. By default this parameter is set to “”.

Password

This value used as the password for the elasticsearch server. By default this parameter is set to “”.

StreamProperties

This value defines the mapping and settings for each stream. As index use the stream name here.

StreamProperties/<streamName>/Index

The value defines the Elasticsearch index used for the stream.

StreamProperties/<streamName>/Type

This value defines the document type used for the stream.

StreamProperties/<streamName>/TimeBasedIndex

This value can be set to “true” to append the date of the message to the index as in “<index>_<TimeBasedFormat>”. NOTE: This setting incurs a performance penalty because it is necessary to check if an index exists for each message! By default this parameter is set to “false”.

StreamProperties/<streamName>/TimeBasedFormat

This value can be set to a valid go time format string to be used with DayBasedIndex. By default this parameter is set to “2006-01-02”.

StreamProperties/<streamName>/Mapping

This value is a map which is used for the document field mapping. As document type, the already defined type is reused for the field mapping. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings

StreamProperties/<streamName>/Settings

Parameters (from core.BatchedProducer)

Batch/MaxCount (default: 8192)

Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.

Batch/FlushCount (default: 4096)

Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.

Batch/TimeoutSec (default: 5, unit: sec)

Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.

Parameters (from core.SimpleProducer)

Streams

Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.

FallbackStream

Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.

ShutdownTimeoutMs (default: 1000, unit: ms)

Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.

Modulators

Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.

Examples

This example starts a simple twitter example producer for local running ElasticSearch:

producerElasticSearch:
  Type: producer.ElasticSearch
  Streams: tweets_stream
  SetGzip: true
  Servers:
    - http://127.0.0.1:9200
  StreamProperties:
    tweets_stream:
      Index: twitter
      DayBasedIndex: true
      Type: tweet
      Mapping:
        # index mapping for payload
        user: keyword
        message: text
      Settings:
        number_of_shards: 1
        number_of_replicas: 1