基于webSocket的聊天室

不知大家在平时的需求中有没有遇到需要实时处理信息的情况,如站内信,订阅,聊天之类的。在这之前我们通常想到的方法一般都是采用轮训的方式每隔一定的时间向服务器发送请求从而获得最新的数据,但这样会浪费掉很多的资源并且也不是实时的,于是随着HTML5的推出带来了websocket可以根本的解决以上问题实现真正的实时传输。

websocket是什么?

至于websocket是什么、有什么用这样的问题一Google一大把,这里我就简要的说些websocket再本次实例中的作用吧。
由于在本次实例中需要实现的是一个聊天室,一个实时的聊天室。如下图:


采用websocket之后可以让前端和和后端像C/S模式一样实时通信,不再需要每次单独发送请求。由于是基于H5的所以对于老的浏览器如IE7、IE8之类的就没办法了,不过H5是大势所趋这点不用担心。

后端

既然推出了websocket,作为现在主流的Java肯定也有相应的支持,所以在JavaEE7之后也对websocket做出了规范,所以本次的代码理论上是要运行在Java1.7+和Tomcat7.0+之上的。
看过我前面几篇文章的朋友应该都知道本次实例也是运行在之前的SSM之上的,所以这里就不再赘述了。
首先第一步需要加入websocket的依赖:

1 package com.css.tax.mobilebs.util; 2 3 import java.io.IOException; 4 import java.io.UnsupportedEncodingException; 5 import java.util.Date; 6 import java.util.HashMap; 7 import java.util.Iterator; 8 import java.util.Map; 9 import java.util.concurrent.CopyOnWriteArraySet; 10 11 import javax.websocket.OnClose; 12 import javax.websocket.OnError; 13 import javax.websocket.OnMessage; 14 import javax.websocket.OnOpen; 15 import javax.websocket.Session; 16 import javax.websocket.server.PathParam; 17 import javax.websocket.server.ServerEndpoint; 18 19 import org.g4studio.common.dao.Reader; 20 import org.g4studio.common.service.impl.BaseServiceImpl; 21 import org.g4studio.common.util.SpringBeanLoader; 22 import org.g4studio.common.web.BaseAction; 23 import org.g4studio.core.metatype.Dto; 24 import org.g4studio.core.metatype.impl.BaseDto; 25 import org.junit.Test; 26 27 import com.css.tax.mobilebs.Vo.CurrentUserVo; 28 import com.css.tax.mobilebs.serviceI.WebSocketService; 29 import com.css.tax.mobilebs.serviceI.ZjzzService; 30 31 /** 32 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, 33 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 34 */ 35 @ServerEndpoint("/websocket/{user}/{id}/{ptbz}") 36 public class WebSocket extends BaseAction{ 37 // 静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。 38 private static int onlineCount = 0; 39 // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。若要实现服务端与单一客户端通信的话,可以使用Map来存放,其中Key可以为用户标识 40 private static CopyOnWriteArraySet<Map<String, WebSocket>> webSocketSet = new CopyOnWriteArraySet<Map<String, WebSocket>>(); 41 private static CopyOnWriteArraySet<Map<String, WebSocket>> webSocketSetPt = new CopyOnWriteArraySet<Map<String, WebSocket>>(); 42 private WebSocketService zjzzWebSocketService = (WebSocketService)super.getService("zjzzWebSocketService"); 43 // 与某个客户端的连接会话,需要通过它来给客户端发送数据 44 private Session session; 45 private Map<String, WebSocket> webSocketMap = new HashMap<String, WebSocket>(); 46 private CurrentUserVo currentUserVo = new CurrentUserVo(); 47 48 /** 49 * 连接建立成功调用的方法 50 * 51 * @param session 52 * 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 53 */ 54 @OnOpen 55 public void onOpen(@PathParam("user") String user, 56 @PathParam("id") String id, @PathParam("ptbz") String ptbz, 57 Session session) { 58 String charset = getEncoding(user); 59 try { 60 byte[] b = user.getBytes(charset); 61 user = new String(b, "utf-8"); 62 } catch (UnsupportedEncodingException e) { 63 e.printStackTrace(); 64 } 65 this.session = session; 66 currentUserVo.setFbrmc(user); 67 currentUserVo.setFbr(id); 68 currentUserVo.setPtbz(ptbz); 69 currentUserVo.setTwr_dm(id); 70 currentUserVo.setPtbz(ptbz); 71 webSocketMap.put(id, this); 72 if("pt".equals(ptbz)) { 73 currentUserVo.setZjmc(user); 74 currentUserVo.setZjry_dm(id); 75 currentUserVo.setKhdfwr(id); 76 webSocketSetPt.add(webSocketMap); 77 }else{ 78 currentUserVo.setNsrmc(user); 79 webSocketSet.add(webSocketMap); // 加入set中 80 } 81 addOnlineCount(); // 在线数加 82 System.out.println("有新连接加入!当前在线人数为" + getOnlineCount()); 83 } 84 85 /** 86 * 连接关闭调用的方法 87 */ 88 @OnClose 89 public void onClose() { 90 if("pt".equals(this.currentUserVo.getPtbz())) { 91 webSocketSetPt.remove(this.webSocketMap); 92 }else{ 93 webSocketSet.remove(this.webSocketMap); // 从set中删除 94 } 95 subOnlineCount(); // 在线数减 96 System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); 97 } 98 99 /** 100 * 收到客户端消息后调用的方法 101 * 102 * @param message 103 * 客户端发送过来的消息 104 * @param session 105 * 可选的参数 106 */ 107 @OnMessage 108 public void onMessage(String message,Session session) { 109 boolean boo = false; 110 111 System.out.println("来自" + currentUserVo.getFbrmc() + "的消息:" + message+",发向"+currentUserVo.getJsr_dm()); 112 //判断是否为未读消息 113 int a = message.indexOf("&&&"); 114 System.out.println(a); 115 if(a!=-1) {//为未读消息 116 String[] msgStr = message.split("&&&"); 117 String userMes = msgStr[0]; 118 String[] user = userMes.split("@"); 119 String uuid = ""; 120 String wdjsr = ""; 121 String fbrmc = ""; 122 String fbr = ""; 123 String zjzzuuid = ""; 124 if(user.length>0) { 125 wdjsr = user[0]; 126 uuid = user[1]; 127 fbrmc = user[2]; 128 fbr = user[3]; 129 zjzzuuid = user[4]; 130 } 131 message = msgStr[1]; 132 CopyOnWriteArraySet<Map<String, WebSocket>> websocketSet = null; 133 if("pt".equals(this.currentUserVo.getPtbz())) { 134 websocketSet = webSocketSetPt; 135 }else{ 136 websocketSet = webSocketSet; 137 } 138 if(!"everybody".equals(wdjsr)) { 139 //消息有指定平台用户 140 for (Map<String, WebSocket> itemMap : websocketSet) { 141 WebSocket socket = itemMap.get(wdjsr); 142 if(socket!=null) { 143 try { 144 if("pt".equals(this.currentUserVo.getPtbz())) { 145 socket.sendMessage(fbr+"@@"+fbrmc + "@^&" + message); 146 socket.currentUserVo.setKhdfwr(this.currentUserVo.getFbr()); 147 }else{ 148 socket.sendMessage(fbrmc + "@^&" + message); 149 socket.currentUserVo.setKhdfwr(fbr); 150 } 151 socket.currentUserVo.setJsr_dm(fbr);//设置该对话的接收人代码 152 socket.currentUserVo.setJsr(fbrmc); 153 socket.currentUserVo.setZjzzuuid(zjzzuuid); 154 boo = true; 155 Dto reDto = new BaseDto(); 156 reDto.put("uuid", uuid); 157 reDto.put("ckbz", "Y"); 158 reDto.put("xgr_dm", "111"); 159 reDto.put("xgsj", SystemUtils.dateFormat(new Date())); 160 zjzzWebSocketService.updateLtxxDatas(reDto); 161 } catch (IOException e) { 162 e.printStackTrace(); 163 } 164 } 165 } 166 }else{ 167 //未读消息没有指定平台用户 168 if("pt".equals(this.currentUserVo.getPtbz())) { 169 Object[] objArr = webSocketSetPt.toArray(); 170 int index = (int) (Math.random() * objArr.length); 171 Map<String, WebSocket> map = (Map<String, WebSocket>) objArr[index]; 172 for (WebSocket socket : map.values()) { 173 if(socket!=null) { 174 try { 175 boo = true; 176 socket.sendMessage(fbr+"@@"+fbrmc + "@^&" + message); 177 socket.currentUserVo.setZjzzuuid(zjzzuuid); 178 socket.currentUserVo.setJsr_dm(fbr); 179 socket.currentUserVo.setJsr(fbrmc); 180 socket.currentUserVo.setKhdfwr(socket.currentUserVo.getFbr()); 181 //修改未读对话记录 182 Dto reDto = new BaseDto(); 183 reDto.put("uuid", uuid); 184 reDto.put("jsr", this.currentUserVo.getFbr()); 185 reDto.put("ckbz", "Y"); 186 reDto.put("khdfwr", this.currentUserVo.getKhdfwr()); 187 reDto.put("xgr_dm", "111"); 188 reDto.put("xgsj", SystemUtils.dateFormat(new Date())); 189 zjzzWebSocketService.updateWdWzdDatas(reDto); 190 } catch (IOException e) { 191 e.printStackTrace(); 192 continue; 193 } 194 } 195 } 196 } 197 } 198 }else{ 199 String[] xxArr = message.split("@@"); 200 String bz = ""; 201 String zjzzuuid = ""; 202 String fsmessage = ""; 203 String jsr = ""; 204 if(!"pt".equals(this.currentUserVo.getPtbz())) { 205 String userXx = xxArr[0]; 206 String[] user = userXx.split("&&"); 207 bz = user[0]; 208 zjzzuuid = user[1]; 209 this.currentUserVo.setZjzzuuid(zjzzuuid); 210 fsmessage = xxArr[1]; 211 }else{ 212 String[] xx = message.split("&@"); 213 jsr = xx[0]; 214 fsmessage = xx[1]; 215 } 216 if("pt".equals(this.currentUserVo.getPtbz())) { 217 if(jsr!=null&&jsr.length()>0) { 218 //平台发送消息,发向指定客户端 219 boolean pdboo = false; 220 for (Map<String, WebSocket> itemMap : webSocketSet) { 221 WebSocket socket = itemMap.get(jsr); 222 if(socket!=null) { 223 try { 224 boo = true; 225 pdboo = true; 226 socket.sendMessage(currentUserVo.getFbrmc() + "@^&" + fsmessage); 227 //this.currentUserVo.setJsr_dm(socket.currentUserVo.getFbr()); 228 //this.currentUserVo.setJsr(socket.currentUserVo.getFbrmc()); 229 //存储已查看对话信息 230 if(socket.currentUserVo.getZjzzuuid()==null) { 231 socket.currentUserVo.setZjzzuuid(this.currentUserVo.getZjzzuuid()); 232 } 233 saveYckMessage(socket.currentUserVo.getZjzzuuid(),"Y",fsmessage,"Y",jsr); 234 } catch (IOException e) { 235 e.printStackTrace(); 236 continue; 237 } 238 } 239 } 240 if(!pdboo) { 241 this.currentUserVo.setJsr_dm(jsr); 242 String uuid = zjzzWebSocketService.queryZjzzuuidByJsr(jsr); 243 this.currentUserVo.setZjzzuuid(uuid); 244 boo = true; 245 saveYckMessage(uuid,"Y",fsmessage,"N",this.currentUserVo.getJsr_dm()); 246 } 247 } 248 }else{ 249 if(this.currentUserVo.getJsr_dm()!=null&&this.currentUserVo.getJsr_dm().length()>0) { 250 //该客户端消息已有接收对象,顺利发送给平台 251 for (Map<String, WebSocket> itemMap : webSocketSetPt) { 252 WebSocket socket = itemMap.get(currentUserVo.getJsr_dm()); 253 if(socket!=null) { 254 try { 255 boo = true; 256 socket.sendMessage(currentUserVo.getFbr()+"@@"+currentUserVo.getFbrmc() + "@^&" + fsmessage); 257 //存储已查看对话信息 258 saveYckMessage(zjzzuuid,bz,fsmessage,"Y",this.currentUserVo.getJsr_dm()); 259 } catch (IOException e) { 260 e.printStackTrace(); 261 continue; 262 } 263 } 264 } 265 }else{ 266 //该客户端尚未有接收对象,需要随机指定接收对象,并连接发送消息 267 if(webSocketSetPt.size()>0){ 268 //平台有用户连接 269 Object[] objArr = webSocketSetPt.toArray(); 270 int index = (int) (Math.random() * objArr.length); 271 Map<String, WebSocket> map = (Map<String, WebSocket>) objArr[index]; 272 for (WebSocket socket : map.values()) { 273 if(socket!=null) { 274 try { 275 boo = true; 276 socket.sendMessage(currentUserVo.getFbr()+"@@"+currentUserVo.getFbrmc() + "@^&" + fsmessage); 277 socket.currentUserVo.setJsr_dm(this.currentUserVo.getFbr()); 278 socket.currentUserVo.setJsr(this.currentUserVo.getFbrmc()); 279 this.currentUserVo.setKhdfwr(socket.currentUserVo.getFbr()); 280 this.currentUserVo.setJsr_dm(socket.currentUserVo.getFbr()); 281 this.currentUserVo.setZjry_dm(socket.currentUserVo.getFbr()); 282 this.currentUserVo.setZjmc(socket.currentUserVo.getFbrmc()); 283 this.currentUserVo.setJsr(socket.currentUserVo.getFbrmc()); 284 //存储已查看对话信息 285 saveYckMessage(zjzzuuid,bz,fsmessage,"Y",this.currentUserVo.getJsr_dm()); 286 } catch (IOException e) { 287 e.printStackTrace(); 288 continue; 289 } 290 } 291 } 292 }else{ 293 try { 294 this.sendMessage("系统:消息已发送,但当前未有服务人员链接,无法为您解答疑问!"); 295 //平台没有用户连接,无法发送消息,保存数据为所有人可接收的未读消息 296 this.currentUserVo.setJsr_dm("everybody"); 297 this.currentUserVo.setZjry_dm("everybody"); 298 this.currentUserVo.setKhdfwr("everybody"); 299 saveYckMessage(zjzzuuid,bz,fsmessage,"N",this.currentUserVo.getJsr_dm()); 300 boo = true; 301 } catch (IOException e) { 302 e.printStackTrace(); 303 } 304 } 305 } 306 } 307 308 } 309 if(!boo){ 310 if(this.currentUserVo.getJsr_dm()!=null&&this.currentUserVo.getZjzzuuid()!=null) { 311 if("pt".equals(this.currentUserVo.getPtbz())) { 312 saveYckMessage(this.currentUserVo.getZjzzuuid(),"Y",message,"N",this.currentUserVo.getJsr_dm()); 313 }else{ 314 //存储未查看对话信息 315 String[] xxArr = message.split("@@"); 316 String userXx = xxArr[0]; 317 String[] user = userXx.split("&&"); 318 String bz = user[0]; 319 String zjzzuuid = user[1]; 320 String fsmessage = xxArr[1]; 321 saveYckMessage(zjzzuuid,bz,fsmessage,"N",this.currentUserVo.getJsr_dm()); 322 } 323 }else{ 324 try { 325 this.sendMessage("系统:消息未发出,可能是连接失败!请重新连接"); 326 } catch (IOException e) { 327 e.printStackTrace(); 328 } 329 } 330 } 331 } 332 333 /** 334 * 335 * @param zjzzuuid 对话组uuid 336 * @param bz 是否存在对话组标志 337 * @param message 对话信息 338 * @param ckbz 对话是否查看标志 339 */ 340 private void saveYckMessage(String zjzzuuid,String bz,String message,String ckbz,String jsr) { 341 Dto dto = new BaseDto(); 342 String fbrmc = currentUserVo.getFbrmc(); 343 String fbr = currentUserVo.getFbr(); 344 String khdfwr = currentUserVo.getKhdfwr(); 345 if("N".equals(bz)) {//是否有uuid,判断其在数据库中是否存在已有信息 346 //数据库没有该对话记录,故需创建对话记录 347 dto.put("uuid", zjzzuuid); 348 //以下需前台传值 349 dto.put("twr_dm", currentUserVo.getTwr_dm()); 350 dto.put("nsrsbh", currentUserVo.getNsrsbh()); 351 dto.put("nsrmc", currentUserVo.getNsrmc()); 352 dto.put("zjmc", currentUserVo.getZjmc()); 353 dto.put("zjry_dm", currentUserVo.getZjry_dm()); 354 dto.put("fbr", fbr); 355 dto.put("fbrmc", fbrmc); 356 dto.put("jsr", jsr); 357 dto.put("lrr_dm", fbr); 358 dto.put("lrrq", SystemUtils.dateFormat(new Date())); 359 zjzzWebSocketService.insertZjzzDhjl(dto); 360 361 Dto dto1 = new BaseDto(); 362 dto1.put("lrr_dm", fbr); 363 dto1.put("lrrq", SystemUtils.dateFormat(new Date())); 364 dto1.put("uuid", SystemUtils.genUUID()); 365 dto1.put("zjzzuuid", zjzzuuid); 366 dto1.put("dhnr", message); 367 dto1.put("dhsj", SystemUtils.dateFormat(new Date())); 368 dto1.put("fbr", fbr); 369 dto1.put("ckbz", ckbz); 370 dto1.put("khdfwr", khdfwr); 371 dto1.put("fbrmc", fbrmc); 372 dto1.put("jsr", jsr); 373 zjzzWebSocketService.insertZjzzMxDhjlByUuid(dto1); 374 }else{ 375 Dto redto = new BaseDto(); 376 redto.put("lrr_dm", fbr); 377 redto.put("lrrq", SystemUtils.dateFormat(new Date())); 378 redto.put("uuid", SystemUtils.genUUID()); 379 redto.put("zjzzuuid", zjzzuuid); 380 redto.put("dhnr", message); 381 redto.put("dhsj", SystemUtils.dateFormat(new Date())); 382 redto.put("fbr", fbr); 383 redto.put("ckbz", ckbz); 384 redto.put("khdfwr", khdfwr); 385 redto.put("fbrmc", fbrmc); 386 redto.put("jsr", jsr); 387 zjzzWebSocketService.insertZjzzMxDhjlByUuid(redto); 388 } 389 } 390 391 /** 392 * 发生错误时调用 393 * 394 * @param session 395 * @param error 396 */ 397 @OnError 398 public void onError(Session session, Throwable error) { 399 System.out.println("发生错误"); 400 error.printStackTrace(); 401 } 402 403 /** 404 * 这个方法与上面几个方法不一样。没有用注解,是根据自己需要添加的方法。 405 * 406 * @param message 407 * @throws IOException 408 */ 409 public void sendMessage(String message) throws IOException { 410 this.session.getBasicRemote().sendText(message); 411 // this.session.getAsyncRemote().sendText(message); 412 } 413 414 public static synchronized int getOnlineCount() { 415 return onlineCount; 416 } 417 418 public static synchronized void addOnlineCount() { 419 WebSocket.onlineCount++; 420 } 421 422 public static synchronized void subOnlineCount() { 423 WebSocket.onlineCount--; 424 } 425 426 public static String getEncoding(String str) { 427 String encode = "GB2312"; 428 try { 429 if (str.equals(new String(str.getBytes(encode), encode))) { // 判断是不是GB2312 430 String s = encode; 431 return s; // 是的话,返回GB2312,以下代码同理 432 } 433 } catch (Exception e) { 434 e.printStackTrace(); 435 } 436 encode = "ISO-8859-1"; 437 try { 438 if (str.equals(new String(str.getBytes(encode), encode))) { // 判断是不是ISO-8859-1 439 String s1 = encode; 440 return s1; 441 } 442 } catch (Exception e) { 443 e.printStackTrace(); 444 } 445 encode = "UTF-8"; 446 try { 447 if (str.equals(new String(str.getBytes(encode), encode))) { // 判断是不是UTF-8编码 448 String s2 = encode; 449 return s2; 450 } 451 } catch (Exception e) { 452 e.printStackTrace(); 453 } 454 encode = "GBK"; 455 try { 456 if (str.equals(new String(str.getBytes(encode), encode))) { // 判断是不是GBK 457 String s3 = encode; 458 return s3; 459 } 460 } catch (Exception e) { 461 e.printStackTrace(); 462 } 463 return ""; // 到这一步,你就应该检查是不是其他编码啦 464 } 465 }

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zgjwgw.html