Flume数据收集测试-2

10/14/2015

关于Flume的配置我们已经在上一个章节中的内容中简单的描述完毕。这一个章节主要是介绍下Flume的输入源和输出源的配置问题,以及哪些默认的配置源我们可以直接使用。

1. Flume数据源分类

Avro源

Avro同样也是Apache管理的一个独立项目,用于数据的序列化和压缩等。avro支持跨编程语言实现(C, C++, C#,Java, Python, Ruby, PHP),动态加载可以使序列化更加的方便高效。关于Avro这里不多介绍。 下面的就是一个简单的使用avro作为数据源的例子,绑定端口到4141。并且可以配置使用ipFilterRules来设置访问的限制。允许本地访问,而拒绝其他的IP。并且可以支持SSL加密的数据传输。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
a1.sources.r1.ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
Exec Source

定义一个Unix的可执行命令,该命令可以不断的产生数据出来,比如tail -F .另外可以定义是否在进程死掉以后重新启动以及每次发送的数据条目等等。但是不能保证错误发生,以及数据丢失。为了更好地利用flume建议使用一个SDK去集成flume绑定数据流或者使用Spooling Director源。以下为一个执行的实例。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1

或者是调用shell脚本执行命令:

a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done
Spooling Directory

Spooling Direcory可以使你观察放置在目录中的新的文件,并将文件中的数据解析出来,并且文件一旦被读取完毕,则重新命名文件或者删除文件。不同于Exec的是,这个源是非常可靠的,不会丢失数据,即便是flume被重启或者杀掉。另外不要尝试去修改已经放入该目录中的数据。

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Kafka Source

flume支持我们直接从kafka的队列中读取消息使用,注意这里我们使用的一些参数比如 timeout 指代的是我们使用Kafka消费数据的时候为了等待新的数据我们至少要等待100ms.

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100
网络监听数据源

监听每一个请求行到一个时间,打开一个特定的端口去监听所有的数据请求

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.bind = 6666
a1.sources.r1.channels = c1
序列产生器

简单的产生一个每次加1的序列

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1
Syslog Sources

对于TCP 或者UDP的syslog支持直接生成events。

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

UDP数据源实例:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

对于多个syslog数据源的数据收集:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = multiport_syslogtcp
a1.sources.r1.channels = c1
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.ports = 10001 10002 10003
a1.sources.r1.portHeader = port  

这里使用了端口作为header的一部分,这样我们就可以使用selort选择不同的数据源的数据

压力测试数据源

定义发送的每个event的负载大小和总计的数目。用于测试使用

a1.sources = stresssource-1
a1.channels = memoryChannel-1
a1.sources.stresssource-1.type = org.apache.flume.source.StressSource
a1.sources.stresssource-1.size = 10240
a1.sources.stresssource-1.maxTotalEvents = 1000000
a1.sources.stresssource-1.channels = memoryChannel-1
自定义数据源

另外我们还可以自定义数据源,但是需要自己去集成Source的接口并包含Flume agent的classpath在启动的时候。自定义的格式类似于:

a1.sources = r1
a1.channels = c1
a1.sources.r1.type = org.example.MySource
a1.sources.r1.channels = c1

2. Flume数据Sinks分类

Kafka Sink

这里我们使用Kafka作为数据的目的地,在配置上我们需要配置输出为kafka sink. 以下是一个使用的实例,监听一个目录下的文件,如果该文件夹下有新的数据文件到来,则捕获数据并且将其解释为events后输出到kafka.

# Name the components on this agent
a1.sources = src-1
a1.sinks = k1
a1.channels = ch-1

# Describe/configure the source
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
# Describe the sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = my-topic
a1.sinks.k1.brokerList = 192.168.1.115:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# Use a channel which buffers events in memory
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 1000
a1.channels.ch-1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.src-1.channels = ch-1
a1.sinks.k1.channel = ch-1
ElasticSearchSink

Flume可以直接的将数据写入elasticsearch中,通过kibana的图形界面就可以展示他们,就像是使用logstash记录一样。

a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1

3. Flume的Channels

MemoryChannel

内存通道定义最大的传输数量和存储在channel中的数量。缺省定义80%的最大内存使用量。

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
KafkaChannel

数据从输入读入后,可直接进入kafka队列中保存,这里配置中的brokerList最好列举两个,这样就可以保证HA。

a1.channels.channel1.type   = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.channel1.capacity = 10000
a1.channels.channel1.transactionCapacity = 1000
a1.channels.channel1.brokerList=kafka-2:9092,kafka-3:9092
a1.channels.channel1.topic=channel1
a1.channels.channel1.zookeeperConnect=kafka-1:2181
FileChannel

这里需要两种文件夹类型 一种用来存放检查点文件的另一种存放数据文件的。对于数据文件可以列举多个分离磁盘上的文件夹增加性能

a1.channels = c1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data

另外支持对于文件的加密存储

####4. Flume Selectors

对于选择器默认情况下的通道选择器为复制,所有的数据会被分发到每个通道上如下面所示,

a1.sources = r1
a1.channels = c1 c2 c3
a1.source.r1.selector.type = replicating
a1.source.r1.channels = c1 c2 c3
a1.source.r1.selector.optional = c3

但是由于c3被标记为optional所以他在处理上如果遇到错误则简单的忽略。其他的通道错误导致数据的事务失败。下面的例子中我们使用了multiplexing模式。该模式下需要配置响应的标签用于区分不同的流量走向,下面的意思是匹配header中的state字符。如果为CZ则通过c1出去。如果匹配的us则从c2.c3,缺省从c5

a1.sources = r1
a1.channels = c1 c2 c3 c4
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2 c3
a1.sources.r1.selector.default = c4

数据分析 Flume 页面已被访问2503次

发表评论