您好,登录后才能下订单哦!
鲁春利的工作笔记,好记性不如烂笔头
Flume1.6.0增加了对kafka的完全支持:
Flume Sink and Source for Apache Kafka A new channel that uses Kafka
Kafka Source(http://flume.apache.org/FlumeUserGuide.html#kafka-source)
Kafka Source is an Apache Kafka consumer that reads messages from a Kafka topic.
If you have multiple Kafka sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topic.
File Channel(http://flume.apache.org/FlumeUserGuide.html#file-channel)
HBase Sink(http://flume.apache.org/FlumeUserGuide.html#hbasesink)
The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.
Kafka生成的topic为myhbase
[hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --create --zookeeper nnode:2181,dnode1:2181,dnode2:2181 --replication-factor 1 --partitions 1 --topic myhbase Created topic "myhbase". [hadoop@nnode kafka0.8.2.1]$ bin/kafka-topics.sh --list --zookeeper nnode:2181,dnode1:2181,dnode2:2181 myhbase mykafka mytopic - marked for deletion test - marked for deletion [hadoop@nnode kafka0.8.2.1]$
HBase表结构
[hadoop@nnode kafka0.8.2.1]$ hbase shell HBase Shell; enter 'help<RETURN>' for list of supported commands. Type "exit<RETURN>" to leave the HBase Shell Version 1.0.1, r66a93c09df3b12ff7b86c39bc8475c60e15af82d, Fri Apr 17 22:14:06 PDT 2015 表名:t_inter_log 列族:cf
Flume配置文件
vim conf/kafka-hbase.conf # read from kafka and write to hbase agent.sources = kafka-source agent.channels = mem-channel agent.sinks = hbase-sink # source agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource agent.sources.kafka-source.zookeeperConnect = nnode:2181,dnode1:2181,dnode2:2181 agent.sources.kafka-source.groupId = flume agent.sources.kafka-source.topic = myhbase agent.sources.kafka-source.kafka.consumer.timeout.ms = 100 # channel agent.channels.mem-channel.type = memory # sink agent.sinks.hbase-sink.type = hbase agent.sinks.hbase-sink.table = t_inter_log agent.sinks.hbase-sink.columnFamily = cf # agent.sinks.hbase-sink.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer # assemble agent.sources.kafka-source.channels = mem-channel agent.sinks.hbase-sink.channel = mem-channel
启动Kafka
[hadoop@nnode kafka0.8.2.1]# bin/kafka-server-start.sh config/server.properties
启动flume-ng
[hadoop@nnode flume1.6.0]$ bin/flume-ng agent --conf conf --name agent --conf-file conf/kafka-hbase.conf -Dflume.root.logger=INFO,console
通过Java Api实现producer
package com.lucl.kafka.simple; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.log4j.Logger; /** * <p> Copyright: Copyright (c) 2015 </p> * * <p> Date : 2015-11-17 21:42:50 </p> * * <p> Description : JavaApi for kafka producer </p> * * @author luchunli * * @version 1.0 * */ public class SimpleKafkaProducer { private static final Logger logger = Logger.getLogger(SimpleKafkaProducer.class); /** * */ private void execMsgSend() { Properties props = new Properties(); props.put("metadata.broker.list", "192.168.137.117:9092"); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("key.serializer.class", "kafka.serializer.StringEncoder"); props.put("request.required.acks", "0"); ProducerConfig config = new ProducerConfig(props); logger.info("set config info(" + config + ") ok."); Producer<String, String> procuder = new Producer<>(config); String topic = "myhbase"; String columnFamily = "cf"; String column = "count"; for (int i = 1; i <= 10; i++) { String rowkey = "www.value_" + i + ".com"; String value = "value_" + i; String event = rowkey + ", " + columnFamily + ":" + column + ", " + value; logger.info(event); KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, event); procuder.send(msg); } logger.info("send message over."); procuder.close(); } /** * @param args */ public static void main(String[] args) { SimpleKafkaProducer simpleProducer = new SimpleKafkaProducer(); simpleProducer.execMsgSend(); } }
观察Flume-ng控制台输出
2015-11-21 23:09:47,466 (flume_nnode-1448118584558-54f0a1ba-leader-finder-thread) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] [ConsumerFetcherManager-1448118585060] Added fetcher for partitions ArrayBuffer([[myhbase,0], initOffset 70 to broker id:117,host:nnode,port:9092] ) 2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: hbase-sink: Successfully registered new MBean. 2015-11-21 23:09:59,147 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: hbase-sink started 2015-11-21 23:15:30,702 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:351)] Failed to commit transaction.Transaction rolled back. java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377) at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) 2015-11-21 23:15:30,716 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:354)] Failed to commit transaction.Transaction rolled back. java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377) at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchMethodError: org.apache.hadoop.hbase.client.Put.setWriteToWAL(Z)V at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:377) at org.apache.flume.sink.hbase.HBaseSink$3.run(HBaseSink.java:372) at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50) at org.apache.flume.sink.hbase.HBaseSink.putEventsAndCommit(HBaseSink.java:372) at org.apache.flume.sink.hbase.HBaseSink.process(HBaseSink.java:342) at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745) ^X^C2015-11-21 23:15:38,090 (agent-shutdown-hook) [INFO - org.apache.flume.lifecycle.LifecycleSupervisor.stop(LifecycleSupervisor.java:79)] Stopping lifecycle supervisor 10 2015-11-21 23:15:38,103 (PollableSourceRunner-KafkaSource-kafka-source) [INFO - org.apache.flume.source.PollableSourceRunner$PollingRunner.run(PollableSourceRunner.java:149)] Source runner interrupted. Exiting
写入失败。
查看HBase的表
hbase(main):004:0> scan 't_inter_log' ROW COLUMN+CELL 0 row(s) in 0.0140 seconds hbase(main):005:0>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。