如何使用OpenTracing和Jaeger 追踪 Pulsar消息

发布时间:2021-11-09 18:55:49 作者:柒染
来源:亿速云 阅读:224

本篇文章给大家分享的是有关如何使用OpenTracing和Jaeger 追踪 Pulsar消息,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

OpenTracing(https://opentracing.io/) 是针对应用程序和 OSS(Open-Source Software)软件包的开放分布式追踪标准。许多追踪后端服务都支持 OpenTracing API,例如 Jaeger、Zipkin 和 SkyWalking。


准备工作

在开始前,需要安装好 JDK 8、Maven 3 和 Pulsar(集群模式或单机模式)。如果还没有安装 Pulsar,可以查看下方链接,按照提示进行安装。

http://pulsar.apache.org/docs/en/standalone/

第 1 步:启动 Jaeger 后端

1. 在 Docker 中启动 Jaeger 后端。

docker run -d -p 6831:6831/udp -p 16686:16686 jaegertracing/all-in-one:latest

成功启动 Jaeger 后,就可以打开 Jaeger UI 网站。

???? 如何你没有 Jaeger Docker 环境,可以:

2. 访问 `http://localhost:16686`,无需填写用户名或密码就可以打开 Jeager UI 网站。

如何使用OpenTracing和Jaeger 追踪 Pulsar消息

第 2 步:添加 maven dependencies

本示例使用 Open Tracing Pulsar Client。
https://hub.streamnative.io/monitoring/opentracing-pulsar-client/0.1.0

它是 Pulsar Client 与 OpenTracing API(基于 Pulsar Client Interceptors)的集成,用于追踪 Pulsar 消息。OpenTracing Pulsar Client 由 StreamNative 研发,是 StreamNatvie Hub(https://hub.streamnative.io/) 中的监控工具。

添加 Jaeger client dependency 以连接到 Jaeger 后端。

<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.5.1</version></dependency>
<dependency> <groupId>io.streamnative</groupId> <artifactId>opentracing-pulsar-client</artifactId> <version>0.1.0</version></dependency>
<dependency>  <groupId>io.jaegertracing</groupId>  <artifactId>jaeger-client</artifactId>  <version>1.2.0</version></dependency>

第 3 步:使用 OpenTracing Pulsar Client

为便于理解,本示例假设有 2 个 Job 和 2 个 topic。Job-1 向 topic-A 发送消息,Job-2 从 topc-A 消费消息。当 Job 2 收到 topic-A 的消息后,Job 2 会向 topic-B 发送消息,然后 Job-3 从 topic-B 消费消息。因此,在这种情况下有 2 个 topic、2 个 producer 和 2 个 consumer。

要完成上述工作场景中的任务,需要启动三个应用程序。

????Job-1

以下示例为发布消息至 topic-A。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-1").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder()        .serviceUrl("pulsar://localhost:6650")        .build();Producer<String> producerA = client.newProducer(Schema.STRING)        .topic("topic-A")        .intercept(new TracingProducerInterceptor())        .create();for (int i = 0; i < 10; i++) {    producerA.newMessage().value(String.format("[%d] Hello", i)).send();}
????Job-2

以下示例为从 topic-A 消费消息,并将消息发布到 topic-B。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-2").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder()        .serviceUrl("pulsar://localhost:6650")        .build();Consumer<String> consumer = client.newConsumer(Schema.STRING)        .topic("topic-A")        .subscriptionName("open-tracing")        .subscriptionType(SubscriptionType.Shared)        .intercept(new TracingConsumerInterceptor<>())        .subscribe();Producer<String> producerB = client.newProducer(Schema.STRING)        .topic("topic-B")        .intercept(new TracingProducerInterceptor())        .create();while (true) {    Message<String> received = consumer.receive();    SpanContext context = TracingPulsarUtils.extractSpanContext(received, tracer);    TypedMessageBuilder<String> messageBuilder = producerB.newMessage();    messageBuilder.value(received.getValue() + " Pulsar and OpenTracing!");    // Inject parent span context    tracer.inject(context, Format.Builtin.TEXT_MAP, new TypeMessageBuilderInjectAdapter(messageBuilder));    messageBuilder.send();    consumer.acknowledge(received);}

????Job-3

以下示例为从 topic-B 消费消息。

Configuration.SamplerConfiguration samplerConfig = Configuration.SamplerConfiguration.fromEnv().withType("const").withParam(1);Configuration.ReporterConfiguration reporterConfig = Configuration.ReporterConfiguration.fromEnv().withLogSpans(true);Configuration configuration = new Configuration("Job-3").withSampler(samplerConfig).withReporter(reporterConfig);Tracer tracer = configuration.getTracer();GlobalTracer.registerIfAbsent(tracer);PulsarClient client = PulsarClient.builder()        .serviceUrl("pulsar://localhost:6650")        .build();Consumer<String> consumer = client.newConsumer(Schema.STRING)        .topic("topic-B")        .subscriptionName("open-tracing")        .subscriptionType(SubscriptionType.Shared)        .intercept(new TracingConsumerInterceptor<>())        .subscribe();while (true) {    Message<String> received = consumer.receive();    System.out.println(received.getValue());    consumer.acknowledge(received);}

现在,可以分别运行 Job-3、Job-2 和 Job-1。控制台中会出现 Job-3 接收的日志,如下:

[0] Hello Pulsar and OpenTracing![1] Hello Pulsar and OpenTracing!...[9] Hello Pulsar and OpenTracing!

现在,你可以再次打开 Jaeger UI,页面中会出现十条消息追踪链路。

如何使用OpenTracing和Jaeger 追踪 Pulsar消息

点击任务名称即可查看消息追踪链路的详细信息。

如何使用OpenTracing和Jaeger 追踪 Pulsar消息

可以从 span 名称轻松辨别是 producer 还是 consumer 发布了此条消息,span 名称格式为 `To__<topic-name>` 和 `From__<topic-name>__<subscription_name>`。

以上就是如何使用OpenTracing和Jaeger 追踪 Pulsar消息,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。

推荐阅读:
  1. Rabbitmq 简单介绍,安装和go客户端使用
  2. 基于Spring Cloud Netflix的TCC柔性事务和EDA事件驱动示例

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

opentracing jaeger pulsar

上一篇:如何使用Apache SkyWalking对Apache Pulsar进行消息链路追踪与性能监控

下一篇:Django中的unittest应用是什么

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》