Apache Flume 教程:什么是 Flume、架构及 Hadoop 示例

Hadoop 中的 Apache Flume 是什么?

Apache Flume 是一个可靠的分布式系统,用于收集、聚合和移动海量日志数据。它具有基于流数据流的简单而灵活的架构。Apache Flume 用于收集 Web 服务器日志文件中存在的日志数据,并将其聚合到 HDFS 中进行分析。

Hadoop 中的 Flume 支持多种源,例如:

  • “tail”(将数据从本地文件管道传输并通过 Flume 写入 HDFS,类似于 Unix 命令 ‘tail’)
  • 系统日志
  • Apache log4j(启用 Java 应用程序通过 Flume 将事件写入 HDFS 中的文件)。

Flume 架构

一个 Flume 代理是一个 JVM 进程,它包含 3 个组件——Flume Source、Flume Channel Flume Sink——事件在从外部源启动后通过这些组件传播。

Flume Architecture
Flume 架构
  1. 在上图中,外部源(WebServer)生成的事件由 Flume Data Source 消费。外部源以目标源能够识别的格式将事件发送到 Flume 源。
  2. Flume Source 接收事件并将其存储到一个或多个 Channel 中。Channel 作为存储,直到被 Flume Sink 消费。此 Channel 可能使用本地文件系统来存储这些事件。
  3. Flume Sink 从 Channel 中移除事件,并将其存储到外部存储库(例如 HDFS)中。可能存在多个 Flume 代理,在这种情况下,Flume Sink 会将事件转发到流中下一个 Flume 代理的 Flume 源。

FLUME 的一些重要特性

  • Flume 具有基于流数据流的灵活设计。它具有容错性并且健壮,具有多种故障转移和恢复机制。Flume Big Data 提供不同级别的可靠性,包括“尽力而为交付”“端到端交付”尽力而为交付不能容忍任何 Flume 节点故障,而“端到端交付”模式可确保即使在多个节点发生故障时也能交付。
  • Flume 在源和宿之间传输数据。这些数据的收集可以是计划性的,也可以是事件驱动的。Flume 拥有自己的查询处理引擎,可以轻松地在将每批新数据移动到目标宿之前对其进行转换。
  • 可能的 Flume 宿包括 HDFS HBase。Flume Hadoop 还可用于传输事件数据,包括但不限于网络流量数据、社交媒体网站生成的数据以及电子邮件消息。

Flume 库和源代码设置

在开始实际过程之前,请确保已安装 Hadoop。将用户更改为“hduser”(Hadoop 配置期间使用的 ID,您可以切换到 Hadoop 配置期间使用的用户 ID)

Flume, Library and Source Code Setup

步骤 1)创建一个名为“FlumeTutorial”的新目录

sudo mkdir FlumeTutorial
  1. 赋予读、写和执行权限
    sudo chmod -R 777 FlumeTutorial
  2. 将文件 **MyTwitterSource.java** 和 **MyTwitterSourceForFlume.java** 复制到此目录。

从此处下载输入文件

检查所有这些文件的文件权限,如果缺少“读取”权限,则授予该权限 —

Flume, Library and Source Code Setup

步骤 2)从网站下载“Apache Flume” — https://flume.apache.org/download.html

本 Flume 教程中使用了 Apache Flume 1.4.0。

Flume, Library and Source Code Setup

下一步

Flume, Library and Source Code Setup

步骤 3)将下载的 tarball 复制到您选择的目录,并使用以下命令解压内容

sudo tar -xvf apache-flume-1.4.0-bin.tar.gz

Flume, Library and Source Code Setup

此命令将创建一个名为 **apache-flume-1.4.0-bin** 的新目录并将文件解压到其中。在本文的其余部分,此目录将被称为

步骤 4)Flume 库设置

将 **twitter4j-core-4.0.1.jar、flume-ng-configuration-1.4.0.jar、flume-ng-core-1.4.0.jar、flume-ng-sdk-1.4.0.jar** 复制到

<Flume 安装目录>/lib/

复制的 JAR 文件可能需要或不需要执行权限。这可能会导致代码编译出现问题。因此,撤销此类 JAR 的执行权限。

在我的例子中,**twitter4j-core-4.0.1.jar** 具有执行权限。我按如下方式撤销了它 —

sudo chmod -x twitter4j-core-4.0.1.jar

Flume, Library and Source Code Setup

在此命令之后,将“读取”权限授予 **twitter4j-core-4.0.1.jar** 中的所有人。

sudo chmod +rrr /usr/local/apache-flume-1.4.0-bin/lib/twitter4j-core-4.0.1.jar

请注意,我已下载 —

– twitter4j-core-4.0.1.jar 来自 https://mvnrepository.com/artifact/org.twitter4j/twitter4j-core

所有 Flume JAR,即 **flume-ng-*-1.4.0.jar** 来自 http://mvnrepository.com/artifact/org.apache.flume

使用 Flume 从 Twitter 加载数据

步骤 1)转到包含源代码文件的目录。

步骤 2)将 **CLASSPATH** 设置为包含 **<Flume 安装目录>/lib/*** 和 **~/FlumeTutorial/flume/mytwittersource/***

export CLASSPATH="/usr/local/apache-flume-1.4.0-bin/lib/*:~/FlumeTutorial/flume/mytwittersource/*"

Load Data from Twitter using Flume

步骤 3)使用命令编译源代码 —

javac -d . MyTwitterSourceForFlume.java MyTwitterSource.java

Load Data from Twitter using Flume

步骤 4)创建一个 jar

首先,使用您选择的文本编辑器创建一个 **Manifest.txt** 文件,并在其中添加以下行 —

Main-Class: flume.mytwittersource.MyTwitterSourceForFlume

... 这里 **flume.mytwittersource.MyTwitterSourceForFlume** 是主类的名称。请注意,您必须在该行末尾按 Enter 键。

Load Data from Twitter using Flume

现在,按如下方式创建 JAR‘**MyTwitterSourceForFlume.jar**’ —

jar cfm MyTwitterSourceForFlume.jar Manifest.txt flume/mytwittersource/*.class

Load Data from Twitter using Flume

步骤 5)将此 jar 复制到 **<Flume 安装目录>/lib/**

sudo cp MyTwitterSourceForFlume.jar <Flume Installation Directory>/lib/

Load Data from Twitter using Flume

步骤 6)转到 Flume 的配置目录 **<Flume 安装目录>/conf**

如果 flume.conf 不存在,则复制 flume-conf.properties.template 并将其重命名为 flume.conf

sudo cp flume-conf.properties.template flume.conf

Load Data from Twitter using Flume

如果 **flume-env.sh** 不存在,则复制 **flume-env.sh.template** 并将其重命名为 **flume-env.sh**

sudo cp flume-env.sh.template flume-env.sh

Load Data from Twitter using Flume

创建 Twitter 应用程序

步骤 1)通过登录 https://developer.twitter.com/ 创建 Twitter 应用程序

Creating a Twitter Application

Creating a Twitter Application

步骤 2)转到“我的应用程序”(当单击右上角的“Egg”按钮时,此选项会下拉)

Creating a Twitter Application

步骤 3)通过单击“创建新应用程序”来创建新应用程序

步骤 4)通过指定应用程序名称、描述和网站来填写应用程序详细信息。您可以参考每个输入框下方的说明。

Creating a Twitter Application

步骤 5)向下滚动页面,通过勾选“是的,我同意”来接受条款,然后单击“创建您的 Twitter 应用程序”按钮

Creating a Twitter Application

步骤 6)在新创建的应用程序窗口中,转到“API 密钥”选项卡,向下滚动页面,然后单击“创建我的访问令牌”按钮

Creating a Twitter Application

Creating a Twitter Application

步骤 7)刷新页面。

步骤 8)单击“测试 OAuth”。这将显示应用程序的“OAuth”设置。

Creating a Twitter Application

步骤 9)修改“flume.conf”,使用这些 OAuth 设置。修改“flume.conf”的步骤如下。

Creating a Twitter Application

我们需要复制 Consumer key、Consumer secret、Access token 和 Access token secret 来更新 ‘flume.conf’。

注意:这些值属于用户,因此是机密的,不应共享。

修改 ‘flume.conf’ 文件

步骤 1)以写入模式打开‘flume.conf’并为以下参数设置值 —

sudo gedit flume.conf

复制以下内容 —

MyTwitAgent.sources = Twitter
MyTwitAgent.channels = MemChannel
MyTwitAgent.sinks = HDFS
MyTwitAgent.sources.Twitter.type = flume.mytwittersource.MyTwitterSourceForFlume
MyTwitAgent.sources.Twitter.channels = MemChannel 
MyTwitAgent.sources.Twitter.consumerKey = <Copy consumer key value from Twitter App>
MyTwitAgent.sources.Twitter.consumerSecret = <Copy consumer secret value from Twitter App>
MyTwitAgent.sources.Twitter.accessToken = <Copy access token value from Twitter App>
MyTwitAgent.sources.Twitter.accessTokenSecret = <Copy access token secret value from Twitter App>
MyTwitAgent.sources.Twitter.keywords = guru99
MyTwitAgent.sinks.HDFS.channel = MemChannel
MyTwitAgent.sinks.HDFS.type = hdfs
MyTwitAgent.sinks.HDFS.hdfs.path = hdfs://:54310/user/hduser/flume/tweets/
MyTwitAgent.sinks.HDFS.hdfs.fileType = DataStream
MyTwitAgent.sinks.HDFS.hdfs.writeFormat = Text
MyTwitAgent.sinks.HDFS.hdfs.batchSize = 1000
MyTwitAgent.sinks.HDFS.hdfs.rollSize = 0
MyTwitAgent.sinks.HDFS.hdfs.rollCount = 10000
MyTwitAgent.channels.MemChannel.type = memory
MyTwitAgent.channels.MemChannel.capacity = 10000
MyTwitAgent.channels.MemChannel.transactionCapacity = 1000

Modify 'Flume.conf' File

步骤 2)另外,将 **TwitterAgent.sinks.HDFS.hdfs.path** 设置为如下所示:

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://<主机名>:<端口号>/<HDFS 主目录>/flume/tweets/

Modify 'Flume.conf' File

要了解<主机名><端口号><HDFS 主目录>,请查看 $HADOOP_HOME/etc/hadoop/core-site.xml 中设置的‘fs.defaultFS’参数值。

Modify 'Flume.conf' File

步骤 3)为了在数据到达时将其刷新到 HDFS,请删除以下条目(如果存在):

TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

示例:使用 Flume 流式传输 Twitter 数据

步骤 1)以写入模式打开‘flume-env.sh’并为以下参数设置值 —

JAVA_HOME=<Installation directory of Java>
FLUME_CLASSPATH="<Flume Installation Directory>/lib/MyTwitterSourceForFlume.jar"

Streaming Twitter Data using Flume

步骤 2)启动 Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

步骤 3)Flume tarball 中的两个 JAR 文件与 Hadoop 2.2.0 不兼容。因此,在此 Apache Flume 示例中,我们需要遵循以下步骤使 Flume 与 Hadoop 2.2.0 兼容。

a.将 **protobuf-java-2.4.1.jar** 移出‘<Flume 安装目录>/lib’

转到‘<Flume 安装目录>/lib’

cd <Flume 安装目录>/lib

sudo mv protobuf-java-2.4.1.jar ~/

Streaming Twitter Data using Flume

b.查找 JAR 文件 ‘guava’ 如下 —

find . -name "guava*"

Streaming Twitter Data using Flume

将 **guava-10.0.1.jar** 移出‘<Flume 安装目录>/lib’

sudo mv guava-10.0.1.jar ~/

Streaming Twitter Data using Flume

c.http://mvnrepository.com/artifact/com.google.guava/guava/17.0 下载 **guava-17.0.jar**

Streaming Twitter Data using Flume

现在,将此下载的 jar 文件复制到‘<Flume 安装目录>/lib’

步骤 4)转到‘<Flume 安装目录>/bin’并按如下方式启动 Flume —

./flume-ng agent -n MyTwitAgent -c conf -f <Flume Installation Directory>/conf/flume.conf

Streaming Twitter Data using Flume

Flume 正在获取 Tweets 的命令提示窗口 —

Streaming Twitter Data using Flume

从命令行消息中,我们可以看到输出已写入 **/user/hduser/flume/tweets/ **目录。

现在,使用 Web 浏览器打开此目录。

步骤 5)要通过浏览器查看数据加载结果,请打开 **https://:50070/** 并浏览文件系统,然后转到数据已加载的目录,即 —

<HDFS 主目录>/flume/tweets/

Streaming Twitter Data using Flume