<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
public class Event {
private String data;
}
public class EventHandler implements EventHandler<Event> {
@Override
public void onEvent(Event event, long sequence, boolean endOfBatch) {
System.out.println("Processing event: " + event.getData());
}
}
RingBuffer<Event> ringBuffer = RingBuffer.createSingleProducer(
Event::new, BUFFER_SIZE, new YieldingWaitStrategy()
);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
EventHandler eventHandler = new EventHandler();
WorkerPool<Event> workerPool = new WorkerPool<>(
ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), eventHandler
);
Sequence[] sequences = workerPool.getWorkerSequences();
ringBuffer.addGatingSequences(sequences);
Executor executor = Executors.newCachedThreadPool();
workerPool.start(executor);
EventProducer eventProducer = new EventProducer(ringBuffer);
eventProducer.publishEvent("Hello, Disruptor!");
workerPool.halt();
executor.shutdown();
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class DisruptorDemo {
private static final int BUFFER_SIZE = 1024;
public static void main(String[] args) {
RingBuffer<Event> ringBuffer = RingBuffer.createSingleProducer(
Event::new, BUFFER_SIZE, new YieldingWaitStrategy()
);
SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();
EventHandler eventHandler = new EventHandler();
WorkerPool<Event> workerPool = new WorkerPool<>(
ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), eventHandler
);
Sequence[] sequences = workerPool.getWorkerSequences();
ringBuffer.addGatingSequences(sequences);
Executor executor = Executors.newCachedThreadPool();
workerPool.start(executor);
EventProducer eventProducer = new EventProducer(ringBuffer);
eventProducer.publishEvent("Hello, Disruptor!");
workerPool.halt();
executor.shutdown();
}
public static class Event {
private String data;
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
}
public static class EventHandler implements EventHandler<Event> {
@Override
public void onEvent(Event event, long sequence, boolean endOfBatch) {
System.out.println("Processing event: " + event.getData());
}
}
public static class EventProducer {
private final RingBuffer<Event> ringBuffer;
public EventProducer(RingBuffer<Event> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void publishEvent(String data) {
long next = ringBuffer.next();
try {
Event event = ringBuffer.get(next);
event.setData(data);
} finally {
ringBuffer.publish(next);
}
}
}
}