您好,欢迎来到暴趣科技网。
搜索
您的当前位置:首页ElasticSearch | Logstash | 入门 | 架构介绍

ElasticSearch | Logstash | 入门 | 架构介绍

来源:暴趣科技网

Logstash

数据搜集处理引擎,支持 200 多个插件。


Logstash.png

Logstash | 架构简介

Codec(Code / Decode):将原始数据 decode 成 Event,再将 Event encode 成目标数据。


Logstash 架构.png

Logstash | 相关概念

Pipeline
  • 包含了 input-filter-output 三个阶段的处理流程;
  • Logstash 支持 200 多个插件,并支持插件的生命周期管理;
  • Logstash 有自己的队列;
Event
  • Event 是数据在内部流转时的具体表现形式,数据在 input 阶段被转换为 Event,在 output 被转换成目标格式数据;
  • Event 其实是一个 Java Object,在 filter 阶段可以对 Event 的属性进行增删改查;

Input Plugins

一个 Pipeline 可以有多个 input 插件:

  • Stdin / File
  • Beats / Log4J / Elasticsearch / JDBC / Kafka / Rabbitmq / Redis
  • JMX / HTTP / Websocket / UDP / TCP
  • Google Cloud Storage / S3
  • Github / Twitter
Input Plugin | File
  • 支持从文件中读取数据;
  • 文件读取需要解决的问题:文件只能被读取一次,重启后,需要从上次读取的位置继续,通过 sincedb 实现;
  • 读取到文件新内容,发现新文件;
  • 文件发生归档操作(文档位置发生变化,日志 Rotation),不能影响当前的内容读取;

Output Plugin

将 Event 发送到特定的目的地,是 Pipeline 的最后一个阶段:

  • ElasticSearch
  • Email / Pageduty
  • Influxdb / Kafka / MongoDB / Opensdb / Zabbix
  • HTTP / TCP / Websocket

Codec Plugin

将原始数据 decode 成 Event;将 Event encode 成目标数据:

  • Line / Multiline
  • JSON / Avro / Cef(ArcSight Common Event Format)
  • Dots / Rubydebug

Filter Plugin

Filter Plugin 可以对 Logstash Event 进行各种处理,例如解析字段、删除字段、类型转换:

  • Date - 日期解析;
  • Dissect - 分隔符解析;
  • Grok - 正则匹配解析;
  • Mutate - 处理字段、重命名、删除、替换;
  • Metircs - 聚合 Metrics;
  • Ruby - 利用 Ruby 代码来动态修改 Event;
Filter Plugin | Mutate

对字段做各种操作:

  • Convert - 类型转换;
  • Gsub - 字符串替换;
  • Split / Join / Merge - 切割 / 数组合并成字符串 / 数组合并;
  • Rename - 字段重命名;
  • Update / Replace - 字段内容更新替换;
  • Remove_field - 字段删除;

Queue

Queue.png
In Memory Queue

进程 Crash,机器宕机都会引起数据的丢失。

Persistent Queue

进程 Crash,机器宕机也不会丢失数据;数据保证会被消费,可以替代 Kafka 等消息队列缓冲区的作用。


Codec Plugins - Single Line | 举几个栗子

  • sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> rubydebug}}"
  • sudo bin/logstash -e "input{stdin{codec=>json}}output{stdout{codec=> rubydebug}}"
  • sudo bin/logstash -e "input{stdin{codec=>line}}output{stdout{codec=> dots}}"

Codec Plugin - Multiline | 举个栗子

运行 multiline-exception.conf

sudo bin/logstash -f multiline-exception.conf

multiline-exception.conf 中的内容
  • 以字母开头的输入会被匹配到;
  • 匹配到的内容属于上一个 Event,需要再输入一个以字母开头的文本,Logstash 才会有输出;
input {
  stdin {
    codec => multiline {
      pattern => "^\s"
      what => "previous"
    }
  }
}


filter {}

output {
  stdout { codec => rubydebug }
}
输入一段 Java 的堆栈异常作为多行数据
Exception in thread "main" java.lang.NullPointerException
        at com.example.myproject.Book.getTitle(Book.java:16)
        at com.example.myproject.Author.getBookTitles(Author.java:25)
        at com.example.myproject.Bootstrap.main(Bootstrap.java:14)
再输入以字母开头的文本

hello word

输出上一段匹配到的 Java 的堆栈信息;


拿个 logstash.conf 出来分析一下

  • 没有用到 sincedb,所以 sincedb_path => "/dev/null"
input {
  file {
    path => "/home/lixinlei/data/movielens/ml-latest-small/movies.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
  • movies.csv 大概长这样:
movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
...
  • 每行的数据用 , 分割,分割出来的内容,分别归属到列 id,content,genre 下;
  • 列 genre 下的数据用 | 分割;
  • 列 content 下的数据用 ( 分割,分割出来的第 1 个字符串存到新的列下,新的列叫 title;分割出来的第 2 个字符串存到新的列下,新的列叫 year;
  • 把列 year 的数据类型转为 integer;
  • 去掉列 title 中的空格;
filter {
  csv {
    separator => ","
    columns => ["id","content","genre"]
  }

  mutate {
    split => { "genre" => "|" }
    remove_field => ["path", "host","@timestamp","message"]
  }

  mutate {
    split => ["content", "("]
    add_field => { "title" => "%{[content][0]}"}
    add_field => { "year" => "%{[content][1]}"}
  }

  mutate {
    convert => {
      "year" => "integer"
    }
    strip => ["title"]
    remove_field => ["path", "host","@timestamp","message","content"]
  }

}
  • 把读取到的,转换过的信息写入 ElasticSearch 中;
output {
   elasticsearch {
     hosts => "http://localhost:9200"
     index => "movies"
     document_id => "%{id}"
   }
  stdout {}
}

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- baoquwan.com 版权所有 湘ICP备2024080961号-7

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务