Kelson Martins

Working with the Logstash translate plugin

Introduction

Kelson

Kelson

Software engineer. Geek. Traveller. Wannabe athlete. Lifelong student. Works at IBM and hosts the @HardcodeCast.


LATEST POSTS

Binary Search Algorithms with Examples 07th June, 2022

Configuring Openshift Identity Providers 06th February, 2022

DevOps

Working with the Logstash translate plugin

Posted on .

 

When working with Logstash, chances are you will someday require transforming/replace data before sending it to its last destination. That being said, this post aims to present the translate-plugin, which is a general search and replace tool that uses a configured hash and/or file to determine replacement values.

Scenario – Understanding the logstash-translate-plugin

To understand the translate-plugin, consider a scenario where your logstash pipeline is receiving logs over TCP in the following format:

{Datestamp} {ServiceCode} {Message}
Where the {ServiceCode} field containing a unique identifier (UID) to a particular service such as “nginx” or “kafka”. A Log Sample is shown next:
 
03/07/2017 17:11:40 EZfAH1yD service stopped
03/08/2017 13:00:02 EZfAH1yD service started
03/08/2017 13:08:20 ddvgZOQL service restarted

Ideally, instead of receiving the UID, we would like to receive the actual service name from the sender, but for the sake of the example, the sender can ship the log with its UID only.
 
The sender then provided us with the dictionary of all {ServiceCode} and its {ServiceName} representation, hoping we can perform some sort of real-time processing to add or replace the {ServiceName} field into the stream.
 
EZfAH1yD  => "nginx"
ddvgZOQL" => "rsyslog"
InCCvabi" => "firewalld"
9PAMjJBx" => "zookeeper"
wGbeORZN" => "kafka"
SujIotpo" => "MySQL"

Solution – Using logstash-translate-plugin

Given the scenario presented in the introduction, our job is then to come up with a solution to make use of the dictionary and inject the service name into the log.
 
For our requirement, we will make use of the logstash-translate-filter tool.
 
So, let’s get our hands dirty. The following is a logstash configuration that receives and parse the original log format from the example above:
input {
    tcp {
        port => 9000
    }
}
filter {
    grok {
        match => {
            message => "%{WORD:service} %{GREEDYDATA}"
        }
    }
}
output{
    elasticsearch {
        index => "logstash-services-%{+YYYY-MM-dd}"
    }
}
In the example, we are receiving logs over TCP connection on port 9000 and have a simple grok filter to parse our log format.
 
Logs are forwarded to the logstash-services index on elasticsearch. Let’s test our configuration:

1) Start logstash.

$ service logstash start

2) In another terminal, open a connection to logstash with netcat and send an event.

$ netcat localhost 9000
EZfAH1yD service stopped
3) Now let’s query elasticsearch to view the last event (assuming a local elasticsearch instance configured at the default port 9200):
 
$ curl localhost:9200/logstash-services-*/_search?pretty -d '{
    "size": 1,
    "sort": {"@timestamp": "desc"}
}
'
The expected result is:
 
{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 20,
    "successful" : 20,
    "failed" : 0
  },
  "hits" : {
    "total" : 25,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "logstash-services-2017-03-10",
        "_type" : "logs",
        "_id" : "AVq4JEuEVyTgmQZUBH-T",
        "_score" : null,
        "_source" : {
          "@timestamp" : "2017-03-10T12:14:30.685Z",
          "port" : 44150,
          "service" : "EZfAH1yD",
          "@version" : "1",
          "host" : "127.0.0.1",
          "message" : "EZfAH1yD service stopped"
        },
        "sort" : [
          1489148070685
        ]
      }
    ]
  }
}
Now, to achieve our desired goal of replacing the {ServiceCode} to {ServiceName} as soon as the event reaches our logstash input, let’s install the logstash-filter-translate plugin.
 
This is required as the translate plugin do not come with logstash by default, so to install it, perform:
 
$ bin/logstash-plugin install logstash-filter-translate
The logstash-plugin service will download the logstash-filter-translate from the plugin repository and install it on your logstash.
 
To confirm installation, perform the following, to query your plugins and grep for the translate string:
 
$ bin/logstash-plugin list | grep translate
Now, let’s alter our logstash configuration to make use the translation filter.
 
input {
        tcp {
                port => 9000
        }
}
filter {
        grok {
                match => {
                        message => "%{WORD:service} %{GREEDYDATA}"
                }
        }
        translate {
                field => "service"
                destination => "service_name"
                override => false
                dictionary_path => /etc/logstash/translate_dictionary.yml
        }
}
output{
        elasticsearch {
                index => "logstash-services-%{+YYYY-MM-dd}"
        }
}

The difference here is the use of the translation block, composed of:
translate {
    field => "service"
    destination => "service_name"
    override => false
    dictionary_path => /etc/logstash/translate_dictionary.yml
}
Where:
 
field: The source field we want to translate
 
destination: Which field we want to store the translated value
 
override: If true, it will overwrite the value of the original field. In our case, we want the translation to be added to a new field. Thus the value false.
dictionary: Location of our dictionary on the filesystem.
 
Before testing our new configuration, the last part of our puzzle is missing, the dictionary file.
 
In this example, we will use a YAML file, so as we defined in our translate block, create the /etc/logstash/translate_dictionary.yml following dictionary file based on our data.
 
"EZfAH1yD": "nginx"
"ddvgZOQL": "rsyslog"
"InCCvabi": "firewalld"
"9PAMjJBx": "zookeeper"
"wGbeORZN": "kafka"
"SujIotpo": "mysql"
Now let’s make our last test. After restarting logstash, connect with netcat and send a new event:
netcat localhost 9000
EZfAH1yD service started
Now, let’s query elasticsearch to check the results:
curl localhost:9200/logstash-services-*/_search?pretty -d '{
    "size": 1,
    "sort": {"@timestamp": "desc"}
}
'
The expected result is:
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 20,
    "successful" : 20,
    "failed" : 0
  },
  "hits" : {
    "total" : 26,
    "max_score" : null,
    "hits" : [
      {
        "_index" : "logstash-services-2017-03-10",
        "_type" : "logs",
        "_id" : "AVq4JXKEVyTgmQZUBH-V",
        "_score" : null,
        "_source" : {
          "@timestamp" : "2017-03-10T12:15:46.193Z",
          "port" : 44156,
          "service" : "EZfAH1yD",
          "service_name" : "nginx",
          "@version" : "1",
          "host" : "127.0.0.1",
          "message" : "EZfAH1yD service started"
        },
        "sort" : [
          1489148146193
        ]
      }
    ]
  }
}
GREAT!!! As you can see, we now have a new field called service_name, which has a translation for our service.
Based on this, you can play around and increase its functionality. The plugin supports a considerable number of configurations. Find the details in the plugin official documentation.
 

Final considerations

As a general recommendation, the translate plugin should be used with an extensive list of dictionary values. For a small list of replacements, you might use the gsub function of the mutate filter.
In this example, we used a YML file. Alternatively, the translate filter also supports JSON and CSV file formats.
 
Finally, if your dictionary dynamically grows over time, you can make use of the refresh_interval attribute to make a logstash request an updated file more often. By default, the interval is 300 seconds.
 
I hope this article was useful and leave a comment if you have any thoughts on the subject =).
Kelson

Kelson

//iamkel.dev

Software engineer. Geek. Traveller. Wannabe athlete. Lifelong student. Works at IBM and hosts the @HardcodeCast.

Navigation