Logstash - 快速指南


Logstash - 简介

Logstash 是一个基于过滤器/管道模式的工具,用于收集、处理和生成日志或事件。它有助于集中和实时分析来自不同来源的日志和事件。

Logstash是用运行在JVM上的JRuby编程语言编写的,因此您可以在不同的平台上运行Logstash。它从几乎所有类型的源收集不同类型的数据,如日志、数据包、事件、事务、时间戳数据等。数据源可以是社交数据、电子商务、新闻文章、CRM、游戏数据、网络趋势、金融数据、物联网、移动设备等。

Logstash 一般功能

Logstash 的一般特征如下:

  • Logstash 可以从不同来源收集数据并将其发送到多个目的地。

  • Logstash 可以处理所有类型的日志数据,例如 Apache 日志、Windows 事件日志、网络协议数据、标准输入数据等等。

  • Logstash 还可以处理 http 请求和响应数据。

  • Logstash提供了多种过滤器,可以帮助用户通过解析和转换数据来找到数据的更多含义。

  • Logstash 还可用于处理物联网中的传感器数据。

  • Logstash 是开源的,可在 Apache 许可证版本 2.0 下使用。

Logstash 关键概念

Logstash 的关键概念如下:

事件对象

它是Logstash中的主要对象,封装了Logstash管道中的数据流。Logstash 使用此对象来存储输入数据并添加在过滤阶段创建的额外字段。

Logstash 为开发人员提供事件 API 来操作事件。在本教程中,此事件有各种名称,例如记录数据事件、日志事件、日志数据、输入日志数据、输出日志数据等。

管道

它由 Logstash 中从输入到输出的数据流阶段组成。输入数据进入管道并以事件的形式进行处理。然后以用户或终端系统所需的格式发送到输出目的地。

输入

这是Logstash管道的第一阶段,用于获取Logstash中的数据以进行进一步处理。Logstash 提供各种插件来从不同平台获取数据。一些最常用的插件是 – File、Syslog、Redis 和 Beats。

筛选

这是 Logstash 的中间阶段,实际处理事件发生在此处。开发人员可以使用 Logstash 预定义的正则表达式模式来创建序列,以区分事件中的字段和接受输入事件的标准。

Logstash 提供各种插件来帮助开发人员解析事件并将其转换为所需的结构。一些最常用的过滤器插件是 – Grok、Mutate、Drop、Clone 和 Geoip。

输出

这是 Logstash 管道的最后一个阶段,可以将输出事件格式化为目标系统所需的结构。最后,它使用插件将处理完成后的输出事件发送到目的地。一些最常用的插件是 – Elasticsearch、File、Graphite、Statsd 等。

Logstash的优势

以下几点解释了Logstash的各种优点。

  • Logstash 提供正则表达式模式序列来识别和解析任何输入事件中的各个字段。

  • Logstash 支持各种 Web 服务器和数据源来提取日志数据。

  • Logstash 提供了多个插件来解析日志数据并将其转换为任何用户所需的格式。

  • Logstash 是集中式的,这使得处理和收集来自不同服务器的数据变得容易。

  • Logstash 支持许多数据库、网络协议和其他服务作为日志记录事件的目标源。

  • Logstash 使用 HTTP 协议,这使得用户能够升级 Elasticsearch 版本,而无需锁定步骤升级 Logstash。

Logstash 的缺点

以下几点解释了Logstash的各种缺点。

  • Logstash 使用 http,这会对日志数据的处理产生负面影响。

  • 使用 Logstash 有时可能有点复杂,因为它需要对输入日志数据有很好的理解和分析。

  • 过滤器插件不是通用的,因此,用户可能需要找到正确的模式序列以避免解析错误。

在下一章中,我们将了解 ELK Stack 是什么以及它如何帮助 Logstash。

Logstash - ELK 堆栈

ELK 代表Elasticsearch、LogstashKibana。在 ELK 堆栈中,Logstash 从不同的输入源提取日志数据或其他事件。它处理事件,然后将其存储在 Elasticsearch 中。Kibana 是一个 Web 界面,它访问 Elasticsearch 的日志数据并将其可视化。

麋鹿

Logstash 和 Elasticsearch

Logstash 提供输入和输出 Elasticsearch 插件,用于将日志事件读取和写入 Elasticsearch。由于 Elasticsearch 与 Kibana 的兼容性,Elasticsearch 公司也推荐将 Elasticsearch 作为输出目的地。Logstash 通过 http 协议将数据发送到 Elasticsearch。

Elasticsearch 提供批量上传功能,有助于将不同来源或 Logstash 实例的数据上传到集中式 Elasticsearch 引擎。与其他 DevOps 解决方案相比,ELK 具有以下优势 -

  • ELK 堆栈更易于管理,并且可以扩展以处理 PB 级的事件。

  • ELK 堆栈架构非常灵活,并且提供与 Hadoop 的集成。Hadoop 主要用于归档目的。Logstash可以通过flume直接连接Hadoop,Elasticsearch提供了一个名为es-hadoop的连接器来连接Hadoop。

  • ELK 的拥有总成本比其替代品低得多。

Logstash 和 Kibana

Kibana 不直接与 Logstash 交互,而是通过数据源(即 ELK 堆栈中的 Elasticsearch)进行交互。Logstash 从各个来源收集数据,Elasticsearch 以非常快的速度对其进行分析,然后 Kibana 提供针对该数据的可行见解。

Kibana 是一个基于 Web 的可视化工具,可帮助开发人员和其他人分析 Logstash 在 Elasticsearch 引擎中收集的大量事件的变化。这种可视化可以轻松预测或查看输入源的错误或其他重要事件的趋势变化。

Logstash - 安装

要在系统上安装 Logstash,我们应该按照以下步骤操作 -

步骤 1 - 检查计算机中安装的 Java 版本;它应该是 Java 8,因为它与 Java 9 不兼容。您可以通过以下方式检查 -

在 Windows 操作系统 (OS) 中(使用命令提示符) -

> java -version 

在 UNIX 操作系统中(使用终端) -

$ echo $JAVA_HOME

步骤 2 - 从以下位置下载 Logstash -

https://www.elastic.co/downloads/logstash

  • 对于 Windows 操作系统,请下载 ZIP 文件。

  • 对于 UNIX 操作系统,请下载 TAR 文件。

  • 对于 Debian 操作系统,请下载 DEB 文件。

  • 对于 Red Hat 和其他 Linux 发行版,请下载 RPN 文件。

  • APT 和 Yum 实用程序也可用于在许多 Linux 发行版中安装 Logstash。

步骤 3 - Logstash 的安装过程非常简单。让我们看看如何在不同平台上安装 Logstash。

注意- 不要在安装文件夹中放置任何空格或冒号。

  • Windows 操作系统- 解压 zip 包并安装 Logstash。

  • UNIX 操作系统- 在任何位置提取 tar 文件并安装 Logstash。

$tar –xvf logstash-5.0.2.tar.gz

使用适用于 Linux 操作系统的 APT 实用程序 -

  • 下载并安装公共签名密钥 -
$ wget -qO - https://artifacts.elastic.co/GPG-KEY-elasticsearch | sudo apt-key add -
  • 保存存储库定义 -
$ echo "deb https://artifacts.elastic.co/packages/5.x/apt stable main" | sudo
   tee -a /etc/apt/sources.list.d/elastic-5.x.list
  • 运行更新 -
$ sudo apt-get update
  • 现在您可以使用以下命令进行安装 -
$ sudo apt-get install logstash

在 Debian Linux 操作系统中使用 YUM 实用程序-

  • 下载并安装公共签名密钥 -
$ rpm --import https://artifacts.elastic.co/GPG-KEY-elasticsearch
  • 在“/etc/yum.repos.d/”目录中的带有 .repo 后缀的文件中添加以下文本。例如,logstash.repo

[logstash-5.x]
name = Elastic repository for 5.x packages
baseurl = https://artifacts.elastic.co/packages/5.x/yum
gpgcheck = 1
gpgkey = https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled = 1
autorefresh = 1
type = rpm-md
  • 您现在可以使用以下命令安装 Logstash -
$ sudo yum install logstash

步骤 4 - 转到 Logstash 主目录。在 bin 文件夹内,运行elasticsearch.bat文件(如果是 Windows),或者您可以使用命令提示符并通过终端执行相同的操作。在 UNIX 中,运行 Logstash 文件。

我们需要指定输入源、输出源和可选的过滤器。为了验证安装,您可以使用标准输入流 (stdin) 作为输入源,使用标准输出流 (stdout) 作为输出源,以基本配置运行它。您也可以使用–e选项在命令行中指定配置。

在 Windows 中 -

> cd logstash-5.0.1/bin
> Logstash -e 'input { stdin { } } output { stdout {} }'

在 Linux 中 -

$ cd logstash-5.0.1/bin
$ ./logstash -e 'input { stdin { } } output { stdout {} }'

注意- 如果是 Windows,您可能会收到一条错误消息,指出未设置 JAVA_HOME。为此,请将环境变量设置为“C:\Program Files\Java\jre1.8.0_111”或安装 java 的位置。

步骤 5 - Logstash Web 界面的默认端口为 9600 到 9700,在logstash-5.0.1\config\logstash.yml中定义为http.port,它将选取给定范围内的第一个可用端口。

我们可以通过浏览http://localhost:9600检查 Logstash 服务器是否已启动并运行,或者端口是否不同,然后请检查命令提示符或终端。我们可以看到分配的端口为“成功启动 Logstash API 端点 {:port ⇒ 9600}”。它将返回一个 JSON 对象,其中包含有关已安装 Logstash 的信息,如下所示 -

{
   "host":"manu-PC", 
   "version":"5.0.1",
   "http_address":"127.0.0.1:9600",
   "build_date":"2016-11-11T22:28:04+00:00",
   "build_sha":"2d8d6263dd09417793f2a0c6d5ee702063b5fada",
   "build_snapshot":false
}

Logstash - 内部架构

在本章中,我们将讨论 Logstash 的内部架构和不同组件。

Logstash服务架构

Logstash 处理来自不同服务器和数据源的日志,它充当托运者。托运人用于收集日志,并将这些日志安装在每个输入源中。像Redis、KafkaRabbitMQ这样的代理是保存索引器数据的缓冲区,可能有多个代理作为故障转移实例。

Lucene等索引器用于索引日志以获得更好的搜索性能,然后将输出存储在 Elasticsearch 或其他输出目的地中。输出存储中的数据可用于 Kibana 和其他可视化软件。

Logstash服务架构

Logstash 内部架构

Logstash 管道由三个组件组成:输入、过滤器输出。输入部分负责指定和访问输入数据源,例如Apache Tomcat服务器的日志文件夹。

Logstash 内部架构

解释 Logstash 管道的示例

Logstash 配置文件包含有关 Logstash 三个组件的详细信息。在本例中,我们创建一个名为Logstash.conf的文件。

以下配置从输入日志“inlog.log”捕获数据并将其写入输出日志“outlog.log”,无需任何过滤器。

Logstash.conf

Logstash 配置文件只是使用输入插件从inlog.log文件复制数据,并使用输出插件将日志数据刷新到outlog.log文件。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog.log"
   }
}

运行 Logstash

Logstash 使用–f选项来指定配置文件。

C:\logstash\bin> logstash –f logstash.conf

输入日志

以下代码块显示输入日志数据。

Hello tutorialspoint.com

输出日志

Logstash 输出包含消息字段中的输入数据。Logstash 还在输出中添加了其他字段,如时间戳、输入源路径、版本、主机和标签。

{
   "path":"C:/tpwork/logstash/bin/log/inlog1.log",
   "@timestamp":"2016-12-13T02:28:38.763Z",
   "@version":"1", "host":"Dell-PC",
   "message":" Hello tutorialspoint.com", "tags":[]
}

正如您所知,Logstash 的输出包含的数据不仅仅是通过输入日志提供的数据。输出包含源路径、时间戳、版本、主机名和标签,用于表示错误等额外消息。

我们可以使用过滤器来处理数据并使其满足我们的需求。在下一个示例中,我们使用过滤器来获取数据,这将输出限制为仅包含带有 GET 或 POST 等动词后跟唯一资源标识符的数据。

Logstash.conf

在此 Logstash 配置中,我们添加一个名为grok的过滤器来过滤输入数据。与模式序列输入日志匹配的输入日志事件仅在出现错误时到达输出目的地。Logstash 在输出事件中添加了一个名为“_grokparsefailure”的标签,该标签与 grok 过滤器模式序列不匹配。

Logstash 提供了许多内置的正则表达式模式来解析 Apache 等流行的服务器日志。这里使用的模式需要一个像 get、post 等动词,后面跟着一个统一的资源标识符。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/inlog2.log"
   }
}
filter {
   grok {
      match => {"message" => "%{WORD:verb} %{URIPATHPARAM:uri}"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/outlog2.log"
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

inlog2.log

我们的输入文件包含两个由默认分隔符(即换行分隔符)分隔的事件。第一个事件与 GROk 中指定的模式匹配,而第二个事件则不匹配。

GET /tutorialspoint/Logstash
Input 1234

输出日志2.log

我们可以看到第二个输出事件包含“_grokparsefailure”标签,因为它与 grok 过滤器模式不匹配。用户还可以通过使用输出插件中的“if”条件来删除输出中的这些不匹配的事件。

{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:47:10.352Z","@version":"1","host":"Dell-PC","verb":"GET",
   "message":"GET /tutorialspoint/logstash", "uri":"/tutorialspoint/logstash", "tags":[]
}
{
   "path":"C:/tpwork/logstash/bin/log/inlog2.log",
   "@timestamp":"2016-12-13T02:48:12.418Z", "@version":"1", "host":"Dell-PC",
   "message":"t 1234\r", "tags":["_grokparsefailure"]
}

Logstash - 收集日志

来自不同服务器或数据源的日志是使用托运人收集的。Shipper 是安装在服务器中的 Logstash 实例,它访问服务器日志并将其发送到特定的输出位置。

它主要将输出发送到Elasticsearch进行存储。Logstash 从以下来源获取输入 -

  • 标准输入
  • 系统日志
  • 文件
  • TCP/UDP
  • Microsoft Windows 事件日志
  • 网络套接字
  • 零点
  • 定制扩展

使用 Apache Tomcat 7 服务器收集日志

在此示例中,我们使用文件输入插件收集安装在 Windows 中的 Apache Tomcat 7 服务器的日志,并将其发送到其他日志。

Logstash.conf

这里,Logstash配置为访问本地安装的Apache Tomcat 7的访问日志。文件插件的路径设置中使用正则表达式模式来从日志文件中获取数据。它的名称中包含“access”,并添加了 apache 类型,这有助于将 apache 事件与集中目标源中的其他事件区分开来。最后,输出事件将显示在output.log中。

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/*access*"
      type => "apache"
   }
} 
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

Apache Tomcat 日志

访问 Apache Tomcat 服务器及其 Web 应用程序 ( http://localhost:8080 ) 以生成日志。日志中的更新数据由 Logstash 实时读取,并按照配置文件中的指定存储在 output.log 中。

Apache Tomcat 根据日期生成一个新的访问日志文件,并在其中记录访问事件。在我们的例子中,它是Apache Tomcat日志目录中的 localhost_access_log.2016-12-24.txt 。

0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:00 +0800] "GET / HTTP/1.1" 200 11418
0:0:0:0:0:0:0:1 - munish [
   25/Dec/2016:18:37:02 +0800] "GET /manager/html HTTP/1.1" 200 17472
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:08 +0800] "GET /docs/ HTTP/1.1" 200 19373
0:0:0:0:0:0:0:1 - - [
   25/Dec/2016:18:37:10 +0800] "GET /docs/introduction.html HTTP/1.1" 200 15399

输出.log

您可以在输出事件中看到,添加了类型字段,并且事件出现在消息字段中。

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt",
   "@timestamp":"2016-12-25T10:37:00.363Z","@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:00 +0800] \"GET /
   HTTP/1.1\" 200 11418\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - munish [25/Dec/2016:18:37:02 +0800] \"GET /
   manager/html HTTP/1.1\" 200 17472\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:10.407Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:08 +0800] \"GET /docs/
   HTTP/1.1\" 200 19373\r","type":"apache","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   localhost_access_log.2016-12-25.txt","@timestamp":"2016-12-25T10:37:20.436Z",
   "@version":"1","host":"Dell-PC",
   "message":"0:0:0:0:0:0:0:1 - - [25/Dec/2016:18:37:10 +0800] \"GET /docs/
   introduction.html HTTP/1.1\" 200 15399\r","type":"apache","tags":[]
}

使用 STDIN 插件收集日志

在本节中,我们将讨论使用STDIN 插件收集日志的另一个示例。

Logstash.conf

这是一个非常简单的示例,Logstash 正在读取用户在标准输入中输入的事件。在我们的例子中,它是命令提示符,它将事件存储在output.log 文件中。

input {
   stdin{}
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

C:\logstash\bin> logstash –f  Logstash.conf

在命令提示符中写入以下文本 -

用户输入以下两行。Logstash 通过分隔符设置分隔事件,默认值为“\n”。用户可以通过更改文件插件中分隔符的值来更改。

Tutorialspoint.com welcomes you
Simply easy learning

输出.log

以下代码块显示了输出日志数据。

{
   "@timestamp":"2016-12-25T11:41:16.518Z","@version":"1","host":"Dell-PC",
   "message":"tutrialspoint.com welcomes you\r","tags":[]
}
{
   "@timestamp":"2016-12-25T11:41:53.396Z","@version":"1","host":"Dell-PC",
   "message":"simply easy learning\r","tags":[]
}

Logstash - 支持的输入

Logstash 支持来自不同来源的大量日志。它正在与著名的消息来源合作,如下所述。

从指标收集日志

系统事件和其他时间活动记录在指标中。Logstash 可以从系统指标访问日志并使用过滤器对其进行处理。这有助于以定制的方式向用户显示事件的实时反馈。默认情况下,根据指标过滤器的flush_interval设置刷新指标;它被设置为 5 秒。

我们通过收集和分析 Logstash 运行的事件并在命令提示符上显示实时信息来跟踪 Logstash 生成的测试指标。

Logstash.conf

此配置包含一个生成器插件,由 Logstash 提供用于测试指标,并将类型设置设置为“生成”以进行解析。在过滤阶段,我们仅使用“if”语句处理具有生成类型的行。然后,指标插件对仪表设置中指定的字段进行计数。指标插件在flush_interval中指定的每5秒后刷新计数。

最后,使用编解码器插件将过滤器事件输出到标准输出(例如命令提示符)进行格式化。Codec 插件使用 [ events ][ rate_1m ] 值来输出 1 分钟滑动窗口中的每秒事件。

input {
   generator {
     	type => "generated"
   }
}
filter {
   if [type] == "generated" {
      metrics {
         meter => "events"
         add_tag => "metric"
      }
   }
}
output {
   # only emit events with the 'metric' tag
   if "metric" in [tags] {
      stdout {
         codec => line { format => "rate: %{[events][rate_1m]}"
      }
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

>logsaths –f logstash.conf

标准输出(命令提示符)

rate: 1308.4
rate: 1308.4
rate: 1368.654529135342
rate: 1416.4796003951449
rate: 1464.974293984808
rate: 1523.3119444107458
rate: 1564.1602979542715
rate: 1610.6496496890895
rate: 1645.2184750334154
rate: 1688.7768007612485
rate: 1714.652283095914
rate: 1752.5150680019278
rate: 1785.9432934744932
rate: 1806.912181962126
rate: 1836.0070454626025
rate: 1849.5669494173826
rate: 1871.3814756851832
rate: 1883.3443123790712
rate: 1906.4879113216743
rate: 1925.9420717997118
rate: 1934.166137658981
rate: 1954.3176526556897
rate: 1957.0107444542625

从Web服务器收集日志

Web 服务器会生成大量有关用户访问和错误的日志。Logstash 有助于使用输入插件从不同服务器提取日志并将其存储在集中位置。

我们从本地 Apache Tomcat 服务器的stderr 日志中提取数据并将其存储在 output.log 中。

Logstash.conf

此 Logstash 配置文件指示 Logstash 读取 apache 错误日志并添加名为“apache-error”的标签。我们可以简单地使用文件输出插件将其发送到output.log。

input {
   file {
      path => "C:/Program Files/Apache Software Foundation/Tomcat 7.0 /logs/*stderr*"
      type => "apache-error"  
   }
} 
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

>Logstash –f Logstash.conf

输入日志示例

这是示例stderr 日志,当 Apache Tomcat 中发生服务器事件时生成。

C:\Program Files\Apache Software Foundation\Tomcat 7.0\logs\ tomcat7-stderr.2016-12-25.log

Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["http-bio-9999"]
Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start
INFO: Starting ProtocolHandler ["ajp-bio-8009"]
Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start
INFO: Server startup in 823 ms

输出.log

{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.coyote.AbstractProtocol start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Starting ProtocolHandler [
      \"ajp-bio-8009\"]\r","type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"Dec 25, 2016 7:05:14 PM org.apache.catalina.startup.Catalina start\r",
   "type":"apache-error","tags":[]
}
{
   "path":"C:/Program Files/Apache Software Foundation/Tomcat 7.0/logs/
   tomcat7-stderr.2016-12-25.log","@timestamp":"2016-12-25T11:05:27.045Z",
   "@version":"1","host":"Dell-PC",
   "message":"INFO: Server startup in 823 ms\r","type":"apache-error","tags":[]
}

从数据源收集日志

首先,让我们了解如何配置 MySQL 进行日志记录。在MySQL 数据库服务器的 [mysqld] 下的my.ini 文件中添加以下行。

在 Windows 中,它存在于 MySQL 的安装目录中,位于 -

C:\wamp\bin\mysql\mysql5.7.11

在 UNIX 中,您可以在 – /etc/mysql/my.cnf 中找到它

general_log_file   = "C:/wamp/logs/queries.log"
general_log = 1

Logstash.conf

在这个配置文件中,文件插件用于读取MySQL日志并将其写入ouput.log。

input {
   file {
      path => "C:/wamp/logs/queries.log"
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

查询日志

这是在 MySQL 数据库中执行查询生成的日志。

2016-12-25T13:05:36.854619Z   2 Query		select * from test1_users
2016-12-25T13:05:51.822475Z    2 Query	select count(*) from users
2016-12-25T13:05:59.998942Z    2 Query         select count(*) from test1_users

输出.log

{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:37.905Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:36.854619Z    2 Query\tselect * from test1_users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:05:51.938Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:51.822475Z    2 Query\tselect count(*) from users",
   "tags":[]
}
{
   "path":"C:/wamp/logs/queries.log","@timestamp":"2016-12-25T13:06:00.950Z",
   "@version":"1","host":"Dell-PC",
   "message":"2016-12-25T13:05:59.998942Z    2 Query\tselect count(*) from test1_users",
   "tags":[]
}

Logstash - 解析日志

Logstash 使用输入插件接收日志,然后使用过滤器插件来解析和转换数据。日志的解析和转换是根据输出目的地中存在的系统来执行的。Logstash 解析日志数据并仅转发必需的字段。随后,这些字段被转换为目标系统兼容且可理解的形式。

如何解析日志?

日志的解析是使用GROK(知识的图形表示)模式执行的,您可以在 Github 中找到它们 -

https://github.com/elastic/logstash/tree/v1.4.2/patterns

Logstash 将日志数据与指定的 GROK 模式或用于解析日志的模式序列进行匹配,例如“%{COMBINEDAPACHELOG}”,这通常用于 apache 日志。

解析后的数据更加结构化,易于搜索和执行查询。Logstash 在输入日志中搜索指定的 GROK 模式并从日志中提取匹配的行。您可以使用 GROK 调试器来测试 GROK 模式。

GROK 模式的语法是 %{SYNTAX:SEMANTIC}。Logstash GROK 过滤器按以下形式编写 -

%{模式:字段名称}

这里,PATTERN 代表 GROK 模式,fieldname 是字段的名称,代表输出中解析的数据。

例如,使用在线 GROK 调试器https://grokdebug.herokuapp.com/

输入

日志中的错误行示例 -

[Wed Dec 07 21:54:54.048805 2016] [:error] [pid 1234:tid 3456829102]
   [client 192.168.1.1:25007] JSP Notice:  Undefined index: abc in
   /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11

GROK 模式序列

此 GROK 模式序列与日志事件匹配,其中包含时间戳,后跟日志级别、进程 ID、事务 ID 和错误消息。

\[(%{DAY:day} %{MONTH:month} %{MONTHDAY} %{TIME} %{YEAR})\] \[.*:%{LOGLEVEL:loglevel}\]
   \[pid %{NUMBER:pid}:tid %{NUMBER:tid}\] \[client %{IP:clientip}:.*\]
   %{GREEDYDATA:errormsg}

输出

输出为 JSON 格式。

{
   "day": [
      "Wed"
   ],
   "month": [
      "Dec"
   ],
   "loglevel": [
      "error"
   ],
   "pid": [
      "1234"
   ],
   "tid": [
      "3456829102"
   ],
   "clientip": [
      "192.168.1.1"
   ],
   "errormsg": [
      "JSP Notice:  Undefined index: abc in
      /home/manu/tpworks/tutorialspoint.com/index.jsp on line 11"
   ]
}

Logstash - 过滤器

Logstash 在输入和输出之间的管道中间使用过滤器。Logstash 度量的过滤器操纵和创建类似Apache-Access的事件。许多过滤器插件用于管理 Logstash 中的事件。在这里,在Logstash 聚合过滤器的示例中,我们过滤数据库中每个 SQL 事务的持续时间并计算总时间。

安装聚合过滤器插件

使用 Logstash-plugin 实用程序安装聚合过滤器插件。Logstash-plugin 是Logstash bin 文件夹中 Windows 的批处理文件。

>logstash-plugin install logstash-filter-aggregate

Logstash.conf

在此配置中,您可以看到三个“if”语句,用于初始化、增量生成事务的总持续时间,即sql_duration。聚合插件用于添加 sql_duration,存在于输入日志的每个事件中。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - 
            %{NOTSPACE:taskid} - %{NOTSPACE:logger} - 
            %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"    
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

>logstash –f logstash.conf 

输入日志

以下代码块显示输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

输出.log

根据配置文件中的指定,记录器所在的最后一个“if”语句是 – TRANSACTION_END,它打印总事务时间或 sql_duration。这已在 output.log 中以黄色突出显示。

{
   "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z",
   "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC",
   "message":"8566 - TRANSACTION_START - start\r","tags":[]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END",
   "@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[]
}

Logstash - 转换日志

Logstash 提供了各种插件来转换解析的日志。这些插件可以添加、删除更新日志中的字段,以便更好地理解和查询输出系统。

我们使用Mutate 插件在输入日志的每一行中添加一个字段名称 user。

安装 Mutate 过滤器插件

安装 mutate 过滤器插件;我们可以使用以下命令。

>Logstash-plugin install Logstash-filter-mutate

Logstash.conf

在此配置文件中,Mutate 插件添加在 Aggregate 插件之后以添加新字段。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
         %{NOTSPACE:taskid} - %{NOTSPACE:logger} -
         %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ; 
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   file {
      path => "C:/tpwork/logstash/bin/log/output.log"
   }
}

运行 Logstash

我们可以使用以下命令来运行 Logstash。

>logstash –f logstash.conf

输入日志

以下代码块显示输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

输出.log

您可以看到输出事件中有一个名为“user”的新字段。

{
   "path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z",
   "@version":"1",
   "host":"wcnlab-PC",
   "message":"NFO - 48566 - TRANSACTION_START - start\r",
   "user":"tutorialspoint.com","tags":["_grokparsefailure"]
}
{
   "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.383Z","loglevel":"INFO","logger":"SQL",
   "@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 320\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"SQL","@version":"1","host":"wcnlab-PC","label":"transaction1",
   "message":" INFO - 48566 - SQL - transaction1 - 200\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}
{
   "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
   "@timestamp":"2016-12-25T19:55:37.399Z","loglevel":"INFO",
   "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC","label":"end",
   "message":" INFO - 48566 - TRANSACTION_END - end\r",
   "user":"tutorialspoint.com","taskid":"48566","tags":[]
}

Logstash - 输出级

输出是Logstash管道的最后一个阶段,它将输入日志中的过滤数据发送到指定的目的地。Logstash 提供多个输出插件,将过滤后的日志事件存储到各种不同的存储和搜索引擎。

存储日志

Logstash 可以将过滤后的日志存储在文件、Elasticsearch Engine、stdout、AWS CloudWatch等中。Logstash 中还可以使用TCP、UDP、Websocket等网络协议将日志事件传输到远程存储系统。

在ELK堆栈中,用户使用Elasticsearch引擎来存储日志事件。在下面的示例中,我们将为本地 Elasticsearch 引擎生成日志事件。

安装Elasticsearch输出插件

我们可以使用以下命令安装 Elasticsearch 输出插件。

>logstash-plugin install Logstash-output-elasticsearch

Logstash.conf

该配置文件包含一个 Elasticsearch 插件,它将输出事件存储在本地安装的 Elasticsearch 中。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [ "message", "%{LOGLEVEL:loglevel} -
      %{NOTSPACE:taskid} - %{NOTSPACE:logger} -  
      %{WORD:label}( - %{INT:duration:int})?" ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
   mutate {
      add_field => {"user" => "tutorialspoint.com"}
   }
}
output {
   elasticsearch {
      hosts => ["127.0.0.1:9200"]
   }
}

输入日志

以下代码块显示输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END - end

在本地主机上启动 Elasticsearch

要在本地主机启动 Elasticsearch,您应该使用以下命令。

C:\elasticsearch\bin> elasticsearch

Elasticsearch 准备就绪后,您可以通过在浏览器中输入以下 URL 来检查它。

http://本地主机:9200/

回复

以下代码块显示了 Elasticsearch 在 localhost 的响应。

{
   "name" : "Doctor Dorcas",
   "cluster_name" : "elasticsearch",
   "version" : {
      "number" : "2.1.1",
      "build_hash" : "40e2c53a6b6c2972b3d13846e450e66f4375bd71",
      "build_timestamp" : "2015-12-15T13:05:55Z",
      "build_snapshot" : false,
      "lucene_version" : "5.3.1"
   },
   "tagline" : "You Know, for Search"
}

注意- 有关 Elasticsearch 的更多信息,您可以单击以下链接。

https://www.tutorialspoint.com/elasticsearch/index.html

现在,使用上述 Logstash.conf 运行 Logstash

>Logstash –f Logstash.conf

将上述文本粘贴到输出日志中后,该文本将由 Logstash 存储在 Elasticsearch 中。您可以通过在浏览器中输入以下 URL 来检查存储的数据。

http://localhost:9200/logstash-2017.01.01/_search?pretty

回复

是索引Logstash-2017.01.01中存储的JSON格式的数据。

{
   "took" : 20,
   "timed_out" : false,
   "_shards" : {
      "total" : 5,
      "successful" : 5,
      "failed" : 0
   },
   "hits" : {
      "total" : 10,
      "max_score" : 1.0,
      "hits" : [ {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOs",
         "_score" : 1.0,
         "_source":{
            "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", 
            "@timestamp":"2017-01-01T12:17:49.140Z","loglevel":"INFO",
            "logger":"SQL","@version":"1","host":"wcnlab-PC",
            "label":"transaction1",
            "message":" INFO - 48566 - SQL - transaction1 - 200\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      },
      {
         "_index" : "logstash-2017.01.01",
         "_type" : "logs",
         "_id" : "AVlZ9vF8hshdrGm02KOt",
         "_score" : 1.0,
         "_source":{
            "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log",
            "@timestamp":"2017-01-01T12:17:49.145Z","loglevel":"INFO",
            "logger":"TRANSACTION_END","@version":"1","host":"wcnlab-PC",
            "label":"end",
            "message":" INFO - 48566 - TRANSACTION_END - end\r",
            "user":"tutorialspoint.com","taskid":"48566","tags":[]
         }
      }
   }
}

Logstash - 支持的输出

Logstash 提供了多个插件来支持各种数据存储或搜索引擎。日志的输出事件可以发送到输出文件、标准输出或Elasticsearch等搜索引擎。Logstash 支持三种类型的输出,它们是 -

  • 标准输出
  • 文件输出
  • 空输出

现在让我们详细讨论其中的每一个。

标准输出(stdout)

它用于将过滤后的日志事件作为数据流生成到命令行界面。以下是生成数据库事务总持续时间到标准输出的示例。

Logstash.conf

此配置文件包含一个 stdout 输出插件,用于将总 sql_duration 写入标准输出。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid}
            - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      stdout {
         codec => line{format => "%{sql_duration}"}
      }
   }
}

注意- 如果尚未安装,请安装聚合过滤器。

>logstash-plugin install Logstash-filter-aggregate

运行 Logstash

我们可以使用以下命令来运行 Logstash。

>logstash –f logsatsh.conf

输入日志

以下代码块显示输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

stdout(Windows 中的命令提示符或 UNIX 中的终端)

这是总 sql_duration 320 + 200 = 520。

520

文件输出

Logstash 还可以将过滤器日志事件存储到输出文件中。我们将使用上述示例并将输出存储在文件中而不是 STDOUT 中。

Logstash.conf

此 Logstash 配置文件指示 Logstash 将总 sql_duration 存储到输出日志文件。

input {
   file {
      path => "C:/tpwork/logstash/bin/log/input1.log"
   }
} 
filter {
   grok {
      match => [
         "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} -
            %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" 
      ]
   }
   if [logger] == "TRANSACTION_START" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] = 0"
         map_action => "create"
      }
   }
   if [logger] == "SQL" {
      aggregate {
         task_id => "%{taskid}"
         code => "map['sql_duration'] ||= 0 ;
            map['sql_duration'] += event.get('duration')"
      }
   }
   if [logger] == "TRANSACTION_END" {
      aggregate {
         task_id => "%{taskid}"
         code => "event.set('sql_duration', map['sql_duration'])"
         end_of_task => true
         timeout => 120
      }
   }
}
output {
   if [logger] == "TRANSACTION_END" {
      file {
         path => "C:/tpwork/logstash/bin/log/output.log"
         codec => line{format => "%{sql_duration}"}
      }
   }
}

运行logstash

我们可以使用以下命令来运行 Logstash。

>logstash –f logsatsh.conf

输入日志

以下代码块显示输入日志数据。

INFO - 48566 - TRANSACTION_START - start
INFO - 48566 - SQL - transaction1 - 320
INFO - 48566 - SQL - transaction1 - 200
INFO - 48566 - TRANSACTION_END – end

输出.log

以下代码块显示了输出日志数据。

520

空输出

这是一个特殊的输出插件,用于分析输入和过滤器插件的性能。

Logstash - 插件

Logstash 为其管道的所有三个阶段(输入、过滤器和输出)提供各种插件。这些插件帮助用户从各种来源捕获日志,例如 Web 服务器、数据库、网络协议等。

捕获后,Logstash可以根据用户的需要解析数据并将其转换为有意义的信息。最后,Logstash 可以将有意义的信息发送或存储到各种目标源,例如 Elasticsearch、AWS Cloudwatch 等。

输入插件

Logstash 中的输入插件可帮助用户从各种来源提取和接收日志。使用输入插件的语法如下 -

Input {
   Plugin name {
      Setting 1……
      Setting 2……..
   }
}

您可以使用以下命令下载输入插件 -

>Logstash-plugin install Logstash-input-<plugin name>

Logstash-plugin 实用程序位于Logstash 安装目录的bin 文件夹中。下表列出了 Logstash 提供的输入插件。

先生。 插件名称和说明
1

节拍

从Elastic Beats框架获取日志数据或事件。

2

云观察

从 CloudWatch(Amazon Web Services 提供的 API)中提取事件。

3

couchdb_changes

来自 couchdb 的 _chages URI 的事件使用此插件发送。

4

drupal_dblog

使用启用的 DBLog 提取 drupal 的看门狗日志记录数据。

5

弹性搜索

检索在 Elasticsearch 集群中执行的查询的结果。

6

事件日志

从 Windows 事件日志中获取事件。

7

执行

获取 shell 命令输出作为 Logstash 中的输入。

8

文件

从输入文件获取事件。当 Logstash 与输入源一起本地安装并且可以访问输入源日志时,这非常有用。

9

发电机

它用于测试目的,这会创建随机事件。

10

github

从 GitHub Webhook 捕获事件。

11

石墨

从石墨监控工具获取指标数据。

12

心跳

它还用于测试并产生类似心跳的事件

13

http

通过 http 和 https 两种网络协议收集日志事件。

14

http_poller

它用于将 HTTP API 输出解码为事件。

15

数据库连接

它将 JDBC 事务转换为 Logstash 中的事件。

16

杰米克斯

使用 JMX 从远程 Java 应用程序中提取指标。

17 号

日志4j

通过 TCP 套接字从 Log4j 的 socketAppender 对象捕获事件。

18

RSS

将命令行工具的输出作为 Logstash 中的输入事件。

19

TCP协议

通过 TCP 套接字捕获事件。

20

推特

从 Twitter Streaming API 收集事件。

21

UNIX

通过 UNIX 套接字收集事件。

22

网络套接字

通过 websocket 协议捕获事件。

23

xmpp

通过 Jabber/xmpp 协议读取事件。

插件设置

所有插件都有其特定的设置,这有助于指定插件中的重要字段,如端口、路径等。我们将讨论一些输入插件的设置。

文件

该输入插件用于直接从输入源中存在的日志或文本文件中提取事件。它的工作方式类似于 UNIX 中的 tail 命令,保存最后读取的光标并仅从输入文件中读取新附加的数据,但可以通过使用 star_position 设置来更改。以下是该输入插件的设置。

设置名称 默认值 描述
添加字段 {} 将新字段附加到输入事件。
较旧的 3600 上次读取时间(以秒为单位)超过此插件中指定的文件将被关闭。
编解码器 “清楚的” 它用于在进入 Logstash 管道之前对数据进行解码。
分隔符 “\n” 它用于指定新的行分隔符。
发现间隔 15 它是在指定路径中发现新文件之间的时间间隔(以秒为单位)。
启用度量 真的 它用于启用或禁用指定插件的报告和指标收集。
排除 它用于指定文件名或模式,应从输入插件中排除。
ID 为该插件实例指定唯一标识。
最大打开文件数 它指定Logstash在任何时候输入文件的最大数量。
小路 指定文件的路径,它可以包含文件名的模式。
起始位置 “结尾” 如果您愿意,可以更改为“开始”;最初,Logstash 应该从头开始读取文件,而不仅仅是新的日志事件。
开始间隔 1 它指定时间间隔(以秒为单位),之后 Logstash 检查修改的文件。
标签 要添加任何附加信息,如 Logstash,当任何日志事件未能符合指定的 grok 过滤器时,它会在标签中添加“_grokparsefailure”。
类型 这是一个特殊字段,您可以将其添加到输入事件中,并且在过滤器和 kibana 中很有用。

弹性搜索

这个特定的插件用于读取 Elasticsearch 集群中的搜索查询结果。以下是该插件中使用的设置 -

设置名称 默认值 描述
添加字段 {} 与文件插件相同,它用于在输入事件中附加一个字段。
ca_文件 用于指定SSL证书颁发机构文件的路径。
编解码器 “清楚的” 它用于在输入 Logstash 管道之前对来自 Elasticsearch 的输入事件进行解码。
文档信息 “错误的” 如果您想从 Elasticsearch 引擎中提取索引、类型和 id 等附加信息,您可以将其更改为 true。
文档信息字段 [“_index”,“_type”,“_id”] 您可以删除 Logstash 输入中不需要的任何字段。
启用度量 真的 它用于启用或禁用该插件实例的报告和指标收集。
主机 它用于指定所有elasticsearch引擎的地址,这将是该Logstash实例的输入源。语法为主机:端口或IP:端口。
ID 它用于为特定输入插件实例提供唯一的标识号。
指数 “logstash-*” 它用于指定索引名称或模式,Logstash 将监视该索引名称或模式以进行输入。
密码 用于身份验证目的。
询问 "{ \"sort\": [ \"_doc\" ] }" 查询执行情况。
安全套接字层 错误的 启用或禁用安全套接字层。
标签 在输入事件中添加任何附加信息。
类型 用于对输入表单进行分类,方便后期查找所有输入事件。
用户 为了真实的目的。

事件日志

该输入插件从 Windows 服务器的 win32 API 读取数据。以下是该插件的设置 -

设置名称 默认值 描述
添加字段 {} 与文件插件相同,用于在输入事件中附加一个字段
编解码器 “清楚的” 用于解码来自windows的输入事件;在进入 Logstash 管道之前
日志档案 [“应用程序”、“安全”、“系统”] 输入日志文件中所需的事件
间隔 1000 它以毫秒为单位,定义两次连续检查新事件日志之间的时间间隔
标签 在输入事件中添加任何附加信息
类型 用于将特定插件的输入分类为给定类型,以便后期轻松搜索所有输入事件

推特

该输入插件用于从 Streaming API 收集 Twitter 的提要。下表描述了该插件的设置。

设置名称 默认值 描述
添加字段 {} 与文件插件相同,用于在输入事件中附加一个字段
编解码器 “清楚的” 用于解码来自windows的输入事件;在进入 Logstash 管道之前
消费者密钥 它包含 Twitter 应用程序的消费者密钥。欲了解更多信息,请访问https://dev.twitter.com/apps/new
消费者秘密 它包含 Twitter 应用程序的消费者密钥。欲了解更多信息,请访问https://dev.twitter.com/apps/new
启用度量 真的 它用于启用或禁用该插件实例的报告和指标收集
如下

它指定以逗号分隔的用户 ID,LogStash 检查这些用户在 Twitter 中的状态。

欲了解更多信息,请访问