Generating Alerts from Logstash

Note

If you’d like to run through this tutorial more quickly, be sure you’ve loaded Cyohon’s starter settings before you begin. These settings provide many of the configurations used in this tutorial.

Introduction

Suppose we want to surface a Snort alert in Cyphon:

May 16 13:00:15 whimsy snort[3115]: [1:469:1] ICMP PING NMAP
[Classification: Attempted Information Leak] [Priority: 2]:
{ICMP} 10.120.66.1:21 -> 10.22.33.106:21

We want the parsed message to look like this:

{
    "sys_timestamp": "2017-05-16T13:00:15.000Z",
    "host": "whimsy",
    "generator_id": 1,
    "signature_id": 469,
    "signature": "ICMP PING NMAP",
    "classification": "Attempted Information Leak",
    "priority": "2",
    "protocol": "ICMP",
    "source_ip": 10.120.66.1,
    "source_port": "21",
    "destination_ip": "10.22.33.106",
    "destination_port": "21"
}

We could either use Cyphon to parse and save the message through a LogChute, or we could use Logstash to parse the message and send it directly to Cyphon’s Watchdogs. In this tutorial, we’ll use the Logstash shortcut. Although we’ll only cover the main aspects of the Logstash configuration here, you can see a full example on Cyphondock.

Step 1: Filebeat

We’ll use Filebeat to send our Snort logs to Logstash. Consult Filebeat’s official documentation for full details.

Prospector

Within the filebeat.yml configuration file, set up a Filebeat prospector to label the Snort log messages as “snort,” so we can easily identify them:

filebeat.prospectors:

- input_type: log
  paths:
    - /var/log/snort/*.log
  document_type: snort

Be sure to change the path to the location where you’re actually sending Snort alerts.

Output

Next, specify an output to send the logs to Logstash, for example:

output.logstash:
  hosts: ["localhost:5044"]

If Filebeat and Logstash are on different machines, be sure to change the hosts setting to reflect the address of your Logstash server.

Note

If you’re using a Logstash Docker container for local testing of your Filebeat configuration, the hosts might instead be ["logstash:5044"].

Step 2: Logstash

Once Filebeat is setup, we can configure Logstash to receive the logs. Consult Logstash’s official documentation for full details. You can also find an example of this Logstash pipeline on Cyphondock.

Input

First, we’ll create a Logstash input for Filebeat:

input {
    beats {
        port => 5044
    }
}

This port matches the port we specified in the Filebeat configuration for Logstash output.

Filters

Next, we’ll construct a couple of Logstash filters to process the message.

The first filter will:

  1. identify messages from Snort
  2. parse the messages into individual fields
  3. translate the timestamp field into a datetime that can be indexed
filter {

    if [type] == "snort" {

        # parse the message into individual fields
        grok {
            match => { "message" => "(?<ts>.*\d{2}:\d{2}:\d{2})\s(?<host>.*?)\s.*?\s\[(?<generator_id>.*?)::(?<signature_id>.*?):.*?\]\s(?<signature>.*?)\s\[Classification:\s(?<classification>.*?)\]\s\[Priority:\s(?<priority>.*?)\].*?{(?<protocol>.*?)\}\s(?<source_ip>.*?):(?<source_port>.*?)\s-\>\s(?<destination_ip>.*?):(?<destination_port>.*)" }
        }

        # remove the original message if parsing was successful
        if !("_grokparsefailure" in [tags]) {
            mutate {
                remove_field => [ "message" ]
            }
        }

        # parse the timestamp and save in a new datetime field
        if [ts] {
            date {
                match => [ "ts", "MMM dd HH:mm:ss" ]
                target => "sys_timestamp"
            }

            # remove the original timestamp if date parsing was successful
            if !("_dateparsefailure" in [tags]) {
                mutate {
                    remove_field => [ "ts" ]
                }
            }
        }
    }
}

The second filter will add the following fields to help Cyphon locate the data:

  1. a universally unique identifier (uuid) for the document
  2. a reference to the index where the document will be saved
filter {

    uuid {
        target => "@uuid"
    }

    mutate {
        add_field => {
            "collection" => "elasticsearch.cyphon.%{type}"
        }
    }
}

When the document is saved, %{type} will be replaced by the document_type we specified in the Filebeat prospector, so the collection will be “elasticsearch.cyphon.snort”.

Output

Finally, we need to tell Logstash where to send the parsed messages. Our Logstash output configuration will:

  1. save the message in a time-stamped Elasticsearch index
  2. use the @uuid field as the Elasticsearch document id
  3. send a copy of the message to Cyphon for inspection by Watchdogs
output {

    # save events to Elasticsearch with the uuid as the document id
    elasticsearch {
        hosts => ["elasticsearch"]
        index => "cyphon-%{+YYYY-MM-dd}"
        document_id => "%{@uuid}"
    }

    # send Snort messages to Cyphon Watchdogs via RabbitMQ
    if [type] == "snort" {
        rabbitmq {
            host => "rabbit"
            port => 5672
            exchange => "cyphon"
            exchange_type => "direct"
            key => "watchdogs"
            user => "guest"
            password => "guest"
            vhost => "cyphon"
            durable => true
            persistent => true
        }
    }
}

Step 3: Cyphon

Although we’re using Logstash to parse and store our logs, we need to let Cyphon know a bit about what’s happening.

Container

../_images/container1.png

We can tell Cyphon what fields of the document are important to us by creating a Container for those fields. We’re not performing any extra analyses on the document, so we don’t need a Label for the Container, just a Bottle.

Let’s revisit our parsed document:

{
    "sys_timestamp": "2017-05-16T13:00:15.000Z",
    "host": "whimsy",
    "generator_id": 1,
    "signature_id": 469,
    "signature": "ICMP PING NMAP",
    "classification": "Attempted Information Leak",
    "priority": "2",
    "protocol": "ICMP",
    "source_ip": 10.120.66.1,
    "source_port": "0",
    "destination_ip": "10.22.33.106",
    "destination_port": "0"
}

To make a Container for this document, open the “Shaping Data” panel on Cyphon’s main admin page. Create the following BottleFields:

Field Name Field Type Target Type
sys_timestamp DateTimeField DateTime
host CharField  
generator_id CharField  
signature_id CharField  
signature CharField  
classification CharField Keyword
priority CharField  
protocol CharField  
source_ip GenericIPAddressField IPAddress
source_port CharField  
destination_ip GenericIPAddressField IPAddress
destination_port CharField  

Add these BottleFields to a Bottle called “snort”.

Next, create a Container using the “snort” Bottle. Define a Taste for the Container with the following properties:

Taste.author host
Taste.title classification
taste.content signature
taste.datetime sys_timestamp

Note

You could also add a “location” to this Taste by adding a geoip filter to your Logstash configuration and then including the geoip.location field in the “snort” Container. Refer to Cyphondock’s Snort filter and starter fixtures for an example.

Distillery

../_images/distillery2.png

Next, we need to specify where the data is located. To do this, we’ll create a Distillery, which defines the data store and the data schema for our document. The data schema is represented by the Distillery’s Container (which we just created). The data store is represented by the Distillery’s Collection.

Before we create the Collection, we first need to establish a Warehouse for it. You can create the Warehouse by opening the “Storing Data” panel on Cyphon’s main admin page.

The name of the Warehouse will correspond to the name of the index we specified in our Logstash output configuration, which was “cyphon-%{+YYYY-MM-dd}”. However, we won’t include the date in the Warehouse name; instead, we’ll simply name it “cyphon” and set it as a “time series” index. This setting will create a new Elasticsearch index every day, with the current date appended to the Warehouse name.

Note

Time-series indexes make it easier to archive or delete old data. See Elasticsearch’s documentation on retiring data for more info.

The Warehouse will thus have the following properties:

Warehouse.backend elasticsearch
Warehouse.name cyphon
Warehouse.time_series True

The name of the Collection in this Warehouse will correspond to the document_type we specified in our Filebeat prospector:

Collection.warehouse cyphon
Collection.name snort

Under the “Distilling Data” panel on the main admin page, create a Distillery that uses this Collection and the Container we previously created:

Distillery.collection elasticsearch.cyphon.snort
Distillery.container snort
Distillery.is_shell True

Watchdog

../_images/watchdog2.png

Now that Cyphon knows where the document is stored and what it looks like, we can create a Watchdog to inspect it. We’ll create some simple rulesets that will generate Alerts according to the priority of the Snort alert.

Note

Watchdogs are able to inspect fields in a document even if we didn’t include them in our Container. In this case, we’ll make use of the type field, which is the field Elasticsearch uses to store the document_type (which we orginally defined in our Filebeat prospector).

Let’s start by making some DataRules to identify Snort Alerts of different priorites. Under the “Sifting Data” panel on the main admin page, click on “JSON Data” and create the following DataRules:

DataRule Name Field Name Operator Value
type_equals_snort type equals snort
priority_equals_1 priority equals 1
priority_equals_2 priority equals 2
priority_equals_3 priority equals 3

We can then use these DataRules to create DataSieves that a Watchdog will use to inspect the document:

DataSieve Name Logic Node 1 (data rule) Node 2 (data rule)
Snort alert - high priority AND type_equals_snort priority_equals_1
Snort alert - medium priority AND type_equals_snort priority_equals_2
Snort alert - low priority AND type_equals_snort priority_equals_3
Snort alert - unknown priority AND type_equals_snort  

Now go to the “Configuring Alerts” panel and create the Watchdog:

Watchdog.name Snort
Watchdog.enabled True

Add the following Triggers to the Watchdog:

DataSieve Alert Level Rank
Snort alert - high priority High 0
Snort alert - medium priority Medium 10
Snort alert - low priority Low 20
Snort alert - unknown priority Medium 30

It’s important that the “Snort alert - unknown priority” rule has the lowest rank; we only want it to come into play if none of the other rules have matched a document.

Note

Although we could assign ranks of 0, 1, 2, and 3 to the Triggers, we’re using 0, 10, 20, and 30 to make it easier to insert additional Triggers at a later date.

Step 4: Receiver

Now all we have to do is start some queue consumers for Logstash’s RabbitMQ output.

If you’re using Cyphondock or installed Cyphon from our ISO (which uses Cyphondock), you don’t need to do anything more; you should already have watchdog containers running, which will act as queue consumers.

If you’re running a manual install, though, you’ll need to create some watchdog queue consumers using the command line. From the Cyphon project directory, you can enter this command:

python receiver/receiver.py watchdogs 4

This will create four queue consumers for a RabbitMQ queue called “watchdogs” that’s associated with the “watchdogs” routing key. This is the same routing key we specified in our Logstash output configuration for RabbitMQ.

You should now start to see Cyphon Alerts from Snort!

Conclusion

In this tutorial, we’ve shown how you can use Logstash to quickly deliver data to Cyphon’s Watchdogs. Although we only used a log message in our example, Logstash has a number of input plugins that handle a variety of other forms of data, which you can send to Cyphon as well!