<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.2</version> </dependency> public class Event { private String data; } public class EventHandler implements EventHandler<Event> { @Override public void onEvent(Event event, long sequence, boolean endOfBatch) { System.out.println("Processing event: " + event.getData()); } } RingBuffer<Event> ringBuffer = RingBuffer.createSingleProducer( Event::new, BUFFER_SIZE, new YieldingWaitStrategy() ); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); EventHandler eventHandler = new EventHandler(); WorkerPool<Event> workerPool = new WorkerPool<>( ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), eventHandler ); Sequence[] sequences = workerPool.getWorkerSequences(); ringBuffer.addGatingSequences(sequences); Executor executor = Executors.newCachedThreadPool(); workerPool.start(executor); EventProducer eventProducer = new EventProducer(ringBuffer); eventProducer.publishEvent("Hello, Disruptor!"); workerPool.halt(); executor.shutdown(); import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkerPool; import com.lmax.disruptor.YieldingWaitStrategy; import java.util.concurrent.Executor; import java.util.concurrent.Executors; public class DisruptorDemo { private static final int BUFFER_SIZE = 1024; public static void main(String[] args) { RingBuffer<Event> ringBuffer = RingBuffer.createSingleProducer( Event::new, BUFFER_SIZE, new YieldingWaitStrategy() ); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); EventHandler eventHandler = new EventHandler(); WorkerPool<Event> workerPool = new WorkerPool<>( ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), eventHandler ); Sequence[] sequences = workerPool.getWorkerSequences(); ringBuffer.addGatingSequences(sequences); Executor executor = Executors.newCachedThreadPool(); workerPool.start(executor); EventProducer eventProducer = new EventProducer(ringBuffer); eventProducer.publishEvent("Hello, Disruptor!"); workerPool.halt(); executor.shutdown(); } public static class Event { private String data; public String getData() { return data; } public void setData(String data) { this.data = data; } } public static class EventHandler implements EventHandler<Event> { @Override public void onEvent(Event event, long sequence, boolean endOfBatch) { System.out.println("Processing event: " + event.getData()); } } public static class EventProducer { private final RingBuffer<Event> ringBuffer; public EventProducer(RingBuffer<Event> ringBuffer) { this.ringBuffer = ringBuffer; } public void publishEvent(String data) { long next = ringBuffer.next(); try { Event event = ringBuffer.get(next); event.setData(data); } finally { ringBuffer.publish(next); } } } }


上一篇:
下一篇:
切换中文