您好,登录后才能下订单哦!
Disruptor中怎么实现一个高性能队列,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
import java.util.concurrent.ThreadFactory
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
import com.lmax.disruptor.{BlockingWaitStrategy,EventFactory,EventHandler,EventTranslatorOneArg,WaitStrategy}
object DisruptorTest {
val disruptor = {
val factory = new EventFactory[Event] {
override def newInstance(): Event = Event(-1)
}
val threadFactory = new ThreadFactory(){
override def newThread(r: Runnable): Thread = new Thread(r)
}
val disruptor = new Disruptor[Event](factory, 4, threadFactory, ProducerType.SINGLE,
new BlockingWaitStrategy())
disruptor.handleEventsWith(TestHandler).`then`(ThenHandler)
disruptor
}
val translator = new EventTranslatorOneArg[Event, Int]() {
override def translateTo(event: Event, sequence: Long, arg: Int): Unit = {
event.id = arg
println(s"translator: ${event}, sequence: ${sequence}, arg: ${arg}")
}
}
def main(args: Array[String]): Unit = {
disruptor.start()
(0 until 100).foreach { i =>
disruptor.publishEvent(translator, i)
}
disruptor.shutdown()
}
}
case class Event(var id: Int) {
override def toString: String = s"event: ${id}"
}
object TestHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}
object ThenHandler extends EventHandler[Event] {
override def onEvent(event: Event, sequence: Long, endOfBatch: Boolean): Unit = {
println(s"${this.getClass.getSimpleName} ${System.nanoTime()} ${event}")
}
}先看 Disruptor 构造方法
public Disruptor(final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}在看 RingBuffer.create, 最终通过 fill 方法 将 eventFactory.newInstance() 作为默认值,塞到 ringBuffer 里面
public static <E> RingBuffer<E> create(ProducerType producerType,
EventFactory<E> factory, int bufferSize, WaitStrategy waitStrategy) {
switch (producerType) {
case SINGLE:
return createSingleProducer(factory, bufferSize, waitStrategy);
case MULTI:
return createMultiProducer(factory, bufferSize, waitStrategy);
default:
throw new IllegalStateException(producerType.toString());
}
}
public static <E> RingBuffer<E> createSingleProducer(EventFactory<E> factory, int bufferSize,
WaitStrategy waitStrategy) {
SingleProducerSequencer sequencer = new SingleProducerSequencer(bufferSize, waitStrategy);
return new RingBuffer<E>(factory, sequencer);
}
RingBufferFields(EventFactory<E> eventFactory, Sequencer sequencer) {
this.sequencer = sequencer;
this.bufferSize = sequencer.getBufferSize();
if (bufferSize < 1) {
throw new IllegalArgumentException("bufferSize must not be less than 1");
}
if (Integer.bitCount(bufferSize) != 1) {
throw new IllegalArgumentException("bufferSize must be a power of 2");
}
this.indexMask = bufferSize - 1;
this.entries = new Object[sequencer.getBufferSize() + 2 * BUFFER_PAD];
fill(eventFactory);
}
private void fill(EventFactory<E> eventFactory) {
for (int i = 0; i < bufferSize; i++) {
entries[BUFFER_PAD + i] = eventFactory.newInstance();
}
}首先看 disruptor.start(): 消费事件消息入口
private final ConsumerRepository<T> consumerRepository = new ConsumerRepository<>();
public RingBuffer<T> start() {
checkOnlyStartedOnce();
for (final ConsumerInfo consumerInfo : consumerRepository) {
consumerInfo.start(executor);
}
return ringBuffer;
}consumerRepository 类型由 disruptor.handleEventsWith(TestHandler) 初始化, 并构造事件消息处理链
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences, final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor = new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}回头看 disruptor.start() 中的 consumerInfo.start(executor) executor = new BasicExecutor(threadFactory),BasicExecutor 在每次 execute 任务时,都会 new thread **但是 consumerRepository 的数量是有限的,所以 new thread 也没啥问题
public Disruptor(
final EventFactory<T> eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy) {
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
private Disruptor(final RingBuffer<T> ringBuffer, final Executor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
@Override
public void start(final java.util.concurrent.Executor executor){
//EventProcessor extends Runnable
//executor = BasicExecutor
executor.execute(eventprocessor);
}
public final class BatchEventProcessor<T> implements EventProcessor {
@Override
public void run() {
if (running.compareAndSet(IDLE, RUNNING)) {
sequenceBarrier.clearAlert();
notifyStart();
try {
if (running.get() == RUNNING) {
processEvents();
}
} finally {
notifyShutdown();
running.set(IDLE);
}
} else {
if (running.get() == RUNNING) {
throw new IllegalStateException("Thread is already running");
} else {
earlyExit();
}
}
}
}
private void processEvents() {
T event = null;
long nextSequence = sequence.get() + 1L;
while (true) {
try {
final long availableSequence = sequenceBarrier.waitFor(nextSequence);
if (batchStartAware != null) {
batchStartAware.onBatchStart(availableSequence - nextSequence + 1);
}
while (nextSequence <= availableSequence) {
event = dataProvider.get(nextSequence);
eventHandler.onEvent(event, nextSequence, nextSequence == availableSequence);
nextSequence++;
}
sequence.set(availableSequence);
} catch (final TimeoutException e) {
notifyTimeout(sequence.get());
} catch (final AlertException ex) {
if (running.get() != RUNNING) {
break;
}
} catch (final Throwable ex) {
exceptionHandler.handleEventException(ex, nextSequence, event);
sequence.set(nextSequence);
nextSequence++;
}
}
}executor.execute 也就是 BasicExecutor.execute(eventHandler) 会异步的执行 eventHandler, 也就是调用 BatchEventProcessor.run 方法
问题来了,既然是异步执行,多个 eventHandler 是怎么按照顺序去处理事件消息的?
我们看 processEvents 方法执行逻辑
先获取 BatchEventProcessor.sequence 并 +1
通过 sequenceBarrier.waitFor 也就是 WaitStrategy.waitFor 获取到可用的 availableSequence
先看下 BlockingWaitStrategy.waitFor 的实现
public long waitFor(long sequence, Sequence cursorSequence, Sequence dependentSequence,
SequenceBarrier barrier)
throws AlertException, InterruptedException {
long availableSequence;
if (cursorSequence.get() < sequence) {
lock.lock();
try {
while (cursorSequence.get() < sequence) {
barrier.checkAlert();
processorNotifyCondition.await();
}
}
finally {
lock.unlock();
}
}
while ((availableSequence = dependentSequence.get()) < sequence) {
barrier.checkAlert();
}
return availableSequence;
}如果 cursorSequence(ringbuffer 的索引) < sequence(batchEventProcessor 的索引) 则batchEventProcessor挂起等待 否则 就用 dependentSequence 作为 availableSequence 返回 然后 batchEventProcessor 会将 availableSequence 索引之前的数据一次性处理完,并更新自身的 sequence 索引值
dependentSequence 由 ProcessingSequenceBarrier 构造方法初始化
final class ProcessingSequenceBarrier implements SequenceBarrier {
private final WaitStrategy waitStrategy;
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private final Sequence cursorSequence;
private final Sequencer sequencer;
ProcessingSequenceBarrier(final Sequencer sequencer, final WaitStrategy waitStrategy,
final Sequence cursorSequence, final Sequence[] dependentSequences) {
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length) {
dependentSequence = cursorSequence;
} else {
dependentSequence = new FixedSequenceGroup(dependentSequences);
}
}
}在 Disruptor.createEventProcessors 中的, 进行了初始化 ProcessingSequenceBarrier final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences) createEventProcessors 仅会被 Disruptor.handleEventsWith 和 EventHandlerGroup.handleEventsWith
public class Disruptor<T> {
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return createEventProcessors(new Sequence[0], handlers);
}
EventHandlerGroup<T> createEventProcessors(final Sequence[] barrierSequences,
final EventHandler<? super T>[] eventHandlers) {
checkNotStarted();
final Sequence[] processorSequences = new Sequence[eventHandlers.length];
final SequenceBarrier barrier = ringBuffer.newBarrier(barrierSequences);
for (int i = 0, eventHandlersLength = eventHandlers.length; i < eventHandlersLength; i++) {
final EventHandler<? super T> eventHandler = eventHandlers[i];
final BatchEventProcessor<T> batchEventProcessor =
new BatchEventProcessor<>(ringBuffer, barrier, eventHandler);
if (exceptionHandler != null) {
batchEventProcessor.setExceptionHandler(exceptionHandler);
}
consumerRepository.add(batchEventProcessor, eventHandler, barrier);
processorSequences[i] = batchEventProcessor.getSequence();
}
updateGatingSequencesForNextInChain(barrierSequences, processorSequences);
return new EventHandlerGroup<>(this, consumerRepository, processorSequences);
}
}
public class EventHandlerGroup<T> {
private final Disruptor<T> disruptor;
private final ConsumerRepository<T> consumerRepository;
private final Sequence[] sequences;
EventHandlerGroup(final Disruptor<T> disruptor, final ConsumerRepository<T> consumerRepository,
final Sequence[] sequences) {
this.disruptor = disruptor;
this.consumerRepository = consumerRepository;
this.sequences = Arrays.copyOf(sequences, sequences.length);
}
public final EventHandlerGroup<T> handleEventsWith(final EventHandler<? super T>... handlers) {
return disruptor.createEventProcessors(sequences, handlers);
}
public final EventHandlerGroup<T> then(final EventHandler<? super T>... handlers) {
return handleEventsWith(handlers);
}
}EventHandlerGroup 会拷贝一份 batchEventProcessor 中的 sequence demo 例子中 disruptor.handleEventsWith(TestHandler).then(ThenHandler) 通过 then 方法将 TestHandler 中的 sequence 传递给 ThenHandler 这样 ThenHandler 就依赖了 TestHandler, ThenHandler 就会在 TestHandler 后执行
接着看 disruptor.publishEvent(translator, i) 就是往 ringBuffer 里面放数据,
public <A> void publishEvent(EventTranslatorOneArg<E, A> translator, A arg0) {
final long sequence = sequencer.next();
translateAndPublish(translator, sequence, arg0);
}
private <A> void translateAndPublish(EventTranslatorOneArg<E, A> translator, long sequence, A arg0) {
try {
translator.translateTo(get(sequence), sequence, arg0);
} finally {
sequencer.publish(sequence);
}
}
public E get(long sequence) {
return elementAt(sequence);
}get(sequence) 根据 sequence [ringbuffer 索引] 获取 ringbuffer 数组里的对象 translator 将其处理替换完后,ringbuffer 数组的的值将是新的值,publish 将会更新索引的标记位 waitStrategy.signalAllWhenBlocking() 会通知阻塞等待的消费者去继续消费消息
protected final Sequence cursor = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
@Override
public void publish(long sequence) {
cursor.set(sequence);
waitStrategy.signalAllWhenBlocking();
}流程理清楚了,我们看看 知识点
ringbuffer
内存使用率很高,不会造成内存碎片,几乎没有浪费。业务处理的同一时间,访问的内存数据段集中。 可以更好的适应不同系统,取得较高的性能。内存的物理布局简单单一,不太容易发生内存越界、悬空指针等 bug,出了问题也容易在内存级别分析调试。 做出来的系统容易保持健壮。
cpu cache
CPU 访问内存时会等待,导致计算资源大量闲置,降低 CPU 整体吞吐量。 由于内存数据访问的热点集中性,在 CPU 和内存之间用较为快速而成本较高(相对于内存)的介质做一层缓存,就显得性价比极高了
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注亿速云行业资讯频道,感谢您对亿速云的支持。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。