您好,登录后才能下订单哦!
本篇内容主要讲解“Storm-kafka接口怎么实现”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Storm-kafka接口怎么实现”吧!
阅读背景: 如有需要,尽情参看本空间的另外一篇文档
阅读目的:了解Storm 如何来封装kafka接口,如何处理Connection连接的封装性问题
package com.mixbox.storm.kafka; import kafka.javaapi.consumer.SimpleConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.trident.IBrokerReader; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; /** * 2014/07/22 * 动态的【分区连接】 * @author Yin Shuai */ public class DynamicPartitionConnections { public static final Logger LOG = LoggerFactory .getLogger(DynamicPartitionConnections.class); /** * 持有了一个 kafka底层的SimpleConsumer对象 * 持有了 具体的分区 * * @author Yin Shuai */ static class ConnectionInfo { //内部维持了一个SimpleConsumer SimpleConsumer consumer; //分区 Set<Integer> partitions = new HashSet(); public ConnectionInfo(SimpleConsumer consumer) { this.consumer = consumer; } } /** * 也就是kafka的每一个节点都维持了一个COnnectionInfo,ConnectionInfo */ Map<Broker, ConnectionInfo> _connections = new HashMap(); // kafkaConfig KafkaConfig _config; /** * IBrokerReader 基本上 IbroerReader这里初始化的是ZkBrokerReader */ IBrokerReader _reader; /** * @param config * kafka配置 * @param brokerReader * IBrokerReader-用于拿到当前的接口 */ public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) { _config = config; _reader = brokerReader; } /** * @param partition 分区 * @return */ public SimpleConsumer register(Partition partition) { /** * 依据你所拥有的partition号,拿到你所对应的Broker * GlobalPartitionInformation中有Map<Integer, Broker> * partitionMap,记录了分区号与Broker所对应的关系 */ Broker broker = _reader.getCurrentBrokers().getBrokerFor( partition.partition); return register(broker, partition.partition); } /** * @param host * 主机 * @param partition * 分区 * @return 底层的SimpleConsumer 对象,这里存在一个注册的行为,将主机和端口【broker】,和分区【partition】 注册到 connections连接之中 */ public SimpleConsumer register(Broker host, int partition) { // Map<Broker, ConnectionInfo> _connections = new HashMap(); //如果连接之中没有包含了Broker,那么建立一个新的连接,并且将这个 主机和连接注册到 _connections之中 if (!_connections.containsKey(host)) { _connections.put(host, new ConnectionInfo(new SimpleConsumer( host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId))); } // --------- 在这里,不管之前有没有都只取一次 ------------- //当包含了,那就直接取出 ConnectionInfo info = _connections.get(host); info.partitions.add(partition); return info.consumer; } public SimpleConsumer getConnection(Partition partition) { // ConnectionInfo 之中封装了一个simpleConsumer ConnectionInfo info = _connections.get(partition.host); if (info != null) { return info.consumer; } return null; } /** * @param port 固定的Broker * @param partition 固定的分区 */ public void unregister(Broker port, int partition) { ConnectionInfo info = _connections.get(port); info.partitions.remove(partition); if (info.partitions.isEmpty()) { info.consumer.close(); _connections.remove(port); } } public void unregister(Partition partition) { unregister(partition.host, partition.partition); } public void clear() { for (ConnectionInfo info : _connections.values()) { info.consumer.close(); } _connections.clear(); } }
与前文有关
1: 在DynamicPartitionConnections之中,我们持有了一个 IBrokerReader的接口对象。
2 : 由于IBrokerReader 派生出了
2.1 StaticBrokerReader
2.2 ZBrokerReader
在这个序列的一系列博文之中,ZBrokerReader已经进行了详尽的分析,并且在赋值的过程之中,IBrokerReader也是实例化为ZBrokerReader了。
内部类:
DynamicPartitionConnections 持有了一个 CinnectionInfo的内部类
static class ConnectionInfo { //内部维持了一个SimpleConsumer SimpleConsumer consumer; //分区 Set<Integer> partitions = new HashSet(); public ConnectionInfo(SimpleConsumer consumer) { this.consumer = consumer; } }
1: 对于每一个Connection内部都维持了一个SimpleConsumer ,以及一个 Set集合 partitions
2 :在DynamicPartitionConnections里面我们维持了一个_connections的对象
Map<Broker, ConnectionInfo> _connections = new HashMap();
3 :在连接维护之中,关键的地方是维护一个 register注册的行为:
public SimpleConsumer register(Broker host, int partition) {
4: 如果_connections之中没有包含Broker,那么将会再建立一个新的连接,并且将Broker和Connection 注册到_connections之中
5:在注册的过程之中,不包含就注册,最后都直接取出SimpleConsumer,这个SimpleConsumer
封装了
new ConnectionInfo(new SimpleConsumer(
host.host, host.port, _config.socketTimeoutMs,
_config.bufferSizeBytes, _config.clientId)):
到此,相信大家对“Storm-kafka接口怎么实现”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。