在Spring整合websocket整合應(yīng)用示例(上)文章中,我們已經(jīng)實現(xiàn)了websocket,但還有一個核心的業(yè)務(wù)實現(xiàn)類沒有實現(xiàn),這里我們就實現(xiàn)這個業(yè)務(wù)核心類,因為老夫參與的這個系統(tǒng)使用websocket發(fā)送消息,所以其實現(xiàn)就是如何發(fā)送消息了。
7. NewsListenerImpl的實現(xiàn)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
package cn.bridgeli.websocket; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.lagou.common.base.util.date.DateUtil; import com.lagou.platform.news.api.enumeration.PlatNewsCategoryType; import com.lagou.platform.news.web.dao.ext.model.PlatNewsVo; import com.lagou.platform.news.web.dao.ext.model.SearchCondition; import com.lagou.platform.news.web.quartz.impl.TimingJob; import com.lagou.platform.news.web.service.PlatNewsService; import org.apache.commons.lang.StringUtils; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.TextMessage; import java.io.IOException; import java.util.Date; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @Description : 站內(nèi)消息監(jiān)聽器實現(xiàn) * @Date : 16-3-7 */ @Component public class NewsListenerImpl implements NewsListener{ private static final Logger logger = LoggerFactory.getLogger(NewsListenerImpl. class ); Gson gson = new GsonBuilder().setDateFormat( "yyyy-MM-dd HH:mm:ss" ).create(); //線程池 private ExecutorService executorService = Executors.newCachedThreadPool(); //任務(wù)調(diào)度 private SchedulerFactory sf = new StdSchedulerFactory(); @Autowired private PlatNewsService platNewsService; @Override public void afterPersist(PlatNewsVo platNewsVo) { logger.info( "監(jiān)聽到有新消息添加。。。" ); logger.info( "新消息為:" +gson.toJson(platNewsVo)); //啟動線程 if ( null != platNewsVo && !StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){ //如果是定時消息 if (platNewsVo.getNewsType() == PlatNewsCategoryType.TIMING_TIME.getCategoryId()){ startTimingTask(platNewsVo); //定時推送 } else { //立即推送 executorService.execute( new AfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail())); } } } @Override public void afterConnectionEstablished(String email) { logger.info( "建立websocket連接后推送新消息。。。" ); if (!StringUtils.isBlank(email)){ executorService.execute( new AfterConnectionEstablishedTask(email)); } } /** * @Description : 如果新添加了定時消息,啟動定時消息任務(wù) * @param platNewsVo */ private void startTimingTask(PlatNewsVo platNewsVo){ logger.info( "開始定時推送消息任務(wù)。。。" ); Date timingTime = platNewsVo.getTimingTime(); if ( null == timingTime){ logger.info( "定時消息時間為null。" ); return ; } logger.info( "定時推送任務(wù)時間為:" +DateUtil.date2String(timingTime)); JobDetail jobDetail= JobBuilder.newJob(TimingJob. class ) .withIdentity(platNewsVo.getCurrentoperatoremail()+ "定時消息" +platNewsVo.getId(), "站內(nèi)消息" ) .build(); //傳遞參數(shù) jobDetail.getJobDataMap().put( "platNewsService" ,platNewsService); jobDetail.getJobDataMap().put( "userEmail" ,platNewsVo.getCurrentoperatoremail()); Trigger trigger= TriggerBuilder .newTrigger() .withIdentity( "定時消息觸發(fā)" +platNewsVo.getId(), "站內(nèi)消息" ) .startAt(timingTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds( 0 ) //時間間隔 .withRepeatCount( 0 ) //重復(fù)次數(shù) ) .build(); //啟動定時任務(wù) try { Scheduler sched = sf.getScheduler(); sched.scheduleJob(jobDetail,trigger); if (!sched.isShutdown()){ sched.start(); } } catch (SchedulerException e) { logger.info(e.toString()); } logger.info( "完成開啟定時推送消息任務(wù)。。。" ); } /** * @Description : 建立websocket鏈接后的推送線程 */ class AfterConnectionEstablishedTask implements Runnable{ String email ; public AfterConnectionEstablishedTask(String email){ this .email = email; } @Override public void run() { logger.info( "開始推送消息給用戶:" +email+ "。。。" ); if (!StringUtils.isBlank(email)){ SearchCondition searchCondition = new SearchCondition(); searchCondition.setOperatorEmail(email); JSONArray jsonArray = new JSONArray(); for (PlatNewsCategoryType type : PlatNewsCategoryType.values()){ searchCondition.setTypeId(type.getCategoryId()); int count = platNewsService.countPlatNewsByExample(searchCondition); JSONObject object = new JSONObject(); object.put( "name" ,type.name()); object.put( "description" ,type.getDescription()); object.put( "count" ,count); jsonArray.add(object); } if ( null != jsonArray && jsonArray.size()> 0 ){ UserSocketVo userSocketVo = WSSessionLocalCache.get(email); TextMessage reMessage = new TextMessage(gson.toJson(jsonArray)); try { if ( null != userSocketVo){ //推送消息 userSocketVo.getWebSocketSession().sendMessage(reMessage); //更新推送時間 userSocketVo.setLastSendTime(DateUtil.getNowDate()); logger.info( "完成推送新消息給用戶:" +userSocketVo.getUserEmail()+ "。。。" ); } } catch (IOException e) { logger.error(e.toString()); logger.info( "站內(nèi)消息推送失敗。。。" +e.toString()); } } } logger.info( "結(jié)束推送消息給" +email+ "。。。" ); } } } |
這個類就是websocket的核心業(yè)務(wù)的實現(xiàn),其具體肯定和業(yè)務(wù)相關(guān),由于業(yè)務(wù)的不同,實現(xiàn)肯定不同,因為老夫參與的系統(tǒng)是發(fā)送消息,所以里面最核心的一句就是:
1
|
userSocketVo.getWebSocketSession().sendMessage(reMessage); |
通過WebSocketSession的sendMessage方法把我們的消息發(fā)送出去。另外,這主要是后端的實現(xiàn),至于前端的實現(xiàn),因為老夫是后端程序猿比較關(guān)注后端,所以前端就不多做介紹了,大家可以自己去網(wǎng)上查資料。最后需要說明的是,老夫之前搜一些學(xué)習(xí)資料的時候,發(fā)現(xiàn)老夫該同事的寫法和有一篇文章幾乎一樣,我想該同事應(yīng)該是參考了這篇文章,所以列在下面,算作參考資料。