在C#中开发Kafka客户端,您可以使用Confluent.Kafka
库。这个库提供了对Apache Kafka的完整支持,包括生产者和消费者。以下是一个简单的示例,展示了如何使用Confluent.Kafka
库创建一个生产者和一个消费者。
首先,您需要安装Confluent.Kafka
库。在Visual Studio中,您可以通过NuGet包管理器来安装:
Install-Package Confluent.Kafka
接下来,创建一个生产者:
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaProducer
{
class Program
{
static async Task Main(string[] args)
{
var conf = new ProducerConfig
{
BootstrapServers = "localhost:9092",
KeySerializer = typeof(string).AssemblyQualifiedName,
ValueSerializer = typeof(string).AssemblyQualifiedName
};
using (var producer = new Producer<string, string>(conf))
{
await producer.ProduceAsync("my-topic", new Message<string, string> { Key = "key", Value = "value" });
}
}
}
}
然后,创建一个消费者:
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace KafkaConsumer
{
class Program
{
static async Task Main(string[] args)
{
var conf = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "my-group",
KeyDeserializer = typeof(string).AssemblyQualifiedName,
ValueDeserializer = typeof(string).AssemblyQualifiedName
};
using (var consumer = new Consumer<string, string>(conf))
{
consumer.Subscribe(new[] { "my-topic" });
while (true)
{
var msg = await consumer.ConsumeAsync();
Console.WriteLine($"Received message: key={msg.Key}, value={msg.Value}");
}
}
}
}
}
这个示例中,生产者将一个键值对发送到名为my-topic
的主题,消费者则从该主题接收消息。请注意,您需要根据您的Kafka集群配置修改BootstrapServers
和其他设置。