Welcome to Gollum’s documentation!¶

What is Gollum?¶
Gollum is an n:m multiplexer that gathers messages from different sources and broadcasts them to a set of destinations.
Gollum originally started as a tool to MUL-tiplex LOG-files (read it backwards to get the name). It quickly evolved to a one-way router for all kinds of messages, not limited to just logs. Gollum is written in Go to make it scalable and easy to extend without the need to use a scripting language.
Terminology¶
The main components of Gollum are consumers, streams and producers. To explain these it helps imagineing to look at Gollum “from the outside”.
Message: | A single set of data passing over a stream is called a message. |
---|---|
Metadata: | A optional part of messages. These can contain key/value pairs with additional information or content. |
Stream: | A stream defines a path between one or more consumers, routers and producers. |
Consumer: | The consumer create messages by “consuming” a specific data source. This can be everything like files, ports, external services and so on. |
Producer: | The producer processed receiving message and “produce” something with it. That can be writing to files or ports, sending to external services and so on. |
Router: | The router get and forward messages from specific source- to target-stream(s). |
Modulator: | A modulator can be a Filter or Formatter which “modulates” a message. |
Formatter: | A formatter can modulate the payload of a message like convert a plain-text to JSON. |
Filter: | A filter can inspect a message to decide wether to drop the message or to let it pass. |

These main components, consumers, routers, producers, filters and formatters are build upon a plugin architecture. This allows each component to be exchanged and configured individually with a different sets of options.
Configuration¶
A Gollum configuration file is written in YAML and may contain any number of plugins. Multiple plugins of the same type are possible, too. The Gollum core does not make any assumption over the type of data you are processing. Plugins however may do that. So it is up to the person configuring Gollum to ensure valid data is passed from consumers to producers. Formatters can help to achieve this.
Table of contents¶
Instructions¶
Installation¶
Latest Release¶
You can download a compressed pre-compiled binary from github releases:
# linux bases example
curl -L https://github.com/trivago/gollum/releases/download/v0.4.5/gollum-0.4.5-Linux_x64.zip -o gollum.zip
unzip -o gollum.zip
chmod 0755 gollum
./gollum --help
From source¶
Installation from source requires the installation of the go toolchain.
Gollum need a least go version 1.7 or higher and supports the Go 1.5 vendor experiment that is automatically enabled when using the provided makefile. With Go 1.7 and later you can also use go build directly without additional modifications. Builds with Go 1.6 or earlier versions are not officially supported and might require additional steps and modifications.
# checkout
mkdir -p $(GOPATH)/src/github.com/trivago
cd $(GOPATH)/src/github.com/trivago
git clone git@github.com:trivago/gollum.git
cd gollum
# run tests and compile
make test
./gollum --help
You can use the make file coming with gollum to trigger cross platform builds. Make will produce ready to deploy .zip files with the corresponding platform builds inside the dist folder.
Build¶
Building gollum is as easy as make or go build. If you want to do cross platform builds use make all or specify one of the following platforms instead of “all”:
current: | build for current OS (default) |
---|---|
freebsd: | build for FreeBSD |
linux: | build for Linux x64 |
mac: | build for MacOS X |
pi: | build for Linux ARM |
win: | build for Windows |
debug: | build for current OS with debug compiler flags |
clean: | clean all artifacts created by the build process |
Docker¶
The repository contains a Dockerfile which enables you to build and run gollum inside a Docker container.
docker build -t trivago/gollum .
docker run -it --rm trivago/gollum -c config/profile.conf -ps -ll 3
To use your own configuration you could run:
docker run -it --rm -v /path/to/config.conf:/etc/gollum/gollum.conf:ro trivago/gollum -c /etc/gollum/gollum.conf
Usage¶
Commandline¶
Gollum goes into an infinte loop once started. You can shutdown gollum by sending a SIG_INT, i.e. Ctrl+C, SIG_TERM or SIG_KILL.
Gollum has several commandline options that can be accessed by starting Gollum without any paramters:
-h, -help | Print this help message. |
-v, -version | Print version information and quit. |
-r, -runtime | Print runtime information and quit. |
-l, -list | Print plugin information and quit. |
-c, -config | Use a given configuration file. |
-tc, -testconfig | |
Test the given configuration file and exit. | |
-ll, -loglevel | Set the loglevel [0-3] as in {0=Error, 1=+Warning, 2=+Info, 3=+Debug}. |
-lc, -log-colors | |
Use Logrus’s “colored” log format. One of “never”, “auto” (default), “always” | |
-n, -numcpu | Number of CPUs to use. Set 0 for all CPUs (respects cgroup limits). |
-p, -pidfile | Write the process id into a given file. |
-m, -metrics | Address to use for metric queries. Disabled by default. |
-hc, -healthcheck | |
Listening address ([IP]:PORT) to use for healthcheck HTTP endpoint. Disabled by default. | |
-pc, -profilecpu | |
Write CPU profiler results to a given file. | |
-pm, -profilemem | |
Write heap profile results to a given file. | |
-ps, -profilespeed | |
Write msg/sec measurements to log. | |
-pt, -profiletrace | |
Write profile trace results to a given file. | |
-t, -trace | Write message trace results _TRACE_ stream. |
Running Gollum¶
By default you start Gollum with your config file of your defined pipeline.
Configuration files are written in the YAML format and have to be loaded via command line switch. Each plugin has a different set of configuration options which are currently described in the plugin itself, i.e. you can find examples in the github wiki.
# starts a gollum process
gollum -c path/to/your/config.yaml
Here is a minimal console example to run Gollum:
# create a minimal config
echo \
{StdIn: {Type: consumer.Console, Streams: console}, StdOut: {Type: producer.Console, Streams: console}} \
> example_conf.yaml
# starts a gollum process
gollum -c example_conf.yaml -ll 3
Metrics¶
Gollum provide various metrics which can be used for monitoring or controlling.
Collecting metrics¶
To collect metrics you need to start the gollum process with the “-m <address:port>” option. If gollum is running with the “-m” option you are able to get all collected metrics by a tcp request in json format.
Example request:
# start gollum on host
gollum -m 8080 -c /my/config/file.conf
# get metrics by curl and prettify response by python
curl 127.0.0.1:8080 | python -m json.tool
# alternative by netcat
nc -d 127.0.0.1 8080 | python -m json.tool
Example response
{
"Consumers": 3,
"GoMemoryAllocated": 21850200,
"GoMemoryGCEnabled": 1,
"GoMemoryNumObjects": 37238,
"GoRoutines": 20,
"GoVersion": 10803,
"Messages:Discarded": 0,
"Messages:Discarded:AvgPerSec": 0,
"Messages:Enqueued": 13972236,
"Messages:Enqueued:AvgPerSec": 1764931,
"Messages:Routed": 13972233,
"Messages:Routed:AvgPerSec": 1764930,
"Plugins:ActiveWorkers": 3,
"Plugins:State:Active": 4,
"Plugins:State:Dead": 0,
"Plugins:State:Initializing": 0,
"Plugins:State:PrepareStop": 0,
"Plugins:State:Stopping": 0,
"Plugins:State:Waiting": 0,
"ProcessStart": 1501855102,
"Producers": 1,
"Routers": 2,
"Routers:Fallback": 2,
"Stream:profile:Messages:Discarded": 0,
"Stream:profile:Messages:Discarded:AvgPerSec": 0,
"Stream:profile:Messages:Routed": 13972239,
"Stream:profile:Messages:Routed:AvgPerSec": 1768878,
"Version": 500
}
Metrics overview¶
Global metrics¶
Consumers
Number of current active consumers.
GoMemoryAllocated
Current allocated memory in bytes.
GoMemoryGCEnabled
Indicates that GC is enabled.
GoMemoryNumObjects
The number of allocated heap objects.
GoRoutines
The number of active go routines.
GoVersion
The golang version in number format.
Messages:Discarded
The count of discarded messages over all.
Messages:Discarded:AvgPerSec
The average of discarded messages from the last seconds.
Messages:Enqueued
The count of enqueued messages over all.
Messages:Enqueued:AvgPerSec
The average of enqueued messages from the last seconds.
Messages:Routed
The count of routed messages over all.
Messages:Routed:AvgPerSec
The average of routed messages from the last seconds.
Plugins:ActiveWorkers
Number of active worker (plugin) processes.
Plugins:State:<STATE>
Number of plugins in specific states. The following states can possible for plugins:
- Active
- Dead
- Initializing
- PrepareStop
- Stopping
- Waiting
ProcessStart
Timestamp of the process start time.
Producers
Number of current active producers.
Routers
Number of current active routers.
Routers:Fallback
Number of current active “fallback” (auto created) routers.
Version
Gollum version as numeric value.
Stream based metrics¶
Stream:<STREAM_NAME>:Messages:Discarded
The count of discarded messages for a specific stream.
Stream:<STREAM_NAME>:Messages:Discarded:AvgPerSec
The average of discarded messages from the last seconds for a specific stream.
Stream:<STREAM_NAME>:Messages:Routed
The count of routed messages for a specific stream.
Stream:<STREAM_NAME>:Messages:Routed:AvgPerSec
The average of routed messages from the last seconds for a specific stream.
Health checks¶
Gollum provide optional http endpoints for health checks.
To activate the health check endpoints you need to start the gollum process with the “-hc <address:port>” option. If gollum is running with the “-hc” option you are able to request different http endpoints to get global- and plugin health status.
# start gollum on host with health check endpoints
gollum -hc 8080 -c /my/config/file.conf
Endpoints¶
/_ALL_
Request:
curl -i 127.0.0.1:8080/_ALL_
Response:
HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 16:03:22 GMT
Content-Length: 191
Content-Type: text/plain; charset=utf-8
/pluginID-A/pluginState 200 ACTIVE: Active
/pluginID-B/pluginState 200 ACTIVE: Active
/pluginID-C/pluginState 200 ACTIVE: Active
/pluginID-D/pluginState 200 ACTIVE: Active
/_PING_ 200 PONG
/_PING_
Request:
curl -i 127.0.0.1:8080/_PING_
Response:
HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 15:46:34 GMT
Content-Length: 5
Content-Type: text/plain; charset=utf-8
PONG
/<PLUGIN_ID>/pluginState
Request:
# example request with active `producer.benchmark`
curl -i 127.0.0.1:8080/pluginID-A/pluginState
Response:
HTTP/1.1 200 OK
Date: Fri, 04 Aug 2017 15:47:45 GMT
Content-Length: 15
Content-Type: text/plain; charset=utf-8
ACTIVE: Active
Best practice¶
Managing own plugins in a seperate git repository¶
You can add a own plugin module by simple using git submodule:
git submodule add -f https://github.com/YOUR_NAMESPACE/YOUR_REPO.git contrib/YOUR_NAMESPACE
The by git created .gitmodules will be ignored by the gollum repository.
To activate your plugin you need to create a contrib_loader.go to be able to compile gollum with your own provided plugins.
package main
// This is a stub file to enable registration of vendor specific plugins that
// are placed in sub folders of this folder.
import (
_ "github.com/trivago/gollum/contrib/myPackage"
)
func init() {
}
You can also copy the existing contrib_loader.go.dist to contrib_loader.go and update the import path to your package:
cp contrib_loader.go.dist contrib_loader.go
# open contrib_loader.go with an editor
# update package path with your plugin's path
make build
You can also change the version string of you Gollum builds to include the version of your plugin.
Set the GOLLUM_RELEASE_SUFFIX variable either in the environment or as an argument to make
:
# build Gollum with myPackage version suffixed to the Gollum version
# e.g.: 0.5.3-pkg0a01d7b6
make all GOLLUM_RELEASE_SUFFIX=pkg$(git -C contrib/myPackage describe --tags --always)
Use more Gollum processes for complex pipelines¶
If your pipeline contain more steps think in your setup also about the separation of concerns (SoC) principle. Split your configuration in smaller parts and start more Gollum processes to handle the pipeline steps.
Developing¶
Testing¶
Gollum provides unit-, integrations- and a couple of linter tests which also runs regulary on travis-ci.
You can run the test by:
# run tests
make test
# run unit-test only
make unit
# run integration-test only
make integration
Here an overview of all provided tests by the Makefile:
make test: | Run go vet, golint, gofmt and go test |
---|---|
make unit: | Run go test -tags unit |
make integration: | |
Run go test -tags integration | |
make vet: | Run go vet |
make lint: | Run golint |
make fmt-check: | Run gofmt -l |
make ineffassign: | |
Install and run ineffassign |
Debugging¶
If you want to use Delve for debugging you need to build gollum with some additional flags. You can use the predefined make command make debug:
# build for current OS with debug compiler flags
make debug
# or go build
# go build -ldflags='-s -linkmode=internal' -gcflags='-N -l'
With this debug build you are able to start a Delve remote debugger:
# for the gollum arguments pls use this format: ./gollum -- -c my/config.conf
dlv --listen=:2345 --headless=true --api-version=2 --log exec ./gollum -- -c testing/configs/test_router.conf -ll 3
Profiling¶
To test Gollum you can use the internal profiler consumer and the benchmark producer.
- The profiler consumer allows you to create automatically messages with random payload.
- The benchmark producer is able to measure processed messages
By some optional parameters you can get further additional information:
-ps: | Profile the processed message per second. |
---|---|
-ll 3: | Set the log level to debug |
-m 8080: | Activate metrics endpoint on port 8080 |
Here a simple config example how you can setup a profiler consumer with a benchmark producer. By default this test profiles the theoretic maximum throughput of 256 Byte messages:
Profiler:
Type: consumer.Profiler
Runs: 100000
Batches: 100
Message: "%256s"
Streams: profile
KeepRunning: true
Benchmark:
Type: producer.Benchmark
Streams: profile
# start Gollum for profiling
gollum -ps -ll 3 -m 8080 -c config/profile.conf
# get metrics
nc -d 127.0.0.1 8080 | python -m json.tool
You can enable different producers in that config to test the write performance of these producers, too.
Dependencies¶
To handle external go-packages and -libraries Gollum use dep. Like in other go projects the vendor is also checked in on github.com. All dependencies can be found in the Gopkg.toml file.
To update the external dependencies we provide also a make command:
# update external dependencies
make update-vendor
Writing plugins¶
When starting to write a plugin its probably a good idea to have a look at already existing plugins. A good starting point is the console plugin as it is very lightweight. If you plan to write a special purpose plugin you should place it into “contrib/yourCompanyName”. Plugins that can be used for general purpose should be placed into the main package folders like “consumer” or “producer”.
To enable a contrib plugin you will need to extend the file “contrib/loader.go”. Add an anonymous import to the list of imports like this:
import (
_ "./yourCompanyName" // this is ok for local extensions
_ "github.com/trivago/gollum/contrib/yourCompanyName" // if you plan to contribute
)
Configuration¶
All plugins have to implement the “core/Plugin” interface. This interface requires a type to implement the Configure method which can be used to read data from the config file passed to Gollum. To make it possible for Gollum to instantiate an instance of your plugin by name it has to be registered. This should be done by adding a line to the init() method of the file.
import (
"github.com/trivago/gollum/core"
"github.com/trivago/tgo"
)
struct MyPlugin type {
}
func init() {
core.TypeRegistry.Register(MyPlugin{}) // Register the new plugin type
}
func (cons *MyPlugin) Configure(conf core.PluginConfig) error {
// ... read custom options ...
}
The configure method is also called when just testing the configuration via gollum -tc. As of this, this function should never open any sockets or other kind of resources. This should be done when a plugin is explicitly started so that proper closing of resources is assured, too.
If your plugins derives from aother plugin it is advisable to call Configure() of the base type before checking your configuration options. There are several convenience functions in the PluginConfig type that makes it easy to obtain configuration values and setting default values. Please refer to Gollum’s GoDoc API documentation for more details on this.
func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
err := prod.MyPluginBase.Configure(conf)
if err != nil {
return err
}
// ... read custom options ...
return nil
}
Configuring nested plugins¶
Some plugins may want to configure “nested” plugins such as a formatter or filter. The plugins can be instantiated by using the type registry and passing the config passed to the Configure method.
func (plugin *MyPlugin) Configure(conf core.PluginConfig) error {
formatter, err := core.NewPluginWithType(conf.GetString("Formatter", "format.Forward"), conf)
if err != nil {
return err // ### return, plugin load error ###
}
// ... do something with your formatter ...
return nil
}
How to document plugins¶
How to document plugins¶
Documentation for each plugin is sourced from a single location - the plugin’s source code - and presented in two locations.
- Standard godocs based documentation is targeted at developers interested in writing Gollum plugins or developing Gollum itself and describes implementation-level details of Gollum’s plugin API and internals.
- The Plugins chapter of Gollum’s documentation on readthedocs.org describes the built-in plugins’ features and configuration from the viewpoint of an end user or operator.
The readthedocs.org documentation is generated by a custom tool (./docs/generator) that converts the godoc formatted comments into RST. Consequently, plugin source code comments must satisfy the syntax requirements of both godoc and Gollum’s rst_generator.
Each plugin’s documentation is contained in the comment block immediately preceding the plugin’s Go struct definition with al according to godoc rules. Other comments in the plugin source are ignored by the RST generator.
Single-line comments (“// ….”) are used for inline documentation. For the purpose of this document, the initial “// ” sequences on each line are not part of the comment block; an “empty line” means a line solely consisting of the comment sequence and optional trailing whitespace.
The following types of elements are recognized by the RST generator:
- Heading A heading is defined by text that is surrounded by blank lines. If a heading is starting a documentation block, the preceding blank line is omitted. Please note that a heading is only detected as heading by godoc if there is regular text following the blank line after the heading.
- Enumeration A “- ” as the first non-space sequence on a line begins an enumeration list item, which continues over zero or more further lines, terminated by a Heading or another enumeration list item. Continuation lines must match the indentation of the beginning line.
- Keyed enumeration A colon (“:”) on the first line of an enumeration specifies a keyed enumeration list item. The text between “- ” and “:” defines the item’s key; the rest its contents.
- Nested enumerations Enumerations may be nested; each level is indented by a single space. Indentation must begin at 0.
The contents of a documentation block consists of 4 parts:
General description
The heading of this section starts with the name of the plugin followed by its type e.g. “Console consumer”.
The contents of this section should describe what the plugin does, including any special considerations.
Metadata fields
The heading of this secion is “Metadata”.
The contents of this section is an enumeration (of keyed and/or unkeyed elements) of all metadata fields consumed or generated by the plugin.
The RST generatorwill automatically inherit the Metadata sections from plugins embedded by this type, so there is no need to duplicate their documentation in your own plugin. You can, however, override inherited fields’ documentation if needed.
Fields that have specific keys should be documented using keyed enumerations, e.g. “- key: stores the key of the request”.
Configuration parameters
The heading of this secion is “Parameters”.
The contents of this section is a keyed enumeration of configuration parameters recognized by the plugin.
The RST generator will automatically inherit the Parameters sections from plugins embedded by this type, so there is no need to duplicate their documentation in your own plugin. You can, however, override inherited parameters’ documentation if needed.
Default values and units are picked up automatically from struct tags for struct properties that have the ‘config’ and one or both of ‘default’ and ‘metric’ tags.
Configuration Examples
The heading of this section is “Examples”.
This section should contain at least one configuration example and a short description for each example provided.
The example is meant to be a working demonstration of the plugin’s usage, not an exhaustive listing of all possible features and parameters. The user should be able to copy-paste it into a config file as-is. It doesn’t need to be self-contained and should not include any boilerplate configuration for other plugins, with the exception of nested plugins (filters and formatters), which should be contained in the following stub:
ExampleConsumer: Type: consumer.Console Streams: console Modulators:
The same Go struct tags used by Gollum to parse and set plugin configuration parameters are supported by the RST generator:
- config:”<ConfigParameter>” maps this struct property to the <ConfigParameter> parameter
- default:”<defval>” specifies <defval> as the parameter’s default value
- metric:”<unit>” specifies <unit> as the parameter’s unit
To inherit an embedded struct type’s Metadata and Parameters documentation in your own plugin’s documentation, add the gollumdoc:”embed_type” in the embed instruction.
The general structure of a plugin documentation block looks like this:
// <BLOCK HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// <CHAPTER HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// <CHAPTER HEADING>
//
// <CHAPTER CONTENTS>
// <CHAPTER CONTENTS>
//
// Metadata
//
// - <METADATA KEY>: <DESCRIPTION
// DESCRIPTION CONTINUED ...>
// - <METADATA KEY>: <DESCRIPTION>
// - <VARIABLE-NAMED METADATA FIELD
// DESCRIPTION CONTINUED ...>
//
// Parameters
//
// - <PARAMETER NAME>: <PARAMETER DESCRIPTION>
// - <PARAMETER NAME>: <PARAMETER DESCRIPTION
// DESCRIPTION CONTINUED ...>
// - <NESTED NAME>: <NESTED VALUE OR OPTION>
// DESCRIPTION CONTINUED ...>
// - <NESTED VALUE OR OPTION>
// - <NESTED VALUE OR OPTION>
//
// Examples
//
// <CONFIGURATION EXAMPLE>
// <CONFIGURATION EXAMPLE>
// Console consumer
//
// This consumer reads from stdin or a named pipe. A message is generated after
// each newline character.
//
// Metadata
//
// - pipe: name of the pipe the message was received on
//
// Parameters
//
// - Pipe: Defines the pipe to read from. This can be "stdin" or the path
// of a named pipe. A named pipe is creared if not existing.
//
// - Permissions: Accepts an octal number string containing the unix file
// permissions used when creating a named pipe.
//
// - ExitOnEOF: Can be set to true to trigger an exit signal if the pipe is closed
// i.e. when EOF is detected.
//
// Examples
//
// This configuration reads data from standard-in.
//
// ConsoleIn:
// Type: consumer.Console
// Streams: console
// Pipe: stdin
type Console struct {
core.SimpleConsumer `gollumdoc:"embed_type"`
autoExit bool `config:"ExitOnEOF" default:"true"`
pipeName string `config:"Pipe" default:"stdin"`
pipePerm uint32 `config:"Permissions" default:"0644"`
pipe *os.File
}
Plugin types¶
Writing consumers¶
When writing a new consumer it is advisable to have a look at existing consumers. A good starting point are the Console and File consumers.
All consumers have to implement the “core/Consumer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ConsumerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.
Consumers deriving from “core/ConsumerBase” have to implement the “Consume” method from the “core/Consumer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.
The Consume() function will be called as a separate go routine and should do two things. 1. Listen to the control channel 2. Process incoming data
As Consume() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Consume() handle everything. ConsumerBase gives you two convenience loop functions to handle control commands:
- ControlLoop
- Will loop until a stop is received and can trigger a callback if a log rotation is requested (SIG_HUP is sent). The log rotation callback cane be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
- TickerControlLoop
- Gives you an additional callback that is triggered in regular intervals.
Both loops only cover control message handling and are blocking calls. As of their blocking nature you will probably want to spawn a separate go routine handling incoming messages when using these loops.
It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update.
A typical consume function will look like this:
func (cons *MyConsumer) Configure(conf core.PluginConfig) error {
cons.SetStopCallback(cons.close) // Register close to the control message handler
}
func (cons *MyConsumer) close() {
cons.WorkerDone()
}
func (cons *MyConsumer) Consume(workers *sync.WaitGroup) {
cons.AddMainWorker(workers) // New go routine = new worker
go cons.readData() // Run until close is called
cons.ControlLoop() // Blocks
}
As we want to run a new go routine we also add a new worker. As this is the first worker we use AddMainWorker(). Additional workers can be added by using AddWorker(). This enables the shutdown routine to wait until all consumers have properly stopped. However - to avoid a hang during shutdown, make sure that all workers added are properly closed during the shutdown sequence.
After we made sure all workers are registered, the core function readData() is called as a separate go routine. This is necessary as the ControlLoop will block Consume() until a shutdown is requested. When a stop control message is received, the StopCallback is executed. You can use this callback to signal your readData function to stop or you can check the pluginState inside your readData function. The pluginState will switch to PluginStateStopping after a stop control has been triggered.
If your consumer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.
Messages can be sent by using either the Enqueue() or EnqueueCopy() method. Both function will make sure that the message is sent to all streams and the correct stream ID is set. The function Enqueue() will reference the data you pass to it, while EnqueueCopy() will copy the data to the new message. The latter will allow you to e.g. safely recycle internal buffers without changing messages that are not processed by all producers, yet.
Both methods expect a sequence number to be passed. This sequence number is meant to be a runtime unique ID that may allow future checks on duplicate messages. The most common sequence number is an incrementing 64-bit integer.
func (cons *MyConsumer) readData() {
var data []byte
for cons.IsActive() {
// ... read data
cons.Enqueue(data, cons.sequence) // This call may block
cons.sequence++ // Increment your sequence number
}
}
Sometimes it might be useful not to derive from ConsumerBase. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.
Writing producers¶
When writing a new producer it is advisable to have a look at existing producers. A good starting point are the Console and File producers.
All producers have to implement the “core/Producer” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/ProducerBase” type as it will provide implementations of the most common methods required. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.
Producers deriving from core/ProducerBase have to implement the “Produce” method from the “core/Producer” interface. In addition to that most plugins might also want to overload the “Configure” function from the “core/Plugin” interface.
The Produce() function will be called as a separate go routine and should provide two things. 1. Listen to the control channel 2. Listen to incoming messages
As Produce() is called as a separate go routine you can decide wether to spawn additional go routines to handle both tasks or to let Produce() handle everything. ProducerBase gives you three convenience loop functions to handle control commands:
- ControlLoop
- Will only listen to control messages and trigger the corresponding callbacks that can be registered during Configure. Stop control messages will cause this loop to end.
- MessageControlLoop
- In addition to the functionality of ControlLoop this will also check for incoming messages. Messages from the internal message channel are passed to the given message handler. The log rotation callback can be set e.g. in the Configure method by using the SetRollBack function. Other possible callbacks functions are SetPrepareStopCallback and SetStopCallback.
- TickerMessageControlLoop
- Gives you an additional callback that is triggered in regular intervals.
It is highly recommended to use at least one of these functions in your plugin implementation. By doing this you can be sure that changes to message streaming and control handling are automatically used by your plugin after a Gollum update. A typical produce function will look like this:
func (prod *MyProducer) close() {
prod.CloseMessageChannel(prod.processData) // Close the internal channel and flush remaining messages
prod.WorkerDone() // Signal that we're done now
}
func (prod *MyProducer) Configure(conf core.PluginConfig) error {
prod.SetStopCallback(prod.close) // Call close upon shutdown
prod.SetRollCallback(prod.onRoll) // Call onRoll when SIG_HUP is sent to the process
}
func (prod *MyProducer) processData(msg core.Message) {
// Do something with the message
}
func (prod *MyProducer) Produce(workers *sync.WaitGroup) {
prod.AddMainWorker(workers)
prod.MessageControlLoop(prod.processData)
}
The framework will call the registered StopCallback function when the control loop receives a stop. As the shutdown procedure needs to wait until all messages from this producers have been sent (to avoid data loss) at least one worker should always be registered. The shutdown procedure will wait until all producer workers have finished before exiting. As of this you have to make sure that all AddWorker calls are followed by a WorkerDone() call during shutdown. If this does not happen the shutdown procedure will block. If your producer sends messages to other producers you can manually set dependencies between receiving producers and this producer by using StreamRegistry.LinkDependencies. DropStream dependencies are automatically added during startup.
If your producer requires additonal configuration options you should implement the Configure method. Please refer to the Plugin documentation for further details.
Messages are passed to the producer one-by-one. Certain services however might perform better when messages are not sent one-by-one but as a batch of messages. Gollum gives you several tools to handle these kind of message batches. A good example for this is the socket producer. This producer takes advantage of the “core/MessageBatch” type. This allows storing messages in a double-buffered queue and provides callback based methods to flush the queue asynchronously. The following code illustrates a best practice approach on how to use the MessageBatch. You may of course change details if required.
buffer := NewMessageBatch(8192) // Hold up to 8192*2 messages (front and backbuffer)
for {
// Append the given message
// - If the buffer is full call the sendBatch method and wait for flush
// - If the producers is not active or if it is shutting down pass the message to prod.Drop
buffer.AppendOrFlush(message, prod.sendBatch, prod.IsActiveOrStopping, prod.Drop)
// ...
if buffer.ReachedSizeThreshold(2048) { // Check if at least 2 KB have been written
buffer.Flush(prod.sendBatch) // Send all buffered messages via sendBatch
buffer.WaitForFlush() // Wait until done
}
}
Producers are able to filter messages like streams do, too. In contrast to streams messages are filtered before they are send to the internal message channel, i.e. before formatting. As formatting is an implementation detail (and may also not happen) a plugin that needs filtering after formatting has too implement it by itself.
Messages are not automatically formatted when passed to the producer. If you wish to enable producer based formatting you need to call ProducerBase.Format() at an appropriate point inside your plugin. All producers deriving from ProducerBase - and that have called ProducerBase.Configure() - may have a formatter set and should thus provide this possibility.
Sometimes it might be useful not to derive from ProducerBase. An example for this is the Null producer which is extremely lightweight. If you decide to go this way please have a look at Gollum’s GoDoc API documentation as well as the source of ConsumerBase.
Writing filters¶
All filters have to implement the “core/Filter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.
Filters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.
The Accept method is fairly simple to implement. If the methods returns true the message is passed. If the method returns false the message is rejected. You can inspect the message in question from the parameter passed to the accept method. The following example filter will reject all messages that have no content:
func (filter *MyFilter) Accepts(msg core.Message) bool {
return len(msg.Data) > 0
}
Writing formatters¶
All filters have to implement the “core/Formatter” as well as the “core/Plugin” interface. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.
Formatters are called in a multithreaded context, so you have to make sure that any internal state is secured by either a mutex or by using atomic functions.
The Format method is fairly simple to implement. It accepts the message to modify and returns the new content plus the stream the message should be sent to. The message itself cannot be changed directly. The following example adds a newline to each message:
func (format *MyFormatter) Format(msg core.Message) ([]byte, core.MessageStreamID) {
return append(msg.Data, '\n'), msg.StreamID
}
Writing routers¶
When writing a new router it is advisable to have a look at existing routers. A good starting point is the Random router.
All routers have to implement the “core/Router” as well as the “core/Plugin” interface. The most convenient way to do this is to derive from the “core/RouterBase” type as it will provide implementations of the most common methods required as well as message metrics. In addition to this, every plugin has to register at the plugin registry to be available as a config option. This is explained in the general plugin section.
Routers deriving from “core/RouterBase” have to implement a custom method that has to be hooked to the “Distribute” callback during Configure(). This allows RouterBase to check and format the message before actually distributing it. In addition to that a message count metric is updated. The following example implements a router that sends messages only to the first producer in the list.
func (router *MyRouter) myDistribute() {
router.RouterBase.Producers[0].Enqueue(msg)
}
func (router *MyRouter) Configure(conf core.PluginConfig) {
if err := router.RouterBase.Configure(conf); err != nil {
return err
}
router.RouterBase.Distribute = router.myDistribute
return nil
}
Messages are sent directly to a producer by calling the Enqueue method. This call may block as either the underlying channel is filled up completely or the producer plugin implemented Enqueue as a blocking method.
Routers that derive from RouterBase may also by paused. In that case messages are not passed to the custom distributor function but to a temporary function. These messages will be sent to the custom distributor function after the router is resumed. A Pause() call is normally done from producers that encounter a connection loss or an unavailable resource in general.
Plugins¶
The main components, consumers, routers, producers, filters and formatters are build upon a plugin architecture. This allows each component to be exchanged and configured individually with a different sets of options.

Plugin types:
Consumers¶
Consumers are plugins that read data from external sources. Data is packed into messages and passed to a router.
Example file consumer:

List of available consumers:
AwsKinesis¶
This consumer reads a message from an AWS Kinesis router.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
KinesisStream (default: default)
This value defines the stream to read from. By default this parameter is set to “default”.
OffsetFile
This value defines a file to store the current offset per shard. To disable this parameter, set it to “”. If the parameter is set and the file is found, consuming will start after the offset stored in the file. By default this parameter is set to “”.
RecordsPerQuery (default: 100)
This value defines the number of records to pull per query. By default this parameter is set to “100”.
RecordMessageDelimiter
This value defines the string to delimit messages within a record. To disable this parameter, set it to “”. By default this parameter is set to “”.
QuerySleepTimeMs (default: 1000, unit: ms)
This value defines the number of milliseconds to sleep before trying to pull new records from a shard that did not return any records. By default this parameter is set to “1000”.
RetrySleepTimeSec
This value defines the number of seconds to wait after trying to reconnect to a shard. By default this parameter is set to “4”.
CheckNewShardsSec (default: 0, unit: sec)
This value sets a timer to update shards in Kinesis. You can set this parameter to “0” for disabling. By default this parameter is set to “0”.
DefaultOffset
This value defines the message index to start reading from. Valid values are either “newest”, “oldest”, or a number. By default this parameter is set to “newest”.
Parameters (from components.AwsCredentials)¶
Credential/Type (default: none)
This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.
environment
Retrieves credentials from the environment variables of the running processstatic
Retrieves credentials value for individual credential fieldsshared
Retrieves credentials from the current user’s home directorynone
Use a anonymous login to aws
Credential/Id
is used for “static” type and is used as the AccessKeyID
Credential/Token
is used for “static” type and is used as the SessionToken
Credential/Secret
is used for “static” type and is used as the SecretAccessKey
Credential/File
is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)
Credential/Profile (default: default)
is used for “shared” type and is used for the profile
Credential/AssumeRole
This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)¶
Region (default: us-east-1)
This value defines the used aws region. By default this is set to “us-east-1”
Endpoint
This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This example consumes a kinesis stream “myStream” and create messages:
KinesisIn:
Type: consumer.AwsKinesis
Credential:
Type: shared
File: /Users/<USERNAME>/.aws/credentials
Profile: default
Region: "eu-west-1"
KinesisStream: myStream
Console¶
This consumer reads from stdin or a named pipe. A message is generated after each newline character.
Metadata¶
NOTE: The metadata will only set if the parameter `SetMetadata` is active.
pipe
Name of the pipe the message was received on (set)
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Pipe (default: stdin)
Defines the pipe to read from. This can be “stdin” or the path to a named pipe. If the named pipe doesn’t exist, it will be created. By default this paramater is set to “stdin”.
Permissions (default: 0644)
Defines the UNIX filesystem permissions used when creating the named pipe as an octal number. By default this paramater is set to “0664”.
ExitOnEOF (default: true)
If set to true, the plusing triggers an exit signal if the pipe is closed, i.e. when EOF is detected. By default this paramater is set to “true”.
SetMetadata (default: false)
When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This config reads data from stdin e.g. when starting gollum via unix pipe.
ConsoleIn:
Type: consumer.Console
Streams: console
Pipe: stdin
File¶
The File consumer reads messages from a file, looking for a customizable delimiter sequence that marks the end of a message. If the file is part of e.g. a log rotation, the consumer can be set to read from a symbolic link pointing to the current file and (optionally) be told to reopen the file by sending a SIGHUP. A symlink to a file will automatically be reopened if the underlying file is changed.
Metadata¶
NOTE: The metadata will only set if the parameter `SetMetadata` is active.
file
The file name of the consumed file (set)
dir
The directory of the consumed file (set)
Parameters¶
Enable (default: true)
Switches this plugin on or off.
File
This value is a mandatory setting and contains the name of the file to read. This field supports glob patterns. If the file pointed to is a symlink, changes to the symlink will be detected. The file will be watched for changes, so active logfiles can be scraped, too.
OffsetFilePath
This value defines a path where the individual, current file offsets are stored. The filename will the name and extension of the source file plus the extension “.offset”. If the consumer is restarted, these offset files are used to continue reading from the previous position. To disable this setting, set it to “”. By default this parameter is set to “”.
Delimiter (default: n)
This value defines the delimiter sequence to expect at the end of each message in the file. By default this parameter is set to “n”.
ObserveMode (default: poll)
This value select how the source file is observed. Available values are poll and watch. NOTE: The watch implementation uses the [fsnotify/fsnotify](https://github.com/fsnotify/fsnotify) package. If your source file is rotated (moved or removed), please verify that your file system and distribution support the RENAME and REMOVE events; the consumer’s stability depends on them. By default this parameter is set to poll.
DefaultOffset (default: newest)
This value defines the default offset from which to start reading within the file. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. By default this parameter is set to “newest”.
PollingDelayMs (default: 100, unit: ms)
This value defines the duration in milliseconds the consumer waits between checking the source file for new content after hitting the end of file (EOF). NOTE: This settings only takes effect if the consumer is running in poll mode! By default this parameter is set to “100”.
RetryDelaySec (default: 3, unit: s)
This value defines the duration in seconds the consumer waits between retries, e.g. after not being able to open a file. By default this parameter is set to “3”.
DirScanIntervalSec (default: 10, unit: s)
Only applies when using globs. This setting will define the interval in secnds in which the glob will be re-evaluated and new files can be scraped. By default this parameter is set to “10”.
SetMetadata (default: false)
When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
BlackList
A regular expression matching file paths to NOT read. When both BlackList and WhiteList are defined, the WhiteList takes precedence. This setting is only used when glob expressions (*, ?) are present in the filename. The path checked is the one before symlink evaluation. By default this parameter is set to “”.
WhiteList
A regular expression matching file paths to read. When both BlackList and WhiteList are defined, the WhiteList takes precedence. This setting is only used when glob expressions (*, ?) are present in the filename. The path checked is the one before symlink evaluation. By default this parameter is set to “”.
Files (default: /var/log/*.log)
(no documentation available)
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This example will read all the .log files /var/log/ into one stream and create a message for each new entry. If the file starts with sys it is ignored
FileIn:
Type: consumer.File
File: /var/log/*.log
BlackList '^sys.*'
DefaultOffset: newest
OffsetFilePath: ""
Delimiter: "\n"
ObserveMode: poll
PollingDelay: 100
HTTP¶
This consumer opens up an HTTP 1.1 server and processes the contents of any incoming HTTP request.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address (default: :80)
Defines the TCP port and optional IP address to listen on. Sets http.Server.Addr; for defails, see its Go documentation. Syntax: [hostname|address]:<port>
ReadTimeoutSec (default: 3, unit: sec)
Defines the maximum duration in seconds before timing out the HTTP read request. Sets http.Server.ReadTimeout; for details, see its Go documentation.
WithHeaders (default: true)
If true, relays the complete HTTP request to the generated Gollum message. If false, relays only the HTTP request body and ignores headers.
Htpasswd
Path to an htpasswd-formatted password file. If defined, turns on HTTP Basic Authentication in the server.
BasicRealm
Defines the Authentication Realm for HTTP Basic Authentication. Meaningful only in conjunction with Htpasswd.
Certificate
Path to an X509 formatted certificate file. If defined, turns on SSL/TLS support in the HTTP server. Requires PrivateKey to be set.
PrivateKey
Path to an X509 formatted private key file. Meaningful only in conjunction with Certificate.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This example listens on port 9090 and writes to the stream “http_in_00”.
"HttpIn00":
Type: "consumer.HTTP"
Streams: "http_in_00"
Address: "localhost:9090"
WithHeaders: false
Kafka¶
This consumer reads data from a kafka topic. It is based on the sarama library; most settings are mapped to the settings from this library.
Metadata¶
NOTE: The metadata will only set if the parameter `SetMetadata` is active.
topic
Contains the name of the kafka topic
key
Contains the key of the kafka message
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Servers
Defines the list of all kafka brokers to initially connect to when querying topic metadata. This list requires at least one borker to work and ideally contains all the brokers in the cluster. By default this parameter is set to [“localhost:9092”].
Topic (default: default)
Defines the kafka topic to read from. By default this parameter is set to “default”.
ClientId
Sets the client id used in requests by this consumer. By default this parameter is set to “gollum”.
GroupId
Sets the consumer group of this consumer. If empty, consumer groups are not used. This setting requires Kafka version >= 0.9. By default this parameter is set to “”.
Version
Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.
SetMetadata (default: false)
When this value is set to “true”, the fields mentioned in the metadata section will be added to each message. Adding metadata will have a performance impact on systems with high throughput. By default this parameter is set to “false”.
DefaultOffset
Defines the initial offest when starting to read the topic. Valid values are “oldest” and “newest”. If OffsetFile is defined and the file exists, the DefaultOffset parameter is ignored. If GroupId is defined, this setting will only be used for the first request. By default this parameter is set to “newest”.
OffsetFile
Defines the path to a file that holds the current offset of a given partition. If the consumer is restarted, reading continues from that offset. To disable this setting, set it to “”. Please note that offsets stored in the file might be outdated. In that case DefaultOffset “oldest” will be used. By default this parameter is set to “”.
FolderPermissions (default: 0755)
Used to create the path to the offset file if necessary. By default this parameter is set to “0755”.
Ordered
Forces partitions to be read one-by-one in a round robin fashion instead of reading them all in parallel. Please note that this may restore the original ordering but does not necessarily do so. The term “ordered” refers to an ordered reading of all partitions, as opposed to reading them randomly. By default this parameter is set to false.
MaxOpenRequests
Defines the number of simultaneous connections to a broker at a time. By default this parameter is set to 5.
ServerTimeoutSec
Defines the time after which a connection will time out. By default this parameter is set to 30.
MaxFetchSizeByte
Sets the maximum size of a message to fetch. Larger messages will be ignored. When set to 0 size of the messages is ignored. By default this parameter is set to 0.
MinFetchSizeByte
Defines the minimum amout of data to fetch from Kafka per request. If less data is available the broker will wait. By default this parameter is set to 1.
DefaultFetchSizeByte
Defines the average amout of data to fetch per request. This value must be greater than 0. By default this parameter is set to 32768.
FetchTimeoutMs
Defines the time in milliseconds to wait on reaching MinFetchSizeByte before fetching new data regardless of size. By default this parameter is set to 250.
MessageBufferCount
Sets the internal channel size for the kafka client. By default this parameter is set to 8192.
PresistTimoutMs (default: 5000, unit: ms)
Defines the interval in milliseconds in which data is written to the OffsetFile. A short duration reduces the amount of duplicate messages after a crash but increases I/O. When using GroupId this setting controls the pause time after receiving errors. By default this parameter is set to 5000.
ElectRetries
Defines how many times to retry fetching the new master partition during a leader election. By default this parameter is set to 3.
ElectTimeoutMs
Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.
MetadataRefreshMs
Defines the interval in milliseconds used for fetching kafka metadata from the cluster (e.g. number of partitons). By default this parameter is set to 10000.
TlsEnable
Defines whether to use TLS based authentication when communicating with brokers. By default this parameter is set to false.
TlsKeyLocation
Defines the path to the client’s PEM-formatted private key used for TLS based authentication. By default this parameter is set to “”.
TlsCertificateLocation
Defines the path to the client’s PEM-formatted public key used for TLS based authentication. By default this parameter is set to “”.
TlsCaLocation
Defines the path to the CA certificate(s) for verifying a broker’s key when using TLS based authentication. By default this parameter is set to “”.
TlsServerName
Defines the expected hostname used by hostname verification when using TlsInsecureSkipVerify. By default this parameter is set to “”.
TlsInsecureSkipVerify
Enables verification of the server’s certificate chain and host name. By default this parameter is set to false.
SaslEnable
Defines whether to use SASL based authentication when communicating with brokers. By default this parameter is set to false.
SaslUsername
Defines the username for SASL/PLAIN authentication. By default this parameter is set to “gollum”.
SaslPassword
Defines the password for SASL/PLAIN authentication. By default this parameter is set to “”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This config reads the topic “logs” from a cluster with 4 brokers.
kafkaIn:
Type: consumer.Kafka
Streams: logs
Topic: logs
ClientId: "gollum log reader"
DefaultOffset: newest
OffsetFile: /var/gollum/logs.offset
Servers:
- "kafka0:9092"
- "kafka1:9092"
- "kafka2:9092"
- "kafka3:9092"
Profiler¶
The “Profiler” consumer plugin autogenerates messages in user-defined quantity, size and density. It can be used to profile producers and configurations and to provide a message source for testing.
Before startup, [TemplateCount] template payloads are generated based on the format specifier [Message], using characters from [Characters]. The length of each template is determined by format size specifier(s) in [Message].
During execution, [Batches] batches of [Runs] messages are generated, with a [DelayMs] ms delay between each message. Each message’s payload is randomly selected from the set of template payloads above.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Runs (default: 10000)
Defines the number of messages per batch.
Batches (default: 10)
Defines the number of batches to generate.
TemplateCount
Defines the number of message templates to generate. Templates are generated in advance and a random message template is chosen from this set every time a message is sent.
Characters (default: abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890)
Defines the set of characters use when generated templates.
Message (default: %256s)
Defines a go format string to use for generating the message templaets. The length of the values generated will be deduced from the format size parameter - “%200d” will generate a digit between 0 and 200, “%10s” will generate a string with 10 characters, etc.
DelayMs (default: 0, unit: ms)
Defines the number of milliseconds to sleep between messages.
KeepRunning
If set to true, shuts down Gollum after Batches * Runs messages have been generated. This can be used to e.g. read metrics after a profile run.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
Generate a short message every 0.5s, useful for testing and debugging
JunkGenerator:
Type: "consumer.Profiler"
Message: "%20s"
Streams: "junkstream"
Characters: "abcdefghijklmZ"
KeepRunning: true
Runs: 10000
Batches: 3000000
DelayMs: 500
Proxy¶
This consumer reads messages from a given socket like consumer.Socket but allows reverse communication, too. Producers which require this kind of communication can access message.GetSource to write data back to the client sending the message. See producer.Proxy as an example target producer.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
Defines the protocol, host and port or the unix domain socket to listen to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. Only unix and tcp protocols are supported. By default this parameter is set to “:5880”.
Partitioner
Defines the algorithm used to read messages from the router. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to “delimiter”.
delimiter
Separates messages by looking for a delimiter string. The delimiter is removed from the message.ascii
Reads an ASCII number at a given offset until a given delimiter is found. Everything to the left of and including the delimiter is removed from the message.binary
reads a binary number at a given offset and size. The number is removed from the message.binary_le
is an alias for “binary”.binary_be
acts like “binary”_le but uses big endian encoding.fixed
assumes fixed size messages.
Delimiter (default: n)
Defines the delimiter string used to separate messages if partitioner is set to “delimiter” or the string used to separate the message length if partitioner is set to “ascii”. By default this parameter is set to “n”.
Offset (default: 0)
Defines an offset in bytes used to read the length provided for partitioner “binary” and “ascii”. By default this parameter is set to 0.
Size (default: 4)
Defines the size of the length prefix used by partitioner “binary” or the message total size when using partitioner “fixed”. When using partitioner “binary” this parameter can be set to 1,2,4 or 8 when using uint8,uint16,uint32 or uint64 length prefixes. By default this parameter is set to 4.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This example will accepts 64bit length encoded data on TCP port 5880.
proxyReceive:
Type: consumer.Proxy
Streams: proxyData
Address: ":5880"
Partitioner: binary
Size: 8
Socket¶
The socket consumer reads messages as-is from a given network or filesystem socket. Messages are separated from the stream by using a specific partitioner method.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
This value defines the protocol, host and port or socket to bind to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. Valid protocols can be derived from the golang net package documentation. Common values are “udp”, “tcp” and “unix”. By default this parameter is set to “tcp://0.0.0.0:5880”.
Permissions (default: 0770)
This value sets the filesystem permissions for UNIX domain sockets as a four-digit octal number. By default this parameter is set to “0770”.
Acknowledge
This value can be set to a non-empty value to inform the writer that data has been accepted. On success, the given string is sent. Any error will close the connection. Acknowledge does not work with UDP based sockets. By default this parameter is set to “”.
Partitioner
This value defines the algorithm used to read messages from the router. By default this is set to “delimiter”. The following options are available:
“delimiter”
Separates messages by looking for a delimiter string. The delimiter is removed from the message.“ascii”
Reads an ASCII number at a given offset until a given delimiter is found. Everything to the right of and including the delimiter is removed from the message.“binary”
Reads a binary number at a given offset and size.“binary_le”
An alias for “binary”.“binary_be”
The same as “binary” but uses big endian encoding.“fixed”
Assumes fixed size messages.
Delimiter (default: n)
This value defines the delimiter used by the text and delimiter partitioners. By default this parameter is set to “n”.
Offset (default: 0)
This value defines the offset used by the binary and text partitioners. This setting is ignored by the fixed partitioner. By default this parameter is set to “0”.
Size
This value defines the size in bytes used by the binary and fixed partitioners. For binary, this can be set to 1,2,4 or 8. The default value is 4. For fixed , this defines the size of a message. By default this parameter is set to “1”.
ReconnectAfterSec (default: 2, unit: sec)
This value defines the number of seconds to wait before a connection is retried. By default this parameter is set to “2”.
AckTimeoutSec (default: 1, unit: sec)
This value defines the number of seconds to wait for acknowledges to succeed. By default this parameter is set to “1”.
ReadTimeoutSec (default: 2, unit: sec)
This value defines the number of seconds to wait for data to be received. This setting affects the maximum shutdown duration of this consumer. By default this parameter is set to “2”.
RemoveOldSocket (default: true)
If set to true, any existing file with the same name as the socket (unix://<path>) is removed prior to connecting. By default this parameter is set to “true”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
This example open a socket and expect messages with a fixed length of 256 bytes:
socketIn:
Type: consumer.Socket
Address: unix:///var/gollum.socket
Partitioner: fixed
Size: 256
Syslogd¶
The syslogd consumer creates a syslogd-compatible log server and receives messages on a TCP or UDP port or a UNIX filesystem socket.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
Defines the IP address or UNIX socket to listen to. This can take one of the four forms below, to listen on a TCP, UDP or UNIX domain socket. However, see the “Format” option for details on transport support by different formats.
- [hostname|ip]:<tcp-port>
- tcp://<hostname|ip>:<tcp-port>
- udp://<hostname|ip>:<udp-port>
- unix://<filesystem-path>
By default this parameter is set to “udp://0.0.0.0:514”
Format
Defines which syslog standard the server will support. Three standards, listed below, are currently available. All standards support listening to UDP and UNIX domain sockets. RFC6587 additionally supports TCP sockets. Default: “RFC6587”.
- RFC3164 (https://tools.ietf.org/html/rfc3164) - unix, udp
- RFC5424 (https://tools.ietf.org/html/rfc5424) - unix, udp
- RFC6587 (https://tools.ietf.org/html/rfc6587) - unix, upd, tcp
By default this parameter is set to “RFC6587”.
Permissions (default: 0770)
This value sets the filesystem permissions as a four-digit octal number in case the address is a Unix domain socket (i.e. unix://<filesystem-path>). By default this parameter is set to “0770”.
SetMetadata (default: false)
When set to true, syslog based metadata will be attached to the message. The metadata fields added depend on the protocol version used. RFC3164 supports: tag, timestamp, hostname, priority, facility, severity. RFC5424 and RFC6587 support: app_name, version, proc_id , msg_id, timestamp, hostname, priority, facility, severity. By default this parameter is set to “false”.
TimestampFormat (default: 2006-01-02T15:04:05.000 MST)
When using SetMetadata this string denotes the go time format used to convert syslog timestamps into strings. By default this parameter is set to “2006-01-02T15:04:05.000 MST”.
Parameters (from core.SimpleConsumer)¶
Streams
Defines a list of streams a consumer will send to. This parameter is mandatory. When using “*” messages will be sent only to the internal “*” stream. It will NOT send messages to all streams. By default this parameter is set to an empty list.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a consumer is allowed to take to shut down. After this timeout the consumer is always considered to have shut down. By default this parameter is set to 1000.
Modulators
Defines a list of modulators to be applied to a message before it is sent to the list of streams. If a modulator specifies a stream, the message is only sent to that specific stream. A message is saved as original after all modulators have been applied. By default this parameter is set to an empty list.
ModulatorRoutines
Defines the number of go routines reserved for modulating messages. Setting this parameter to 0 will use as many go routines as the specific consumer plugin is using for fetching data. Any other value will force the given number fo go routines to be used. By default this parameter is set to 0
ModulatorQueueSize
Defines the size of the channel used to buffer messages before they are fetched by the next free modulator go routine. If the ModulatorRoutines parameter is set to 0 this parameter is ignored. By default this parameter is set to 1024.
Examples¶
Replace the system’s standard syslogd with Gollum
SyslogdSocketConsumer:
Type: consumer.Syslogd
Streams: "system_syslog"
Address: "unix:///dev/log"
Format: "RFC3164"
Listen on a TCP socket
SyslogdTCPSocketConsumer:
Type: consumer.Syslogd
Streams: "tcp_syslog"
Address: "tcp://0.0.0.0:5599"
Format: "RFC6587"
Producers¶
Producers are plugins that transfer messages to external services. Data arrives in the form of messages and can be converted by using a formatter.
Example producer setup:

List of available producer:
AwsCloudwatchLogs¶
The AwsCloudwatchLogs producer plugin sends messages to AWS Cloudwatch Logs service. Credentials are obtained by gollum automaticly.
Patameters
- LogStream: Stream name in cloudwatch logs.
- LogGroup: Group name in cloudwatch logs.
- Region: Amazon region into which stream logs to. Defaults to “eu-west-1”.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
LogStream
(no documentation available)
LogGroup
(no documentation available)
Parameters (from components.AwsCredentials)¶
Credential/Type (default: none)
This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.
environment
Retrieves credentials from the environment variables of the running processstatic
Retrieves credentials value for individual credential fieldsshared
Retrieves credentials from the current user’s home directorynone
Use a anonymous login to aws
Credential/Id
is used for “static” type and is used as the AccessKeyID
Credential/Token
is used for “static” type and is used as the SessionToken
Credential/Secret
is used for “static” type and is used as the SecretAccessKey
Credential/File
is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)
Credential/Profile (default: default)
is used for “shared” type and is used for the profile
Credential/AssumeRole
This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)¶
Region (default: us-east-1)
This value defines the used aws region. By default this is set to “us-east-1”
Endpoint
This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This configuration sends messages to stream stream_name and group group_name with shared credentials.
CwLogs: .. code-block:: yaml
- Type: AwsCloudwatchLogs:
- LogStream: stream_name LogGroup: group_name
- Credential:
- Type: shared
AwsFirehose¶
This producer sends data to an AWS Firehose stream.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
StreamMapping
This value defines a translation from gollum stream names to firehose stream names. If no mapping is given, the gollum stream name is used as the firehose stream name. By default this parameter is set to “empty”
RecordMaxMessages (default: 1)
This value defines the number of messages to send in one record to aws firehose. By default this parameter is set to “1”.
RecordMessageDelimiter (default: n)
This value defines the delimiter string to use between messages within a firehose record. By default this parameter is set to “n”.
SendTimeframeMs (default: 1000, unit: ms)
This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to “1000”.
Parameters (from components.AwsCredentials)¶
Credential/Type (default: none)
This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.
environment
Retrieves credentials from the environment variables of the running processstatic
Retrieves credentials value for individual credential fieldsshared
Retrieves credentials from the current user’s home directorynone
Use a anonymous login to aws
Credential/Id
is used for “static” type and is used as the AccessKeyID
Credential/Token
is used for “static” type and is used as the SessionToken
Credential/Secret
is used for “static” type and is used as the SecretAccessKey
Credential/File
is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)
Credential/Profile (default: default)
is used for “shared” type and is used for the profile
Credential/AssumeRole
This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)¶
Region (default: us-east-1)
This value defines the used aws region. By default this is set to “us-east-1”
Endpoint
This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example set up a simple aws firehose producer:
firehoseOut:
Type: producer.AwsFirehose
Streams: "*"
StreamMapping:
"*": default
Credential:
Type: shared
File: /Users/<USERNAME>/.aws/credentials
Profile: default
Region: eu-west-1
RecordMaxMessages: 1
RecordMessageDelimiter: "\n"
SendTimeframeSec: 1
AwsKinesis¶
This producer sends data to an AWS kinesis stream. Configuration example
Parameters¶
Enable (default: true)
Switches this plugin on or off.
StreamMapping
This value defines a translation from gollum stream names to kinesis stream names. If no mapping is given the gollum stream name is used as the kinesis stream name. By default this parameter is set to “empty”
RecordMaxMessages (default: 1)
This value defines the maximum number of messages to join into a kinesis record. By default this parameter is set to “500”.
RecordMessageDelimiter (default: n)
This value defines the delimiter string to use between messages within a kinesis record. By default this parameter is set to “n”.
SendTimeframeMs (default: 1000, unit: ms)
This value defines the timeframe in milliseconds in which a second batch send can be triggered. By default this parameter is set to “1000”.
Parameters (from components.AwsCredentials)¶
Credential/Type (default: none)
This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.
environment
Retrieves credentials from the environment variables of the running processstatic
Retrieves credentials value for individual credential fieldsshared
Retrieves credentials from the current user’s home directorynone
Use a anonymous login to aws
Credential/Id
is used for “static” type and is used as the AccessKeyID
Credential/Token
is used for “static” type and is used as the SessionToken
Credential/Secret
is used for “static” type and is used as the SecretAccessKey
Credential/File
is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)
Credential/Profile (default: default)
is used for “shared” type and is used for the profile
Credential/AssumeRole
This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)¶
Region (default: us-east-1)
This value defines the used aws region. By default this is set to “us-east-1”
Endpoint
This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example set up a simple aws Kinesis producer:
KinesisOut:
Type: producer.AwsKinesis
Streams: "*"
StreamMapping:
"*": default
Credential:
Type: shared
File: /Users/<USERNAME>/.aws/credentials
Profile: default
Region: eu-west-1
RecordMaxMessages: 1
RecordMessageDelimiter: "\n"
SendTimeframeSec: 1
AwsS3¶
This producer sends messages to Amazon S3.
Each “file” uses a configurable batch and sends the content by a multipart upload to s3. This principle avoids temporary storage on disk.
Please keep in mind that Amazon S3 does not support appending to existing objects. Therefore rotation is mandatory in this producer.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Bucket
The S3 bucket to upload to
File (default: gollum_*.log)
This value is used as a template for final file names. The string ” * ” will replaced with the active stream name. By default this parameter is set to “gollum_*.log”
Parameters (from components.AwsCredentials)¶
Credential/Type (default: none)
This value defines the credentials that are to be used when connecting to aws. Available values are listed below. See https://docs.aws.amazon.com/sdk-for-go/api/aws/credentials/#Credentials for more information.
environment
Retrieves credentials from the environment variables of the running processstatic
Retrieves credentials value for individual credential fieldsshared
Retrieves credentials from the current user’s home directorynone
Use a anonymous login to aws
Credential/Id
is used for “static” type and is used as the AccessKeyID
Credential/Token
is used for “static” type and is used as the SessionToken
Credential/Secret
is used for “static” type and is used as the SecretAccessKey
Credential/File
is used for “shared” type and is used as the path to your shared Credentials file (~/.aws/credentials)
Credential/Profile (default: default)
is used for “shared” type and is used for the profile
Credential/AssumeRole
This value is used to assume an IAM role using. By default this is set to “”.
Parameters (from components.AwsMultiClient)¶
Region (default: us-east-1)
This value defines the used aws region. By default this is set to “us-east-1”
Endpoint
This value defines the used aws api endpoint. If no endpoint is set the client needs to set the right endpoint for the used region. By default this is set to “”.
Parameters (from components.BatchedWriterConfig)¶
Batch/TimeoutSec (default: 5, unit: sec)
This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.
Batch/MaxCount (default: 8192)
This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.
Batch/FlushCount (default: 4096)
This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to “BatchMaxCount”. By default this parameter is set to “BatchMaxCount / 2”.
Batch/FlushTimeoutSec (default: 0, unit: sec)
This value defines the maximum number of seconds to wait before a flush is aborted during shutdown. Set this parameter to “0” which does not abort the flushing procedure. By default this parameter is set to “0”.
Parameters (from components.RotateConfig)¶
Rotation/Enable (default: false)
If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.
Rotation/TimeoutMin (default: 1440, unit: min)
This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.
Rotation/SizeMB (default: 1024, unit: mb)
This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.
Rotation/Timestamp (default: 2006-01-02_15)
This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.
Rotation/ZeroPadding (default: 0)
This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.
Rotation/Compress (default: false)
This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.
Rotation/At
This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.
Rotation/AtHour (default: -1)
(no documentation available)
Rotation/AtMin (default: -1)
(no documentation available)
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example sends all received messages from all streams to S3, creating a separate file for each stream:
S3Out:
Type: producer.AwsS3
Streams: "*"
Credential:
Type: shared
File: /Users/<USERNAME>/.aws/credentials
Profile: default
Region: eu-west-1
Bucket: gollum-s3-test
Batch:
TimeoutSec: 60
MaxCount: 1000
FlushCount: 500
FlushTimeoutSec: 0
Rotation:
Timestamp: 2006-01-02T15:04:05.999999999Z07:00
TimeoutMin: 1
SizeMB: 20
Modulators:
- format.Envelope:
Postfix: "\n"
Benchmark¶
This producer is meant to provide more meaningful results in benchmark situations than producer.Null, as it is based on core.BufferedProducer.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
benchmark:
Type: producer.Benchmark
Streams: "*"
Console¶
The console producer writes messages to standard output or standard error.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Console
Chooses the output device; either “stdout” or “stderr”. By default this is set to “stdout”.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
StdErrPrinter:
Type: producer.Console
Streams: myerrorstream
Console: stderr
ElasticSearch¶
The ElasticSearch producer sends messages to elastic search using the bulk http API. The producer expects a json payload.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Retry/Count
Set the amount of retries before a Elasticsearch request fail finally. By default this parameter is set to “3”.
Retry/TimeToWaitSec
This value denotes the time in seconds after which a failed dataset will be transmitted again. By default this parameter is set to “3”.
SetGzip
This value enables or disables gzip compression for Elasticsearch requests (disabled by default). This option is used one to one for the library package. See http://godoc.org/gopkg.in/olivere/elastic.v5#SetGzip By default this parameter is set to “false”.
Servers
This value defines a list of servers to connect to.
User
This value used as the username for the elasticsearch server. By default this parameter is set to “”.
Password
This value used as the password for the elasticsearch server. By default this parameter is set to “”.
StreamProperties
This value defines the mapping and settings for each stream. As index use the stream name here.
StreamProperties/<streamName>/Index
The value defines the Elasticsearch index used for the stream.
StreamProperties/<streamName>/Type
This value defines the document type used for the stream.
StreamProperties/<streamName>/TimeBasedIndex
This value can be set to “true” to append the date of the message to the index as in “<index>_<TimeBasedFormat>”. NOTE: This setting incurs a performance penalty because it is necessary to check if an index exists for each message! By default this parameter is set to “false”.
StreamProperties/<streamName>/TimeBasedFormat
This value can be set to a valid go time format string to be used with DayBasedIndex. By default this parameter is set to “2006-01-02”.
StreamProperties/<streamName>/Mapping
This value is a map which is used for the document field mapping. As document type, the already defined type is reused for the field mapping. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings
StreamProperties/<streamName>/Settings
This value is a map which is used for the index settings. See https://www.elastic.co/guide/en/elasticsearch/reference/5.4/indices-create-index.html#mappings
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example starts a simple twitter example producer for local running ElasticSearch:
producerElasticSearch:
Type: producer.ElasticSearch
Streams: tweets_stream
SetGzip: true
Servers:
- http://127.0.0.1:9200
StreamProperties:
tweets_stream:
Index: twitter
DayBasedIndex: true
Type: tweet
Mapping:
# index mapping for payload
user: keyword
message: text
Settings:
number_of_shards: 1
number_of_replicas: 1
File¶
The file producer writes messages to a file. This producer also allows log rotation and compression of the rotated logs. Folders in the file path will be created if necessary.
Each target file will handled with separated batch processing.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
File
This value contains the path to the log file to write. The wildcard character “*” can be used as a placeholder for the stream name. By default this parameter is set to “/var/log/gollum.log”.
FileOverwrite
This value causes the file to be overwritten instead of appending new data to it. By default this parameter is set to “false”.
Permissions (default: 0644)
Defines the UNIX filesystem permissions used when creating the named file as an octal number. By default this paramater is set to “0664”.
FolderPermissions (default: 0755)
Defines the UNIX filesystem permissions used when creating the folders as an octal number. By default this paramater is set to “0755”.
Parameters (from components.BatchedWriterConfig)¶
Batch/TimeoutSec (default: 5, unit: sec)
This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.
Batch/MaxCount (default: 8192)
This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.
Batch/FlushCount (default: 4096)
This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to “BatchMaxCount”. By default this parameter is set to “BatchMaxCount / 2”.
Batch/FlushTimeoutSec (default: 0, unit: sec)
This value defines the maximum number of seconds to wait before a flush is aborted during shutdown. Set this parameter to “0” which does not abort the flushing procedure. By default this parameter is set to “0”.
Parameters (from components.RotateConfig)¶
Rotation/Enable (default: false)
If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.
Rotation/TimeoutMin (default: 1440, unit: min)
This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.
Rotation/SizeMB (default: 1024, unit: mb)
This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.
Rotation/Timestamp (default: 2006-01-02_15)
This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.
Rotation/ZeroPadding (default: 0)
This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.
Rotation/Compress (default: false)
This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.
Rotation/At
This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.
Rotation/AtHour (default: -1)
(no documentation available)
Rotation/AtMin (default: -1)
(no documentation available)
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Parameters (from file.Pruner)¶
Prune/Count (default: 0)
this value removes old logfiles upon rotate so that only the given number of logfiles remain. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). Set this value to “0” to disable pruning by count. By default this parameter is set to “0”.
Prune/AfterHours (default: 0)
This value removes old logfiles that are older than a given number of hours. Set this value to “0” to disable pruning by lifetime. By default this parameter is set to “0”.
Prune/TotalSizeMB (default: 0, unit: mb)
This value removes old logfiles upon rotate so that only the given number of MBs are used by logfiles. Logfiles are located by the name defined by “File” and are pruned by date (followed by name). Set this value to “0” to disable pruning by file size. By default this parameter is set to “0”.
Examples¶
This example will write the messages from all streams to /tmp/gollum.log after every 64 message or after 60sec:
fileOut:
Type: producer.File
Streams: "*"
File: /tmp/gollum.log
Batch:
MaxCount: 128
FlushCount: 64
TimeoutSec: 60
FlushTimeoutSec: 3
HTTPRequest¶
The HTTPRequest producer sends messages as HTTP requests to a given webserver.
In RawData mode, incoming messages are expected to contain complete HTTP requests in “wire format”, such as:
POST /foo/bar HTTP/1.0\n
Content-type: text/plain\n
Content-length: 24
\n
Dummy test\n
Request data\n
In this mode, the message’s contents is parsed as an HTTP request and sent to the destination server (virtually) unchanged. If the message cannot be parsed as an HTTP request, an error is logged. Only the scheme, host and port components of the “Address” URL are used; any path and query parameters are ignored. The “Encoding” parameter is ignored.
If RawData mode is off, a POST request is made to the destination server for each incoming message, using the complete URL in “Address”. The incoming message’s contents are delivered in the POST request’s body and Content-type is set to the value of “Encoding”
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
defines the URL to send http requests to. If the value doesn’t contain “://”, it is prepended with “http://”, so short forms like “localhost:8088” are accepted. The default value is “http://localhost:80”.
RawData (default: true)
Turns “RawData” mode on. See the description above.
Encoding (default: text/plain; charset=utf-8)
Defines the payload encoding when RawData is set to false.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
HttpOut01:
Type: producer.HTTPRequest
Streams: http_01
Address: "http://localhost:8099/test"
RawData: true
InfluxDB¶
This producer writes data to an influxDB endpoint. Data is not converted to the correct influxDB format automatically. Proper formatting might be required.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Version
Defines the InfluxDB protocol version to use. This can either be 80-89 for 0.8.x, 90 for 0.9.0 or 91-100 for 0.9.1 or later. Be default this parameter is set to 100.
Host
Defines the host (and port) of the InfluxDB master. Be default this parameter is set to “localhost:8086”.
User
Defines the InfluxDB username to use. If this is empty, credentials are not used. Be default this parameter is set to “”.
Password
Defines the InfluxDB password to use. Be default this parameter is set to “”.
Database
Sets the InfluxDB database to write to. Be default this parameter is set to “default”.
TimeBasedName
When set to true, the Database parameter is treated as a template for time.Format and the resulting string is used as the database name. You can e.g. use “default-2006-01-02” to switch databases each day. By default this parameter is set to “true”.
RetentionPolicy
Only available for Version 90. This setting defines the InfluxDB retention policy allowed with this protocol version. By default this parameter is set to “”.
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
metricsToInflux:
Type: producer.InfluxDB
Streams: metrics
Host: "influx01:8086"
Database: "metrics"
TimeBasedName: false
Batch:
MaxCount: 2000
FlushCount: 100
TimeoutSec: 5
Kafka¶
This producer writes messages to a kafka cluster. This producer is backed by the sarama library (https://github.com/Shopify/sarama) so most settings directly relate to the settings of that library.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Servers
Defines a list of ideally all brokers in the cluster. At least one broker is required. By default this parameter is set to an empty list.
Version
Defines the kafka protocol version to use. Common values are 0.8.2, 0.9.0 or 0.10.0. Values of the form “A.B” are allowed as well as “A.B.C” and “A.B.C.D”. If the version given is not known, the closest possible version is chosen. If GroupId is set to a value < “0.9”, “0.9.0.1” will be used. By default this parameter is set to “0.8.2”.
Topics
Defines a stream to topic mapping. If a stream is not mapped the stream name is used as topic. You can define the wildcard stream (*) here, too. If defined, all streams that do not have a specific mapping will go to this topic (including _GOLLUM_). By default this parameter is set to an empty list.
ClientId (default: gollum)
Sets the kafka client id used by this producer. By default this parameter is set to “gollum”.
Partitioner
Defines the distribution algorithm to use. Valid values are: Random, Roundrobin and Hash. By default this parameter is set to “Roundrobin”.
PartitionHasher
Defines the hash algorithm to use when Partitioner is set to “Hash”. Accepted values are “fnv1-a” and “murmur2”.
KeyFrom
Defines the metadata field that contains the string to be used as the key passed to kafka. When set to an empty string no key is used. By default this parameter is set to “”.
Compression
Defines the compression algorithm to use. Possible values are “none”, “zip” and “snappy”. By default this parameter is set to “none”.
RequiredAcks
Defines the numbers of acknowledgements required until a message is marked as “sent”. When set to -1 all replicas must acknowledge a message. By default this parameter is set to 1.
TimeoutMs
Denotes the maximum time the broker will wait for acks. This setting becomes active when RequiredAcks is set to wait for multiple commits. By default this parameter is set to 10000.
GracePeriodMs (default: 100, unit: ms)
Defines the number of milliseconds to wait for Sarama to accept a single message. After this period a message is sent to the fallback. This setting mitigates a conceptual problem in the saram API which can lead to long blocking times during startup. By default this parameter is set to 100.
MaxOpenRequests
Defines the maximum number of simultaneous connections opened to a single broker at a time. By default this parameter is set to 5.
ServerTimeoutSec
Defines the time after which a connection is set to timed out. By default this parameter is set to 30.
SendTimeoutMs
Defines the number of milliseconds to wait for a broker to before marking a message as timed out. By default this parameter is set to 250.
SendRetries
Defines how many times a message should be send again before a broker is marked as not reachable. Please note that this setting should never be 0. See https://github.com/Shopify/sarama/issues/294. By default this parameter is set to 1.
AllowNilValue (default: false)
When enabled messages containing an empty or nil payload will not be rejected. By default this parameter is set to false.
Batch/MinCount
Sets the minimum number of messages required to send a request. By default this parameter is set to 1.
Batch/MaxCount
Defines the maximum number of messages bufferd before a request is sent. A value of 0 will remove this limit. By default this parameter is set to 0.
Batch/MinSizeByte
Defines the minimum number of bytes to buffer before sending a request. By default this parameter is set to 8192.
Batch/SizeMaxKB
Defines the maximum allowed message size in KB. Messages bigger than this limit will be rejected. By default this parameter is set to 1024.
Batch/TimeoutMs
Defines the maximum time in milliseconds after which a new request will be sent, ignoring of Batch/MinCount and Batch/MinSizeByte By default this parameter is set to 3.
ElectRetries
Defines how many times a metadata request is to be retried during a leader election phase. By default this parameter is set to 3.
ElectTimeoutMs
Defines the number of milliseconds to wait for the cluster to elect a new leader. By default this parameter is set to 250.
MetadataRefreshMs
Defines the interval in milliseconds for refetching cluster metadata. By default this parameter is set to 600000.
TlsEnable
Enables TLS communication with brokers. By default this parameter is set to false.
TlsKeyLocation
Path to the client’s private key (PEM) used for TLS based authentication. By default this parameter is set to “”.
TlsCertificateLocation
Path to the client’s public key (PEM) used for TLS based authentication. By default this parameter is set to “”.
TlsCaLocation
Path to the CA certificate(s) used for verifying the broker’s key. By default this parameter is set to “”.
TlsServerName
Used to verify the hostname on the server’s certificate unless TlsInsecureSkipVerify is true. By default this parameter is set to “”.
TlsInsecureSkipVerify
Enables server certificate chain and host name verification. By default this parameter is set to false.
SaslEnable
Enables SASL based authentication. By default this parameter is set to false.
SaslUsername
Sets the user name used for SASL/PLAIN authentication. By default this parameter is set to “”.
SaslPassword
Sets the password used for SASL/PLAIN authentication. By default this parameter is set to “”. MessageBufferCount sets the internal channel size for the kafka client. By default this is set to 8192.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
kafkaWriter:
Type: producer.Kafka
Streams: logs
Compression: zip
Servers:
- "kafka01:9092"
- "kafka02:9092"
- "kafka03:9092"
- "kafka04:9092"
Null¶
This producer is meant to be used as a sink for data. It will throw away all messages without notice.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
TrashCan:
Type: producer.Null
Streams: trash
Proxy¶
This producer is a compatible with the Proxy consumer plugin. Responses to messages sent to the given address are sent back to the original consumer of it is a compatible message source. As with consumer.proxy the returned messages are partitioned by common message length algorithms.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
This value stores the identifier to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.Proxy”. By default this parameter is set to “:5880”.
ConnectionBufferSizeKB (default: 1024, unit: mb)
This value sets the connection buffer size in KB. This also defines the size of the buffer used by the message parser. By default this parameter is set to “1024”.
TimeoutSec (default: 1, unit: sec)
This value defines the maximum time in seconds a client is allowed to take for a response. By default this parameter is set to “1”.
Partitioner
This value defines the algorithm used to read messages from the stream. The messages will be sent as a whole, no cropping or removal will take place. By default this parameter is set to “delimiter”.
delimiter
separates messages by looking for a delimiter string. The delimiter is included into the left hand message.ascii
reads an ASCII encoded number at a given offset until a given delimiter is found.binary
reads a binary number at a given offset and sizebinary_le
is an alias for “binary”binary_be
is the same as “binary” but uses big endian encodingfixed
assumes fixed size messages
Delimiter
This value defines the delimiter used by the text and delimiter partitioner. By default this parameter is set to “n”.
Offset
This value defines the offset used by the binary and text partitioner. This setting is ignored by the fixed partitioner. By default this parameter is set to “0”.
Size
This value defines the size in bytes used by the binary or fixed partitioner. For binary this can be set to 1,2,4 or 8, for fixed this defines the size of a message. BY default this parameter is set to “4” for binary or “1” for fixed partitioner.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example will send 64bit length encoded data on TCP port 5880.
proxyOut:
Type: producer.Proxy
Address: ":5880"
Partitioner: binary
Size: 8
Redis¶
This producer sends messages to a redis server. Different redis storage types and database indexes are supported. This producer does not implement support for redis 3.0 cluster.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
Stores the identifier to connect to. This can either be any ip address and port like “localhost:6379” or a file like “unix:///var/redis.socket”. By default this is set to “:6379”.
Database (default: 0)
Defines the redis database to connect to.
Key
Defines the redis key to store the values in. This field is ignored when “KeyFormatter” is set. By default this is set to “default”.
Storage
Defines the type of the storage to use. Valid values are: “hash”, “list”, “set”, “sortedset”, “string”. By default this is set to “hash”.
KeyFrom
Defines the name of the metadata field used as a key for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.
FieldFrom
Defines the name of the metadata field used as a field for messages sent to redis. If the name is an empty string no key is sent. By default this value is set to an empty string.
Password
(no documentation available)
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Scribe¶
This producer allows sending messages to Facebook’s scribe service.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
Defines the host and port of a scrive endpoint. By default this parameter is set to “localhost:1463”.
ConnectionBufferSizeKB (default: 1024, unit: kb)
Sets the connection socket buffer size in KB. By default this parameter is set to 1024.
HeartBeatIntervalSec (default: 5, unit: sec)
Defines the interval in seconds used to query scribe for status updates. By default this parameter is set to 1.
WindowSize (default: 2048)
Defines the maximum number of messages send to scribe in one call. The WindowSize will reduce when scribe is returing “try later” to reduce load on the scribe server. It will slowly rise again for each successful write until WindowSize is reached again. By default this parameter is set to 2048.
ConnectionTimeoutSec (default: 5, unit: sec)
Defines the time in seconds after which a connection timeout is assumed. This can happen during writes or status reads. By default this parameter is set to 5.
Category
Maps a stream to a scribe category. You can define the wildcard stream (*) here, too. When set, all streams that do not have a specific mapping will go to this category (including reserved streams like _GOLLUM_). If no category mappings are set the stream name is used as category. By default this parameter is set to an empty list.
Parameters (from core.BatchedProducer)¶
Batch/MaxCount (default: 8192)
Defines the maximum number of messages per batch. If this limit is reached a flush is always triggered. By default this parameter is set to 8192.
Batch/FlushCount (default: 4096)
Defines the minimum number of messages required to flush a batch. If this limit is reached a flush might be triggered. By default this parameter is set to 4096.
Batch/TimeoutSec (default: 5, unit: sec)
Defines the maximum time in seconds messages can stay in the internal buffer before being flushed. By default this parameter is set to 5.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
logs:
Type: producer.Scribe"
Stream: ["*", "_GOLLUM"]
Address: "scribe01:1463"
HeartBeatIntervalSec: 10
Category:
"access" : "accesslogs"
"error" : "errorlogs"
"_GOLLUM_" : "gollumlogs"
Socket¶
The socket producer connects to a service over TCP, UDP or a UNIX domain socket.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address
Defines the address to connect to. This can either be any ip address and port like “localhost:5880” or a file like “unix:///var/gollum.socket”. By default this parameter is set to “:5880”.
ConnectionBufferSizeKB (default: 1024, unit: kb)
This value sets the connection buffer size in KB. By default this parameter is set to “1024”.
Batch/MaxCount (default: 8192)
This value defines the maximum number of messages that can be buffered before a flush is mandatory. If the buffer is full and a flush is still underway or cannot be triggered out of other reasons, the producer will block. By default this parameter is set to “8192”.
Batch/FlushCount (default: 4096)
This value defines the number of messages to be buffered before they are written to disk. This setting is clamped to BatchMaxCount. By default this parameter is set to “Batch/MaxCount / 2”.
Batch/TimeoutSec (default: 5, unit: sec)
This value defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to “5”.
Acknowledge
This value can be set to a non-empty value to expect the given string as a response from the server after a batch has been sent. If Acknowledge is enabled and a IP-Address is given to Address, TCP is used to open the connection, otherwise UDP is used. By default this parameter is set to “”.
AckTimeoutMs (default: 2000, unit: ms)
This value defines the time in milliseconds to wait for a response from the server. After this timeout the send is marked as failed. By default this parameter is set to “2000”.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example starts a socket producer on localhost port 5880:
SocketOut:
Type: producer.Socket
Address: ":5880"
Batch
MaxCount: 1024
FlushCount: 512
TimeoutSec: 3
AckTimeoutMs: 1000
Spooling¶
This producer is meant to be used as a fallback if another producer fails to send messages, e.g. because a service is down. It does not really produce messages to some other service, it buffers them on disk for a certain time and inserts them back to the system after this period.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Path (default: /var/run/gollum/spooling)
Sets the output directory for spooling files. Spooling files will be stored as “<path>/<stream name>/<number>.spl”. By default this parameter is set to “/var/run/gollum/spooling”.
MaxFileSizeMB (default: 512, unit: mb)
Sets the size limit in MB that causes a spool file rotation. Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 512.
MaxFileAgeMin (default: 1, unit: min)
Defines the duration in minutes after which a spool file rotation is triggered (regardless of MaxFileSizeMB). Reading messages back into the system will start only after a file is rotated. By default this parameter is set to 1.
MaxMessagesSec
Sets the maximum number of messages that will be respooled per second. Setting this value to 0 will cause respooling to send as fast as possible. By default this parameter is set to 100.
RespoolDelaySec (default: 10, unit: sec)
Defines the number of seconds to wait before trying to load existing spool files from disk after a restart. This setting can be used to define a safe timeframe for gollum to set up all required connections and resources before putting additionl load on it. By default this parameter is set to 10.
RevertStreamOnFallback (default: false)
This allows the spooling fallback to handle the messages that would have been sent back by the spooler if it would have handled the message. When set to true it will revert the stream of the message to the previous stream ID before sending it to the Fallback stream. By default this parameter is set to false.
BufferSizeByte (default: 8192)
Defines the initial size of the buffer that is used to read messages from a spool file. If a message is larger than this size, the buffer will be resized. By default this parameter is set to 8192.
Batch/MaxCount (default: 100)
defines the maximum number of messages stored in memory before a write to file is triggered. By default this parameter is set to 100.
Batch/TimeoutSec (default: 5, unit: sec)
defines the maximum number of seconds to wait after the last message arrived before a batch is flushed automatically. By default this parameter is set to 5.
Parameters (from components.RotateConfig)¶
Rotation/Enable (default: false)
If this value is set to “true” the logs will rotate after reaching certain thresholds. By default this parameter is set to “false”.
Rotation/TimeoutMin (default: 1440, unit: min)
This value defines a timeout in minutes that will cause the logs to rotate. Can be set in parallel with RotateSizeMB. By default this parameter is set to “1440”.
Rotation/SizeMB (default: 1024, unit: mb)
This value defines the maximum file size in MB that triggers a file rotate. Files can get bigger than this size. By default this parameter is set to “1024”.
Rotation/Timestamp (default: 2006-01-02_15)
This value sets the timestamp added to the filename when file rotation is enabled. The format is based on Go’s time.Format function. By default this parameter is to to “2006-01-02_15”.
Rotation/ZeroPadding (default: 0)
This value sets the number of leading zeros when rotating files with an existing name. Setting this setting to 0 won’t add zeros, every other number defines the number of leading zeros to be used. By default this parameter is set to “0”.
Rotation/Compress (default: false)
This value defines if a rotated logfile is to be gzip compressed or not. By default this parameter is set to “false”.
Rotation/At
This value defines a specific time for rotation in hh:mm format. By default this parameter is set to “”.
Rotation/AtHour (default: -1)
(no documentation available)
Rotation/AtMin (default: -1)
(no documentation available)
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example will collect messages from the fallback stream and buffer them for 10 minutes. After 10 minutes the first messages will be written back to the system as fast as possible.
spooling:
Type: producer.Spooling
Stream: fallback
MaxMessagesSec: 0
MaxFileAgeMin: 10
StatsdMetrics¶
This producer samples the messages it receives and sends metrics about them to statsd.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Server
Defines the server and port to send statsd metrics to. By default this parameter is set to “localhost:8125”.
Prefix
Defines a string that is prepended to every statsd metric name. By default this parameter is set to “gollum.”.
StreamMapping
Defines a translation from gollum stream to statsd metric name. If no mapping is given the gollum stream name is used as the metric name. By default this parameter is set to an empty list.
UseMessage (default: false)
Switch between just counting all messages arriving at this producer or summing up the message content. If UseMessage is set to true, the contents will be parsed as an integer, i.e. a string containing a human readable number is expected. By default the parameter is set to false.
UseGauge (default: false)
When set to true the statsd data format will switch from counter to gauge. Every stream that does not receive any message but is liste in StreamMapping will have a gauge value of 0. By default this is parameter is set to false.
Batch/MaxMessages
Defines the maximum number of messages to collect per batch. By default this parameter is set to 500.
Batch/TimeoutSec (default: 10, unit: sec)
Defines the number of seconds after which a batch is processed, regardless of MaxMessages being reached or not. By default this parameter is set to 10.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example will collect all messages going through gollum and sending metrics about the different datastreams to statsd at least every 5 seconds. Metrics will be send as “logs.streamName”.
metricsCollector:
Type: producer.StatsdMetrics
Stream: "*"
Server: "stats01:8125"
BatchTimeoutSec: 5
Prefix: "logs."
UseGauge: true
Websocket¶
The websocket producer opens up a websocket.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Address (default: :81)
This value defines the host and port to bind to. This is allowed be any ip address/dns and port like “localhost:5880”. By default this parameter is set to “:81”.
Path (default: /)
This value defines the url path to listen for. By default this parameter is set to “/”
ReadTimeoutSec (default: 3, unit: sec)
This value specifies the maximum duration in seconds before timing out read of the request. By default this parameter is set to “3” seconds.
IgnoreOrigin (default: false)
Ignore origin check from websocket server. By default this parameter is set to “false”.
Parameters (from core.BufferedProducer)¶
Channel
This value defines the capacity of the message buffer. By default this parameter is set to “8192”.
ChannelTimeoutMs (default: 0, unit: ms)
This value defines a timeout for each message before the message will discarded. To disable the timeout, set this parameter to 0. By default this parameter is set to “0”.
Parameters (from core.SimpleProducer)¶
Streams
Defines a list of streams the producer will receive from. This parameter is mandatory. Specifying “*” causes the producer to receive messages from all streams except internal internal ones (e.g. _GOLLUM_). By default this parameter is set to an empty list.
FallbackStream
Defines a stream to route messages to if delivery fails. The message is reset to its original state before being routed, i.e. all modifications done to the message after leaving the consumer are removed. Setting this paramater to “” will cause messages to be discared when delivery fails.
ShutdownTimeoutMs (default: 1000, unit: ms)
Defines the maximum time in milliseconds a producer is allowed to take to shut down. After this timeout the producer is always considered to have shut down. Decreasing this value may lead to lost messages during shutdown. Raising it may increase shutdown time.
Modulators
Defines a list of modulators to be applied to a message when it arrives at this producer. If a modulator changes the stream of a message the message is NOT routed to this stream anymore. By default this parameter is set to an empty list.
Examples¶
This example starts a default Websocket producer on port 8080:
WebsocketOut:
Type: producer.Websocket
Address: ":8080"
Routers¶
Routers manage the transfer of messages between consumers and producers by streams. Routers can act as a kind of proxy that may filter and define the distribution algorithm of messages.
The stream names can be referred to by cleartext names. This stream names are free to choose but there are several reserved names for internal or special purpose:
_GOLLUM_: | is used for internal log messages |
---|---|
*: | is a placeholder for “all routers but the internal routers”. In some cases “*” means “all routers” without exceptions. This is denoted in the corresponding documentations whenever this is the case. |
Basics router setups:

List of available Router:
Broadcast¶
This router implements the default behavior of routing all messages to all producers registered to the configured stream.
Parameters (from core.SimpleRouter)¶
Stream
This value specifies the name of the stream this plugin is supposed to read messages from.
Filters
This value defines an optional list of Filter plugins to connect to this router.
TimeoutMs (default: 0, unit: ms)
This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples¶
rateLimiter:
Type: router.Broadcast
Stream: errorlogs
Filters:
- filter.Rate:
MessagesPerSec: 200
Distribute¶
The “Distribute” plugin provides 1:n stream remapping by duplicating messages.
During startup, it creates a set of streams with names listed in [TargetStreams]. During execution, it consumes messages from the stream [Stream] and enqueues copies of these messages onto each of the streams listed in [TargetStreams].
When routing to multiple routers, the incoming stream has to be listed explicitly to be used.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
TargetStreams
List of streams to route the incoming messages to.
Parameters (from core.SimpleRouter)¶
Stream
This value specifies the name of the stream this plugin is supposed to read messages from.
Filters
This value defines an optional list of Filter plugins to connect to this router.
TimeoutMs (default: 0, unit: ms)
This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples¶
This example route incoming messages from streamA to streamB and streamC (duplication):
JunkRouterDist:
Type: router.Distribute
Stream: streamA
TargetStreams:
- streamB
- streamC
Metadata¶
This router routes the message to a stream given in a specified metadata field. If the field is not set, the message will be passed along.
Parameters¶
Enable (default: true)
Switches this plugin on or off.
Key (default: Stream)
The metadata field to read from. By default this parameter is set to “Stream”
Parameters (from core.SimpleRouter)¶
Stream
This value specifies the name of the stream this plugin is supposed to read messages from.
Filters
This value defines an optional list of Filter plugins to connect to this router.
TimeoutMs (default: 0, unit: ms)
This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples¶
switchRoute:
Type: router.Metadata
Stream: errorlogs
Key: key
Random¶
The “Random” router relays each message sent to the stream [Stream] to exactly one of the producers connected to [Stream]. The receiving producer is chosen randomly for each message.
Parameters (from core.SimpleRouter)¶
Stream
This value specifies the name of the stream this plugin is supposed to read messages from.
Filters
This value defines an optional list of Filter plugins to connect to this router.
TimeoutMs (default: 0, unit: ms)
This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples¶
This example will randomly send messages to one of the two console producers.
randomRouter:
Type: router.Random
Stream: randomStream
JunkPrinter00:
Type: producer.Console
Streams: randomStream
Modulators:
- format.Envelope:
Prefix: "[junk_00] "
JunkPrinter01:
Type: producer.Console
Streams: randomStream
Modulators:
- format.Envelope:
Prefix: "[junk_01] "
RoundRobin¶
This router implements round robin routing. Messages are routed to exactly one of the producers registered to the given stream. The producer is switched in a round robin fashin after each message. This producer can be useful for load balancing, e.g. when the target service does not support sharding by itself.
Parameters (from core.SimpleRouter)¶
Stream
This value specifies the name of the stream this plugin is supposed to read messages from.
Filters
This value defines an optional list of Filter plugins to connect to this router.
TimeoutMs (default: 0, unit: ms)
This value sets a timeout in milliseconds until a message should handled by the router. You can disable this behavior by setting it to “0”. By default this parameter is set to “0”.
Examples¶
This example will send message to the two console producers in an alternating fashin.
loadBalancer:
Type: router.RoundRobin
Stream: logs
JunkPrinter00:
Type: producer.Console
Streams: randomStream
Modulators:
- format.Envelope:
Prefix: "[junk_00] "
JunkPrinter01:
Type: producer.Console
Streams: randomStream
Modulators:
- format.Envelope:
Prefix: "[junk_01] "
Filters¶
Filters are plugins that are embedded into router plugins. Filters can analyze messages and decide wether to let them pass to a producer. or to block them.
Any¶
This plugin takes a list of filters and applies each of them to incoming messages until a an accepting filter is found. If any of the listed filters accept the message, it is passed through, otherwise, the message is dropper.
Parameters¶
AnyFilters
Defines a list of filters that should be checked before filtering a message. Filters are checked in order, and if the message passes then no further filters are checked.
Parameters (from core.SimpleFilter)¶
FilteredStream
This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples¶
This example will accept valid json or messages from “exceptionStream”:
ExampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- filter.Any:
AnyFilters:
- filter.JSON
- filter.Stream:
Only: exceptionStream
None¶
This filter blocks all messages.
Parameters (from core.SimpleFilter)¶
FilteredStream
This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples¶
This example starts a Console consumer and blocks all incoming messages:
exampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- filter.None
Rate¶
This plugin blocks messages after a certain number of messages per second has been reached.
Parameters¶
MessagesPerSec (default: 100)
This value defines the maximum number of messages per second allowed to pass through this filter. By default this parameter is set to “100”.
Ignore
Defines a list of streams that should not be affected by rate limiting. This is useful for e.g. producers listeing to “*”. By default this parameter is set to “empty”.
Parameters (from core.SimpleFilter)¶
FilteredStream
This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples¶
This example accept ~10 messages in a second except the “noLimit” stream:
ExampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- filter.Rate:
MessagesPerSec: 10
Ignore:
- noLimit
RegExp¶
This filter rejects or accepts messages based on regular expressions.
Parameters¶
Expression
Messages matching this expression are passed on. This parameter is ignored when set to “”. Expression is checked after ExpressionNot. By default this parameter is set to “”.
ExpressionNot
Messages not matching this expression are passed on. This parameter is ignored when set to “”. ExpressionNot is checked before Expression. By default this parameter is set to “”.
ApplyTo
Defines which part of the message the filter is applied to. When set to “”, this filter is applied to the message’s payload. All other values denotes a metadata key. By default this parameter is set to “”.
Parameters (from core.SimpleFilter)¶
FilteredStream
This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples¶
This example accepts only accesslog entries with a return status of 2xx or 3xx not originated from staging systems.
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- filter.RegExp:
ExpressionNot: " stage\\."
Expression: "HTTP/1\\.1\\\" [23]\\d\\d"
Sample¶
This plugin can be used to get n out of m messages (downsample). This allows you to reduce the amount of messages; the plugin starts blocking after a certain number of messages has been reached.
Parameters¶
SampleRatePerGroup (default: 1)
This value defines how many messages are passed through the filter in each group. By default this parameter is set to “1”.
SampleGroupSize (default: 2)
This value defines how many messages make up a group. Messages over SampleRatePerGroup within a group are filtered. By default this parameter is set to “2”.
SampleRateIgnore
This value defines a list of streams that should not be affected by sampling. This is useful for e.g. producers listening to “*”. By default this parameter is set to an empty list.
Examples¶
This example will block 8 from 10 messages:
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- filter.Sample:
SampleRatePerGroup: 2
SampleGroupSize: 10
SampleIgnore:
- foo
- bar
Stream¶
The “Stream” filter filters messages by applying black and white lists to the the messages’ streams’ names.
The blacklist is applied first; messages not rejected by the blacklist are checked against the whitelist. An empty white list matches all streams.
Parameters¶
Block
Defines a list of stream names that are blocked. If a message’s stream is not in that list, the “Only” list is tested. By default this parameter is empty.
Only
Defines a list of streams that may pass. Messages from streams that are not in this list are blocked unless the list is empty. By default this parameter is empty.
Parameters (from core.SimpleFilter)¶
FilteredStream
This value defines the stream filtered messages get sent to. You can disable this behavior by setting the value to “”. By default this parameter is set to “”.
Examples¶
This example accepts ALL messages except ones from stream “foo”:
ExampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- filter.Stream:
Block:
- foo
This example only accepts messages from stream “foo”:
ExampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- filter.Stream:
Only:
- foo
Formatters¶
Formatters are plugins that are embedded into routers or producers. Formatters can convert messages into another format or append additional information.
Agent¶
This formatter parses a user agent string and outputs it as metadata fields to the set target.
Parameters¶
Fields
An array of the fields to extract from the user agent. Available fields are: “mozilla”, “platform”, “os”, “localization”, “engine”, “engine-version”, “browser”, “browser-version”, “bot”, “mobile”. By default this is set to [“platform”,”os”,”localization”,”browser”].
Prefix
Defines a prefix for each of the keys generated. By default this is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.Agent
Source: user_agent
Aggregate¶
Aggregate is a formatter which can group up further formatter. The Source setting will be passed on to all child formatters, overwriting any source value there (if set). This plugin could be useful to setup complex configs with metadata handling in more readable format.
Parameters¶
Source
This value chooses the part of the message that should be formatted. Use “” to use the message payload; other values specify the name of a metadata field to use. This values is forced to be used by all child modulators. By default this parameter is set to “”.
Modulators
Defines a list of child modulators to be applied to a message when it arrives at this formatter. Please note that everything is still one message. I.e. applying filters twice might not make sense.
Parameters (from core.SimpleFormatter)¶
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example show a useful case for format.Aggregate plugin:
exampleConsumerA:
Type: consumer.Console
Streams: "foo"
Modulators:
- format.Aggregate:
Target: bar
Modulators:
- format.Copy
- format.Envelope:
Postfix: "\n"
- format.Aggregate:
Target: foo
Modulators:
- format.Copy
- format.Base64Encode
- format.Double
- format.Envelope:
Postfix: "\n"
# same config as
exampleConsumerB:
Type: consumer.Console
Streams: "bar"
Modulators:
- format.Copy:
Target: bar
- format.Envelope:
Target: bar
Postfix: "\n"
- format.Copy:
Target: foo
- format.Base64Encode:
Target: foo
- format.Double:
Target: foo
- format.Envelope:
Postfix: "\n"
Target: foo
Base64Decode¶
Base64Decode is a formatter that decodes base64 encoded messages. If a message is not or only partly base64 encoded an error will be logged and the decoded part is returned. RFC 4648 is expected.
Parameters¶
Base64Dictionary
This value defines the 64-character base64 lookup dictionary to use. When left empty, a dictionary as defined by RFC4648 is used. By default this parameter is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example expects base64 strings from the console and decodes them before transmitting the message payload.
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Base64Decode
Base64Encode¶
Base64Encode is a formatter that decodes Base64 encoded strings. Custom dictionaries are supported, by default RFC 4648 standard encoding is used.
Parameters¶
Base64Dictionary
Defines the 64-character base64 lookup dictionary to use. When left empty a RFC 4648 standard encoding is used. By default this parameter is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example uses RFC 4648 URL encoding to format incoming data.
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- formatter.Base64Encode
Dictionary: "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"
Cast¶
This formatter casts a given metadata filed into another type.
- AsType: The type to cast to. Can be either string, bytes, float or int.
By default this parameter is set to “string”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example casts the key “bar” to string.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.Cast
ApplyTo: bar
ToType: "string"
ConvertTime¶
This formatter converts one time format in another.
- From: When left empty, a unix time is expected. Otherwise a go compatible
timestamp has to be given. See https://golang.org/pkg/time/#pkg-constants By default this is set to “”.
- To: When left empty, the output will be unixtime. Otherwise a go compatible
timestamp has to be given. See https://golang.org/pkg/time/#pkg-constants By default this is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example removes the “pipe” key from the metadata produced by consumer.Console.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.ConvertTime:
FromFormat: ""
ToFormat: ""
Copy¶
This formatter sets metadata fields by copying data from the message’s payload or from other metadata fields.
Parameters¶
Source
Defines the key to copy, i.e. the “source” of a copy operation. Target will define the target of the copy, i.e. the “destination”. An empty string will use the message payload as source. By default this parameter is set to an empty string (i.e. payload).
Mode
Defines the copy mode to use. This can be one of “append”, “prepend” or “replace”. By default this parameter is set to “replace”.
Separator
When using mode prepend or append, defines the characters inserted between source and destination. By default this parameter is set to an empty string.
Parameters (from core.SimpleFormatter)¶
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example copies the payload to the field key and applies a hash on it contain a hash over the complete payload.
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Copy:
Target: key
- formatter.Identifier
Generator: hash
Target: key
Delete¶
This formatter erases the message payload or deletes a metadata key.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example removes the “pipe” key from the metadata produced by consumer.Console.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.Delete
Target: pipe
Double¶
Double is a formatter that duplicates a message and applies two different sets of formatters to both sides. After both messages have been processed, the value of the field defined as “source” by the double formatter will be copied from both copies and merged into the “target” field of the original message using a given separator.
Parameters¶
Separator (default: :)
This value sets the separator string placed between both parts. This parameter is set to “:” by default.
UseLeftStreamID (default: false)
When set to “true”, use the stream id of the left side (after formatting) as the streamID for the resulting message. This parameter is set to “false” by default.
Left
An optional list of formatters. The first copy of the message (left of the delimiter) is passed through these filters. This parameter is set to an empty list by default.
Right
An optional list of formatters. The second copy of the mssage (right of the delimiter) is passed through these filters. This parameter is set to an empty list by default.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example creates a message of the form “<orig>|<hash>”, where <orig> is the original console input and <hash> its hash.
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Double:
Separator: "|"
Right:
- format.Identifier:
Generator: hash
Envelope¶
This formatter adds content to the beginning and/or end of a message.
Parameters¶
Prefix
Defines a string that is added to the front of the message. Special characters like n r or t can be used without additional escaping. By default this parameter is set to “”.
Postfix (default: n)
Defines a string that is added to the end of the message. Special characters like n r or t can be used without additional escaping. By default this parameter is set to “n”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example adds a line number and a newline character to each message printed to the console.
exampleProducer:
Type: producer.Console
Streams: "*"
Modulators:
- format.Sequence
- format.Envelope
Flatten¶
This formatter takes a metadata tree and moves all subkeys on the same level as the root of the tree. Fields will be named according to their hierarchy but joining all keys in the path with a given separator.
Parameters¶
Separator (default: .)
Defines the separator used when joining keys. By default this parameter is set to “.”
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This will flatten all elements below the key “tree” on the root level. A key /tree/a/b will become /tree.a.b
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- format.Flatten:
Source: tree
GeoIP¶
This formatter parses an IP and outputs it’s geo information as metadata fields to the set target.
Parameters¶
GeoIPFile
Defines a GeoIP file to load this setting is mandatory. Files can be found e.g. at http://dev.maxmind.com/geoip/geoip2/geolite2/. By default this parameter is set to “”.
Fields
An array of the fields to extract from the GeoIP. Available fields are: “city”, “country-code”, “country”, “continent-code”, “continent”, “timezone”, “proxy”, “satellite”, “location”, “location-hash” By default this is set to [“city”,”country”,”continent”,”location-hash”].
Prefix
Defines a prefix for each of the keys generated. By default this is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.GeoIP
Source: client-ip
Grok¶
Grok is a formatter that applies regex filters to messages and stores the result as metadata fields. If the target key is not existing it will be created. If the target key is existing but not a map, it will be replaced. It works by combining text patterns into something that matches your logs. See https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html#_grok_basics for more information about Grok.
Parameters¶
RemoveEmptyValues
When set to true, empty captures will not be returned. By default this parameter is set to “true”.
NamedCapturesOnly
When set to true, only named captures will be returned. By default this parameter is set to “true”.
SkipDefaultPatterns
When set to true, standard grok patterns will not be included in the list of patterns. By default this parameter is set to “true”.
Patterns
A list of grok patterns that will be applied to messages. The first matching pattern will be used to parse the message.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example transforms unstructured input into a structured json output. Input:
us-west.servicename.webserver0.this.is.the.measurement 12.0 1497003802
Output:
{
"datacenter": "us-west",
"service": "servicename",
"host": "webserver0",
"measurement": "this.is.the.measurement",
"value": "12.0",
"time": "1497003802"
}
Config:
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Grok:
Patterns:
- ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.gauge-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_gauge:float}\s*%{INT:time}
- ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.latency-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_latency:float}\s*%{INT:time}
- ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.statsd\.derive-(?P<application>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value_derive:float}\s*%{INT:time}
- ^(?P<datacenter>[^\.]+?)\.(?P<service>[^\.]+?)\.(?P<host>[^\.]+?)\.(?P<measurement>[^\s]+?)\s%{NUMBER:value:float}\s*%{INT:time}
- format.ToJSON: {}
Hostname¶
This formatter prefixes the message or metadata with the hostname of the machine gollum is running on.
Parameters¶
Separator (default: :)
Defines the separator string placed between hostname and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example inserts the hostname into an existing JSON payload.
exampleProducer:
Type: producer.Console
Streams: "*"
Modulators:
- format.Trim:
LeftSeparator: "{"
RightSeparator: "}"
- format.Hostname
Separator: ","
- format.Envelope:
Prefix: "{\"host\":"
Postfix: "}"
Identifier¶
This formatter generates a (mostly) unique 64 bit identifier number from the message payload, timestamp and/or sequence number. The number is be converted to a human readable form.
Parameters¶
Generator
Defines which algorithm to use when generating the identifier. This my be one of the following values. By default this parameter is set to “time”
hash
The message payload will be hashed using fnv1a and returned as hex.time
The id will be formatted YYMMDDHHmmSSxxxxxxx where x denotes the current sequence number modulo 10000000. I.e. 10.000.000 messages per second are possible before a collision occurs.seq
The sequence number will be used.seqhex
The hex encoded sequence number will be used.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will generate a payload checksum and store it to a metadata field called “checksum”.
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- formatter.Identifier
Generator: hash
Target: checksum
JSON¶
This formatter parses json data into metadata.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example parses the payload as JSON and stores it below the key “data”.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.JSON
Target: data
Move¶
This formatter moves data from one location to another. When targeting a metadata key, the target key will be created or overwritten. When the source is the payload, it will be cleared.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example moves the payload produced by consumer.Console to the metadata key data.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.Move
Target: data
Override¶
This formatter sets a given value to a metadata field or payload.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example sets the value “foo” on the key “bar”.
exampleConsumer:
Type: consumer.Console
Streams: stdin
Modulators:
- format.Override
Target: bar
Value: "foo"
RegExp¶
This formatter parses a message using a regular expression, performs string (template) replacement and returns the result.
Parameters¶
Posix
Set to true to compile the regular expression using posix semantics. By default this parameter is set to true.
Expression
Defines the regular expression used for parsing. For details on the regexp syntax see https://golang.org/pkg/regexp/syntax. By default this parameter is set to “(.*)”
Template (default: ${1})
Defines the result string. Regexp matching groups can be referred to using “${n}”, with n being the group’s index. For other possible reference semantics, see https://golang.org/pkg/regexp/#Regexp.Expand. By default this parameter is set to “${1}”
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example extracts time and host from an imaginary log message format.
exampleConsumer:
Type: consumer.Console
Streams: stding
Modulators:
- format.RegExp:
Expression: "^(\\d+) (\\w+): "
Template: "time: ${1}, host: ${2}"
Replace¶
This formatter replaces all occurrences in a string with another.
Parameters¶
Search
Defines the string to search for. When left empty, the target will be completely replaced by ReplaceWith. By default this is set to “”.
ReplaceWith
Defines the string to replace all occurences of “search” with. By default this is set to “”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- format.Replace:
Search: "foo"
ReplaceWith: "bar"
Runlength¶
Runlength is a formatter that prepends the length of the message, followed by a “:”. The actual message is formatted by a nested formatter.
Parameters¶
Separator (default: :)
This value is used as separator. By default this parameter is set to “:”.
StoreRunlengthOnly (default: false)
If this value is set to “true” only the runlength will stored. This option is useful to e.g. create metadata fields only containing the length of the payload. When set to “true” the Separator parameter will be ignored. By default this parameter is set to false.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will store the length of the payload in a separate metadata field.
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.MetadataCopy:
CopyToKeys: ["length"]
- format.Runlength:
Target: length
StoreRunlengthOnly: true
Sequence¶
This formatter prefixes data with a sequence number managed by the formatter. All messages passing through an instance of the formatter will get a unique number. The number is not persisted, i.e. it restarts at 0 after each restart of gollum.
Parameters¶
Separator (default: :)
Defines the separator string placed between number and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will insert the sequence number into an existing JSON payload.
exampleProducer:
Type: producer.Console
Streams: "*"
Modulators:
- format.Trim:
LeftSeparator: "{"
RightSeparator: "}"
- format.Sequence
Separator: ","
- format.Envelope:
Prefix: "{\"seq\":"
Postfix: "}"
Split¶
This formatter splits data into an array by using the given delimiter and stores it at the metadata key denoted by target. Targeting the payload (by not given a target or passing an empty string) will result in an error.
Parameters¶
Delimiter (default: ,)
Defines the delimiter to use when splitting the data. By default this parameter is set to “,”
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- format.Split:
Target: values
Delimiter: ":"
SplitPick¶
This formatter splits data into an array by using the given delimiter and extracts the given index from that array. The value of that index will be written back.
Parameters¶
Delimiter (default: ,)
Defines the delimiter to use when splitting the data. By default this parameter is set to “,”
Index (default: 0)
Defines the index to pick. By default this parameter is set to 0.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- format.SplitPick:
Index: 2
Delimiter: ","
SplitToFields¶
This formatter splits data into an array by using the given delimiter and stores it at the metadata key denoted by Fields.
Parameters¶
Delimiter (default: ,)
Defines the delimiter to use when splitting the data. By default this parameter is set to “,”
Fields
Defines a index-to-key mapping for storing the resulting list into Metadata. If there are less entries in the resulting array than fields, the remaining fields will not be set. If there are more entries, the additional indexes will not be handled. By default this parameter is set to an empty list.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will split the payload by “:” and writes up to three elements as keys “first”, “second” and “third” as fields below the field “values”.
ExampleProducer:
Type: proucer.Console
Streams: console
Modulators:
- format.SplitToFields:
Target: values
Delimiter: ":"
Fields: [first,second,third]
StreamName¶
This formatter prefixes data with the name of the current or previous stream.
Parameters¶
UsePrevious
Set to true to use the name of the previous stream. By default this parameter is set to false.
Separator (default: :)
Defines the separator string used between stream name and data. By default this parameter is set to “:”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example prefixes the message with the most recent routing history.
exampleProducer:
Type: producer.Console
Streams: "*"
Modulators:
- format.StreamName:
Separator: ", "
UsePrevious: true
- format.StreamName:
Separator: ": "
StreamRevert¶
This formatter gets the previously used stream from a message and sets it as the new target stream.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
ExampleConsumer:
Type: consumer.Console
Streams: console
Modulators:
- format.StreamRevert
StreamRoute¶
StreamRoute is a formatter that modifies a message’s stream by reading a prefix from the message’s data (and discarding it). The prefix is defined as everything before a given delimiter in the message. If no delimiter is found or the prefix is empty the message stream is not changed.
Parameters¶
Delimiter (default: :)
This value defines the delimiter to search when extracting the stream name. By default this parameter is set to “:”.
StreamModulator
A list of further modulators to format and filter the extracted stream name. By default this parameter is “empty”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example sets the stream name for messages like <error>:a message string to error and a message string as payload:
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.StreamRoute:
Delimiter: ":"
StreamModulator:
- format.Trim:
LeftSeparator: <
RightSeparator: >
Template¶
This formatter allows to apply go templating to a message based on the currently set metadata. The template language is described in the go documentation: https://golang.org/pkg/text/template/#hdr-Actions
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example writes the fields “Name” and “Surname” from metadata as the new payload.
exampleProducer:
Type: proucer.Console
Streams: "*"
Modulators:
- format.Template:
Template: "{{.Name}} {{.Surname}}"
Timestamp¶
Timestamp is a formatter that allows prefixing messages with a timestamp (time of arrival at gollum). The timestamp format is freely configurable and can e.g. contain a delimiter sequence at the end.
Parameters¶
Timestamp (default: 2006-01-02 15:04:05 MST | )
This value defines a Go time format string that is used to f ormat the timestamp. By default this parameter is set to “2006-01-02 15:04:05 MST | “.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will set a time string to the meta data field time:
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Timestamp:
Timestamp: "2006-01-02T15:04:05.000 MST"
Target: time
ToCSV¶
ToCSV converts a set of metadata keys to CSV and applies it to Target.
Parameters¶
Keys
List of strings specifying the keys to write as CSV. Note that these keys can be paths. By default this parameter is set to an empty list.
Separator (default: ,)
The delimited string to insert between each value in the generated string. By default this parameter is set to “,”.
KeepLastSeparator
When set to true, the last separator will not be removed. By default this parameter is set to false.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example get sthe foo and bar keys from the metdata of a message and set this as the new payload.
exampleProducer:
Type: producer.Console
Streams: "*"
Modulators:
- format.ToCSV:
Separator: ';'
Keys:
- 'foo'
- 'bar'
ToJSON¶
This formatter converts metadata to JSON and stores it where applied.
Parameters¶
Root
The metadata key to transform to json. When left empty, all metadata is assumed. By default this is set to ‘’.
Ignore
A list of keys or paths to exclude from marshalling. please note that this is currently a quite expensive operation as all metadata below root is cloned during the process. By default this is set to an empty list.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example transforms all metadata below the “foo” key to JSON and stores the result as the new payload.
exampleProducer:
Type: consumer.Producer
Streams: stdin
Modulators:
- format.ToJSON
Root: "foo"
Trim¶
Trim removes a set of characters from the beginning and end of a metadata value or the payload.
Parameters¶
Characters (default: trnvf)
This value defines which characters should be removed from both ends of the data. The data to operate on is expected to be a string. By default this is set to ” trnvf”.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will trim spaces from the message payload:
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.Trim: {}
TrimToBounds¶
This formatter searches for separator strings and removes all data left or right of this separator.
Parameters¶
LeftBounds
The string to search for. Searching starts from the left side of the data. If an empty string is given this parameter is ignored. By default this parameter is set to “”.
RightBounds
The string to search for. Searching starts from the right side of the data. If an empty string is given this parameter is ignored. By default this parameter is set to “”.
LeftOffset (default: 0)
Defines the search start index when using LeftBounds. By default this parameter is set to 0.
RightOffset (default: 0)
Defines the search start index when using RightBounds. Counting starts from the right side of the message. By default this parameter is set to 0.
Parameters (from core.SimpleFormatter)¶
Source
This value chooses the part of the message the data to be formatted should be read from. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
Target
This value chooses the part of the message the formatted data should be stored to. Use “” to target the message payload; other values specify the name of a metadata field to target. By default this parameter is set to “”.
ApplyTo
Use this to set Source and Target to the same value. This setting will be ignored if either Source or Target is set to something else but “”. By default this parameter is set to “”.
SkipIfEmpty
When set to true, this formatter will not be applied to data that is empty or - in case of metadata - not existing. By default this parameter is set to false
Examples¶
This example will reduce data like “foo[bar[foo]bar]foo” to “bar[foo]bar”.
exampleConsumer:
Type: consumer.Console
Streams: "*"
Modulators:
- format.TrimToBounds:
LeftBounds: "["
RightBounds: "]"
Aggregate plugins¶
To simplify complex pipeline configs you are able to aggregate plugin configurations. That means that all settings witch are defined in the aggregation scope will injected to each defined “sub-plugin”.
To define an aggregation use the keyword Aggregate as plugin type.
Examples¶
In this example both consumers get the streams and modulator injected from the aggregation settings:
AggregatePipeline:
Type: Aggregate
Streams: console
Modulators:
- format.Envelope:
Postfix: "\n"
Plugins:
consumerFoo:
Type: consumer.File
File: /tmp/foo.log
consumerBar:
Type: consumer.File
File: /tmp/bar.log
This example shows a second use case to reuse server settings easier:
consumerConsole:
Type: consumer.Console
Streams: write
kafka:
Type: Aggregate
Servers:
- kafka0:9092
- kafka1:9093
- kafka2:9094
Plugins:
producer:
Type: producer.Kafka
Streams: write
Compression: zip
Topics:
write: test
consumer:
Type: consumer.Kafka
Streams: read
Topic: test
DefaultOffset: Oldest
producerConsole:
Type: producer.Console
Streams: read
Examples and Cookbooks¶
Here you can find some examples and cookbooks how you can run Gollum.
Examples¶
Hello World Examples¶
Hello World¶
This example sets up a simple console consumer and producer that will simply echo everything you type back to the console. As messages have no new line appended by default an envelope formatter is used to add one before writing to console. Make sure to start Gollum with gollum -ll 3 to see all log messages.
'StdIn':
Type: 'consumer.Console'
Streams: 'console'
'StdOut':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope': {}
Loadbalancer¶
This example extends the Hello World example by introducing a route configuration. All messages from the console consumer will be sent to a round robin loadbalancer that will forward messages to one of the two attached producers. Make sure to start Gollum with gollum -ll 3 to see all log messages.
'StdIn':
Type: 'consumer.Console'
Streams: 'console'
'loadbalancer':
Type: 'router.RoundRobin'
Stream: 'console'
'StdOut1':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '1: '
'StdOut2':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '2: '
When you remove the router from the config you will see each message to reach both producers.
Hello World filtered¶
This example extends the previous example by setting up a filter to only echo sentences that end with the word “gollum”. A regular expression filter is used to achieve this. Note that this filter does not apply to standard log messages. Make sure to start Gollum with gollum -ll 3 to see all log messages.
'StdIn':
Type: 'consumer.Console'
Streams: 'console'
'loadbalancer':
Type: 'router.RoundRobin'
Stream: 'console'
Filters:
- 'filter.RegExp':
FilterExpression: ".*gollum$"
'StdOut1':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '1: '
'StdOut2':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '2: '
You can also attach filters to the modulators section of a consumer or a producer. Please note that routers can filter but not modify messages.
Hello World splitter¶
This example extends the first example by introducing a stream split. This time we will print the console output twice, encoded as XML and as JSON. Make sure to start Gollum with gollum -ll 3 to see all log messages.
'StdIn':
Type: 'consumer.Console'
Streams: 'console'
'StdOutXML':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '<msg>'
Postfix: '</msg>\n'
'StdOutJSON':
Type: 'producer.Console'
Streams: 'console'
Modulators:
- 'format.Envelope':
Prefix: '{"msg":"'
Postfix: '"}\n'
You can also do this in a slightly different way by utilizing two streams. When doing this you can filter or route both streams differently. In this extended example, every second example will output only JSON.
'StdIn':
Type: 'consumer.Console'
Streams:
- 'consoleJSON'
- 'consoleXML'
'xmlFilter':
Type: 'router.Broadcast'
Stream: 'consoleXML'
Filters:
- 'filter.Sample': {}
'StdOutXML':
Type: 'producer.Console'
Streams: 'consoleXML'
Modulators:
- 'format.Envelope':
Prefix: '<msg>'
Postfix: '</msg>\n'
'StdOutJSON':
Type: 'producer.Console'
Streams: 'consoleJSON'
Modulators:
- 'format.Envelope':
Prefix: '{"msg":"'
Postfix: '"}\n'
Chat server¶
This example requires two Gollum instances to run. The first one acts as the “chat client” while the second one acts as the “chat server”. Messages entered on the client will be sent to the server using runlength encoding. When the message reaches the server, it will be decoded and written to the console. If the server does not respond, the message will be sent to the fallback and displayed as an error. Make sure to start Gollum with gollum -ll 3 to see all log messages.
Client
'StdIn':
Type: 'consumer.Console'
Streams: 'console'
'SocketOut':
Type: 'producer.Socket'
Streams: 'console'
Address: ':5880'
Acknowledge: 'OK'
FallbackStream: 'failed'
Modulators:
- 'format.Runlength': {}
'Failed':
Type: 'producer.Console'
Streams: 'failed'
Modulators:
- 'format.Envelope':
Prefix: 'Failed to sent: '
Server
'SocketIn':
Type: 'consumer.Socket'
Streams: 'socket'
Address: ":5880"
Acknowledge: 'OK'
Partitioner: 'ascii'
Delimiter: ':'
'StdOut':
Type: 'producer.Console'
Streams: 'socket'
Modulators:
- 'format.Envelope': {}
Profiling¶
This configuration will test Gollum for its theoretic maximum message throughput. You can of course modify this example to test e.g. file producer performance. Make sure to start Gollum with gollum -ll 3 -ps to see all log messages as well as intermediate profiling results.
'Profiler':
Type: ‘consumer.Profiler’ Streams: ‘profile’ Runs: 100000 Batches: 100 Characters: ‘abcdefghijklmnopqrstuvwxyz .,!;:-_’ Message: ‘%256s’ KeepRunning: false ModulatorRoutines: 0
- ‘Benchmark’:
- Type: ‘producer.Benchmark’ Streams: ‘profile’
Cookbooks¶
Socket to Kafka¶
This example creates a unix domain socket /tmp/kafka.socket that
accepts the following protocol:
| <topic>:<message_base64>\n
The message will be base64 decoded and written to the topic mentioned at the start of the message. This example also allows shows how to apply rate limiting per topic.
Configuration (v0.4.x):
# Socket accepts <topic>:<message_base64>
- "consumer.Socket":
Stream: "raw"
Address: "unix:///tmp/kafka.socket"
Permissions: "0777"
# Stream "raw" to stream "<topic>" conversion
# Decoding of <message_base64> to <message>
- "stream.Broadcast":
Stream: "raw"
Formatter: "format.StreamRoute"
StreamRouteDelimiter: ":"
StreamRouteFormatter: "format.Base64Decode"
# Listening to all streams as streams are generated at runtime
# Use ChannelTimeoutMs to be non-blocking
- "producer.Kafka":
Stream: "*"
Filter: "filter.Rate"
RateLimitPerSec: 100
ChannelTimeoutMs: 10
Servers:
- "kafka1:9092"
- "kafka2:9092"
- "kafka3:9092"
Kafka roundtrip¶
This example can be used for developing or testing kafka consumers and producers.
gollum config¶
With the following config gollum will create a console.consumer
with
a kafka.producer
and a kafka.consumer
with a
console.producer
. All data which write to the console will send to
kafka. The second kafka.consumer
will read all data from kafka and
send it back to your console by the console.producer
:
consumerConsole:
type: consumer.Console
Streams: "write"
producerKafka:
type: producer.Kafka
Streams: "write"
Compression: "zip"
Topics:
"write" : "test"
Servers:
- kafka0:9092
- kafka1:9093
- kafka2:9094
consumerKafka:
type: consumer.Kafka
Streams: "read"
Topic: "test"
DefaultOffset: "Oldest"
MaxFetchSizeByte: 100
Servers:
- kafka0:9092
- kafka1:9093
- kafka2:9094
producerConsole:
type: producer.Console
Streams: "read"
Modulators:
- format.Envelope:
Postfix: "\n"
This config example can also be found here
kafka setup for docker¶
Here you find a docker-compose setup which works for the gollum config example.
You need a valid /etc/hosts
entry to be able to use the set
hostnames:
# you can not use 127.0.0.1 or localhost here
<YOUR PUBLIC IP> kafka0 kafka1 kafka2
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
kafkaone:
image: wurstmeister/kafka:0.10.0.0
ports:
- "9092:9092"
links:
- zookeeper:zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka0
KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
KAFKA_BROKER_ID: "21"
KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"
kafkatwo:
image: wurstmeister/kafka:0.10.0.0
ports:
- "9093:9092"
links:
- zookeeper:zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka1
KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
KAFKA_BROKER_ID: "22"
KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"
kafkathree:
image: wurstmeister/kafka:0.10.0.0
ports:
- "9094:9092"
links:
- zookeeper:zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka2
KAFKA_ZOOKEEPER_CONNECT: "zookeeper"
KAFKA_BROKER_ID: "23"
KAFKA_CREATE_TOPICS: "test:1:3,Topic2:1:1:compact"
This docker-compose file can be run by
docker-compose -f docker-compose-kafka.yml -p kafka010 up
Write to Elasticsearch (ElasticSearch producer)¶
Description¶
This example can be used for developing or testing the ElasticSearch producer.
gollum config¶
With the following config gollum will create a console.consumer
with
a ElasticSearch.producer
. All data which write to the console will
send to ElasticSearch.
This payload can be used for the configured setup:
{"user" : "olivere", "message" : "It's a Raggy Waltz"}
consumerConsole:
type: consumer.Console
Streams: "write"
producerElastic:
Type: producer.ElasticSearch
Streams: write
User: elastic
Password: changeme
Servers:
- http://127.0.0.1:9200
Retry:
Count: 3
TimeToWaitSec: 5
SetGzip: true
StreamProperties:
write:
Index: twitter
DayBasedIndex: true
Type: tweet
Mapping:
user: keyword
message: text
Settings:
number_of_shards: 1
number_of_replicas: 1
This config example can also be found here
ElasticSearch setup for docker¶
Here you find a docker-compose setup which works for the config example:
version: '2'
services:
elasticsearch1:
image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
container_name: elasticsearch1
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
mem_limit: 1g
volumes:
- esdata1:/usr/share/elasticsearch/data
ports:
- 9200:9200
networks:
- esnet
elasticsearch2:
image: docker.elastic.co/elasticsearch/elasticsearch:5.4.1
environment:
- cluster.name=docker-cluster
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
- "discovery.zen.ping.unicast.hosts=elasticsearch1"
ulimits:
memlock:
soft: -1
hard: -1
mem_limit: 1g
volumes:
- esdata2:/usr/share/elasticsearch/data
networks:
- esnet
volumes:
esdata1:
driver: local
esdata2:
driver: local
networks:
esnet:
This docker-compose file can be run by:
docker-compose -f docker-compose-elastic.yml up
Release Notes¶
Performance tests¶
History¶
All tests were executed by calling time gollum -c profile.conf -ll 1
.
test | ver | user | sys | cpu | msg/sec |
---|---|---|---|---|---|
Raw pipeline | 0.6.0 | 11,45s | 3,13s | 200% | 1.316.153 |
0.5.0 | 11,85s | 3,69s | 173% | 1.116.071 | |
0.4.6 | 10,48s | 3,01s | 178% | 1.320.132 | |
Basic formatting | 0.6.0 | 37,63s | 4,01s | 520% | 1.173.945 |
0.5.0 | 39,70s | 6,09s | 532% | 1.163.602 | |
0.4.6 [1] | 21,84s | 5,78s | 206% | 746.881 | |
8 consumers | 0.6.0 | 325,18s | 18,32s | 511% | 1.137.784 |
0.5.0 | 344,33s | 28,24s | 673% | 1.446.157 | |
0.4.6 | 319,44s | 72,22s | 574% | 1.173.536 | |
JSON pipeline | 0.6.0 | 14,98s | 4,77s | 173% | 78.377 |
0.5.0 | 28,23s | 6,33s | 138% | 40.033 | |
0.4.6 | 28,30s | 6,30s | 150% | 43.400 |
[1] | this version does not use paralell formatting |
v0.6.0¶
JSON pipeline¶
- 14,98s user
- 4,77s system
- 173% cpu
- 11,388s total
- 73.846 msg/sec
"Profiler":
Type: consumer.Profiler
Runs: 10000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "{\"test\":\"%64s\",\"foo\":\"%32s|%32s\",\"bar\":\"%64s\",\"thisisquitealongstring\":\"%64s\"}"
Streams: "profile"
KeepRunning: false
ModulatorRoutines: 0
Modulators:
- format.JSON: {}
- format.Move:
Source: "test"
Target: "foobar"
- format.Delete:
Target: "bar"
- format.SplitToFields:
Source: "foo"
Delimiter: "|"
Fields: ["foo1","foo2"]
- format.Copy:
Source: "thisisquitealongstring"
"Benchmark":
Type: "producer.Benchmark"
Streams: "profile"
v0.5.0¶
Raw pipeline¶
- 11,85s user
- 3,69s system
- 173% cpu
- 8,960s total
- 1.116.071 msg/sec
"Profiler":
Type: "consumer.Profiler"
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Streams: "profile"
KeepRunning: false
ModulatorRoutines: 0
"Benchmark":
Type: "producer.Benchmark"
Streams: "profile"
Basic formatting¶
- 39,70s user
- 6,09s system
- 532% cpu
- 8,594s total
- 1.163.602 msg/sec
"Profiler":
Type: "consumer.Profiler"
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Streams: "profile"
KeepRunning: false
ModulatorRoutines: 4
Modulators:
- format.Envelope
- format.Timestamp
"Benchmark":
Type: "producer.Benchmark"
Streams: "profile"
8 consumers with formatting¶
- 344,33s user
- 28,24s system
- 673% cpu
- 55,319s total
- 1.446.157 msg/sec
"Profiler":
Type: Aggregate
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Streams: "profile"
KeepRunning: false
ModulatorRoutines: 0
Modulators:
- format.Envelope
- format.Timestamp
Plugins:
P01:
Type: "consumer.Profiler"
P02:
Type: "consumer.Profiler"
P03:
Type: "consumer.Profiler"
P04:
Type: "consumer.Profiler"
P05:
Type: "consumer.Profiler"
P06:
Type: "consumer.Profiler"
P07:
Type: "consumer.Profiler"
P08:
Type: "consumer.Profiler"
"Benchmark":
Type: "producer.Benchmark"
Streams: "profile"
JSON pipeline¶
- 28,23s user
- 6,33s system
- 138% cpu
- 24,979s total
- 40.033 msg/sec
"Profiler":
Type: consumer.Profiler
Runs: 10000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "{\"test\":\"%64s\",\"foo\":\"%32s|%32s\",\"bar\":\"%64s\",\"thisisquitealongstring\":\"%64s\"}"
Streams: "profile"
KeepRunning: false
ModulatorRoutines: 0
Modulators:
- format.ProcessJSON:
Directives:
- "test:rename:foobar"
- "bar:remove"
- "foo:split:|:foo1:foo2"
- format.ExtractJSON:
Field: thisisquitealongstring
"Benchmark":
Type: "producer.Benchmark"
Streams: "profile"
v0.4.6¶
Raw pipeline¶
- 10,48s user
- 3,01s system
- 178% cpu
- 7,575s total
- 1.320.132 msg/sec
- "consumer.Profiler":
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "{\"test\":\"%64s\",\"foo\":\"%32s|%32s\",\"bar\":\"%64s\",\"thisisquitealongstring\":\"%64s\"}"
Stream: "profile"
KeepRunning: false
- "producer.Benchmark":
Stream: "profile"
Basic formatting¶
- 21,84s user
- 5,78s system
- 206% cpu
- 13,389s total
- 746.881 msg/sec
- "consumer.Profiler":
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Stream: "profile"
KeepRunning: false
- "stream.Broadcast":
Stream: "profile"
Formatter: format.Timestamp
TimestampFormatter: format.Envelope
- "producer.Benchmark":
Stream: "profile"
8 consumers with formatting¶
- 319,44s user
- 72,22s system
- 574% cpu
- 68,17s total
- 1.173.536 msg/sec
- "consumer.Profiler":
Instances: 8
Runs: 100000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Stream: "profile"
KeepRunning: false
- "stream.Broadcast":
Stream: "profile"
Formatter: format.Timestamp
TimestampFormatter: format.Envelope
- "producer.Benchmark":
Stream: "profile"
JSON pipeline¶
- 28,30s user
- 6,30s system
- 150% cpu
- 23,041s total
- 43.400 msg/sec
- "consumer.Profiler":
Runs: 10000
Batches: 100
Characters: "abcdefghijklmnopqrstuvwxyz .,!;:-_"
Message: "%256s"
Stream: "profile"
KeepRunning: false
- "stream.Broadcast":
Stream: "profile"
Formatter: format.ExtractJSON
ExtractJSONdataFormatter: format.ProcessJSON
ProcessJSONDirectives:
- "test:rename:foobar"
- "bar:remove"
- "foo:split:|:foo1:foo2"
ExtractJSONField: thisisquitealongstring
- "producer.Benchmark":
Stream: "profile"
v0.5.0¶
Breaking changes 0.4.x to 0.5.0¶
Configuration¶
The goal of this breaking change was to make Gollum configuration files easier to maintain and easier to merge. In addition to that several quirks and inconsistencies have been resolved.
From
- "plugin.Type":
ID: "pluginId"
To
"pluginId":
Type: "plugin.Type"
In previous versions fields did not follow a rule when to use plural or singular. In 0.5.0 plural means “one or more values” while singular means “only one value”.
From
- "plugin.Type":
ID: "pluginId"
Category:
- "Foo"
- "Bar"
Streams:
- "foo"
- "bar"
To
"pluginId":
type: "plugin.Type"
categories:
- "Foo"
- "Bar"
streams:
- "foo"
- "bar"
In earlier versions chaining formatters was done by nesting them via options. This was confusing as the order was “upside down”. In addition to that you could use every formatter only once. The new modulator concept introduces a more natural order and allows formatters to be reused as often as necessary. In addition to that, filter and formatters have been merged into the same list. This fixes the problem of applying filters before or after formatters that was previously fixed by adding e.g. a “FilterAfterFormat” field.
From
- "plugin.Type":
ID: "pluginId"
Filter: "filter.Before"
FilterAfterFormat: "filter.After"
Formatter: "format.SECOND"
SECONDOption: "foobar"
SECONDFormatter: "format.FIRST"
To
"pluginId":
Type: "plugin.Type"
Modulators:
- "filter.Before"
- "format.FIRST"
- "format.SECOND"
Option: "foobar"
- "filter.After"
Some plugins had a set of options starting with the same prefix (e.g. file.Producer). These options have now been grouped.
From
- "plugin.Type":
ID: "pluginId"
RotateAfterHours: 10
RotateSizeMB: 1024
RotateAt: "00:00"
To
"pluginId":
Type: "plugin.Type"
Rotate:
AfterHours: 10
SizeMB: 1024
At: "00:00"
Plugins¶
Renaming of streams to routers¶
From
- "stream.Broadcast":
ID: "Splitter"
Stream: "foo"
To
"Splitter":
Type: "router.Broadcast"
Stream: "foo"
Base classes¶
In version 0.4.x and earlier not all plugins had a base class. In 0.5.0 all plugins have base classes and existing base classes have been renamed.
renamed
core.ConsumerBase -> core.SimpleConsumer
core.ProducerBase -> core.BufferedProducer
core.StreamBase -> core.SimpleRouter
new
core.SimpleConsumer Consumer base class
core.SimpleFilter Filter base class
core.SimpleFormatter Formatter base class
core.SimpleProducer Producer base class
core.SimpleRouter Router base class
core.DirectProducer A producer that directly accepts messages without buffering
core.BufferedProducer A producer that reads messages from a channel
core.BatchedProducer A producer that collects messages and processes them in a batch
Metrics¶
shared.Metric.*
has to be replaced by tgo.Metric.*
and
the package “github.com/trivago/tgo” has to be imported instead of
“github.com/trivago/gollum/shared”.tgo.Metric.NewRate(metricName, rateMetricName, time.Second, 10, 3, true)
.
All custom “per second” metrics should be replaced with this function.Logging¶
Version 0.5.0 introduces logrus based scoped logging to give error messages a clearer context. As of this every plugin has a “Logger” member in its base class.
From
Log.Error.Print("MyPlugin: Something's wrong", err)
To
plugin.Logger.WithError(err).Error("Something's wrong")
Configure¶
Error handling has been improved so that a plugin automatically reacts on missing or invalid values. Errors are now collected in a stack attached to the config reader and processed as a batch after configure returns. In addition to that, simple types can now be configured using struct tags.
From
type Console struct {
core.ConsumerBase
autoExit bool
pipeName string
pipePerm uint32
pipe *os.File
}
func (cons *Console) Configure(conf core.PluginConfig) error {
cons.autoexit = conf.GetBool("ExitOnEOF", true)
inputConsole := conf.GetString("Console", "stdin")
switch strings.ToLower(inputConsole) {
case "stdin":
cons.pipe = os.Stdin
cons.pipeName = "stdin"
case "stdin":
return fmt.Errorf("Cannot read from stderr")
default:
cons.pipe = nil
cons.pipeName = inputConsole
if perm, err := strconv.ParseInt(conf.GetString("Permissions", "0664"), 8, 32); err != nil {
Log.Error.Printf("Error parsing named pipe permissions: %s", err)
} else {
cons.pipePerm = uint32(perm)
}
}
return cons.ConsumerBase.Configure(conf)
}
To
type Console struct {
core.SimpleConsumer
autoExit bool `config:"ExitOnEOF" default:"true"`
pipeName string `config:"Pipe" default:"stdin"`
pipePerm uint32 `config:"Permissions" default:"0644"`
pipe *os.File
}
func (cons *Console) Configure(conf core.PluginConfigReader) {
switch strings.ToLower(cons.pipeName) {
case "stdin":
cons.pipe = os.Stdin
cons.pipeName = "stdin"
case "stderr":
conf.Errors.Pushf("Cannot read from stderr")
default:
cons.pipe = nil
}
}
Message handling¶
Message handling has changed from the way 0.4.x does it. Messages now support MetaData and contain a copy of the “original” data next to the actual payload. In addition to this, messages are now backed by a memory pool and are passed around using pointers. All this is reflected in new function signatures and new message member functions.
From
func (format *Sequence) Format(msg core.Message) ([]byte, core.MessageStreamID) {
basePayload, stream := format.base.Format(msg)
baseLength := len(basePayload)
sequenceStr := strconv.FormatUint(msg.Sequence, 10) + format.separator
payload := make([]byte, len(sequenceStr)+baseLength)
len := copy(payload, []byte(sequenceStr))
copy(payload[len:], basePayload)
return payload, stream
}
To
func (format *Sequence) ApplyFormatter(msg *core.Message) error {
seq := atomic.AddInt64(format.seq, 1)
sequenceStr := strconv.FormatInt(seq, 10)
content := format.GetAppliedContent(msg)
dataSize := len(sequenceStr) + len(format.separator) + len(content)
payload := core.MessageDataPool.Get(dataSize)
offset := copy(payload, []byte(sequenceStr))
offset += copy(payload[offset:], format.separator)
copy(payload[offset:], content)
format.SetAppliedContent(msg, payload)
return nil
}
This example shows most of the changes related to the new message structure.
- As the sequence number has been removed from the message struct, plugins relying on it need to implement it themselves.
- As messages now support metadata, you need to specify whether you want to affect metadata or the payload. In formatter plugins this is reflected by the GetAppliedContent method, which is backed by the “ApplyTo” config parameter.
- If you require a new payload buffer you should now utilize core.MessageDataPool.
Things that you don’t see in this example are the following:
- Buffers returned by core.MessageDataPool tend to be overallocated, i.e. they can be resized without reallocation in most cases. As of this methods to resize the payload have been added.
- If you need to create a copy of the complete message use the Clone() method
Formatting pipeline¶
In version 0.4.x you had to take care about message changes by yourself on many different occasions. With 0.5.0 the message flow has been moved completely to the core framework. As of this you don’t need to worry about routing, or resetting data to it’s original state. The framework will do this for you.
From
func (prod *Redis) getValueAndKey(msg core.Message) (v []byte, k string) {
value, _ := prod.Format(msg) // Creates a copy and we must not forget this step
if prod.keyFormat == nil {
return value, prod.key
}
if prod.keyFromParsed { // Ordering is crucial here
keyMsg := msg
keyMsg.Data = value
key, _ := prod.keyFormat.Format(keyMsg)
return value, string(key)
}
key, _ := prod.keyFormat.Format(msg)
return value, string(key)
}
func (prod *Redis) storeString(msg core.Message) {
value, key := prod.getValueAndKey(msg)
result := prod.client.Set(key, string(value), 0)
if result.Err() != nil {
Log.Error.Print("Redis: ", result.Err())
prod.Drop(msg) // Good thing we stored a copy of the message ...
}
}
To
func (prod *Redis) getValueFieldAndKey(msg *core.Message) (v, f, k []byte) {
meta := msg.GetMetadata()
key := meta.GetValue(prod.key) // Due to metadata fields...
field := meta.GetValue(prod.field) // ... this is now a lot easier
return msg.GetPayload(), field, key
}
func (prod *Redis) storeString(msg *core.Message) {
// The message arrives here after formatting
value, key := prod.getValueAndKey(msg)
result := prod.client.Set(string(key), string(value), time.Duration(0))
if result.Err() != nil {
prod.Logger.WithError(result.Err()).Error("Failed to set value")
prod.TryFallback(msg) // Will send the original (unformatted) message. Always.
}
}
New features¶
- Filters and Formatters have been merged into one list
- You can now use a filter or formatter more than once in the same plugin
- Consumers can now do filtering and formatting, too
- Messages can now store metadata. Formatters can affect the payload or a metadata field
- All plugins now have an automatic log scope
- Message payloads are now backed by a memory pool
- Messages now store the original message, i.e. a backup of the payload state after consumer processing
- Gollum now provides per-stream metrics
- Plugins are now able to implement health checks that can be queried via http
- There is a new pseudo plugin type “Aggregate” that can be used to share configuration between multiple plugins
- New base types for producers: Direct, Buffered, Batched
- Plugin configurations now support nested structures
- The configuration process has been simplified a lot by adding automatic error handling and struct tags
- Added a new formatter format.GrokToJSON
- Added a new formatter format.JSONToInflux10
- Added a new formatter format.Double
- Added a new formatter format.MetadataCopy
- Added a new formatter format.Trim
- Consumer.File now supports filesystem events
- Consumers can now define the number of go routines used for formatting/filtering
- All AWS plugins now support role switching
- All AWS plugins are now based on the same credentials code
Bugfixes¶
- The plugin lifecycle has been reimplemented to avoid gollum being stuck waiting for plugins to change state
- Any errors during the configuration phase will cause gollum to exit
- Integration test suite added
- Producer.HTTPRequest port handling fixed
- The test-config command will now produce more meaningful results
- Duplicating messages now properly duplicates the whole message and not just the struct
- Several race conditions have been fixed
- Producer.ElasticSearch is now based on a more up-to-date library
- Producer.AwsS3 is now behaving more like producer.File
- Gollum metrics can now bind to a specific address instead of just a port
Breaking changes¶
- The config format has changed to improve automatic processing
- A lot of plugins have been renamed to avoid confusion and to better reflect their behavior
- A lot of plugins parameters have been renamed
- The instances plugin parameter has been removed
- Most of gollum’s metrics have been renamed
- Plugin base types have been renamed
- All message handling function signatures have changed to use pointers
- All formatters don’t daisy chain anymore as they can now be listed in proper order
- Stream plugins have been renamed to Router plugins
- Routers are not allowed to modify message content anymore
- filter.All and format.Forward have been removed as they are not required anymore
- Producer formatter listss dedicated to format a key or similar constructs have been removed
- Logging framework switched to logrus
- The package gollum.shared has been removed in favor of trivago.tgo
- Fuses have been removed from all plugins
- The general message sequence number has been removed
- The term “drop” has been replaced by the term “fallback” to emphasise it’s use
- The _DROPPED_ stream has been removed. Messages are discarded if no fallback is set
- Formatters can still the stream of a message but cannot trigger routing by themselves
- Compiling contrib plugins now requires a specific loader.go to be added
- The docker file on docker hub is now a lot smaller and only contains the gollum binary
License¶
This project is released under the terms of the Apache 2.0 license.
Copyright 2015-2018 trivago N.V.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.