Kafka开发日志流系统(环境搭建)

9/25/2015

image

运行环境准备

  1. CentOS(运行Kafka)
  2. Java运行环境
  3. Node开发环境(可在一台机器上开发)

搭建运行环境 kafka 运行于一台虚拟机器中,操作系统为CentOS.安装完成后需要先安装Java运行环境。Linux版本Java下载地址 下载完毕后编译安装java jre。 这里注意对于二进制文件需要添加环境变量到系统中,才能在下一步执行操作中不报错。

vim /etc/profile
在文档最后,添加:
export PATH="/path/jre-path/bin:$PATH"
保存,退出,然后运行:
source /etc/profile
不报错则成功。

安装完成后下载linux版本的Kafka二进制包,直接解压缩即可使用。Kafka

Kafka测试

对于Kafka的测试可以参考下面的测试实例完成,由于Kafka需要Zookeeper来协调分布式的部署和运行。所以在下载的文件中已经包含了可以使用的Zookeeper这里简单的描述测试的流程。这里面涉及几个Kafka的概念:topic,producer,consumer其中producer作为一个数据源用于生成数据,topic作为消息的分类,比如用户消息,日志消息单独的topic中保存此类消息。consumer则是消费这些信息的客体。本例子就是通过使用node来处理kafka的生产消息和消费消息的整个数据流。

**启动zookeeper**

bin/zookeeper-server-start.sh config/zookeeper.properties

[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

**启动Kafka服务器**

bin/kafka-server-start.sh config/server.properties

[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)

[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

**创建一个topic**

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

**查看创建的topic实例**

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

**生产一个消息到topic**

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a messageThis is another message

**开启一个消费客户端**

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

This is a message
This is another message

测试完成后就可以直接开始接触实际代码了(node环境的搭建可参考主页的node官网链接)

注意一定在配置server的时候设置好参考的IP地址和端口后,否则链接有问题

配置需要在/usr/local/kafka_2.9.1-0.8.2.1/config下面找到server.properties然后将其中的host.name配置为client可以访问到的地址,否则会导致客户端代码无法正常的连接和更新数据。

advertised.host.name=192.168.1.104  
# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
#advertised.port=<port accessible by clients>

# The number of threads handling network requests
num.network.threads=3

附录

使用python代码管理数据。其中的Producer和Consumer工作都可以通过模块 KafkaConsumer 来实现。

 pip install KafkaConsumer 
 easy_install KafkaConsumer

 或者是直接使用git下载编译安装
 git clone https://github.com/mumrah/kafka-python
 pip install ./kafka-python

一旦下载完成后,即可在客户端代码中import 该模块,并调用模块的生成数据的方法将数据传递给kafka的broker。以下为测试代码 供测试使用

客户端的Producer.py代码如下:

#encoding: utf-8
from kafka import SimpleProducer, KafkaClient

# To send messages synchronously
kafka = KafkaClient('192.168.1.104:9092')
producer = SimpleProducer(kafka)

# Note that the application is responsible for encoding messages to type bytes
producer.send_messages(b'test', b'some message')
producer.send_messages(b'test', b'this method', b'is variadic')

# Send unicode message
producer.send_messages(b'test', u'你怎么样?'.encode('utf-8'))

客户端消费数据的代码 Consumer.py

import logging
from kafka import KafkaConsumer

# To consume messages
consumer = KafkaConsumer('test',
                         group_id='my_group',
                         bootstrap_servers=['192.168.1.104:9092'])
for message in consumer:
    # message value is raw byte string -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                         message.offset, message.key,
                                         message.value))

如果需要了解更多的API使用,可参考模块的官方链接 http://kafka-python.readthedocs.org/en/latest/install.html


Kafka python 页面已被访问3028次

发表评论