There are many situations when we want to analyze large size of logs. Considering the logs are in CSV format. There are many tools that make this task easy. Still, it’s frustrating us with the amount of time it takes to process the data.

To overcome this I took the help of ELK (Docker) in a windows machine using WSL.

Requirement

  • Docker in Windows
  • WSL 2 (Ubuntu/Kali)

Installation

ELK docker installation

sudo docker pull sebp/elk

Starting docker

docker run -p 5601:5601 -p 9200:9200 -p 5044:5044 -it -e LOGSTASH_START=0 -v {CSV file location in host}:{Mounting location} --name Container_Name sebp/elk
  • e LOGSTASH_START=0: Start ELK without logstash
  • v {CSV file location in host}:{Mounting location}:Mount host drive to docker

Starting Docker with bash terminal

Open new terminal and enter below command:

docker exec -it Container_name /bin/bash

Then upload CSV Logs using command below:

/opt/logstash/bin/logstash -f logstash.conf

logstash.conf Example

input {
    file {
        path => "/home/user/log.csv"
        start_position => "beginning"
       # sincedb_path => "/dev/null"
    }
}

filter {
    csv {
        separator => ","
        columns =>
        ["Sr","Day","Weekday","District","DistrictNumber","Mem","Abs","AttendancePercent","AbsentPercent","County"
        ]
    }
}

output {
    elasticsearch {
        hosts => "http://localhost:9200"
        index => "elk_test"
    }
    stdout {}
}

When we add logs using logstash timeline created will the time when log has uploaded to elastic search.

To mitigate this we need to create gork filter to remove the timestamp filed created by logstash and map time column we want to map as below:

file{
        path => "C:/log_files/*.csv"
        sincedb_path => "NULL"
        start_position => "beginning"
        }
}
filter{
    csv{
        separator => ","
        columns => ["Event Time","Receipt Time","Device","startTime"]
        remove_field => ["@timestamp"]
    }
    date {
            match => ["startTime","yyyy/MM/dd HH:mm:ss ZZZ"]
            target => "startTime"
    }
}
output{
    elasticsearch{
        hosts => ["localhost:9200"]
        index => "fortigate_201_07_1022"
        user => "elastic"
        password => "e3yC******"
    }
    stdout{
        codec => rubydebug
    }

}