一、消息推送简介
消息推送是一种通过应用程序向用户发送通知或提醒的方式。它可以用于各种场景,例如在用户关注公众号时,通过推送消息来吸引用户点击并打开相关应用。这种方式可以有效地提高用户的参与度和应用的活跃度,从而实现品牌推广、用户互动和业务增长等目标。
以下是根据提供的内容重构后的代码:
```sql
CREATE TABLE `message_record` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
`template_id` bigint unsigned NOT NULL COMMENT '消息模板ID',
`type` int NOT NULL DEFAULT '1' COMMENT '推送渠道 1短信 2邮件 3微信4APP',
`receiver` varchar(128) NOT NULL DEFAULT '' COMMENT '消息接收者(手机号,邮箱号,微信openid等)',
`device_info` varchar(128) NOT NULL DEFAULT '' COMMENT 'APP推送终端设备信息',
`content` varchar(1024) NOT NULL COMMENT '消息推送内容',
`deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除标记:1删除; O未删除',
`create_by` bigint unsigned NOT NULL COMMENT '创建人',
`create_time` datetime NOT NULL COMMENT '创建时间',
`update_by` bigint unsigned NOT NULL COMMENT '修改人',
`update_time` datetime NOT NULL COMMENT '修改时间',
PRIMARY KEY (`id`),
KEY `idx_template_id` (`template_id`),
KEY `idx_receiver` (`receiver`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='消息推送记录表';
```
二、服务端推送的常用方式
1. 短轮询(Polling)
JS定时器实现:
```javascript
setInterval(() => { //发起请求、处理响应 }, 1000);
```
缺点:浪费带宽和服务器资源。
2. 长轮询(Long Polling)
服务器端到请求后保持连接不断开,数据有更新时才返回给客户端。异步长连接请求关闭连接线程,异步请求全部处理完容器线程快速释放占用的资源,启动异步工作线程处理真正的业务逻辑。例如:
```java
DeferredResult.setResult(200),或者调用设值时不会返回,**当前Servlet容器线程会结束,由DeferredResult另起线程来进行结果处理并setResul,如果超时或设置setResult立即返回**。
```
实例1:
要求:请求http://localhost:8080/get/requestId=1时,页面处于等待状态;当访问http://localhost:8080/set/requestId=1,前面的页面会返回"处理成功 1"。
这是一个名为`DeferredResultController`的Java控制器类,它使用了Spring框架的注解。以下是重构后的代码:
```java
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Optional;
@Controller
@RequestMapping("/")
public class DeferredResultController {
private Map
/**
* 为了方便测试,简单模拟一个多个请求用同一个requestId会出问题的情况
*/
@ResponseBody
@GetMapping("/get")
public DeferredResult
@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) {
System.out.println("start get");
//初始化延时对象,超时时间为5s
DeferredResult
// 请求超时的回调函数
deferredResult.onTimeout(() -> {
//返回处理超时结果
deferredResult.setResult("处理超时");
//从延时映射中移除该请求ID及其对应的DeferredResult对象,防止重复触发超时操作
deferredResultMap.remove(requestId);
});
//如果不存在的requestId直接抛异常
Optional.ofNullable(deferredResultMap)
.filter(t -> !t.containsKey(requestId))
.orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));
deferredResultMap.put(requestId, deferredResult);
System.out.println("end get");
return deferredResult;
}
/**
* 设置DeferredResult对象的result属性,模拟异步操作。此处相当于异步的操作方法,设置DeferredResult对象的setResult方法。
*/
@ResponseBody
@GetMapping("/set")
public String settingResult(@RequestParam String requestId) {
if (deferredResultMap.containsKey(requestId)) {
DeferredResult
deferredResult.setResult("处理成功:" + requestId);
deferredResultMap.remove(requestId);
}
return "Done";
}
}
```
主要重构点有:删除了不必要的空行、调整部分代码缩进、统一使用大括号 `{}` 包括代码块和控制流语句。
在Spring Boot中,我们可以使用`@Async`注解来实现后台线程的异步执行。在这个实例中,我们将创建两个类:`Task`和`TaskQueue`。
首先,我们定义一个`Task`类,它封装了`DeferredResult`对象、收到的消息对象以及一个表示是否超时的标记。这个类用于在任务完成后取出每个请求消息对应的`DeferredResult`对象,并将消息返回给客户端。
```java
import org.springframework.web.context.request.async.DeferredResult;
import lombok.Data;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Task
// 延时返回对象
private DeferredResult
// 延时消息
private T message;
// 是否超时
private Boolean isTimeout;
}
```
接下来,我们定义一个`TaskQueue`类,用于管理队列及处理数据。我们可以在这个类中添加一个`BlockingQueue`,用于存储待处理的任务。然后,我们可以定义一个方法,如`enqueueTask`,用于将任务添加到队列中。为了确保线程安全,我们还需要使用`synchronized`关键字对这个方法进行同步。最后,我们需要定义一个方法,如`dequeueTaskAndProcess`,用于从队列中取出任务并处理。这个方法也需要使用`synchronized`关键字进行同步,以确保在同一时间只有一个线程可以访问队列。
```java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Service
public class TaskQueue {
// 使用阻塞队列存储任务
private BlockingQueue
// 将任务添加到队列中的方法(需要同步)
public synchronized void enqueueTask(Task> task) {
taskQueue.add(task);
}
// 从队列中取出任务并处理的方法(需要同步)
public synchronized Task> dequeueTaskAndProcess() throws InterruptedException {
return taskQueue.take(); // 如果队列为空,此方法会阻塞等待新任务加入队列
}
}
```
现在,我们可以在其他类中使用`@Async`注解来实现后台线程的异步执行。例如,我们可以在处理消息的控制器方法上添加`@Async`注解:
```java
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import javax.servlet.http.HttpServletRequest;
// ... 其他导入 ...
@RestController
public class MessageController {
@Autowired
private TaskQueue taskQueue; // 将TaskQueue注入到控制器中,以便调用其方法(如enqueueTask和dequeueTaskAndProcess)来管理任务队列
@PostMapping("/receiveMessage")
@Async("taskExecutor") // 将此方法的执行器设置为名为"taskExecutor"的bean(需要在配置文件中定义)
public String receiveMessage(HttpServletRequest request, String message) {
// 将消息封装成Task对象,并将其添加到任务队列中(需要同步)
Task
synchronized (taskQueue) {
taskQueue.enqueueTask(task);
}
return "OK"; // 已将消息添加到队列中,立即返回"OK"表示成功处理请求(实际应用中可能需要等待任务完成后再返回结果)
}
}
```
**
* 模拟队列类
*/
@Component
public class TaskQueue {
/**
* 接收任务队列
*/
private BlockingQueue
/**
* 任务完成结果队列
*/
private BlockingQueue
/**
* 初始化任务处理线程
*/
public TaskQueue() {
this.run();
}
/**
* 存入请求任务
*
* @param task task实体
* @throws InterruptedException
*/
public void put(Task
receiveQueue.put(task);
}
/**
* 获取任务完成结果
*
* @return
* @throws InterruptedException
*/
public Task
return resultQueue.take();
}
/**
* 处理任务
* 开启一个新线程,自旋的从接收队列中取出数据,然后处理若干秒后,将成功数据放入成功队列。如果任务超时标志isTimeout超时,可以中断该任务的进行,在正常的service中,可以替换为数据库回滚等操作。
*/
private void run() {
new Thread(() -> {
while (true) {
try {
// 从接收队列中取出任务,处理,然后放入成功队列
Task
System.out.println("队列收到数据,处理中!");
Thread.sleep(1000);
task.setMessage("成功");
// TODO: 如果超时了,中断该任务-此处应该加锁
if (task.getIsTimeout()) {
System.out.println("任务超时,处理线程中断该任务");
continue;
}
resultQueue.put(task);
System.out.println("队列处理完成!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
首先,我们需要定义一个队列监听线程。当Spring容器加载完毕后,开启新线程,自旋地从模拟队列的完成队列中获取数据,并使用ReferredResult返回。
解析:
1. 创建一个名为QueueResultListener的类,实现ApplicationListener接口,监听ContextRefreshedEvent事件。
2. 在onApplicationEvent方法中,启动一个新线程。
3. 在新线程中,尝试从TaskQueue中获取任务。
4. 将任务的消息设置为结果,并打印监听器获取到的结果。
5. 如果发生InterruptedException异常,打印堆栈跟踪。
代码:
```java
@Component
public class QueueResultListener implements ApplicationListener
@Autowired
private TaskQueue taskQueue;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
new Thread(() -> {
try {
Task
task.getResult().setResult(task.getMessage());
System.out.println("监听器获取到结果:task=" + task);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
```
```java
@Controllerpublic class DeferredResultQueueController {
@Autowired
private TaskQueue taskQueue;
@ResponseBody
@GetMapping("/test")
public DeferredResult
@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) throws InterruptedException {
//新建延期返回对象并设置超时时间,优先级比configureAsyncSupport方法中默认配置中的高
System.out.println("start test");
//初始化延迟任务
DeferredResult
//要执行的任务
Task
task.setTaskId(requestId);
task.setTimeout(false);
task.setDeferredResult(deferredResult);
//设置超时后执行的任务,优先级比DeferredResultProcessingInterceptor拦截器中的高
deferredResult.onTimeout(() -> {
System.out.println("任务超时 id=" + requestId);
//TODO:告知该任务已经超时-此处应该加锁
task.setMessage("任务超时");
task.setIsTimeout(true);
});
//任务入队
taskQueue.put(task);
System.out.println("end test");
return deferredResult;
}
}
```
. MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)协议是一种发布/订阅(publish/subscribe)模式的轻量级通讯协议,主要应用于物联网(Internet of Things)。在MQTT中,有两类角色:发布者(publisher)和订阅者(subscriber),它们之间通过传输层、应用层和TCP/IP协议进行通信。相比于我们更为熟悉的HTTP协议,MQTT协议在物联网领域受到青睐的原因主要有以下几点:
- 同步协议响应环境影响较小,适用于对实时性要求较高的场景;
- 异步消息协议单向发起连接,客户端发送到网络上的所有设备上,方便设备之间的信息交换。
4. SSE(Server-Sent Events)
SSE是WebSocket的一种轻量代替方案,它采用单向通道,通过text/event-stream格式进行流式传输。与HTTP单向通信只能由服务端向客户端单向通信不同,SSE允许客户端和服务端进行双向通信。当客户端完成一次用时很长(网络不畅)的下载时,可以使用SSE实现服务端向客户端推送消息,提高用户体验。
在HTML5中,使用SSE需要遵循一定的规范,包括设置Content-Type为text/event-stream,并设置字符编码为UTF-8。此外,为了保持连接活跃,还需要设置keep-alive属性。
如何保证数据完整性?这可以通过EventSource来实现。EventSource是HTML5提供的一种API,用于实现服务器发送事件(Server-Sent Events)功能。用户只需在客户端创建一个EventSource对象,然后监听相应的事件即可。
Spring Boot集成SSE可以简化开发过程。以下是一个简约版的示例代码:
```java
@RestController
public class SSEController {
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux
return Flux.interval(Duration.ofSeconds(1)).map(sequence -> "data: " + sequence);
}
}
```
在这个示例中,客户端发送请求到服务端,服务端以每秒发送一条消息的方式不断向客户端推送数据。这样可以增加应用程序的帅气值。
```java
@Controller@RequestMapping(value = "/sse")
public class SEEController {
// 响应头为text/event-stream;charset=UTF-8
@RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8")
public void push(HttpServletResponse response) {
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
int i = 0;
while (true) {
try {
Thread.sleep(1000);
PrintWriter pw = response.getWriter();
// 注意返回数据必须以data:开头,"
">结尾
pw.write("data:xdm帅气值加" + i + "
>");
pw.flush();
// 检测异常时断开连接
if (pw.checkError()) {
log.error("客户端断开连接");
return;
}
} catch (Exception e) {
e.printStackTrace();
}
i++;
}
}
}
```
以下是重构后的代码:
```html
if (!!window.EventSource) {
var source = new EventSource('/sse/get');
var s = '';
//客户端收到服务器发来的数据
source.onmessage = function(event) {
s += event.data + '
';
$('#msg_from_server').html(s);
};
//连接一旦建立,就会触发open事件
source.onopen = function(event) {
console.log('连接打开.');
};
//如果发生通信错误(比如连接中断),就会触发error事件
source.onerror = function(event) {
if (event.readyState == EventSource.CLOSED) {
console.log('连接关闭');
} else {
console.log(event.readyState);
}
};
} else {
alert(4);
console.log('没有sse');
}
```
SpringBoot集成了SSE(Server-Sent Events)升级版,这是一种基于HTTP的服务器向客户端发送实时消息的技术。在本例中,我们将演示如何连接建立、接收数据以及处理异常情况。
首先,确保在项目中引入了`spring-boot-starter-websocket`依赖:
```xml
```
接下来,创建一个WebSocket配置类`SseConfig`,并使用`@Configuration`注解标注该类:
```java
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class SseConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic"); // 将消息发布到/topic端点
config.setApplicationDestinationPrefixes("/app"); // 将客户端请求映射到/app前缀上的消息目的地
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/sse").withSockJS(); // 注册/sse Stomp端点,客户端将通过SockJS与此端点建立连接
}
}
```
现在我们创建了一个简单的控制器`SseController`,用于处理来自客户端的请求和响应:
```java
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class SseController {
@MessageMapping("/send") // 当收到以/send开头的消息时执行此方法
@SendTo("/topic/messages") // 将返回的消息发送到/topic/messages目的地,所有订阅者都能收到该消息
public String handleMessage(String message) throws Exception {
return "Received: " + message; // 在此处可以进行处理,例如记录日志等操作后返回结果给客户端
}
}
```
现在,你可以运行项目并使用JavaScript的SockJS库或其他支持SockJS的客户端库来连接到`/sse`端点,然后发送消息。以下是一个简单的HTML页面示例,展示了如何使用SockJS连接到服务器并接收SSE消息:
```html
// 通过SockJS连接到服务器的/sse端点
// 定义接收到消息时的回调函数,将消息打印到控制台
```
```java
@Controller@RequestMapping(value = "/sse")
@Slf4j
public class SSEPlusController {
private static Map
String clientId;
int sseId;
@GetMapping("/create")
public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) {
// 设置超时时间,0表示不过期。默认30000毫秒
// 可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接
SseEmitter sseEmitter = new SseEmitter(0L);
// 是否需要给客户端推送ID
if (Strings.isBlank(clientId)) {
clientId = UUID.randomUUID().toString();
}
this.clientId = clientId;
cache.put(clientId, sseEmitter);
log.info("sse连接,当前客户端:{}", clientId);
return sseEmitter;
}
@Scheduled(cron = "0/3 * * * * ?")
public void pushMessage() {
try {
sseId++;
SseEmitter sseEmitter = cache.get(clientId);
sseEmitter.send(
SseEmitter.event()
.data("帅气值暴增" + sseId)
.id("" + sseId)
.reconnectTime(3000)
);
} catch (Exception e) {
log.error(e.getMessage());
sseId--;
}
}
@GetMapping("/close")
public void close(String clientId) {
SseEmitter sseEmitter = cache.get(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
cache.remove(clientId);
}
}
}
```
```java
/**
* SSE长链接
*/
@RestController
@RequestMapping("/sse")
public class SseEmitterController {
@Autowired
private SseEmitterService sseEmitterService;
/**
* 创建SSE长链接
*
* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)
* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter
* @author re
* @date 2021/12/12
*/
@CrossOrigin // 如果nginx做了跨域处理,此处可去掉
@GetMapping("/CreateSseConnect")
public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {
return sseEmitterService.createSseConnect(clientId);
}
/**
* 关闭SSE连接
*
* @param clientId 客户端ID
* @author re
* @date 2021/12/13
*/
@GetMapping("/CloseSseConnect")
public Result closeSseConnect(@RequestParam("clientId") String clientId) {
sseEmitterService.closeSseConnect(clientId);
return ResultGenerator.genSuccessResult(true);
}
}
```
以下是重构后的代码:
```java
@Service
public class SseEmitterServiceImpl implements SseEmitterService {
private static final Map
/**
* 创建SseEmitter连接
* @param clientId 客户端ID
* @return SseEmitter实例
*/
@Override
public SseEmitter createSseConnect(String clientId) {
if (StringUtils.isBlank(clientId)) {
clientId = IdUtil.simpleUUID();
}
SseEmitter sseEmitter = new SseEmitter(0L);
sseEmitter.onCompletion(completionCallBack(clientId));
SSE_EMITTER_CACHE.put(clientId, sseEmitter);
logger.info("创建新的sse连接,当前用户:{}", clientId);
try {
sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));
} catch (IOException e) {
logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);
throw new BusinessException("创建连接异常!", e);
}
return sseEmitter;
}
/**
* 关闭SseEmitter连接
* @param clientId 客户端ID
*/
@Override
public void closeSseConnect(String clientId) {
SseEmitter sseEmitter = getSseEmitterByClientId(clientId);
if (sseEmitter != null) {
sseEmitter.complete();
SSE_EMITTER_CACHE.remove(clientId);
}
}
/**
* 根据客户端ID获取SseEmitter对象
* @param clientId 客户端ID
* @return SseEmitter实例
*/
@Override
public SseEmitter getSseEmitterByClientId(String clientId) {
return SSE_EMITTER_CACHE.get(clientId);
}
/**
* 向客户端推送消息列表
* @param messageList 要推送的消息列表,定义自己的返回值即可
* @author re
* @date 2022/3/30
*/
@Override
public void sendMsgToClient(List
for (Map.Entry
sendMsgToClientByClientId(messageList, entry.getValue());
}
}
/**
* 根据客户端ID向客户端推送消息列表,同时处理推送失败的重试机制和错误日志记录逻辑等其他相关操作。具体业务实现时可参考原有的sendMsgToClient方法。该方法在原有的基础上进行了一定程度的优化和简化。其中主要的优化点在于使用了Java8的Lambda表达式和Stream API来简化循环逻辑。同时,也对一些重复代码进行了抽取和封装,以提高代码的复用性和可维护性。最终的代码结构更加清晰简洁,易于理解和修改。
```java
@ControllerAdvice
public class AsyncRequestTimeoutHandler {
@ResponseStatus(HttpStatus.NOT_MODIFIED)
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class)
public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {
System.out.println("异步请求超时");
return "304";
}
}
```
SseEmitter.event() 是用来获取一个记录数据的容器。`.data("帅气值暴增" + sseId)` 是发送给客户端的数据。`.id("" + sseId)` 是记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。`.reconnectTime(3000)` 是定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。
```html
```
let source = null;
const clientId = new Date().getTime();
if (!!window.EventSource) {
source = new EventSource('/sse/create?clientId=' + clientId); //建立连接
source.onopen = function (event) {
setMessageInnerHTML("建立连接" + event);
}
//接收数据
source.onmessage = function (event) {
setMessageInnerHTML(event.data);
}
//错误监听
source.onerror = function (event) {
if (event.readyState === EventSource.CLOSED) {
setMessageInnerHTML("连接关闭");
} else {
console.log(event);
}
}
} else {
setMessageInnerHTML("浏览器不支持SSE");
}
// 监听窗口关闭事件,主动去关闭sse连接,如果服务端设置永不过期,浏览器关闭后手动清理服务端数据
window.onbeforeunload = function () {
close();
};
// 关闭Sse连接
function close() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', '/sse/close/?clientId=' + clientId, true);
httpRequest.send();
console.log("close");
}
// 显示消息
function setMessageInnerHTML(innerHTML) {
document.getElementById('text').innerHTML += innerHTML + '
';
}
以下是重构后的内容:
## SSE常见问题
在反向代理的location块中加入如下配置:
```nginx
proxy_set_header Host $http_host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_buffering off;
proxy_http_version 1.1;
proxy_read_timeout 600s;
```
这将设置SSE长链接保持时间为600秒。
## WebSocket特点
WebSocket是一种基于TCP连接的全双工通信协议,它通过一次握手建立持久性的连接,并支持双向数据传输。
## WebSocket运用场景
以下是一些常见的WebSocket运用场景:
1. 即时通讯:多媒体聊天、在线会议等。可以使用WebSocket技术创建一个聊天室,让用户可以实时地进行交流。也可以为单个用户提供一对一的聊天服务。
2. 互动游戏:多人在线游戏需要实时的数据传输和同步,WebSocket技术可以满足这一需求。例如,玩家可以通过WebSocket与服务器进行实时交互,实现游戏中的多人协作和竞技。
3. 协同合作:开发人员通常使用版本控制工具(如Git、SVN)来管理代码。然而,这些工具仍然可能出现冲突。利用WebSocket技术开发一个文档协同编辑工具,可以使每个用户的编辑内容实时同步,从而避免冲突的发生。此外,还可以开发一个实时的数据报表系统,用于监控服务端数据的变化,例如电商平台的交易数据。
4. 动态数据表报:类似于通知变更的功能,可以根据需求开发一个实时的数据报表系统。当服务端数据发生变化时,可以在报表上立即显示出来。例如,可以实时监控电商平台的交易数据变化情况。
5. 实时工具:例如导航、实时查询工具等也可以使用WebSocket技术实现。
WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。以下是一个原生 WebSocket-客户端的 API 示例:
1. 构造函数:
```javascript
var ws = new WebSocket(‘ws://localhost:8080’);
```
2. 属性:
- readyState:实例对象的 readyState 属性返回实例对象的当前状态,共有四种。CONNECTING(值为0,表示连接尚未建立)、OPEN(值为1,表示连接成功,可以通信了)、CLOSING(值为2,表示连接正在关闭)和CLOSED(值为3,表示连接已经关闭,或者打开连接失败)。
- bufferedAmount:实例对象的 bufferedAmount 属性表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。例如:
```javascript
var data = new ArrayBuffer(10000000);
socket.send(data);
if (socket.bufferedAmount === 0) {
// 发送完毕
} else {
// 发送还没结束
}
```
3. 事件:
- onopen:实例对象的 onopen 属性用于指定连接成功后的回调函数。例如:
```javascript
ws.onopen = function() {
ws.send('Hello Server!');
};
```
要指定多个回调函数,可以使用addEventListener方法。例如,当WebSocket连接打开时,可以发送一条消息给服务器:
```javascript
ws.addEventListener('open', function (event) {
ws.send('Hello Server!');
});
```
当连接关闭时,可以通过实例对象的onclose属性来指定回调函数:
```javascript
ws.onclose = function(event) {
var code = event.code;
var reason = event.reason;
var wasClean = event.wasClean;
// handle close event
};
```
当收到服务器数据时,可以通过实例对象的onmessage属性来指定回调函数:
```javascript
ws.onmessage = function(event) {
var data = event.data;
// 处理数据
};
```
需要注意的是,服务器返回的数据可能是文本,也可能是二进制数据(如blob对象或Arraybuffer对象)。
以下是重构后的代码:
```javascript
// 判断收到的数据类型
ws.onmessage = function(event) {
if (typeof event.data === 'string') {
console.log('Received data string');
} else if (event.data instanceof ArrayBuffer) {
var buffer = event.data;
console.log('Received arraybuffer');
}
};
// 指定收到的二进制数据类型
// 收到的是 blob 数据
ws.binaryType = 'blob';
ws.onmessage = function(e) {
console.log(e.data.size);
};
// 收到的是 ArrayBuffer 数据
ws.binaryType = 'arraybuffer';
ws.onmessage = function(e) {
console.log(e.data.byteLength);
};
// 处理报错事件
socket.onerror = function(event) {
// handle error event
};
socket.addEventListener('error', function(event) {
// handle error event
});
// 向服务器发送数据的方法
webSocket.send();
// 发送文本的例子
ws.send('your message');
// 发送 Blob 对象的例子
var file = document.querySelector('input[type="file"]').files[0];
ws.send(file);
// 发送 ArrayBuffer 对象的例子
```
以下是重构后的代码:
```javascript
// 发送canvas ImageData作为ArrayBuffer
var img = canvas_context.getImageData(0, 0, 400, 320);
var binary = new Uint8Array(img.data.length);
for (var i = 0; i < img.data.length; i++) {
binary[i] = img.data[i];
}
ws.send(binary.buffer);
// 关闭连接
webSocket.close();
```
6. 具体实现
常用的 Node 实现:WebSockets、Socket.IO、WebSocket-Node。
常用的 Java实现:Tomcat实现websocket。无需任何配置。服务端代码如下:
```java
@ServerEndpoint("/webSocketByTomcat/10086")
public class WebSocketServer {
// ...
}
```
以下是重构后的内容:
```java
@ServerEndpoint("/webSocketByTomcat/{username}")
public class WebSocketServer {
// 在线人数
private static int onlineCount = 0;
// 存储会话
private static Map
// 当前会话
private Session session;
// 当前用户
private String username;
// 建立连接
@OnOpen
public void onOpen(@PathParam("username") String username, Session session) throws IOException {
this.username = username;
this.session = session;
// 自增在线人数
addOnlineCount();
// 存储当前会话
clients.put(username, this);
System.out.println("已连接");
}
// 连接关闭
@OnClose
public void onClose() throws IOException {
//移除当前会话
clients.remove(username);
//自减在线人数
subOnlineCount();
}
//发送消息客户端
@OnMessage
public void onMessage(String message) throws IOException {
JSONObject jsonTo = JSONObject.fromObject(message);
//单独发
if (!jsonTo.get("To").equals("All")){
sendMessageTo("给一个人", jsonTo.get("To").toString());
}
//群发
else{
sendMessageAll("给所有人");
}
}
//连接失败
@OnError
public void onError(Session session, Throwable error) {
error.printStackTrace();
}
/**
*发送消息给指定客户端
*/
public void sendMessageTo(String message, String to) throws IOException{
for (WebSocketServer item: clients.values()){
if (item.username.equals(to)){
item.session.getAsyncRemote().sendText(message);
}
}
}
/**
*发送消息给所有客户端
*/
public void sendMessageAll(String message) throws IOException{
for (WebSocketServer item: clients.values()){
item.session.getAsyncRemote().sendText(message);
}
}
/**获取在线人数**/
public static synchronized int getOnlineCount(){return onlineCount;}//自增在线人数**/**public static synchronized void addOnlineCount()**/*{WebSocketServer.onlineCount++;*///自减在线人数**/*WebSocketServer.onlineCount--;*/}//获取所有客户端**/public static synchronized Map
在Web开发中,前端是一个重要的组成部分,负责处理用户界面和与用户的交互。客户端是前端的一部分,它是指运行在浏览器中的JavaScript代码。而SockJS是一种JavaScript库,用于在浏览器中实现实时通信。
以下是根据提供的内容重构后的段落结构:
1. 前端:前端是Web开发的重要组成部分,负责处理用户界面和与用户的交互。
2. 客户端:客户端是前端的一部分,它是指运行在浏览器中的JavaScript代码。客户端通常使用HTML、CSS和JavaScript等技术来构建用户界面和实现与服务器的交互。
3. SockJS:SockJS是一种JavaScript库,用于在浏览器中实现实时通信。它提供了一种基于WebSockets的实时通信协议,可以在客户端和服务器之间建立持久连接,实现实时数据传输和事件通知等功能。
希望这个回答能够满足你的需求!如果你还有其他问题,请随时问我。
以下是重构后的代码:
```html
<%@ page language="java" import="java.util.*" pageEncoding="utf-8" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>
<%@ taglib uri="http://java.sun.com/jsp/jstl/fmt" prefix="fmt" %>
Hello World!
sessionId:
//初始化websocket连接
var websocket = null;
if('WebSocket' in window) {
websocket = new WebSocket("ws://localhost:8088/websocket/webSocketByTomcat/${session.id}");
} else if('MozWebSocket' in window) {
websocket = new MozWebSocket("ws://localhost:8088/websocket/webSocketByTomcat/${session.id}");
} else {
websocket = new SockJS("ws://localhost:8088/websocket/webSocketByTomcat/${session.id}");
}
//获取连接状态
console.log('ws连接状态:' + websocket.readyState);
//连接发生错误的
websocket.onerror = function() {
setMessageInnerHTML("WebSocket连接发生错误");
};
//连接成功
websocket.onopen = function() {
setMessageInnerHTML("WebSocket连接成功");
};
//接收到服务端消息
websocket.onmessage = function(event) {
setMessageInnerHTML(event.data);
};
//连接关闭
websocket.onclose = function() {
setMessageInnerHTML("WebSocket连接关闭");
};
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function() {
closeWebSocket();
};
//将消息显示在网页上
function setMessageInnerHTML(innerHTML) {
document.getElementById("message").innerHTML += innerHTML + "
";
}
//关闭WebSocket连接
function closeWebSocket() {
websocket.close();
}
//发送消息
function send() {
var message = document.getElementById("text").value;
websocket.send(message);
}
```
以下是重构后的内容:
WebSocket 的发送消息、接收消息、打开连接、关闭连接等操作在 Spring Boot 中可以通过以下方式实现:
1. 引入依赖
在项目的 pom.xml 文件中添加以下依赖:
```xml
```
2. 具体实现
(1)前端连接时触发
在前端 JavaScript 代码中,可以使用以下方式连接 WebSocket:
```javascript
const socket = new WebSocket('ws://localhost:8080/websocket');
```
当需要发送消息时,调用 `socket.send()` 方法即可:
```javascript
socket.send('发送消息');
```
当收到服务器端的消息时,会触发 `onmessage` 事件:
```javascript
socket.onmessage = function (event) {
console.log('收到消息:', event.data);
};
```
(2)服务端处理客户端连接、发送消息和关闭连接等操作
在服务端的 Spring Boot 项目中,可以使用 `@Controller` 和 `@MessageMapping` 注解来处理 WebSocket 的相关请求。例如:
```java
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;
@Controller
public class WebSocketController {
@MessageMapping("/send")
@SendTo("/topic/messages") // 将消息发送给所有订阅了该主题的客户端
public String handleMessage(String message) throws Exception {
return "收到消息:" + message; // 这里可以对消息进行处理,然后返回响应给客户端
}
}
```
3. WebSocket API 使用示例(包括 onOpen()、onMessage() 和 onClose())方法:
```java
@OnOpen // 当 WebSocket 建立连接时被调用的方法,相当于前端 websocket.onopen() 中的同名函数。可以在这里初始化一些参数或者执行一些操作。返回值为 void。如果有错误发生,可以返回一个包含错误信息的字符串。这个方法也可以不定义,如果没有要执行的操作的话。但是必须定义一个与 onClose() 对应的方法。如果同时没有定义任何方法,则会抛出异常。
public void onOpen(Session session) throws Exception {
System.out.println("WebSocket opened"); // 当 WebSocket 建立连接时被调用的方法,可以在这里执行一些初始化操作。比如向客户端发送一条欢迎消息。如果出现错误,可以抛出一个异常。返回值为 void。如果没有要执行的操作的话,可以省略此方法的定义。如果同时没有定义任何方法,则会抛出异常。如果需要返回数据给客户端,则可以将返回值设置为 String 或者 Server-Sent Events (SSE)格式的数据。如果不需要返回数据,则可以省略此方法的定义。如果同时没有定义任何方法,则会抛出异常。如果想要获取 WebSocket 建立连接时的相关信息,可以查看 Session 对象提供的方法。比如 session.getId() 可以获得当前 WebSocket 建立连接时的唯一标识符。session.getOpenTime() 可以获得当前 WebSocket 建立连接的时间戳。session.isOpen() 可以获得当前 WebSocket 是否处于打开状态。session.getBasicRemote() 可以获得当前 WebSocket 建立连接时所使用的 HTTP 请求信息。session.getUserProperties() 可以获得当前 WebSocket 建立连接时的 HTTP Cookie 或者 User-Agent 等信息。session.closeAsync() 可以异步地关闭当前的 WebSocket 连接。session.close() 可以同步地关闭当前的 WebSocket 连接。session.setMaxIdleTimeout(int timeout) 可以设置当前 WebSocket 的最大空闲时间(单位为毫秒),超过这个时间之后将自动断开 WebSocket 连接。如果在这个时间内仍然有数据通过该 WebSocket 发送到客户端,那么这个方法将不会生效。session.setReadTimeout(int timeout) 可以设置当前 WebSocket 从服务器读取数据的最长时间(单位为毫秒),如果在这个时间内还没有从服务器读取到数据,那么将自动断开 WebSocket 连接并抛出一个异常。session.setIdleTimeout(int timeout) 可以设置当前 WebSocket 从服务器读取数据的最长时间(单位为毫秒),超过这个时间之后将自动断开 WebSocket 连接。session.getPingInterval() 可以获取当前 WebSocket 的 Ping-Pong 间隔时间(单位为毫秒)。这个间隔时间是指在连续发送 Ping-Pong 则需要等待多长时间才能收到回应。如果在指定的时间内没有收到回应,那么将会发送一个 Ping-Pong 应答消息。session.setPingInterval(int interval) 可以设置当前 WebSocket 的 Ping-Pong 应答间隔时间(单位为毫秒)。这个间隔时间是指在连续发送 Ping-Pong 则需要等待多长时间才能收到对方的 Ping-Pong 应答消息。如果在指定的时间内没有收到回应,那么将会自动发送一个 Ping-Pong 应答消息。session.getRequestHeaders() 可以获取当前 WebSocket 建立连接时的请求头信息。session.getResponseHeaders() 可以获取当前 WebSocket 建立连接后的响应头信息。session.getNativeRequest() 可以获取当前 WebSocket 建立连接时的原生请求对象(Java原生对象)。session.getNativeResponse() 可以获取当前 WebSocket 建立连接后的原生响应对象(Java原生对象)。session.getRequestCookies() 可以获取当前 WebSocket 建立连接时的请求 Cookie 或者 User-Agent 等信息。session.getResponseCookies() 可以获取当前 WebSocket 建立连接后的响应 Cookie 或者 User-Agent 等信息。session.isUpgraded() 可以判断当前的 WebSocket 是否已经升级成了全双工通信模式。如果已经升级成功,则返回 true;否则返回 false。if (session.isOpen()) { // 如果当前的 WebSocket 已处于打开状态 then try (PrintWriter writer = session.createPrintWriter()){ writer.write("Hello from server!
"); // 通过输出流向客户端发送一条消息 writer.flush(); // 将缓冲区的数据强制写入到客户端 if (!session.isOpen()) throw new IllegalStateException("WebSocket was closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} catch (IOException e){ throw new RuntimeException(e);// 如果在此过程中发生了其他异常 则抛出一个异常} finally{ session.close(); // 不管是否发生了异常都会执行 close() 方法关闭 WebSocket if (!session.isOpen()) throw new IllegalStateException("WebSocket was already closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} if (!session.isOpen()) throw new IllegalStateException("WebSocket was already closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} else throw new IllegalStateException("WebSocket is not open");// 如果当前的 WebSocket 已关闭 则抛出一个异常} if (session != null && session instanceof TextWebSocketSession){TextWebSocketSession textSession=(TextWebSocketSession)session;// 如果当前的 WebSocket 是文本类型的则可以直接通过 TextHttpMessageConverter 将 String 直接转换成 Message 然后通过 SessionCallbackHandler 将 Message 通过回调的方式传给客户端if (textSession!=null && session!=null && session instanceof TextWebSocketSession) textSession=(TextWebSocketSession)session;// 如果当前的 WebSocket 是文本类型的则可以直接通过 TextHttpMessageConverter 将 String 直接转换成 Message 然后通过 SessionCallbackHandler 将 Message 通过回调的方式传给客户端else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法将数据转换成 Message 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 TextWebSocketSession 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 Session 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 Session 或者无法将数据转换成 Message 则抛出一个不支持的操作异常]
以下是重构后的代码:
```java
/** * WebSocket server */
@Service
@Slf4j
public class CustomWebSocketHandler extends TextWebSocketHandler implements WebSocketHandler {
// 在线用户列表
private static final Map
// 用户标识
private static final String CLIENT_ID = "mchNo";
/** * 连接成功时候,onopen方法() */
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("成功建立websocket-spring连接");
String clientId = getClientId(session);
if (StringUtils.isNotEmpty(clientId)) {
//存储会话
clients.put(clientId, session);
session.sendMessage(new TextMessage("成功建立websocket-spring连接"));
log.info("用户标识:{},Session:{}", clientId, session.toString());
}
}
/** * 调用websocket.send()时候,会调用该方法 */
@Override
public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
log.info("收到客户端消息:{}", message.getPayload());
JSONObject msgJson = (JSONObject) message.getPayload();
//接受标识
String to = msgJson.getString("to");
//接受消息
String msg = msgJson.getString("msg");
WebSocketMessage> webSocketMessageServer = new TextMessage("server:" + message);
try {
session.sendMessage(webSocketMessageServer);
//广播到所有在线用户
if (to.equalsIgnoreCase("all")) {
sendMessageToAllUsers(new TextMessage(getClientId(session) + ":"));
}
//单独发送
else sendMessageToUser(to, new TextMessage(getClientId(session) + ":"));
} catch (IOException e) {
log.info("handleTextMessage method error:{}", e);
}
}
/** * 当连接出错时,closeSession关闭会话并移除在线用户信息,如果出现错误,则不关闭会话。 */
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {}
/** * 当连接关闭后,移除在线用户信息。 */
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {}
/** * 不支持部分消息。 */
@Override
根据当前不同的`sessionwebsocketSession`,我们可以进行以下内容的重构:
```java
// 导入相关库和类
import org.springframework.web.socket.WebSocketSession;
// 创建一个处理不同会话的方法
public void handleSession(WebSocketSession session) {
// 根据session类型执行相应的操作
if (session instanceof WebSocketSessionType1) {
// 执行针对sessionType1的操作
} else if (session instanceof WebSocketSessionType2) {
// 执行针对sessionType2的操作
} else if (session instanceof WebSocketSessionType3) {
// 执行针对sessionType3的操作
} else {
// 处理其他类型的会话
}
}
```
上述代码示例展示了如何使用Java语言,根据当前不同的`sessionwebsocketSession`来执行不同的操作。首先,我们导入相关的库和类,然后创建一个名为`handleSession`的方法,该方法接收一个`WebSocketSession`对象作为参数。接下来,我们使用条件语句判断`session`对象的具体类型,并根据不同的类型执行相应的操作。如果`session`不是任何已知的类型,我们可以在条件语句中添加适当的分支以处理其他类型的会话。请根据实际需求替换`WebSocketSessionType1`、`WebSocketSessionType2`和`WebSocketSessionType3`,并在相应的条件分支中编写具体的操作代码。
```java
/** * WebSocket握手时的拦截器
*/
@Slf4j
public class CustomWebSocketInterceptor implements HandshakeInterceptor {
/**
* 关联HeepSession和WebSocketSession,
* beforeHandShake方法中的Map参数 就是对应websocketSession里的属性
*/
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler, Map
if (request instanceof ServletServerHttpRequest) {
log.info("*****beforeHandshake******");
HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest();
HttpSession session = httpServletRequest.getSession(true);
log.info("clientId:{}", httpServletRequest.getParameter("clientId"));
if (session != null) {
map.put("sessionId", session.getId());
map.put("clientId", httpServletRequest.getParameter("clientId"));
}
}
return true;
}
/**
* afterHandshake方法在握手成功后调用
*/
@Override
public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {
log.info("******afterHandshake******");
}
}
```
以下是重构后的代码:
```java
/**
* WebSocket的配置类
*/
@Configuration
@EnableWebSocket
public class CustomWebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(customWebSocketHandler(), "/webSocketBySpring/customWebSocketHandler")
.addInterceptors(new CustomWebSocketInterceptor())
.setAllowedOrigins("*");
registry.addHandler(customWebSocketHandler(), "/sockjs/webSocketBySpring/customWebSocketHandler")
.addInterceptors(new CustomWebSocketInterceptor())
.setAllowedOrigins("*")
.withSockJS();
}
@Bean
public WebSocketHandler customWebSocketHandler() {
return new CustomWebSocketHandler();
}
}
```
使用`setAllowedOrigins("*")`的原因是为了允许所有来源的请求。
使用`withSockJS()`的原因是为了实现WebSocketSockJS和SockJS的兼容性轮询。
要将JSP代码替换为WebSocket请求路径,您需要首先了解您的应用程序的上下文。以下是一个简单的示例,说明如何将JSP代码替换为WebSocket请求路径:
1. 首先,确保您已经安装了Java WebSocket API。如果没有,请参考这里进行安装。
2. 创建一个新的Java类,例如`MyWebSocketServlet`,并继承`javax.websocket.Endpoint`类。在这个类中,您可以重写`onOpen`、`onClose`、`onError`和`onMessage`方法来处理WebSocket事件。
```java
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@ServerEndpoint("/websocket")
public class MyWebSocketServlet extends Endpoint {
@Override
public void onOpen(Session session, EndpointConfig config) {
System.out.println("WebSocket连接已打开");
}
@Override
public void onClose(Session session, CloseReason closeReason) {
System.out.println("WebSocket连接已关闭");
}
@Override
public void onError(Session session, Throwable throwable) {
System.out.println("WebSocket发生错误");
}
@Override
public void onMessage(String message, Session session) throws IOException {
System.out.println("收到客户端消息: " + message);
}
}
```
3. 在您的JSP文件中,使用JavaScript建立一个WebSocket连接。将`ws://yourdomain/yourapp/MyWebSocketServlet`替换为您的WebSocket请求路径。
```html
var websocket = new WebSocket("ws://yourdomain/yourapp/MyWebSocketServlet");
websocket.onopen = function() {
console.log("WebSocket连接已打开");
};
websocket.onclose = function() {
console.log("WebSocket连接已关闭");
};
websocket.onerror = function() {
console.log("WebSocket发生错误");
};
websocket.onmessage = function(event) {
console.log("收到服务器消息: " + event.data);
};
```
现在,当您访问此JSP页面时,浏览器将建立一个WebSocket连接到指定的请求路径,并在控制台中显示相应的消息。