Apache NiFi - 快速指南


Apache NiFi - 简介

Apache NiFi 是一个功能强大、易于使用且可靠的系统,用于在不同系统之间处理和分发数据。它基于 NSA 开发的 Niagara Files 技术,并在 8 年后捐赠给 Apache Software 基金会。它根据 2004 年 1 月的 Apache 许可证版本 2.0 分发。Apache NiFi 的最新版本是 1.7.1。

Apache NiFi 是一个实时数据摄取平台,可以传输和管理不同源和目标系统之间的数据传输。它支持多种数据格式,如日志、地理位置数据、社交源等。它还支持许多协议,如 SFTP、HDFS 和 KAFKA 等。这种对多种数据源和协议的支持使得该平台在许多 IT 组织。

Apache NiFi - 一般特性

Apache NiFi 的一般特征如下:

  • Apache NiFi 提供基于 Web 的用户界面,提供设计、控制、反馈和监控之间的无缝体验。

  • 它是高度可配置的。这有助于用户在运行时获得有保证的交付、低延迟、高吞吐量、动态优先级、背压和修改流。

  • 它还提供数据来源模块来跟踪和监控从流程开始到结束的数据。

  • 开发人员可以根据自己的需要创建自己的自定义处理器和报告任务。

  • NiFi 还提供对 SSL、HTTPS、SSH 和其他加密等安全协议的支持。

  • 它还支持用户和角色管理,还可以配置 LDAP 进行授权。

Apache NiFi - 关键概念

Apache NiFi 的关键概念如下:

  • 进程组- 它是一组 NiFi 流,可帮助用户以分层方式管理和保留流。

  • - 创建连接不同的处理器,以便在需要时从一个或多个数据源传输和修改数据到另一个目标数据源。

  • 处理器- 处理器是一个 java 模块,负责从源系统获取数据或将其存储在目标系统中。其他处理器也用于添加属性或更改流文件中的内容。

  • Flowfile - 这是 NiFi 的基本用法,它代表从 NiFi 源系统中选取的数据的单个对象。NiFi 处理器在从源处理器移动到目标时对流文件进行更改。流中的不同处理器对流文件执行不同的事件(例如 CREATE、CLONE、RECEIVE 等)。

  • 事件- 事件表示遍历 NiFi 流时流文件的变化。这些事件在数据来源中进行跟踪。

  • 数据来源- 它是一个存储库。它还有一个 UI,使用户能够检查有关流文件的信息,并帮助排除在处理流文件期间出现的任何问题。

Apache NiFi 的优点

  • Apache NiFi 支持使用 SFTP 从远程计算机获取数据并保证数据沿袭。

  • Apache NiFi支持集群,因此可以在多个节点上以相同的流程处理不同的数据,从而提高数据处理的性能。

  • 它还提供了用户级别、进程组级别和其他模块的安全策略。

  • 它的UI还可以在HTTPS上运行,这使得用户与NiFi的交互变得安全。

  • NiFi 支持大约 188 个处理器,用户还可以创建自定义插件来支持各种数据系统。

Apache NiFi 的缺点

  • 当用户在其中进行任何更改时,节点与 NiFi 集群断开连接时,flow.xml 将变为无效。除非管理员手动从连接的节点复制 flow.xml,否则节点无法连接回集群。

  • Apache NiFi 在主节点切换时存在状态持久性问题,这有时会导致处理器无法从源系统获取数据。

Apache NiFi - 基本概念

Apache NiFi 由 Web 服务器、流量控制器和处理器组成,运行在 Java 虚拟机上。它还具有 3 个存储库 Flowfile Repository、Content Repository 和 Provenance Repository,如下图所示。

阿帕奇网络服务器

流程文件存储库

该存储库存储通过 apache NiFi 数据流的每个流文件的当前状态和属性。该存储库的默认位置位于 apache NiFi 的根目录中。可以通过更改名为“nifi.flowfile.repository.directory”的属性来更改此存储库的位置。

内容存储库

该存储库包含 NiFi 所有流文件中存在的所有内容。它的默认目录也在NiFi的根目录中,可以使用“org.apache.nifi.controller.repository.FileSystemRepository”属性进行更改。该目录占用磁盘空间较大,建议安装盘有足够的空间。

出处存储库

存储库跟踪并存储在 NiFi 中流动的所有流文件的所有事件。有两个来源存储库 -易失性来源存储库(在此存储库中,所有来源数据在重新启动后都会丢失)和持久来源存储库。它的默认目录也在NiFi的根目录中,可以使用相应存储库的“org.apache.nifi.provenance.PersistentProvenanceRepository”和“org.apache.nifi.provenance.VolatileProvenanceRepositor”属性来更改它。

出处存储库

Apache NiFi - 环境设置

本章我们将学习Apache NiFi的环境搭建。Apache NiFi的安装步骤如下:

步骤 1 - 在您的计算机中安装当前版本的 Java。请在您的机器中设置JAVA_HOME。您可以检查版本,如下图所示:

在 Windows 操作系统 (OS) 中(使用命令提示符) -

> java -version

在 UNIX 操作系统中(使用终端):

$ echo $JAVA_HOME


步骤 2 - 从https://nifi.apache.org/download.html下载 Apache NiFi

步骤 3 - Apache NiFi 的安装过程非常简单。该过程因操作系统而异 -

  • Windows 操作系统- 解压 zip 包并安装 Apache NiFi。

  • UNIX 操作系统- 在任何位置提取 tar 文件并安装 Logstash。

$tar -xvf nifi-1.6.0-bin.tar.gz

步骤 4 - 打开命令提示符,进入 NiFi 的 bin 目录。例如C:\nifi-1.7.1\bin,执行run-nifi.bat文件。

C:\nifi-1.7.1\bin>run-nifi.bat

步骤 5 - 启动 NiFi UI 需要几分钟时间。用户可以检查nifi-app.log,一旦NiFi UI启动,用户可以输入http://localhost:8080/nifi/来访问UI。

Apache NiFi - 用户界面

Apache 是一个基于 Web 的平台,用户可以使用 Web UI 进行访问。NiFi UI 具有很强的交互性,并提供有关 NiFi 的各种信息。如下图所示,用户可以访问有关以下属性的信息 -

  • 活动线程
  • 排队数据总数
  • 传输远程进程组
  • 不传输远程进程组
  • 运行组件
  • 停止的组件
  • 无效组件
  • 禁用组件
  • 最新的版本化流程组
  • 本地修改的版本化进程组
  • 过时的版本化进程组
  • 本地修改和过时版本的进程组
  • 同步失败版本化进程组
用户界面

Apache NiFi 的组件

Apache NiFi UI 具有以下组件 -

处理器

用户可以拖动画布上的进程图标,并为 NiFi 中的数据流选择所需的处理器。

处理器图标 添加处理器

输入端口

将下面的图标拖到画布上以将输入端口添加到任何数据流中。

输入端口用于从处理器获取数据,该数据不存在于该进程组中。

输入端口

拖动此图标后,NiFi 会要求输入输入端口的名称,然后将其添加到 NiFi 画布中。

添加端口

输出端口

将下面的图标拖到画布上以将输出端口添加到任何数据流中。

输出端口用于将数据传输到该进程组中不存在的处理器。

输出端口

拖动此图标后,NiFi 会要求输入输出端口的名称,然后将其添加到 NiFi 画布中。

输出添加端口

流程组

用户使用下面的图标在 NiFi 画布中添加进程组。

组图标

拖动此图标后,NiFi 会要求输入进程组的名称,然后将其添加到 NiFi 画布中。

添加进程组

远程进程组

这用于在 NiFi 画布中添加远程进程组。

远程进程组

漏斗

漏斗用于将一个处理器的输出传输到多个处理器。用户可以使用下面的图标将漏斗添加到 NiFi 数据流中。

漏斗图标

模板

该图标用于将数据流模板添加到 NiFi 画布。这有助于在相同或不同的 NiFi 实例中重用数据流。

模板图标

拖动后,用户可以选择NiFi中已添加的模板。

标签

它们用于在 NiFi 画布上添加有关 NiFi 中存在的任何组件的文本。它提供了用户使用的一系列颜色来增加美感。

标签图标

Apache NiFi - 处理器

Apache NiFi 处理器是创建数据流的基本块。每个处理器都有不同的功能,这有助于创建输出流文件。下图所示的数据流使用 GetFile 处理器从一个目录中获取文件,并使用 PutFile 处理器将其存储在另一目录中。

放置文件处理器

获取文件

GetFile进程用于从特定目录中获取特定格式的文件。它还为用户提供了其他选项,以更好地控制获取。我们将在下面的属性部分讨论它。

获取文件

获取文件设置

以下是 GetFile 处理器的不同设置 -

姓名

在“名称”设置中,用户可以根据项目或项目为处理器定义任何名称,这使得名称更有意义。

使能够

用户可以使用此设置启用或禁用处理器。

处罚期限

此设置允许用户在流程文件失败时添加惩罚持续时间。

产量持续时间

该设置用于指定处理器的屈服时间。在此期间,不会再次安排该进程。

公告级别

此设置用于指定该处理器的日志级别。

自动终止关系

其中包含该特定进程的所有可用关系的检查列表。通过选中这些框,用户可以对处理器进行编程,以在该事件上终止流文件,并且不在流中进一步发送它。

自动终止关系

获取文件调度

这些是 GetFile 处理器提供的以下调度选项 -

日程策略

您可以通过选择时间驱动来按时间安排进程,也可以通过选择 CRON 驱动程序选项来安排指定的 CRON 字符串。

并发任务

该选项用于定义该处理器的并发任务调度。

执行

用户可以使用此选项定义是在所有节点中运行处理器还是仅在主节点中运行处理器。

运行时间表

它用于定义时间驱动策略的时间或CRON驱动策略的CRON表达式。

运行时间表

获取文件属性

GetFile 提供了多个属性,如下图所示,从强制属性(如输入目录和文件过滤器)到可选属性(如路径过滤器和最大文件大小)。用户可以使用这些属性来管理文件获取过程。

获取文件属性

获取文件评论

本节用于指定有关处理器的任何信息。

获取文件评论

放置文件

PutFile 处理器用于将文件从数据流存储到特定位置。

放置文件

放置文件设置

PutFile 处理器具有以下设置 -

姓名

在名称设置中,用户可以根据项目或使名称更有意义的名称为处理器定义任何名称。

使能够

用户可以使用此设置启用或禁用处理器。

处罚期限

此设置允许用户在流程文件失败时添加惩罚持续时间。

产量持续时间

该设置用于指定处理器的屈服时间。在此期间,不会再次安排该进程。

公告级别

此设置用于指定该处理器的日志级别。

自动终止关系

此设置具有该特定进程的所有可用关系的检查列表。通过选中这些框,用户可以对处理器进行编程,以在该事件上终止流文件,并且不再在流中进一步发送它。

自动终止

PutFile调度

这些是 PutFile 处理器提供的以下调度选项 -

日程策略

您可以通过选择计时器驱动或通过选择 CRON 驱动程序选项指定的 CRON 字符串来按时间安排进程。还有一种实验策略“事件驱动”,它将在特定事件上触发处理器。

并发任务

该选项用于定义该处理器的并发任务调度。

执行

用户可以使用此选项定义是在所有节点中运行处理器还是仅在主节点中运行处理器。

运行时间表

它用于定义定时器驱动策略的时间或CRON驱动策略的CRON表达式。

Putfile 运行计划

放置文件属性

PutFile 处理器提供诸如 Directory 之类的属性来指定用于文件传输的输出目录,以及其他属性来管理传输,如下图所示。

放置文件属性

PutFile 注释

本节用于指定有关处理器的任何信息。

Putfile 评论

Apache NiFi - 处理器分类

在本章中,我们将讨论 Apache NiFi 中的进程分类。

数据摄取处理器

数据摄取类别下的处理器用于将数据摄取到 NiFi 数据流中。这些主要是apache NiFi中任何数据流的起点。属于这些类别的一些处理器有 GetFile、GetHTTP、GetFTP、GetKAFKA 等。

路由和中介处理器

路由和中介处理器用于根据流文件的属性或内容中的信息将流文件路由到不同的处理器或数据流。这些处理器还负责控制 NiFi 数据流。属于此类别的一些处理器有 RouteOnAttribute、RouteOnContent、ControlRate、RouteText 等。

数据库访问处理器

此数据库访问类别的处理器能够从数据库选择或插入数据或执行和准备其他 SQL 语句。这些处理器主要使用Apache NiFi的数据连接池控制器设置。属于此类别的一些处理器包括 ExecuteSQL、PutSQL、PutDatabaseRecord、ListDatabaseTables 等。

属性提取处理器

属性提取处理器负责提取、分析、更改NiFi数据流中的流文件属性处理。属于此类别的一些处理器包括 UpdateAttribute、EvaluateJSONPath、ExtractText、AttributesToJSON 等。

系统交互处理器

系统交互处理器用于在任何操作系统中运行进程或命令。这些处理器还运行多种语言的脚本以与各种系统交互。属于此类别的一些处理器有 ExecuteScript、ExecuteProcess、ExecuteGroovyScript、ExecuteStreamCommand 等。

数据转换处理器

属于数据转换的处理器能够更改流文件的内容。当用户必须将流文件作为 HTTP 主体发送以调用 HTTP 处理器时,这些可用于完全替换通常使用的流文件的数据。属于此类别的一些处理器有 ReplaceText、JoltTransformJSON 等。

发送数据处理器

发送数据处理器通常是数据流中的最终处理器。这些处理器负责存储数据或将数据发送到目标服务器。成功存储或发送数据后,这些处理器会删除具有成功关系的流文件。属于此类别的处理器有 PutEmail、PutKafka、PutSFTP、PutFile、PutFTP 等。

拆分和聚合处理器

这些处理器用于分割和合并流文件中存在的内容。属于此类别的一些处理器包括 SplitText、SplitJson、SplitXml、MergeContent、SplitContent 等。

HTTP 处理器

这些处理器处理 HTTP 和 HTTPS 调用。属于此类别的一些处理器有 InvokeHTTP、PostHTTP、ListenHTTP 等。

AWS 处理器

AWS 处理器负责与 Amazon Web 服务系统交互。属于此类别的一些处理器包括 GetSQS、PutSNS、PutS3Object、FetchS3Object 等。

Apache NiFi - 处理器关系

在 Apache NiFi 数据流中,流文件通过连接从一个处理器移动到另一个处理器,该连接使用处理器之间的关系进行验证。每当创建连接时,开发人员都会选择这些处理器之间的一个或多个关系。

配置处理器

正如您在上图中看到的,黑色矩形中的复选框是关系。如果开发人员选择这些复选框,则当关系成功或失败或两者兼而有之时,流文件将在该特定处理器中终止。

成功

当处理器成功处理流文件(例如从任何数据源存储或获取数据)而没有获得任何连接、身份验证或任何其他错误时,流文件将进入成功关系。

失败

当处理器无法在没有错误(如身份验证错误或连接问题等)的情况下处理流文件时,流文件将进入失败关系。

开发人员还可以使用连接将流文件传输到其他处理器。开发者可以选择并对其进行负载均衡,但负载均衡在1.8版本才发布,本教程不会涉及。

失败

正如您在上图中看到的,标记为红色的连接具有故障关系,这意味着所有有错误的流文件将转到左侧的处理器,而所有没有错误的流文件将分别传输到标记为绿色的连接。

现在让我们继续处理其他关系。

通讯故障

当由于通信故障而无法从远程服务器获取流文件时,满足此关系。

未找到

我们从远程服务器收到“未找到”消息的任何流文件都将转移到not.found关系。

没有权限

当 NiFi 由于权限不足而无法从远程服务器获取流文件时,它将通过此​​关系移动。

Apache NiFi - FlowFile

流文件是 Apache NiFi 中的基本处理实体。它包含数据内容和属性,NiFi处理器使用这些内容和属性来处理数据。文件内容通常包含从源系统获取的数据。Apache NiFi FlowFile 最常见的属性是 -

流文件

通用唯一标识符

这代表通用唯一标识符,它是 NiFi 生成的流文件的唯一标识。

文件名

该属性包含该流文件的文件名,并且不应包含任何目录结构。

文件大小

它包含 Apache NiFi FlowFile 的大小。

哑剧类型

它指定此 FlowFile 的 MIME 类型。

小路

该属性包含流程文件所属文件的相对路径,不包含文件名。

Apache NiFi - 队列

Apache NiFi数据流连接有一个排队系统来处理大量的数据流入。这些队列可以处理大量的 FlowFile,以便处理器串行处理它们。

排队系统

上图中的队列有 1 个通过成功关系传输的流文件。用户可以通过选择下拉列表中的“列出队列”选项来检查流文件。如果出现任何过载或错误,用户还可以通过选择空队列选项来清除队列,然后用户可以重新启动流程以再次在数据流中获取这些文件。

列表队列

队列中的流文件列表,由位置、UUID、文件名、文件大小、队列持续时间和沿袭持续时间组成。用户可以通过单击流文件列表第一列中的信息图标来查看流文件的所有属性和内容。

流程文件详细信息

Apache NiFi - 进程组

在Apache NiFi中,用户可以在不同的进程组中维护不同的数据流。这些组可以基于 Apache NiFi 实例支持的不同项目或组织。

流程组

如上图所示,NiFi UI 顶部菜单中的第四个符号用于在 NiFi 画布中添加进程组。名为“Tutorialspoint.com_ProcessGroup”的进程组包含一个数据流,其中有四个处理器当前处于停止阶段,如上图所示。可以通过分层的方式创建流程组,以更好的结构管理数据流,易于理解。

数据流

在 NiFi UI 的页脚中,您可以看到进程组,并可以返回到用户当前所在进程组的顶部。

要查看 NiFi 中存在的进程组的完整列表,用户可以使用 NiFi UI 左上角的菜单转到摘要。总之,有一个进程组选项卡,其中列出了所有进程组,其中包含版本状态、传输/大小、输入/大小、读/写、输出/大小等参数,如下图所示。

NiFi总结

Apache NiFi - 标签

Apache NiFi 提供标签,使开发人员能够编写有关 NiFI 画布中存在的组件的信息。NiFi UI 顶部菜单中最左边的图标用于在 NiFi 画布中添加标签。

NiFI画布

开发人员可以通过右键单击标签并从菜单中选择适当的选项来更改标签的颜色和文本大小。

选项菜单

Apache NiFi - 配置

Apache NiFi 是高度可配置的平台。conf目录下的nifi.properties文件

包含大部分配置。

Apache NiFi 的常用属性如下 -

核心属性

本节包含运行 NiFi 实例所必需的属性。

编号 物业名称 默认值 描述
1 nifi.flow.configuration.file ./conf/flow.xml.gz 此属性包含 flow.xml 文件的路径。该文件包含 NiFi 中创建的所有数据流。
2 nifi.flow.configuration.archive.enabled 真的 该属性用于启用或禁用 NiFi 中的存档。
3 nifi.flow.configuration.archive.dir ./conf/存档/ 该属性用于指定存档目录。
4 nifi.flow.configuration.archive.max.time 30天 这用于指定归档内容的保留时间。
5 nifi.flow.configuration.archive.max.storage 500MB 它包含归档目录可以增长的最大大小。
6 nifi.authorizer.configuration.file ./conf/authorizers.xml 指定授权者配置文件,用于用户授权。
7 nifi.login.identity.provider.configuration.file ./conf/login-identity-providers.xml 该属性包含登录身份提供者的配置,
8 nifi.templates.directory ./conf/模板 该属性用于指定存储 NiFi 模板的目录。
9 nifi.nar.library.directory ./lib 此属性包含库的路径,NiFi 将使用该路径使用此 lib 文件夹中存在的 NAR 文件加载所有组件。
10 nifi.nar.工作目录 ./工作/nar/ 一旦 NiFi 处理完解压后的 nar 文件,该目录将存储它们。
11 nifi.documentation.working.directory ./工作/文档/组件 该目录包含所有组件的文档。

状态管理

这些属性用于存储有助于启动处理的组件的状态,其中组件在重新启动后离开并在下一个计划运行中。

编号 物业名称 默认值 描述
1 nifi.state.management.configuration.file ./conf/状态管理.xml 此属性包含 state-management.xml 文件的路径。该文件包含该 NiFi 实例的数据流中存在的所有组件状态。
2 nifi.state.management.provider.local 本地提供商 它包含本地状态提供商的 ID。
3 nifi.state.management.provider.cluster zk-提供者 此属性包含集群范围内的状态提供程序的 ID。如果 NiFi 未集群,则该值将被忽略,但如果在集群中运行则必须填充。
4 nifi.状态管理。嵌入的。动物园管理员。开始 错误的 此属性指定 NiFi 的此实例是否应运行嵌入式 ZooKeeper 服务器。
5 nifi.状态管理。嵌入的。动物园管理员属性 ./conf/zookeeper.properties 此属性包含提供在 <nifi.state.management 时使用的 ZooKeeper 属性的属性文件的路径。嵌入的。动物园管理员。start> 设置为 true。

流文件存储库

现在让我们看看 FlowFile 存储库的重要细节 -

编号 物业名称 默认值 描述
1 nifi.flowfile.存储库。执行 org.apache.nifi。控制器。存储库。预写流文件存储库 此属性用于指定将流文件存储在内存还是磁盘中。如果用户想要将流文件存储在内存中,则更改为“org.apache.nifi.controller.repository.VolatileFlowFileRepository”。
2 nifi.flowfile.repository.directory ./flowfile_repository 指定流程文件存储库的目录。

Apache NiFi - 管理

Apache NiFi 为管理目的提供对 ambari、zookeeper 等多种工具的支持。NiFi 还在 nifi.properties 文件中提供配置,以便为管理员设置 HTTPS 和其他内容。

动物园管理员

NiFi 本身不处理集群中的投票过程。这意味着当创建集群时,所有节点都是主节点和协调节点。因此,zookeeper被配置为管理主节点和协调器的投票。nifi.properties 文件包含一些用于设置 Zookeeper 的属性。

编号 物业名称 默认值 描述
1 nifi.state.management.embedded.zookeeper。特性 ./conf/zookeeper.properties 指定zookeeper属性文件的路径和名称。
2 nifi.zookeeper.connect.string 空的 指定zookeeper的连接字符串。
3 nifi.zookeeper.connect.timeout 3秒 指定zookeeper与NiFi的连接超时时间。
4 nifi.zookeeper.session.timeout 3秒 指定zookeeper与NiFi的会话超时。
5 nifi.zookeeper.root.node /尼菲 为zookeeper指定根节点。
6 nifi.zookeeper.auth.type 空的 指定zookeeper的身份验证类型。

启用 HTTPS

要通过 HTTPS 使用 NiFi,管理员必须生成密钥库和信任库,并在 nifi.properties 文件中设置一些属性。TLS 工具包可用于生成在 apache NiFi 中启用 HTTPS 所需的所有密钥。

编号 物业名称 默认值 描述
1 nifi.web.https.port 空的 指定 https 端口号。
2 nifi.web.https.network.interface.default 空的 NiFi 中 https 的默认接口。
3 nifi.security.keystore 空的 指定密钥库的路径和文件名。
4 nifi.security.keystoreType 空的 指定密钥库类型,如 JKS。
5 nifi.security.keystorePasswd 空的 指定密钥库密码。
6 nifi.security.truststore 空的 指定信任库的路径和文件名。
7 nifi.security.truststoreType 空的 指定信任库类型,如 JKS。
8 nifi.security.truststorePasswd 空的 指定信任库密码。

其他用于管理的属性

还有一些其他属性,管理员使用它们来管理 NiFi 及其服务连续性。

编号 物业名称 默认值 描述
1 nifi.flowcontroller.graceful.shutdown.period 10秒 指定正常关闭 NiFi flowcontroller 的时间。
2 nifi. 行政. 产量. 持续时间 30秒 指定 NiFi 的管理屈服持续时间。
3 nifi.authorizer.configuration.file ./conf/authorizers.xml 指定授权者配置文件的路径和文件名。
4 nifi.login.identity.provider.configuration.file ./conf/login-identity-providers.xml 指定登录身份提供者配置文件的路径和文件名。

Apache NiFi - 创建流程

Apache NiFi 提供了大量组件来帮助开发人员为任何类型的协议或数据源创建数据流。要创建流程,开发人员将组件从菜单栏拖动到画布,然后通过单击鼠标并将鼠标从一个组件拖动到另一个组件来连接它们。

一般来说,NiFi 在流程开始时有一个监听器组件,如 getfile,它从源系统获取数据。另一端有一个像 putfile 这样的发送器组件,中间有一些处理数据的组件。

例如,让我们创建一个流程,该流程从一个目录获取一个空文件,并在该文件中添加一些文本,然后将其放入另一个目录中。

创造流程
  • 首先,将处理器图标拖到 NiFi 画布上,然后从列表中选择 GetFile 处理器。

  • 创建一个输入目录,例如c:\inputdir。

  • 右键单击处理器并选择配置,然后在属性选项卡中添加输入目录 (c:\inputdir),然后单击应用并返回画布。

  • 将处理器图标拖到画布上,然后从列表中选择 ReplaceText 处理器。

  • 右键单击处理器并选择配置。在属性选项卡中,在替换值文本框中添加一些文本,例如“Hellotutorialspoint.com” ,然后单击应用。

  • 转到设置选项卡,选中右侧的失败复选框,然后返回画布。

  • 根据成功关系将 GetFILE 处理器连接到 ReplaceText。

  • 将处理器图标拖到画布上,然后从列表中选择 PutFile 处理器。

  • 创建一个输出目录,例如c:\outputdir

  • 右键单击处理器并选择配置。在属性选项卡中,添加目录 (c:\outputdir)并单击应用并返回画布。

  • 转到设置选项卡并选中右侧的失败和成功复选框,然后返回画布。

  • 根据成功关系将 ReplaceText 处理器连接到 PutFile。

  • 现在启动流程并在输入目录中添加一个空文件,您将看到它将移动到输出目录并且文本将添加到文件中。

通过执行上述步骤,开发人员可以选择任何处理器和其他 NiFi 组件来为其组织或客户创建合适的流程。

Apache NiFi - 模板

Apache NiFi 提供了模板的概念,这使得重用和分发 NiFi 流变得更加容易。这些流程可供其他开发人员或其他 NiFi 集群使用。它还帮助 NiFi 开发人员在 GitHub 等存储库中分享他们的工作。

创建模板

让我们为流程创建一个模板,该模板是我们在第 15 章“Apache NiFi - 创建流程”中创建的。

创建模板

使用 Shift 键选择流程的所有组件,然后单击 NiFi 画布左侧的创建模板图标。您还可以看到一个工具箱,如上图所示。单击蓝色标记的图标创建模板,如上图所示。输入模板的名称。开发人员还可以添加描述,这是可选的。

下载模板

然后转到 NiFi UI 右上角菜单中的 NiFi 模板选项,如下图所示。

下载模板

现在,单击要下载的模板的下载图标(位于列表右侧)。将下载带有模板名称的 XML 文件。

上传模板

要在 NiFi 中使用模板,开发人员必须使用 UI 将其 xml 文件上传到 NiFi。“创建模板”图标旁边有一个“上传模板”图标(下图中用蓝色标记),单击该图标并浏览 xml。

上传模板

添加模板

在 NiFi UI 的顶部工具栏中,模板图标位于标签图标之前。该图标标记为蓝色,如下图所示。

添加模板

拖动模板图标并从下拉列表中选择模板,然后单击添加。它将把模板添加到 NiFi 画布中。

Apache NiFi - API

NiFi提供了大量的API,可以帮助开发人员进行更改并从任何其他工具或定制开发的应用程序中获取NiFi的信息。在本教程中,我们将使用 google chrome 中的 postman 应用程序来解释一些示例。

要将 postman 添加到您的 Google Chrome,请转到下面提到的 URL,然后单击“添加到 Chrome”按钮。您现在将看到一个新应用程序添加到您的 Google Chrome 中。

Chrome 网上应用店

NiFi Rest API 的当前版本是 1.8.0,文档位于下面提到的 URL 中。

https://nifi.apache.org/docs/nifi-docs/rest-api/index.html

以下是最常用的 NiFi REST API 模块 -

  • http://<nifi url>:<nifi 端口>/nifi-api/< api-path >

  • 如果启用了 HTTPS https://<nifi url>:<nifi port>/nifi-api/< api-path >

编号 API模块名称 api 路径 描述
1 使用权 /使用权 对用户进行身份验证并从 NiFi 获取访问令牌。
2 控制器 /控制器 管理集群并创建报告任务。
3 控制器服务 /控制器服务 它用于管理控制器服务和更新控制器服务引用。
4 报告任务 /报告任务 管理报告任务。
5 流动 /流动 获取数据流元数据和组件状态以及查询历史记录
6 进程组 /进程组 上传并实例化模板并创建组件。
7 处理器 /处理器 创建和调度处理器并设置其属性。
8 连接 /连接 要创建连接,请设置队列优先级并更新连接目标
9 流文件队列 /流文件队列 查看队列内容、下载流文件内容和清空队列。
10 远程进程组 /远程进程组 创建远程组并启用传输。
11 出处 /出处 查询出处,并搜索事件沿袭。

现在让我们考虑一个示例并在 postman 上运行以获取有关正在运行的 NiFi 实例的详细信息。

要求

GET http://localhost:8080/nifi-api/flow/about

回复

{
   "about": {
      "title": "NiFi",
      "version": "1.7.1",
      "uri": "http://localhost:8080/nifi-api/",
      "contentViewerUrl": "../nifi-content-viewer/",
      "timezone": "SGT",
      "buildTag": "nifi-1.7.1-RC1",
      "buildTimestamp": "07/12/2018 12:54:43 SGT"
   }
}

Apache NiFi - 数据来源

Apache NiFi 记录并存储有关流中摄取数据上发生的事件的所有信息。数据来源存储库存储此信息并提供 UI 来搜索此事件信息。还可以访问完整 NiFi 级别和处理器级别的数据来源。

数据来源

下表列出了 NiFi 数据来源事件列表中的不同字段,其中包含以下字段 -

编号 字段名称 描述
1 约会时间 事件的日期和时间。
2 类型 事件类型,如“创建”。
3 流文件Uuid 执行事件的流文件的 UUID。
4 尺寸 流文件的大小。
5 组件名称 执行事件的组件的名称。
6 元件类型 组件的类型。
7 显示血统 最后一列有“显示沿袭”图标,用于查看流文件沿袭,如下图所示。
血统图标

要获取有关该事件的更多信息,用户可以单击 NiFi Data Provenance UI 第一列中的信息图标。

nifi.properties 文件中有一些属性,用于管理 NiFi Data Provenance 存储库。

编号 物业名称 默认值 描述
1 nifi.provenance.repository.directory.default ./provenance_repository 指定NiFi数据来源的默认路径。
2 nifi.provenance.repository.max.storage.time 24小时 指定 NiFi 数据来源的最长保留时间。
3 nifi.provenance.repository.max.storage.size 1GB 指定 NiFi 数据来源的最大存储量。
4 nifi.provenance.repository.rollover.time 30秒 指定 NiFi 数据来源的滚动时间。
5 nifi.provenance.repository.rollover.size 100MB 指定 NiFi 数据来源的翻转大小。
6 nifi.provenance.repository.indexed.fields 事件类型、FlowFileUUID、文件名、处理器ID、关系 指定用于搜索和索引 NiFi 数据来源的字段。

Apache NiFi - 监控

在 Apache NiFi 中,有多种方法可以监控系统的不同统计信息,例如错误、内存使用情况、CPU 使用情况、数据流统计信息等。我们将在本教程中讨论最流行的方法。

内置监控

在本节中,我们将详细了解 Apache NiFi 中的内置监控。

布告栏

公告板实时显示 NiFi 处理器生成的最新 ERROR 和 WARNING。要访问公告板,用户必须进入右侧下拉菜单并选择公告板选项。它会自动刷新,用户也可以禁用它。用户还可以通过双击错误导航到实际的处理器。用户还可以通过执行以下操作来过滤公告 -

  • 通过留言
  • 按名字
  • 按 ID
  • 按组 ID

数据来源 UI

要监控任何特定处理器或整个 NiFi 中发生的事件,用户可以从与公告板相同的菜单访问数据来源。用户还可以通过使用以下字段来过滤数据来源存储库中的事件 -

  • 按组件名称
  • 按组件类型
  • 按类型

NiFi 摘要 UI

Apache NiFi 摘要也可以从与公告板相同的菜单访问。此 UI 包含有关特定 NiFi 实例或集群的所有组件的信息。可以按名称、类型或 URI 过滤它们。不同的组件类型有不同的选项卡。以下是可以在 NiFi 摘要 UI 中进行监控的组件 -

  • 处理器
  • 输入端口
  • 输出端口
  • 远程进程组
  • 连接
  • 进程组

在此 UI 中,右下角有一个名为系统诊断的链接,用于检查 JVM 统计信息。

报告任务

Apache NiFi 提供多种报告任务来支持 Ambari、Grafana 等外部监控系统。开发人员可以创建自定义报告任务,也可以配置内置报告任务,将 NiFi 的指标发送到外部监控系统。下表列出了 NiFi 1.7.1 提供的报告任务。

编号 报告任务名称 描述
1 Ambari报告任务 为 NiFi 设置 Ambari Metrics Service。
2 控制器状态报告任务 报告 NiFi 摘要 UI 中最近 5 分钟的信息。
3 监控磁盘使用情况 报告并警告特定目录的磁盘使用情况。
4 监控内存 监控 JVM 的 Java 内存池中使用的 Java 堆的数量。
5 站点到站点公告报告任务 使用站点到站点协议在公告中报告错误和警告。
6 站点到站点来源报告任务 使用站点到站点协议报告 NiFi 数据来源事件。

NIFI API

有一个名为系统诊断的 API,可用于监控任何自定义开发的应用程序中的 NiFI 统计信息。让我们检查一下 postman 中的 API。

要求

http://localhost:8080/nifi-api/system-diagnostics

回复

{
   "systemDiagnostics": {
      "aggregateSnapshot": {
         "totalNonHeap": "183.89 MB",
         "totalNonHeapBytes": 192819200,
         "usedNonHeap": "173.47 MB",
         "usedNonHeapBytes": 181894560,
         "freeNonHeap": "10.42 MB",
         "freeNonHeapBytes": 10924640,
         "maxNonHeap": "-1 bytes",
         "maxNonHeapBytes": -1,
         "totalHeap": "512 MB",
         "totalHeapBytes": 536870912,
         "usedHeap": "273.37 MB",
         "usedHeapBytes": 286652264,
         "freeHeap": "238.63 MB",
         "freeHeapBytes": 250218648,
         "maxHeap": "512 MB",
         "maxHeapBytes": 536870912,
         "heapUtilization": "53.0%",
         "availableProcessors": 4,
         "processorLoadAverage": -1,
         "totalThreads": 71,
         "daemonThreads": 31,
         "uptime": "17:30:35.277",
         "flowFileRepositoryStorageUsage": {
            "freeSpace": "286.93 GB",
            "totalSpace": "464.78 GB",
            "usedSpace": "177.85 GB",
            "freeSpaceBytes": 308090789888,
            "totalSpaceBytes": 499057160192,
            "usedSpaceBytes": 190966370304,
            "utilization": "38.0%"
         },
         "contentRepositoryStorageUsage": [
            {
               "identifier": "default",
               "freeSpace": "286.93 GB",
               "totalSpace": "464.78 GB",
               "usedSpace": "177.85 GB",
               "freeSpaceBytes": 308090789888,
               "totalSpaceBytes": 499057160192,
               "usedSpaceBytes": 190966370304,
               "utilization": "38.0%"
            }
         ],
         "provenanceRepositoryStorageUsage": [
            {
               "identifier": "default",
               "freeSpace": "286.93 GB",
               "totalSpace": "464.78 GB",
               "usedSpace": "177.85 GB",
               "freeSpaceBytes": 308090789888,
               "totalSpaceBytes": 499057160192,
               "usedSpaceBytes": 190966370304,
               "utilization": "38.0%"
            }
         ],
         "garbageCollection": [
            {
               "name": "G1 Young Generation",
               "collectionCount": 344,
               "collectionTime": "00:00:06.239",
               "collectionMillis": 6239
            },
            {
               "name": "G1 Old Generation",
               "collectionCount": 0,
               "collectionTime": "00:00:00.000",
               "collectionMillis": 0
            }
         ],
         "statsLastRefreshed": "09:30:20 SGT",
         "versionInfo": {
            "niFiVersion": "1.7.1",
            "javaVendor": "Oracle Corporation",
            "javaVersion": "1.8.0_151",
            "osName": "Windows 7",
            "osVersion": "6.1",
            "osArchitecture": "amd64",
            "buildTag": "nifi-1.7.1-RC1",
            "buildTimestamp": "07/12/2018 12:54:43 SGT"
         }
      }
   }
}

Apache NiFi - 升级

在开始升级 Apache NiFi 之前,请阅读发行说明以了解更改和添加内容。用户需要评估这些添加和更改对其当前 NiFi 安装的影响。以下是获取 Apache NiFi 新版本的发行说明的链接。

https://cwiki.apache.org/confluence/display/NIFI/Release+Notes

在集群设置中,用户需要升级集群中每个节点的 NiFi 安装。请按照以下步骤升级 Apache NiFi。

  • 备份当前 NiFi 或 lib 或任何其他文件夹中存在的所有自定义 NAR。

  • 下载新版本的 Apache NiFi。以下是下载最新 NiFi 版本的源代码和二进制文件的链接。

    https://nifi.apache.org/download.html

  • 在当前NiFi的安装目录下新建一个目录,并解压新版本的Apache NiFi。

  • 优雅地停止 NiFi。首先停止所有处理器,并处理流程中存在的所有流程文件。一旦不再存在流文件,请停止 NiFi。

  • 将authorizers.xml的配置从当前NiFi安装复制到新版本。

  • 从当前版本更新新 NiFi 版本的 bootstrap-notification-services.xml 和 bootstrap.conf 中的值。

  • 将 logback.xml 中的自定义日志记录添加到新的 NiFi 安装中。

  • 在当前版本的login-identity-providers.xml 中配置登录身份提供程序。

  • 从当前版本更新新 NiFi 安装的 nifi.properties 中的所有属性。

  • 请确保新版本的组和用户与当前版本相同,以避免任何权限被拒绝的错误。

  • 将当前版本的state-management.xml中的配置复制到新版本。

  • 将当前版本 NiFi 安装中以下目录的内容复制到新版本中的相同目录。

    • ./conf/flow.xml.gz

    • 还有归档目录中的 flow.xml.gz。

    • 对于出处和内容存储库,更改 nifi 中的值。属性文件到当前存储库。

    • 如果指定了任何其他外部目录,则从 ./state/local 复制状态或更改 nifi.properties 。

  • 重新检查所有已执行的更改,并检查它们是否对新 NiFi 版本中添加的任何新更改产生影响。如果有影响,请检查解决方案。

  • 启动所有 NiFi 节点并验证所有流程是否正常工作以及存储库是否正在存储数据以及 Ui 是否正在检索数据但有任何错误。

  • 监视公告一段时间以检查是否有任何新错误。

  • 如果新版本工作正常,则可以将当前版本存档并从目录中删除。

Apache NiFi - 远程进程组

Apache NiFi 远程进程组或 RPG 使流程能够使用站点到站点协议将流程中的 FlowFiles 定向到不同的 NiFi 实例。从版本 1.7.1 开始,NiFi 不提供平衡关系,因此 RPG 用于 NiFi 数据流中的负载平衡。

远程进程

开发人员可以通过将上图所示的图标拖到画布上,从 NiFi UI 的顶部工具栏中添加 RPG。要配置 RPG,开发人员必须添加以下字段 -

编号 字段名称 描述
1 网址 指定以逗号分隔的远程目标 NiFi URL。
2 传输协议 指定远程 NiFi 实例的传输协议。它可以是 RAW 或 HTTP。
3 本地网络接口 指定发送/接收数据的本地网络接口。
4 HTTP 代理服务器主机名 指定代理服务器的主机名,以便在 RPG 中进行传输。
5 HTTP 代理服务器端口 指定代理服务器的端口以便在 RPG 中进行传输。
6 HTTP 代理用户 它是一个可选字段,用于指定 HTTP 代理的用户名。
7 HTTP 代理密码 这是一个可选字段,用于指定上述用户名的密码。

开发人员需要在使用它之前启用它,就像我们在使用处理器之前启动它们一样。

NiFi流

Apache NiFi - 控制器设置

Apache NiFi 提供共享服务,可以由处理器共享,报告任务称为控制器设置。它们就像数据库连接池,可供访问同一数据库的处理器使用。

要访问控制器设置,请使用 NiFi UI 右上角的下拉菜单,如下图所示。

控制器设置

Apache NiFi 提供了许多控制器设置,我们将讨论一种常用的控制器设置以及如何在 NiFi 中设置它。

DBCP连接池

单击控制器设置选项后,在 Nifi 设置页面中添加加号。然后从控制器设置列表中选择 DBCPConnectionPool。DBCPConnectionPool 将添加到 NiFi 主设置页面中,如下图所示。

连接池

它包含有关控制器设置的以下信息:名称

  • 类型
  • 状态
  • 范围
  • 配置和删除图标

单击配置图标并填写必填字段。下表列出了这些字段 -

编号 字段名称 默认值 描述
1 数据库连接 URL 空的 指定数据库的连接 URL。
2 数据库驱动程序类名 空的 指定数据库的驱动程序类名,例如 mysql 的 com.mysql.jdbc.Driver。
3 最长等待时间 500 毫厘 指定等待数据库连接数据的时间。
4 最大总连接数 8 指定数据库连接池中分配的最大连接数。

要停止或配置控制器设置,首先应停止所有连接的 NiFi 组件。NiFi 还在控制器设置中添加了范围来管理其配置。因此,只有共享相同设置的控制器才不会受到影响,并且会使用相同的控制器设置。

Apache NiFi - 报告任务

Apache NiFi 报告任务类似于控制器服务,它在后台运行并发送或记录 NiFi 实例的统计信息。NiFi 报告任务也可以从与控制器设置相同的页面访问,但在不同的选项卡中。

报告任务

要添加报告任务,开发人员需要单击报告任务页面右上角的加号按钮。这些报告任务主要用于监控 NiFi 实例的活动,无论是在公告中还是在来源中。这些报告任务主要使用Site-to-Site将NiFi统计数据传输到其他节点或外部系统。

现在让我们添加一个配置的报告任务以便更好地理解。

监控内存

此报告任务用于在内存池超过指定百分比时生成公告。按照以下步骤配置 MonitorMemory 报告任务 -

  • 添加加号并在列表中搜索 MonitorMemory。

  • 选择MonitorMemory并点击AD