disruptor不過(guò)多介紹了,描述下當(dāng)前的業(yè)務(wù)場(chǎng)景,兩個(gè)應(yīng)用A,B,應(yīng)用 A 向應(yīng)用 B 傳遞數(shù)據(jù) . 數(shù)據(jù)傳送比較快,如果用http直接push數(shù)據(jù)然后入庫(kù),效率不高.有可能導(dǎo)致A應(yīng)用比較大的壓力. 使用mq 太重量級(jí),所以選擇了disruptor. 也可以使用Reactor
BaseQueueHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
/** * lmax.disruptor 高效隊(duì)列處理模板. 支持初始隊(duì)列,即在init()前進(jìn)行發(fā)布。 * * 調(diào)用init()時(shí)才真正啟動(dòng)線程開始處理 系統(tǒng)退出自動(dòng)清理資源. * * @author xielongwang * @create 2018-01-18 下午3:49 * @email [email protected] * @description */ public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> { /** * 記錄所有的隊(duì)列,系統(tǒng)退出時(shí)統(tǒng)一清理資源 */ private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>(); /** * Disruptor 對(duì)象 */ private Disruptor<E> disruptor; /** * RingBuffer */ private RingBuffer<E> ringBuffer; /** * initQueue */ private List<D> initQueue = new ArrayList<D>(); /** * 隊(duì)列大小 * * @return 隊(duì)列長(zhǎng)度,必須是2的冪 */ protected abstract int getQueueSize(); /** * 事件工廠 * * @return EventFactory */ protected abstract EventFactory<E> eventFactory(); /** * 事件消費(fèi)者 * * @return WorkHandler[] */ protected abstract WorkHandler[] getHandler(); /** * 初始化 */ public void init() { ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat( "DisruptorThreadPool" ).build(); disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy()); disruptor.setDefaultExceptionHandler( new MyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer = disruptor.start(); //初始化數(shù)據(jù)發(fā)布 for (D data : initQueue) { ringBuffer.publishEvent( new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } //加入資源清理鉤子 synchronized (queueHelperList) { if (queueHelperList.isEmpty()) { Runtime.getRuntime().addShutdownHook( new Thread() { @Override public void run() { for (BaseQueueHelper baseQueueHelper : queueHelperList) { baseQueueHelper.shutdown(); } } }); } queueHelperList.add( this ); } } /** * 如果要改變線程執(zhí)行優(yōu)先級(jí),override此策略. YieldingWaitStrategy會(huì)提高響應(yīng)并在閑時(shí)占用70%以上CPU, * 慎用SleepingWaitStrategy會(huì)降低響應(yīng)更減少CPU占用,用于日志等場(chǎng)景. * * @return WaitStrategy */ protected abstract WaitStrategy getStrategy(); /** * 插入隊(duì)列消息,支持在對(duì)象init前插入隊(duì)列,則在隊(duì)列建立時(shí)立即發(fā)布到隊(duì)列處理. */ public synchronized void publishEvent(D data) { if (ringBuffer == null ) { initQueue.add(data); return ; } ringBuffer.publishEvent( new EventTranslatorOneArg<E, D>() { @Override public void translateTo(E event, long sequence, D data) { event.setValue(data); } }, data); } /** * 關(guān)閉隊(duì)列 */ public void shutdown() { disruptor.shutdown(); } } |
EventFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
|
/** * @author xielongwang * @create 2018-01-18 下午6:24 * @email [email protected] * @description */ public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> { @Override public SeriesDataEvent newInstance() { return new SeriesDataEvent(); } } |
MyHandlerException.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class MyHandlerException implements ExceptionHandler { private Logger logger = LoggerFactory.getLogger(MyHandlerException. class ); /* * (non-Javadoc) 運(yùn)行過(guò)程中發(fā)生時(shí)的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable * , long, java.lang.Object) */ @Override public void handleEventException(Throwable ex, long sequence, Object event) { ex.printStackTrace(); logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage()); } /* * (non-Javadoc) 啟動(dòng)時(shí)的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. * Throwable) */ @Override public void handleOnStartException(Throwable ex) { logger.error("start disruptor error ==[{}]!", ex.getMessage()); } /* * (non-Javadoc) 關(guān)閉時(shí)的異常 * * @see * com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang * .Throwable) */ @Override public void handleOnShutdownException(Throwable ex) { logger.error( "shutdown disruptor error ==[{}]!" , ex.getMessage()); } } |
SeriesData.java (代表應(yīng)用A發(fā)送給應(yīng)用B的消息)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class SeriesData { private String deviceInfoStr; public SeriesData() { } public SeriesData(String deviceInfoStr) { this .deviceInfoStr = deviceInfoStr; } public String getDeviceInfoStr() { return deviceInfoStr; } public void setDeviceInfoStr(String deviceInfoStr) { this .deviceInfoStr = deviceInfoStr; } @Override public String toString() { return "SeriesData{" + "deviceInfoStr='" + deviceInfoStr + '\ '' + '}' ; } } |
SeriesDataEvent.java
1
2
|
public class SeriesDataEvent extends ValueWrapper<SeriesData> { } |
SeriesDataEventHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> { private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler. class ); @Autowired private DeviceInfoService deviceInfoService; @Override public void onEvent(SeriesDataEvent event) { if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) { logger.warn( "receiver series data is empty!" ); } //業(yè)務(wù)處理 deviceInfoService.processData(event.getValue().getDeviceInfoStr()); } } |
SeriesDataEventQueueHelper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
@Component public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean { private static final int QUEUE_SIZE = 1024 ; @Autowired private List<SeriesDataEventHandler> seriesDataEventHandler; @Override protected int getQueueSize() { return QUEUE_SIZE; } @Override protected com.lmax.disruptor.EventFactory eventFactory() { return new EventFactory(); } @Override protected WorkHandler[] getHandler() { int size = seriesDataEventHandler.size(); SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray( new SeriesDataEventHandler[size]); return paramEventHandlers; } @Override protected WaitStrategy getStrategy() { return new BlockingWaitStrategy(); //return new YieldingWaitStrategy(); } @Override public void afterPropertiesSet() throws Exception { this .init(); } } |
ValueWrapper.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public abstract class ValueWrapper<T> { private T value; public ValueWrapper() {} public ValueWrapper(T value) { this .value = value; } public T getValue() { return value; } public void setValue(T value) { this .value = value; } } |
DisruptorConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
@Configuration @ComponentScan (value = { "com.portal.disruptor" }) //多實(shí)例幾個(gè)消費(fèi)者 public class DisruptorConfig { /** * smsParamEventHandler1 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler1() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler2 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler2() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler3 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler3() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler4 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler4() { return new SeriesDataEventHandler(); } /** * smsParamEventHandler5 * * @return SeriesDataEventHandler */ @Bean public SeriesDataEventHandler smsParamEventHandler5() { return new SeriesDataEventHandler(); } } |
測(cè)試
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//注入SeriesDataEventQueueHelper消息生產(chǎn)者 @Autowired private SeriesDataEventQueueHelper seriesDataEventQueueHelper; @RequestMapping (value = "/data" , method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE) public DataResponseVo<String> receiverDeviceData( @RequestBody String deviceData) { long startTime1 = System.currentTimeMillis(); if (StringUtils.isEmpty(deviceData)) { logger.info( "receiver data is empty !" ); return new DataResponseVo<String>( 400 , "failed" ); } seriesDataEventQueueHelper.publishEvent( new SeriesData(deviceData)); long startTime2 = System.currentTimeMillis(); logger.info( "receiver data ==[{}] millisecond ==[{}]" , deviceData, startTime2 - startTime1); return new DataResponseVo<String>( 200 , "success" ); } |
應(yīng)用A通過(guò)/data 接口把數(shù)據(jù)發(fā)送到應(yīng)用B ,然后通過(guò)seriesDataEventQueueHelper 把消息發(fā)給disruptor隊(duì)列,消費(fèi)者去消費(fèi),整個(gè)過(guò)程對(duì)不會(huì)堵塞應(yīng)用A. 可接受消息丟失, 可以通過(guò)擴(kuò)展SeriesDataEventQueueHelper來(lái)達(dá)到對(duì)disruptor隊(duì)列的監(jiān)控
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。
原文鏈接:http://blog.csdn.net/u014087707/article/details/79340463