v0.5.0

Breaking changes 0.4.x to 0.5.0

Configuration

The goal of this breaking change was to make Gollum configuration files easier to maintain and easier to merge. In addition to that several quirks and inconsistencies have been resolved.

Plugin header

This change allows configs to be easier to merge which is requirement for future features.
As of this change a new, mandatory field “Type” has been added.

From

- "plugin.Type":
    ID: "pluginId"

To

"pluginId":
    Type: "plugin.Type"

Plural form

In previous versions fields did not follow a rule when to use plural or singular. In 0.5.0 plural means “one or more values” while singular means “only one value”.

From

- "plugin.Type":
    ID: "pluginId"
    Category:
        - "Foo"
        - "Bar"
    Streams:
        - "foo"
        - "bar"

To

"pluginId":
    type: "plugin.Type"
    categories:
        - "Foo"
        - "Bar"
    streams:
        - "foo"
        - "bar"

Formatters and filters are now modulators

In earlier versions chaining formatters was done by nesting them via options. This was confusing as the order was “upside down”. In addition to that you could use every formatter only once. The new modulator concept introduces a more natural order and allows formatters to be reused as often as necessary. In addition to that, filter and formatters have been merged into the same list. This fixes the problem of applying filters before or after formatters that was previously fixed by adding e.g. a “FilterAfterFormat” field.

From

- "plugin.Type":
    ID: "pluginId"
    Filter: "filter.Before"
    FilterAfterFormat: "filter.After"
    Formatter: "format.SECOND"
    SECONDOption: "foobar"
    SECONDFormatter: "format.FIRST"

To

"pluginId":
    Type: "plugin.Type"
    Modulators:
        - "filter.Before"
        - "format.FIRST"
        - "format.SECOND"
            Option: "foobar"
        - "filter.After"

Nested options

Some plugins had a set of options starting with the same prefix (e.g. file.Producer). These options have now been grouped.

From

- "plugin.Type":
    ID: "pluginId"
    RotateAfterHours: 10
    RotateSizeMB: 1024
    RotateAt: "00:00"

To

"pluginId":
    Type: "plugin.Type"
    Rotate:
        AfterHours: 10
        SizeMB: 1024
        At: "00:00"

Plugins

The plugin system has been refactored to make plugins more consistent and to reduce the amount of work required to write a new plugin. This change introduced new subclasses and changed some of the basic interfaces.
The shutdown process has been revamped to give plugins a better chance to cleanly shut down and to get rid of all their messages without the system having to care about stream loops.

Renaming of streams to routers

A “stream” in 0.4.x has a double meaning. It denotes a stream of data, as well as a type of plugin that is used to route messages from one stream to another or simply to configure a certain stream of data in terms of formatting.
To make it easier to talk about these to things the routing/configuring part (the plugins) are renamed to “router”.

From

- "stream.Broadcast":
    ID: "Splitter"
    Stream: "foo"

To

"Splitter":
    Type: "router.Broadcast"
    Stream: "foo"

Removal of gollum/shared

All types from the github.com/trivago/gollum/shared package have been moved to the new github.com/trivago/tgo package and subpackages. This allows us to re-use these types in other projects more easily and introduces a better structure. This package is meant to be an extension to the Golang standard library and follows a “t-prefix” naming convention. Everything that you would expect in e.g. the sync package will be placed in tgo/tsync.

From

c := shared.MaxI(a,b)
spin := shared.NewSpinner(shared.SpinPriorityLow)

To

c := tmath.MaxI(a,b)
spin := tsync.NewSpinner(tsync.SpinPriorityLow)

Base classes

In version 0.4.x and earlier not all plugins had a base class. In 0.5.0 all plugins have base classes and existing base classes have been renamed.

renamed

core.ConsumerBase -> core.SimpleConsumer
core.ProducerBase -> core.BufferedProducer
core.StreamBase   -> core.SimpleRouter

new

core.SimpleConsumer     Consumer base class
core.SimpleFilter       Filter base class
core.SimpleFormatter    Formatter base class
core.SimpleProducer     Producer base class
core.SimpleRouter       Router base class
core.DirectProducer     A producer that directly accepts messages without buffering
core.BufferedProducer   A producer that reads messages from a channel
core.BatchedProducer    A producer that collects messages and processes them in a batch

Metrics

Metrics have been moved from gollum/shared to the tgo package. As of this shared.Metric.* has to be replaced by tgo.Metric.* and the package “github.com/trivago/tgo” has to be imported instead of “github.com/trivago/gollum/shared”.
Please note that “per second” metrics can now be added without additional overhead by using tgo.Metric.NewRate(metricName, rateMetricName, time.Second, 10, 3, true). All custom “per second” metrics should be replaced with this function.

Logging

Version 0.5.0 introduces logrus based scoped logging to give error messages a clearer context. As of this every plugin has a “Logger” member in its base class.

From

Log.Error.Print("MyPlugin: Something's wrong", err)

To

plugin.Logger.WithError(err).Error("Something's wrong")

Configure

Error handling has been improved so that a plugin automatically reacts on missing or invalid values. Errors are now collected in a stack attached to the config reader and processed as a batch after configure returns. In addition to that, simple types can now be configured using struct tags.

From

type Console struct {
    core.ConsumerBase
    autoExit bool
    pipeName string
    pipePerm uint32
    pipe     *os.File
}

func (cons *Console) Configure(conf core.PluginConfig) error {
    cons.autoexit = conf.GetBool("ExitOnEOF", true)
    inputConsole := conf.GetString("Console", "stdin")

    switch strings.ToLower(inputConsole) {
    case "stdin":
        cons.pipe = os.Stdin
        cons.pipeName = "stdin"
    case "stdin":
        return fmt.Errorf("Cannot read from stderr")
    default:
        cons.pipe = nil
        cons.pipeName = inputConsole

        if perm, err := strconv.ParseInt(conf.GetString("Permissions", "0664"), 8, 32); err != nil {
            Log.Error.Printf("Error parsing named pipe permissions: %s", err)
        } else {
            cons.pipePerm = uint32(perm)
        }
    }

    return cons.ConsumerBase.Configure(conf)
}

To

type Console struct {
    core.SimpleConsumer
    autoExit            bool   `config:"ExitOnEOF" default:"true"`
    pipeName            string `config:"Pipe" default:"stdin"`
    pipePerm            uint32 `config:"Permissions" default:"0644"`
    pipe                *os.File
}

func (cons *Console) Configure(conf core.PluginConfigReader) {
    switch strings.ToLower(cons.pipeName) {
    case "stdin":
        cons.pipe = os.Stdin
        cons.pipeName = "stdin"
    case "stderr":
        conf.Errors.Pushf("Cannot read from stderr")
    default:
        cons.pipe = nil
    }
}

Message handling

Message handling has changed from the way 0.4.x does it. Messages now support MetaData and contain a copy of the “original” data next to the actual payload. In addition to this, messages are now backed by a memory pool and are passed around using pointers. All this is reflected in new function signatures and new message member functions.

From

func (format *Sequence) Format(msg core.Message) ([]byte, core.MessageStreamID) {
    basePayload, stream := format.base.Format(msg)
    baseLength := len(basePayload)
    sequenceStr := strconv.FormatUint(msg.Sequence, 10) + format.separator

    payload := make([]byte, len(sequenceStr)+baseLength)
    len := copy(payload, []byte(sequenceStr))
    copy(payload[len:], basePayload)

    return payload, stream
}

To

func (format *Sequence) ApplyFormatter(msg *core.Message) error {
    seq := atomic.AddInt64(format.seq, 1)
    sequenceStr := strconv.FormatInt(seq, 10)
    content := format.GetAppliedContent(msg)

    dataSize := len(sequenceStr) + len(format.separator) + len(content)
    payload := core.MessageDataPool.Get(dataSize)

    offset := copy(payload, []byte(sequenceStr))
    offset += copy(payload[offset:], format.separator)
    copy(payload[offset:], content)

    format.SetAppliedContent(msg, payload)
    return nil
}

This example shows most of the changes related to the new message structure.

  1. As the sequence number has been removed from the message struct, plugins relying on it need to implement it themselves.
  2. As messages now support metadata, you need to specify whether you want to affect metadata or the payload. In formatter plugins this is reflected by the GetAppliedContent method, which is backed by the “ApplyTo” config parameter.
  3. If you require a new payload buffer you should now utilize core.MessageDataPool.

Things that you don’t see in this example are the following:

  1. Buffers returned by core.MessageDataPool tend to be overallocated, i.e. they can be resized without reallocation in most cases. As of this methods to resize the payload have been added.
  2. If you need to create a copy of the complete message use the Clone() method

Formatting pipeline

In version 0.4.x you had to take care about message changes by yourself on many different occasions. With 0.5.0 the message flow has been moved completely to the core framework. As of this you don’t need to worry about routing, or resetting data to it’s original state. The framework will do this for you.

From

func (prod *Redis) getValueAndKey(msg core.Message) (v []byte, k string) {
    value, _ := prod.Format(msg) // Creates a copy and we must not forget this step

    if prod.keyFormat == nil {
        return value, prod.key
    }

    if prod.keyFromParsed {     // Ordering is crucial here
        keyMsg := msg
        keyMsg.Data = value
        key, _ := prod.keyFormat.Format(keyMsg)
        return value, string(key)
    }

    key, _ := prod.keyFormat.Format(msg)
    return value, string(key)
}


func (prod *Redis) storeString(msg core.Message) {
    value, key := prod.getValueAndKey(msg)

    result := prod.client.Set(key, string(value), 0)
    if result.Err() != nil {
        Log.Error.Print("Redis: ", result.Err())
        prod.Drop(msg) // Good thing we stored a copy of the message ...
    }
}

To

func (prod *Redis) getValueFieldAndKey(msg *core.Message) (v, f, k []byte) {
    meta := msg.GetMetadata()
    key := meta.GetValue(prod.key)     // Due to metadata fields...
    field := meta.GetValue(prod.field) // ... this is now a lot easier

    return msg.GetPayload(), field, key
}

func (prod *Redis) storeString(msg *core.Message) {
    // The message arrives here after formatting
    value, key := prod.getValueAndKey(msg)

    result := prod.client.Set(string(key), string(value), time.Duration(0))
    if result.Err() != nil {
        prod.Logger.WithError(result.Err()).Error("Failed to set value")
        prod.TryFallback(msg)          // Will send the original (unformatted) message. Always.
    }
}

New features

  • Filters and Formatters have been merged into one list
  • You can now use a filter or formatter more than once in the same plugin
  • Consumers can now do filtering and formatting, too
  • Messages can now store metadata. Formatters can affect the payload or a metadata field
  • All plugins now have an automatic log scope
  • Message payloads are now backed by a memory pool
  • Messages now store the original message, i.e. a backup of the payload state after consumer processing
  • Gollum now provides per-stream metrics
  • Plugins are now able to implement health checks that can be queried via http
  • There is a new pseudo plugin type “Aggregate” that can be used to share configuration between multiple plugins
  • New base types for producers: Direct, Buffered, Batched
  • Plugin configurations now support nested structures
  • The configuration process has been simplified a lot by adding automatic error handling and struct tags
  • Added a new formatter format.GrokToJSON
  • Added a new formatter format.JSONToInflux10
  • Added a new formatter format.Double
  • Added a new formatter format.MetadataCopy
  • Added a new formatter format.Trim
  • Consumer.File now supports filesystem events
  • Consumers can now define the number of go routines used for formatting/filtering
  • All AWS plugins now support role switching
  • All AWS plugins are now based on the same credentials code

Bugfixes

  • The plugin lifecycle has been reimplemented to avoid gollum being stuck waiting for plugins to change state
  • Any errors during the configuration phase will cause gollum to exit
  • Integration test suite added
  • Producer.HTTPRequest port handling fixed
  • The test-config command will now produce more meaningful results
  • Duplicating messages now properly duplicates the whole message and not just the struct
  • Several race conditions have been fixed
  • Producer.ElasticSearch is now based on a more up-to-date library
  • Producer.AwsS3 is now behaving more like producer.File
  • Gollum metrics can now bind to a specific address instead of just a port

Breaking changes

  • The config format has changed to improve automatic processing
  • A lot of plugins have been renamed to avoid confusion and to better reflect their behavior
  • A lot of plugins parameters have been renamed
  • The instances plugin parameter has been removed
  • Most of gollum’s metrics have been renamed
  • Plugin base types have been renamed
  • All message handling function signatures have changed to use pointers
  • All formatters don’t daisy chain anymore as they can now be listed in proper order
  • Stream plugins have been renamed to Router plugins
  • Routers are not allowed to modify message content anymore
  • filter.All and format.Forward have been removed as they are not required anymore
  • Producer formatter listss dedicated to format a key or similar constructs have been removed
  • Logging framework switched to logrus
  • The package gollum.shared has been removed in favor of trivago.tgo
  • Fuses have been removed from all plugins
  • The general message sequence number has been removed
  • The term “drop” has been replaced by the term “fallback” to emphasise it’s use
  • The _DROPPED_ stream has been removed. Messages are discarded if no fallback is set
  • Formatters can still the stream of a message but cannot trigger routing by themselves
  • Compiling contrib plugins now requires a specific loader.go to be added
  • The docker file on docker hub is now a lot smaller and only contains the gollum binary