In the previous part, we've set up the first component in the process: Filebeat. This component sends all the log lines to Logstash. We will now configure Logstash to interpret the log lines and map it to useful, searchable, filterable and groupable data.
Log lines are just... strings. Strings of concatenated information.
Let’s say, we have these log lines:
2019-02-21T10:00:15.565 (STIHO) Action X took: 2403 ms
2019-02-21T10:00:17.105 (STIHO) Action Y took: 1209 ms
2019-02-21T10:00:19.643 (GERGE) Action X took: 2221 ms
We can deduce a lot of information here. The first part is when an action happened. Then, between the parentheses, we see which user has triggered this action. We also see the name of the action and how long it took.
What if we would be able to divide the log line in columns of a table in a database?
Log date | User | Action name | Duration |
---|---|---|---|
2019-02-21T10:00:15.565 | STIHO | X | 2403 |
2019-02-21T10:00:17.105 | STIHO | Y | 1209 |
2019-02-21T10:00:19.643 | GERGE | X | 2221 |
We could execute queries on this database, like:
With Logstash, we map incoming log lines into pieces of useful information (log date, user, action name, duration, …) and persist this information in an Elastic Search database.
Such mapping can be done with a Grok pattern.
In order to configure Logstash, at least 2 types of config are needed:
logstash/config/logstash.yml
logstash/pipeline/logstash.conf
In this general config file, you can configure things like:
In our example (which can be found in my example Git repo as logstash/config/logstash.yml
) we configure the bare minimum to make Logstash work in Docker.
For more information on what can be configured, check out the documentation.
http.host: "0.0.0.0"
path.config: /usr/share/logstash/pipeline
In a pipeline, you define:
input {
beats {
port => 5044 # Open up a port (5044) which Filebeat can talk to.
}
}
filter {
grok {
match => {
"message" => [
# my grok patterns (as an array of strings)
]
}
}
date {
match => ["tstamp", "yyyy-MM-dd HH:mm:ss"]
timezone => "UTC"
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200" # Since I'm running the Elastic Stack in a Docker environment, the host name is elasticssearch. If you don't use Docker and run the stack on your local computer, you'll have to replace this with localhost.
}
}
It's possible to have multiple pipelines, if you have multiple inputs. For more information, check out the documentation.
Let’s take the following log lines as an example:
2019-02-21T10:00:15.565 (STIHO) Something happens in my application
Which useful parts exist in this log line?
In vague words:
To convert these vague words into a grok pattern, you need to define:
In more concrete words:
A Grok tag looks like this: %{data-type:name-of-your-tag}.
On Github, you’ll find a list of the existing standard types in Grok
This leads us to:
So, our Grok pattern is
^%{TIMESTAMP_ISO8601:logDate}%{SPACE}\(%{NOTSPACE:username}\)%{SPACE}%{GREEDYDATA:action}
To test and tweak your grok pattern, you can use https://grokdebug.herokuapp.com/.
Aaah... localization. Everywhere in the world, people write dates and numbers differently.
What is a duration of 2,342 seconds? Does this action take 2 seconds or forever?
And is 01/02 in February or in January?
The artistic freedom, what a gift!
Now on a serious note, look out for errors caused by data being interpreted differently by your US-based Elastic search software.
I have a log with dates formatted like this:
2019/01/30 09:02:37.482
To a human eye, this resembles the standard date time pattern. However it isn’t. The ISO 8106 standard looks like this: 2019-01-30T09:02:37.482Z
When I have a look at the date patterns that are available by default, I don't find this specific format.
Time to get creative!
filter {
grok {
match => {
"message" => [
"%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day}[T ]%{HOUR:hour}:?%{MINUTE:minute}(?::?%{SECOND:second})............"
]
overwrite => [ "message" ]
}
}
mutate {
# create a date field that can be used by Elastic Search
add_field => { "logDate" => "%{year}-%{month}-%{day}T%{hour}:%{minute}:%{second}Z" }
remove_field => [ "year", "month", "day", "hour", "minute", "second" ]
}
date {
match => [ "logDate", "ISO8601" ]
timezone => “UTC”
}
}
In Belgium, we write "2 and a half seconds" as 2,5. Elastic Search expect a dot, instead of a comma: 2.5.
Luckily, it isn't too hard to do conversions in Logstash.
mutate {
# Duration has a comma separator. Elastic search needs a point.
gsub => [
"duration", ",", "."
]
}
mutate {
# Now that the comma has been replaced by a point, duration can be considered as a float
convert => [
"duration", "float"
]
}
When I first unleashed my Grok patterns on a big set of logs, I quickly encountered the following error in Logstash:
Timeout executing grok
Message too large to output
.
What's the cause? Turns out that my Grok patterns were too heavy to execute.
If you want the Logstash status to stay green, keep your Grok patterns lean and mean!
(That was lame. Sorry.)
We have configured Filebeat and Logstash, so if we boot up our Elastic stack, logs should be pumped into the Elastic Search database. Let's see if everything works correctly!
In our example, we can do that by executing docker-compose up -d
in a terminal.
In our example, we can do this with docker logs -f elasticsearchforunionvms_filebeat_1
You should see non-zero metrics to be logged.
Non-zero metrics in the last 30s {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":2928560,"time":{"ms":44}},"total":{"ticks":6224980,"time":{"ms":116},"value":6224980},"user":{"ticks":3296420,"time":{"ms":72}}},"handles":{"limit":{"hard":1048576,"soft":1048576},"open":6},"info":{"ephemeral_id":"956eb026-a4f1-41ea-81e8-44f29297b836","uptime":{"ms":1660200077}},"memstats":{"gc_next":10799648,"memory_alloc":5627064,"memory_total":261400300056}},"filebeat":{"harvester":{"open_files":0,"running":0}},"libbeat":{"config":{"module":{"running":0}},"pipeline":{"clients":4,"events":{"active":0}}},"registrar":{"states":{"current":797}},"system":{"load":{"1":0,"15":0,"5":0,"norm":{"1":0,"15":0,"5":0}}}}}}
It also doesn't hurt to look out for:
Configured paths: [/your-log-path/*.log]
Loading and starting Inputs completed. Enabled inputs: 4
Probably, you'll see something like this in your logs:
2019-03-20T18:14:21.348Z ERROR pipeline/output.go:100 Failed to connect to backoff(async(tcp://logstash:5044)): dial tcp 172.19.0.5:5044: connect: connection refused
2019-03-20T18:14:21.350Z INFO pipeline/output.go:93 Attempting to reconnect to backoff(async(tcp://logstash:5044)) with 1 reconnect attempt(s)
Logstash needs more time to boot than Filebeat does. It's possible that Filebeat tries to push information to a Logstash that's still booting. No worries, Filebeat will retry later.
If the problem still occurs, even after Logstash has fully started up, somethings's up. Then it's time to do a quick search on the Internet with the error message.
When using our example, you can fetch the logs with docker logs -f elasticsearchforunionvms_logstash_1
.
When everyhting boots up correctly, you should see
Successfully started Logstash API endpoint {:port=>9600}
and
Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xe378ce2 run>"}
Here, the same remark applies as in "Filebeat cannot connect to Logstash". Make sure that Elastic Search has fully booted up.
If all logs look OK, let's see if the Elastic Stack database contains our mapped logs.
You can manage the database with HTTP requests (with a tool like Postman).
Elastic Search keeps data in indexes. When Logstash wants to save data into Elastic Search, it will need to create indexes. These indexes are created the name log-[date].
Knowing this, we can get all indexes made by Logstash by doing the following HTTP GET request:
GET http://localhost:9200/log*/_search?pretty
Applying the same logic, we can delete all indexes, made by Logstash, by shooting a DELETE request:
DELETE http://localhost:9200/log*
Second, do you remember that Filebeat remembers which log lines it has sent to Logstash, via its registry file? You'll need to remove that registry file too, if you want to start over again.
Using the REST API works, but isn't a great user experience. Let's set up Kibana in the final part of this series.