Flume数据收集测试-1

10/11/2015

Flumes是一套分布式的,可靠地数据收集系统,他可以将大量的数据源传递的数据(log data等)传递到一个集中的数据源中进行统一的管理。他可以实现数据的收集和聚合,而且收集的数据可以是日志数据,也可以网络信息,社交媒体产生的信息等等。现存的版本主要是0.9.x还有1.x版本,本章节文章中使用的是官方的1.6.0版本。主要内容是通过Flume产生的数据收集起来后放入kafka队列中。为后期的数据消费做准备。

系统运行条件

  1. JRE环境(1.6+)
  2. 内存,充足的内存可以更加方便系统的运行和分配(本节实例:虚拟机2G内存)
  3. 磁盘空间(本节实例:虚拟机30G)
  4. 文件读写权限(测试环境运行root用户)

Flume架构

数据传输模型

在Flume中事件(event)被定义为一组数据流拥有数据payload和一个可选的字符串属性。一个Flume agent代表了一个JVM进程用于传递数据的节点。

image

在Flume中数据源产生数据后,在Flume中的Source消费这些数据后,通过一个传输的通道传递到Sink(目的地)中。之后再将这些数据传递到一个数据库或者HDFS中。在Flume中一旦Source接收到数据后,就会将数据传递到一个或者多个Channel中,数据会一直保留在其中直到Sink消费了该数据后,在Channel中才会删除。这个Channel可以是文件Channel,这样数据就会被保存在本地的磁盘上。

对于Flume支持多级的传输,也就是多个Agent的级联,这样可以跨越多个节点到达最终的目的地。数据在传输过程中,只会存储在Channel中或者最终的目的地。Flume提供了事务机制保证了数据的可靠传递,在多级传递中每个Agent都有自己的事务机制运行确保数据在该节点上的可靠传递。

Channel的区别:

  1. 文件Channel保证了数据的持久化和可恢复性。
  2. 内存Channel无法保证数据可恢复(当进程Down掉的时候)

安装与配置Flume

关于Flume的下载与安装,可以在官方Flume下载链接上下载文件。 下载完成后可以放置在/usr/share/目录上,

[root@localhost apache-flume-1.6.0-bin]# ls
bin  CHANGELOG  conf  DEVNOTES  docs  lib  LICENSE  NOTICE  README  RELEASE-NOTES  tools

这是整个的下载目录,可以在conf目录中找到对应的配置文件

以下是一个简单的配置文件的实例example.conf:

# Name the components on this agent
a1.sources = r1   
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

定义了一个agent a1的结构:

  1. a1拥有数据源r1,传输通道c1和传输目的地k1
  2. a1的数据源r1类型是网络类型监听所有发送到本地44444端口的数据
  3. a1的传输通道类型是内存型,传输的容量为1G,传输的事务容量为100Mb
  4. a1的传输目的地是一个定义为logger的目的地(在参数中可定义)
  5. 绑定传输的源到传输通道和目的地到传输通道

编写配置文件完成后就可以直接的运行下面的指令了

$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dflume.root.logger=INFO,console

这里我们定义了logger的意义,console终端,并且INFO类型的数据。

测试的话,就可以直接在本地telnet 44444并数据一些内容在终端上就会显示出以下的内容:

2015-05-02 01:00:30,197 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 20 0D          hello world . }

这样一个单节点的数据传输Flow就完成了。

多节点数据传输

image

上述图示代表了一个较为复杂的数据传输流,每个webServer节点上都部署上Flume用于采集数据,并通过设置相同的目的地来将数据传递到数据中心并通过该节点存储数据到HDFS。在这里我们实际上是并行的方式运行了多个Flume实例来传递数据。有时候我们可能需要将一份数据广播到多个数据终点,比如下面的图示中:

image

我们将采集到的数据通过多个Channel共享给了多个数据Sinks,并将数据传递到了多个不同的目的地中。为了实现多个灵活的数据采集方案,我们需要首先搞清楚如何通过修改配置系统文件来完成上述功能。

配置Flume

单一节点配置

为了定义一个单一的Agent,我们需要通过channels将源sources和目的地sinks连接,因此在配置中我们需要列出所有的源和目的地以及通道,并且将源和目的地指向一个通道。一个源可以定义多个channels,但是一个目的地sinks只能链接一个channel 上述代表了一个从avro到HDFS sink的配置文件

# list the sources, sinks and channels for the agent
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1

# set channel for source
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1

# set channel for sink
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1

同样我们虽然绑定了输入和输出 ,但是每个组件都有自己的配置信息,比如url 端口和类型等等,这些也需要我们从配置文件中定义好:

agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000

# properties of mem-channel-1
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100

# properties of hdfs-Cluster1-sink
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata

在一个配置文件中我们可以定义多个传输流,并存于同一个节点上 比如

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# flow #1 configuration
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1

# flow #2 configuration
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

这样对于同一个agent内部可以实现两套收集和处理数据流。

多通道节点配置

对于上述的将一个数据源分散到多个channel中,分为两种方式,一种是复制一种是多路,区别在于传递数据的方式的不同。复制意味着一个数据将被复制分发到所有的配置channels.而多路的含义就是指的是数据被分配到一个子通道,(这里需要定义分配的选择器)。

例如下面的实例中:

# List the sources, sinks and channels for the agent
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>

# set list of channels for source (separated by space)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>

# set channel for sinks
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>

<Agent>.sources.<Source1>.selector.type = replicating

上述的例子中我们将数据每条都复制到相同的路径上进行传递。也就是每个目的地址都会收到相同的数据内容

# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2

# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2

# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2

# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

数据传输中我们可以检查 header中的“State”,按照State不同将数据分发到不同的传输channel上,若不存在则缺省到内存channel,另外选择器还支持可选的映射关系 :

agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

可选的处理逻辑是:

  1. 数据先从必须的(mapping)映射通道上传递。
  2. 数据传递在必须的通道上时候,由于通道问题(数据无法被消费)传输失败后,数据将会在所有的这些必须的通道上重新尝试传递,建立事务。
  3. 一旦所有的必须的通道消费了数据,数据会被写入可选的channels,在此上述通道上传递的数据将不会被因为错误导致重传。
  4. 上述的mapping.CA = mem-channel-1 ;optional.CA = mem-channel-1 file-channel-2 标示数据在mem-channel-1上上是必须传输的,一旦失败了会重传,但是在file-channel-2上则是可选的,出错了也不会重新尝试发送。

总结:可选的通道处理优先级较低,并且不提供数据传输保证。


数据分析 Flume 页面已被访问2003次

发表评论