java分布式流式处理组件Producer分区的作用是什么

发布时间:2023-03-07 11:31:17 作者:iii
来源:亿速云 阅读:131

这篇文章主要讲解了“java分布式流式处理组件Producer分区的作用是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“java分布式流式处理组件Producer分区的作用是什么”吧!

为什么需要分区

分区的作用

如果将全部的数据存储在一台机器上,那么要对当前数据做副本的时候,由于服务器资源配置不同,就有可能会出现副本数据存放失败,从而增加数据丢失的可能性。

同时,如果单个文件过大,副本放置时间、内容检索时间都会极大的延长,从而导致Kafka性能降低。

刚才我们提到:生产者已分区为单位向Broker发送数据。那么问题来了:

这就是我们接下来要研究的分区策略。

分区策略

private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
    // 如果在消息中指定了分区
    if (record.partition() != null)
        return record.partition();
    if (partitioner != null) {
        // 分区器通过计算得到分区
        int customPartition = partitioner.partition(
            record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
        if (customPartition < 0) {
            throw new IllegalArgumentException(String.format(
                "The partitioner generated an invalid partition number: %d. Partition number should always be non-negative.", customPartition));
        }
        return customPartition;
    }
    // 通过序列化key计算分区
    if (serializedKey != null && !partitionerIgnoreKeys) {
        // hash the keyBytes to choose a partition
        return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size());
    } else {
        // 返回-1
        return RecordMetadata.UNKNOWN_PARTITION;
    }
}

下面的代码可以说是整个分区器的核心部分,可以通过以下的步骤进行说明:

在Kafka中分区策略我们是可以自定义的。当然Kafka也为我们内置了三种分区策略类。 接下来我们挑个重点来介绍,来给我们自定义分区器做一个铺垫~

java分布式流式处理组件Producer分区的作用是什么

我们已经看到,DefaultPartitionerUniformStickyPartitioner已经被标注为过期类,当然也并不妨碍我们来了解一下。

DefaultPartitioner

在当前版本中,如果没有对partitioner.class进行配置,此时的分区策略就会采用当前类作为默认分区策略类。

而以下是DefaultPartitioner策略类的核心实现方式,并且标记部分的代码实现其实就是UniformStickyPartitioner的计算逻辑

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
    if (keyBytes == null) {
        // 就是这段属于UniformStickyPartitioner的实现逻辑
        return stickyPartitionCache.partition(topic, cluster);
    }
    return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}

还有一段代码让我们来一起看看

public static int partitionForKey(final byte[] serializedKey, final int numPartitions) {
    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

这段代码不管有多复杂,调用方法有多少,但最终我们是能够发现:

这是一种比较重要的计算方式,但却不是唯一的方式

java分布式流式处理组件Producer分区的作用是什么

---这是分割线---

接下来继续,我们看看如果无法对序列化Key计算,会是怎么样的计算逻辑?

我们先开始来看一下,是在哪个地方得到的serializedKey,并且什么情况下serializedKey会是NULL

看看下面的这个代码眼熟不?

// 生产者生产消息对象
ProducerRecord<String, String> record = new ProducerRecord<>(
        "newTopic001",
        "data from " + KafkaQuickProducer.class.getName()
);

java分布式流式处理组件Producer分区的作用是什么

// KafkaProducer#doSend()
// line994
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
public class StringSerializer implements Serializer<String> {
    // 省略。。。
    @Override
    public byte[] serialize(String topic, String data) {
        if (data == null) {
            return null;
        } else {
            return data.getBytes(encoding);
        }
    }
}

从上面的代码来看,基本上能够实锤了:

此时相当于我们已经进入到UniformStickyPartitioner的计算逻辑, 当然了在我们使用的3.3版本中当前类也已经被标注为过期

根据前面的说法,粘性分区主要解决了消息无Key的分区计算逻辑,那么粘性分区并不是说每次都使用同一个分区

它是通过一个大的Batch为单位,尽量将batch内的消息固定在同一个分区内,这样在很大程度上能够保证:

而实现方式是采用ConcurrentMap来进行缓存,感兴趣的大家可以看看StickyPartitionCache的源码

而当Batch内消息满足发送条件被发送出去之后,才会开始再次计算下一个分区,为此在KafkaProducer中还专门调用了新的方法

partitioner.onNewBatch(topic, cluster, prevPartition);
public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
}

java分布式流式处理组件Producer分区的作用是什么

RoundRobinPartitioner

这是在当前版本中唯一没有被标注的类,未来说不定会成为默认分区策略类,我们不看,就瞄一眼

private int nextValue(String topic) {
    AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
    return counter.getAndIncrement();
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (!availablePartitions.isEmpty()) {
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
}

这个类的解释,嗯。。你们看那个合适吧~

java分布式流式处理组件Producer分区的作用是什么

其实这个逻辑非常简单:

我们先来做个小实验吧: 将分区策略类修改为RoundRobinPartitioner,也方便后续自定义分区器的配置操作

config.setProperty(
        ProducerConfig.PARTITIONER_CLASS_CONFIG, 
        "org.apache.kafka.clients.producer.RoundRobinPartitioner"
);

就这样就能实现,看结果验证~

java分布式流式处理组件Producer分区的作用是什么

中间穿插了一点小知识,那么接下来就会进入到我们最后一个环节:尝试自定义分区器

自定义分区器

前面我们也提到过,相信大家没有忘记partitioner.class这个配置

那么接下来就进入到重头戏:自定义分区器实战编码环节。

public class CustomPartitioner implements Partitioner {
    @Override
    public void configure(Map<String, ?> configs) {
        // nothing
    }
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 如果keyBytes == null
        // 直接去0号位置
        if (null == keyBytes) {
            return 0;
        }
        // 已默认分区策略实现
        int numPartitions = cluster.partitionsForTopic(topic).size();
        return BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
    }
    @Override
    public void close() {
        // nothing
    }
}

我们就先做的简单一点,主要是想让大家明白自定义分区器的实现:

当自定义分区器实现完成之后,接下来我们就需要通过发送者进行验证。当然了,主要还是通过partitioner.class进行修改

// 给出关键代码,其他的都是一样的。就不赘述了~~~
config.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "top.zopx.kafka.partitioner.CustomPartitioner");

通过执行之后,我们来看看它的运行效果是否满足我们的预期

java分布式流式处理组件Producer分区的作用是什么

另一种运行结果与默认分区器有Key的情况类似,这里就不再重复贴图

感谢各位的阅读,以上就是“java分布式流式处理组件Producer分区的作用是什么”的内容了,经过本文的学习后,相信大家对java分布式流式处理组件Producer分区的作用是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

推荐阅读:
  1. Ubuntu16 安装Oracle JDK 8
  2. Oracle Java SE 任意代码执行漏洞加固

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

java producer

上一篇:go语言int64整型转字符串如何实现

下一篇:windows中program files可不可以移到D盘

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》