您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
这篇文章主要介绍Flink中Connectors如何连接Kafka,文中介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们一定要看完!
通过使用Flink DataStream Connectors 数据流连接器连接到ElasticSearch搜索引擎的文档数据库Index,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1kafka:2.11
数据流输入
DataStreamSource.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; /** * @Description 从Kafka中消费数据 */ public class DataStreamSource { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度(使用几个CPU核心) env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); //1.消费者客户端连接到kafka Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-45"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), props); //setStartFromEarliest()会从最早的数据开始进行消费,忽略存储的offset信息 //consumer.setStartFromEarliest(); //Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略 //consumer.setStartFromTimestamp(1559801580000L); //Flink从topic中最新的数据开始消费 //consumer.setStartFromLatest(); //Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数 //consumer.setStartFromGroupOffsets(); //2.在算子中进行处理 DataStream<TUser> sourceStream = env.addSource(consumer) .filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value)) .map((MapFunction<String, TUser>) value -> { System.out.println("print:" + value); //注意,因已开启enableCheckpointing容错定期检查状态机制,当算子出现错误时, //会导致数据流恢复到最新checkpoint的状态,并从存储在checkpoint中的offset开始重新消费Kafka中的消息。 //因此会有可能导制数据重复消费,重复错误,陷入死循环。加上try|catch,捕获错误后再正确输出。 Gson gson = new Gson(); try { TUser user = gson.fromJson(value, TUser.class); return user; }catch(Exception e){ System.out.println("error:" + e.getMessage()); } return new TUser(); }) .returns(TUser.class); sourceStream.print(); //3.执行 env.execute("flink kafka source"); } }
数据流输出
DataStreamSink.java
package com.flink.examples.kafka; import com.flink.examples.TUser; import com.google.gson.Gson; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import java.util.Properties; /** * @Description 将生产者数据写入到kafka */ public class DataStreamSink { /** * 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/kafka.html */ public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //必需设置setParallelism并行度,否则不会输出 env.setParallelism(1); //每隔2000ms进行启动一个检查点 env.enableCheckpointing(2000); //设置模式为exactly-once env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 确保检查点之间有进行500 ms的进度 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 检查点必须在一分钟内完成,或者被丢弃 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一时间只允许进行一个检查点 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //1.连接kafka Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.110.35:9092"); FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props); //2.创建数据,并写入数据到流中 TUser user = new TUser(); user.setId(8); user.setName("liu3"); user.setAge(22); user.setSex(1); user.setAddress("CN"); user.setCreateTimeSeries(1598889600000L); DataStream<String> sourceStream = env.fromElements(user).map((MapFunction<TUser, String>) value -> new Gson().toJson(value)); //3.将数据流输入到kafka sourceStream.addSink(producer); sourceStream.print(); env.execute("flink kafka sink"); } }
在kafka上创建名称为test的topic
先启动DataStreamSource.java获取输出流,在启动DataStreamSink.java输入流
数据展示
以上是“Flink中Connectors如何连接Kafka”这篇文章的所有内容,感谢各位的阅读!希望分享的内容对大家有帮助,更多相关知识,欢迎关注亿速云行业资讯频道!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。