DevOps
Working with the Logstash translate plugin
When working with Logstash, chances are you will someday require transforming/replace data before sending it to its last destination. That being said, this post aims to present the translate-plugin, which is a general search and replace tool that uses a configured hash and/or file to determine replacement values.
Scenario – Understanding the logstash-translate-plugin
To understand the translate-plugin, consider a scenario where your logstash pipeline is receiving logs over TCP in the following format:
{Datestamp} {ServiceCode} {Message}
{ServiceCode}
field containing a unique identifier (UID) to a particular service such as “nginx” or “kafka”. A Log Sample is shown next:03/07/2017 17:11:40 EZfAH1yD service stopped 03/08/2017 13:00:02 EZfAH1yD service started 03/08/2017 13:08:20 ddvgZOQL service restarted
{ServiceCode}
and its {ServiceName}
representation, hoping we can perform some sort of real-time processing to add or replace the {ServiceName}
field into the stream.EZfAH1yD => "nginx"
ddvgZOQL" => "rsyslog"
InCCvabi" => "firewalld"
9PAMjJBx" => "zookeeper"
wGbeORZN" => "kafka"
SujIotpo" => "MySQL"
Solution – Using logstash-translate-plugin
input {
tcp {
port => 9000
}
}
filter {
grok {
match => {
message => "%{WORD:service} %{GREEDYDATA}"
}
}
}
output{
elasticsearch {
index => "logstash-services-%{+YYYY-MM-dd}"
}
}
1) Start logstash.
$ service logstash start
2) In another terminal, open a connection to logstash with netcat and send an event.
$ netcat localhost 9000
EZfAH1yD service stopped
$ curl localhost:9200/logstash-services-*/_search?pretty -d '{
"size": 1,
"sort": {"@timestamp": "desc"}
}
'
{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 20,
"successful" : 20,
"failed" : 0
},
"hits" : {
"total" : 25,
"max_score" : null,
"hits" : [
{
"_index" : "logstash-services-2017-03-10",
"_type" : "logs",
"_id" : "AVq4JEuEVyTgmQZUBH-T",
"_score" : null,
"_source" : {
"@timestamp" : "2017-03-10T12:14:30.685Z",
"port" : 44150,
"service" : "EZfAH1yD",
"@version" : "1",
"host" : "127.0.0.1",
"message" : "EZfAH1yD service stopped"
},
"sort" : [
1489148070685
]
}
]
}
}
{ServiceCode}
to {ServiceName}
as soon as the event reaches our logstash input, let’s install the logstash-filter-translate plugin.$ bin/logstash-plugin install logstash-filter-translate
$ bin/logstash-plugin list | grep translate
input { tcp { port => 9000 } } filter { grok { match => { message => "%{WORD:service} %{GREEDYDATA}" } } translate { field => "service" destination => "service_name" override => false dictionary_path => /etc/logstash/translate_dictionary.yml } } output{ elasticsearch { index => "logstash-services-%{+YYYY-MM-dd}" } }
translate {
field => "service"
destination => "service_name"
override => false
dictionary_path => /etc/logstash/translate_dictionary.yml
}
/etc/logstash/translate_dictionary.yml
following dictionary file based on our data."EZfAH1yD": "nginx"
"ddvgZOQL": "rsyslog"
"InCCvabi": "firewalld"
"9PAMjJBx": "zookeeper"
"wGbeORZN": "kafka"
"SujIotpo": "mysql"
netcat
and send a new event:netcat localhost 9000
EZfAH1yD service started
curl localhost:9200/logstash-services-*/_search?pretty -d '{
"size": 1,
"sort": {"@timestamp": "desc"}
}
'
{
"took" : 1,
"timed_out" : false,
"_shards" : {
"total" : 20,
"successful" : 20,
"failed" : 0
},
"hits" : {
"total" : 26,
"max_score" : null,
"hits" : [
{
"_index" : "logstash-services-2017-03-10",
"_type" : "logs",
"_id" : "AVq4JXKEVyTgmQZUBH-V",
"_score" : null,
"_source" : {
"@timestamp" : "2017-03-10T12:15:46.193Z",
"port" : 44156,
"service" : "EZfAH1yD",
"service_name" : "nginx",
"@version" : "1",
"host" : "127.0.0.1",
"message" : "EZfAH1yD service started"
},
"sort" : [
1489148146193
]
}
]
}
}
service_name
, which has a translation for our service.Final considerations
gsub
function of the mutate filter.refresh_interval
attribute to make a logstash request an updated file more often. By default, the interval is 300 seconds.Kelson
//iamkel.devSoftware engineer. Geek. Traveller. Wannabe athlete. Lifelong student. Works at IBM and hosts the @HardcodeCast.