v0.5.0¶
Breaking changes 0.4.x to 0.5.0¶
Configuration¶
The goal of this breaking change was to make Gollum configuration files easier to maintain and easier to merge. In addition to that several quirks and inconsistencies have been resolved.
Plugin header¶
From
- "plugin.Type":
ID: "pluginId"
To
"pluginId":
Type: "plugin.Type"
Plural form¶
In previous versions fields did not follow a rule when to use plural or singular. In 0.5.0 plural means “one or more values” while singular means “only one value”.
From
- "plugin.Type":
ID: "pluginId"
Category:
- "Foo"
- "Bar"
Streams:
- "foo"
- "bar"
To
"pluginId":
type: "plugin.Type"
categories:
- "Foo"
- "Bar"
streams:
- "foo"
- "bar"
Formatters and filters are now modulators¶
In earlier versions chaining formatters was done by nesting them via options. This was confusing as the order was “upside down”. In addition to that you could use every formatter only once. The new modulator concept introduces a more natural order and allows formatters to be reused as often as necessary. In addition to that, filter and formatters have been merged into the same list. This fixes the problem of applying filters before or after formatters that was previously fixed by adding e.g. a “FilterAfterFormat” field.
From
- "plugin.Type":
ID: "pluginId"
Filter: "filter.Before"
FilterAfterFormat: "filter.After"
Formatter: "format.SECOND"
SECONDOption: "foobar"
SECONDFormatter: "format.FIRST"
To
"pluginId":
Type: "plugin.Type"
Modulators:
- "filter.Before"
- "format.FIRST"
- "format.SECOND"
Option: "foobar"
- "filter.After"
Nested options¶
Some plugins had a set of options starting with the same prefix (e.g. file.Producer). These options have now been grouped.
From
- "plugin.Type":
ID: "pluginId"
RotateAfterHours: 10
RotateSizeMB: 1024
RotateAt: "00:00"
To
"pluginId":
Type: "plugin.Type"
Rotate:
AfterHours: 10
SizeMB: 1024
At: "00:00"
Plugins¶
Renaming of streams to routers¶
From
- "stream.Broadcast":
ID: "Splitter"
Stream: "foo"
To
"Splitter":
Type: "router.Broadcast"
Stream: "foo"
Base classes¶
In version 0.4.x and earlier not all plugins had a base class. In 0.5.0 all plugins have base classes and existing base classes have been renamed.
renamed
core.ConsumerBase -> core.SimpleConsumer
core.ProducerBase -> core.BufferedProducer
core.StreamBase -> core.SimpleRouter
new
core.SimpleConsumer Consumer base class
core.SimpleFilter Filter base class
core.SimpleFormatter Formatter base class
core.SimpleProducer Producer base class
core.SimpleRouter Router base class
core.DirectProducer A producer that directly accepts messages without buffering
core.BufferedProducer A producer that reads messages from a channel
core.BatchedProducer A producer that collects messages and processes them in a batch
Metrics¶
shared.Metric.*
has to be replaced by tgo.Metric.*
and
the package “github.com/trivago/tgo” has to be imported instead of
“github.com/trivago/gollum/shared”.tgo.Metric.NewRate(metricName, rateMetricName, time.Second, 10, 3, true)
.
All custom “per second” metrics should be replaced with this function.Logging¶
Version 0.5.0 introduces logrus based scoped logging to give error messages a clearer context. As of this every plugin has a “Logger” member in its base class.
From
Log.Error.Print("MyPlugin: Something's wrong", err)
To
plugin.Logger.WithError(err).Error("Something's wrong")
Configure¶
Error handling has been improved so that a plugin automatically reacts on missing or invalid values. Errors are now collected in a stack attached to the config reader and processed as a batch after configure returns. In addition to that, simple types can now be configured using struct tags.
From
type Console struct {
core.ConsumerBase
autoExit bool
pipeName string
pipePerm uint32
pipe *os.File
}
func (cons *Console) Configure(conf core.PluginConfig) error {
cons.autoexit = conf.GetBool("ExitOnEOF", true)
inputConsole := conf.GetString("Console", "stdin")
switch strings.ToLower(inputConsole) {
case "stdin":
cons.pipe = os.Stdin
cons.pipeName = "stdin"
case "stdin":
return fmt.Errorf("Cannot read from stderr")
default:
cons.pipe = nil
cons.pipeName = inputConsole
if perm, err := strconv.ParseInt(conf.GetString("Permissions", "0664"), 8, 32); err != nil {
Log.Error.Printf("Error parsing named pipe permissions: %s", err)
} else {
cons.pipePerm = uint32(perm)
}
}
return cons.ConsumerBase.Configure(conf)
}
To
type Console struct {
core.SimpleConsumer
autoExit bool `config:"ExitOnEOF" default:"true"`
pipeName string `config:"Pipe" default:"stdin"`
pipePerm uint32 `config:"Permissions" default:"0644"`
pipe *os.File
}
func (cons *Console) Configure(conf core.PluginConfigReader) {
switch strings.ToLower(cons.pipeName) {
case "stdin":
cons.pipe = os.Stdin
cons.pipeName = "stdin"
case "stderr":
conf.Errors.Pushf("Cannot read from stderr")
default:
cons.pipe = nil
}
}
Message handling¶
Message handling has changed from the way 0.4.x does it. Messages now support MetaData and contain a copy of the “original” data next to the actual payload. In addition to this, messages are now backed by a memory pool and are passed around using pointers. All this is reflected in new function signatures and new message member functions.
From
func (format *Sequence) Format(msg core.Message) ([]byte, core.MessageStreamID) {
basePayload, stream := format.base.Format(msg)
baseLength := len(basePayload)
sequenceStr := strconv.FormatUint(msg.Sequence, 10) + format.separator
payload := make([]byte, len(sequenceStr)+baseLength)
len := copy(payload, []byte(sequenceStr))
copy(payload[len:], basePayload)
return payload, stream
}
To
func (format *Sequence) ApplyFormatter(msg *core.Message) error {
seq := atomic.AddInt64(format.seq, 1)
sequenceStr := strconv.FormatInt(seq, 10)
content := format.GetAppliedContent(msg)
dataSize := len(sequenceStr) + len(format.separator) + len(content)
payload := core.MessageDataPool.Get(dataSize)
offset := copy(payload, []byte(sequenceStr))
offset += copy(payload[offset:], format.separator)
copy(payload[offset:], content)
format.SetAppliedContent(msg, payload)
return nil
}
This example shows most of the changes related to the new message structure.
- As the sequence number has been removed from the message struct, plugins relying on it need to implement it themselves.
- As messages now support metadata, you need to specify whether you want to affect metadata or the payload. In formatter plugins this is reflected by the GetAppliedContent method, which is backed by the “ApplyTo” config parameter.
- If you require a new payload buffer you should now utilize core.MessageDataPool.
Things that you don’t see in this example are the following:
- Buffers returned by core.MessageDataPool tend to be overallocated, i.e. they can be resized without reallocation in most cases. As of this methods to resize the payload have been added.
- If you need to create a copy of the complete message use the Clone() method
Formatting pipeline¶
In version 0.4.x you had to take care about message changes by yourself on many different occasions. With 0.5.0 the message flow has been moved completely to the core framework. As of this you don’t need to worry about routing, or resetting data to it’s original state. The framework will do this for you.
From
func (prod *Redis) getValueAndKey(msg core.Message) (v []byte, k string) {
value, _ := prod.Format(msg) // Creates a copy and we must not forget this step
if prod.keyFormat == nil {
return value, prod.key
}
if prod.keyFromParsed { // Ordering is crucial here
keyMsg := msg
keyMsg.Data = value
key, _ := prod.keyFormat.Format(keyMsg)
return value, string(key)
}
key, _ := prod.keyFormat.Format(msg)
return value, string(key)
}
func (prod *Redis) storeString(msg core.Message) {
value, key := prod.getValueAndKey(msg)
result := prod.client.Set(key, string(value), 0)
if result.Err() != nil {
Log.Error.Print("Redis: ", result.Err())
prod.Drop(msg) // Good thing we stored a copy of the message ...
}
}
To
func (prod *Redis) getValueFieldAndKey(msg *core.Message) (v, f, k []byte) {
meta := msg.GetMetadata()
key := meta.GetValue(prod.key) // Due to metadata fields...
field := meta.GetValue(prod.field) // ... this is now a lot easier
return msg.GetPayload(), field, key
}
func (prod *Redis) storeString(msg *core.Message) {
// The message arrives here after formatting
value, key := prod.getValueAndKey(msg)
result := prod.client.Set(string(key), string(value), time.Duration(0))
if result.Err() != nil {
prod.Logger.WithError(result.Err()).Error("Failed to set value")
prod.TryFallback(msg) // Will send the original (unformatted) message. Always.
}
}
New features¶
- Filters and Formatters have been merged into one list
- You can now use a filter or formatter more than once in the same plugin
- Consumers can now do filtering and formatting, too
- Messages can now store metadata. Formatters can affect the payload or a metadata field
- All plugins now have an automatic log scope
- Message payloads are now backed by a memory pool
- Messages now store the original message, i.e. a backup of the payload state after consumer processing
- Gollum now provides per-stream metrics
- Plugins are now able to implement health checks that can be queried via http
- There is a new pseudo plugin type “Aggregate” that can be used to share configuration between multiple plugins
- New base types for producers: Direct, Buffered, Batched
- Plugin configurations now support nested structures
- The configuration process has been simplified a lot by adding automatic error handling and struct tags
- Added a new formatter format.GrokToJSON
- Added a new formatter format.JSONToInflux10
- Added a new formatter format.Double
- Added a new formatter format.MetadataCopy
- Added a new formatter format.Trim
- Consumer.File now supports filesystem events
- Consumers can now define the number of go routines used for formatting/filtering
- All AWS plugins now support role switching
- All AWS plugins are now based on the same credentials code
Bugfixes¶
- The plugin lifecycle has been reimplemented to avoid gollum being stuck waiting for plugins to change state
- Any errors during the configuration phase will cause gollum to exit
- Integration test suite added
- Producer.HTTPRequest port handling fixed
- The test-config command will now produce more meaningful results
- Duplicating messages now properly duplicates the whole message and not just the struct
- Several race conditions have been fixed
- Producer.ElasticSearch is now based on a more up-to-date library
- Producer.AwsS3 is now behaving more like producer.File
- Gollum metrics can now bind to a specific address instead of just a port
Breaking changes¶
- The config format has changed to improve automatic processing
- A lot of plugins have been renamed to avoid confusion and to better reflect their behavior
- A lot of plugins parameters have been renamed
- The instances plugin parameter has been removed
- Most of gollum’s metrics have been renamed
- Plugin base types have been renamed
- All message handling function signatures have changed to use pointers
- All formatters don’t daisy chain anymore as they can now be listed in proper order
- Stream plugins have been renamed to Router plugins
- Routers are not allowed to modify message content anymore
- filter.All and format.Forward have been removed as they are not required anymore
- Producer formatter listss dedicated to format a key or similar constructs have been removed
- Logging framework switched to logrus
- The package gollum.shared has been removed in favor of trivago.tgo
- Fuses have been removed from all plugins
- The general message sequence number has been removed
- The term “drop” has been replaced by the term “fallback” to emphasise it’s use
- The _DROPPED_ stream has been removed. Messages are discarded if no fallback is set
- Formatters can still the stream of a message but cannot trigger routing by themselves
- Compiling contrib plugins now requires a specific loader.go to be added
- The docker file on docker hub is now a lot smaller and only contains the gollum binary