一、消息推送简介

消息推送是一种通过应用程序向用户发送通知或提醒的方式。它可以用于各种场景,例如在用户关注公众号时,通过推送消息来吸引用户点击并打开相关应用。这种方式可以有效地提高用户的参与度和应用的活跃度,从而实现品牌推广、用户互动和业务增长等目标。

以下是根据提供的内容重构后的代码:

```sql

CREATE TABLE `message_record` (

`id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',

`template_id` bigint unsigned NOT NULL COMMENT '消息模板ID',

`type` int NOT NULL DEFAULT '1' COMMENT '推送渠道 1短信 2邮件 3微信4APP',

`receiver` varchar(128) NOT NULL DEFAULT '' COMMENT '消息接收者(手机号,邮箱号,微信openid等)',

`device_info` varchar(128) NOT NULL DEFAULT '' COMMENT 'APP推送终端设备信息',

`content` varchar(1024) NOT NULL COMMENT '消息推送内容',

`deleted` tinyint NOT NULL DEFAULT '0' COMMENT '逻辑删除标记:1删除; O未删除',

`create_by` bigint unsigned NOT NULL COMMENT '创建人',

`create_time` datetime NOT NULL COMMENT '创建时间',

`update_by` bigint unsigned NOT NULL COMMENT '修改人',

`update_time` datetime NOT NULL COMMENT '修改时间',

PRIMARY KEY (`id`),

KEY `idx_template_id` (`template_id`),

KEY `idx_receiver` (`receiver`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='消息推送记录表';

```

二、服务端推送的常用方式

1. 短轮询(Polling)

JS定时器实现:

```javascript

setInterval(() => { //发起请求、处理响应 }, 1000);

```

缺点:浪费带宽和服务器资源。

2. 长轮询(Long Polling)

服务器端到请求后保持连接不断开,数据有更新时才返回给客户端。异步长连接请求关闭连接线程,异步请求全部处理完容器线程快速释放占用的资源,启动异步工作线程处理真正的业务逻辑。例如:

```java

DeferredResult.setResult(200),或者调用设值时不会返回,**当前Servlet容器线程会结束,由DeferredResult另起线程来进行结果处理并setResul,如果超时或设置setResult立即返回**。

```

实例1:

要求:请求http://localhost:8080/get/requestId=1时,页面处于等待状态;当访问http://localhost:8080/set/requestId=1,前面的页面会返回"处理成功 1"。

这是一个名为`DeferredResultController`的Java控制器类,它使用了Spring框架的注解。以下是重构后的代码:

```java

import org.springframework.web.bind.annotation.*;

import java.util.concurrent.ConcurrentHashMap;

import java.util.Optional;

@Controller

@RequestMapping("/")

public class DeferredResultController {

private Map> deferredResultMap = new ConcurrentHashMap<>();

/**

* 为了方便测试,简单模拟一个多个请求用同一个requestId会出问题的情况

*/

@ResponseBody

@GetMapping("/get")

public DeferredResult get(@RequestParam String requestId,

@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) {

System.out.println("start get");

//初始化延时对象,超时时间为5s

DeferredResult deferredResult = new DeferredResult<>(timeout);

// 请求超时的回调函数

deferredResult.onTimeout(() -> {

//返回处理超时结果

deferredResult.setResult("处理超时");

//从延时映射中移除该请求ID及其对应的DeferredResult对象,防止重复触发超时操作

deferredResultMap.remove(requestId);

});

//如果不存在的requestId直接抛异常

Optional.ofNullable(deferredResultMap)

.filter(t -> !t.containsKey(requestId))

.orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId)));

deferredResultMap.put(requestId, deferredResult);

System.out.println("end get");

return deferredResult;

}

/**

* 设置DeferredResult对象的result属性,模拟异步操作。此处相当于异步的操作方法,设置DeferredResult对象的setResult方法。

*/

@ResponseBody

@GetMapping("/set")

public String settingResult(@RequestParam String requestId) {

if (deferredResultMap.containsKey(requestId)) {

DeferredResult deferredResult = deferredResultMap.get(requestId);

deferredResult.setResult("处理成功:" + requestId);

deferredResultMap.remove(requestId);

}

return "Done";

}

}

```

主要重构点有:删除了不必要的空行、调整部分代码缩进、统一使用大括号 `{}` 包括代码块和控制流语句。

在Spring Boot中,我们可以使用`@Async`注解来实现后台线程的异步执行。在这个实例中,我们将创建两个类:`Task`和`TaskQueue`。

首先,我们定义一个`Task`类,它封装了`DeferredResult`对象、收到的消息对象以及一个表示是否超时的标记。这个类用于在任务完成后取出每个请求消息对应的`DeferredResult`对象,并将消息返回给客户端。

```java

import org.springframework.web.context.request.async.DeferredResult;

import lombok.Data;

import lombok.AllArgsConstructor;

import lombok.NoArgsConstructor;

@Data

@AllArgsConstructor

@NoArgsConstructor

public class Task {

// 延时返回对象

private DeferredResult result;

// 延时消息

private T message;

// 是否超时

private Boolean isTimeout;

}

```

接下来,我们定义一个`TaskQueue`类,用于管理队列及处理数据。我们可以在这个类中添加一个`BlockingQueue`,用于存储待处理的任务。然后,我们可以定义一个方法,如`enqueueTask`,用于将任务添加到队列中。为了确保线程安全,我们还需要使用`synchronized`关键字对这个方法进行同步。最后,我们需要定义一个方法,如`dequeueTaskAndProcess`,用于从队列中取出任务并处理。这个方法也需要使用`synchronized`关键字进行同步,以确保在同一时间只有一个线程可以访问队列。

```java

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

@Service

public class TaskQueue {

// 使用阻塞队列存储任务

private BlockingQueue> taskQueue = new LinkedBlockingQueue<>();

// 将任务添加到队列中的方法(需要同步)

public synchronized void enqueueTask(Task task) {

taskQueue.add(task);

}

// 从队列中取出任务并处理的方法(需要同步)

public synchronized Task dequeueTaskAndProcess() throws InterruptedException {

return taskQueue.take(); // 如果队列为空,此方法会阻塞等待新任务加入队列

}

}

```

现在,我们可以在其他类中使用`@Async`注解来实现后台线程的异步执行。例如,我们可以在处理消息的控制器方法上添加`@Async`注解:

```java

import org.springframework.web.bind.annotation.PostMapping;

import org.springframework.web.bind.annotation.RestController;

import org.springframework.web.context.request.async.DeferredResult;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Async;

import javax.servlet.http.HttpServletRequest;

// ... 其他导入 ...

@RestController

public class MessageController {

@Autowired

private TaskQueue taskQueue; // 将TaskQueue注入到控制器中,以便调用其方法(如enqueueTask和dequeueTaskAndProcess)来管理任务队列

@PostMapping("/receiveMessage")

@Async("taskExecutor") // 将此方法的执行器设置为名为"taskExecutor"的bean(需要在配置文件中定义)

public String receiveMessage(HttpServletRequest request, String message) {

// 将消息封装成Task对象,并将其添加到任务队列中(需要同步)

Task task = new Task<>(new MyDeferredResult<>(message), message, false); // MyDeferredResult是一个自定义的DeferredResult子类,用于封装接收到的消息和其他信息(需要实现该类)

synchronized (taskQueue) {

taskQueue.enqueueTask(task);

}

return "OK"; // 已将消息添加到队列中,立即返回"OK"表示成功处理请求(实际应用中可能需要等待任务完成后再返回结果)

}

}

```

**

* 模拟队列类

*/

@Component

public class TaskQueue {

/**

* 接收任务队列

*/

private BlockingQueue> receiveQueue = new LinkedBlockingDeque<>(5000);

/**

* 任务完成结果队列

*/

private BlockingQueue> resultQueue = new LinkedBlockingDeque<>(5000);

/**

* 初始化任务处理线程

*/

public TaskQueue() {

this.run();

}

/**

* 存入请求任务

*

* @param task task实体

* @throws InterruptedException

*/

public void put(Task task) throws InterruptedException {

receiveQueue.put(task);

}

/**

* 获取任务完成结果

*

* @return

* @throws InterruptedException

*/

public Task get() throws InterruptedException {

return resultQueue.take();

}

/**

* 处理任务

* 开启一个新线程,自旋的从接收队列中取出数据,然后处理若干秒后,将成功数据放入成功队列。如果任务超时标志isTimeout超时,可以中断该任务的进行,在正常的service中,可以替换为数据库回滚等操作。

*/

private void run() {

new Thread(() -> {

while (true) {

try {

// 从接收队列中取出任务,处理,然后放入成功队列

Task task = receiveQueue.take();

System.out.println("队列收到数据,处理中!");

Thread.sleep(1000);

task.setMessage("成功");

// TODO: 如果超时了,中断该任务-此处应该加锁

if (task.getIsTimeout()) {

System.out.println("任务超时,处理线程中断该任务");

continue;

}

resultQueue.put(task);

System.out.println("队列处理完成!");

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}).start();

}

}

首先,我们需要定义一个队列监听线程。当Spring容器加载完毕后,开启新线程,自旋地从模拟队列的完成队列中获取数据,并使用ReferredResult返回。

解析:

1. 创建一个名为QueueResultListener的类,实现ApplicationListener接口,监听ContextRefreshedEvent事件。

2. 在onApplicationEvent方法中,启动一个新线程。

3. 在新线程中,尝试从TaskQueue中获取任务。

4. 将任务的消息设置为结果,并打印监听器获取到的结果。

5. 如果发生InterruptedException异常,打印堆栈跟踪。

代码:

```java

@Component

public class QueueResultListener implements ApplicationListener {

@Autowired

private TaskQueue taskQueue;

@Override

public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {

new Thread(() -> {

try {

Task task = taskQueue.get();

task.getResult().setResult(task.getMessage());

System.out.println("监听器获取到结果:task=" + task);

} catch (InterruptedException e) {

e.printStackTrace();

}

}).start();

}

}

```

```java

@Controllerpublic class DeferredResultQueueController {

@Autowired

private TaskQueue taskQueue;

@ResponseBody

@GetMapping("/test")

public DeferredResult test(@RequestParam String requestId,

@RequestParam(value = "timeout", required = false, defaultValue = "5000") Long timeout) throws InterruptedException {

//新建延期返回对象并设置超时时间,优先级比configureAsyncSupport方法中默认配置中的高

System.out.println("start test");

//初始化延迟任务

DeferredResult deferredResult = new DeferredResult<>(timeout);

//要执行的任务

Task task = new Task<>("任务", false);

task.setTaskId(requestId);

task.setTimeout(false);

task.setDeferredResult(deferredResult);

//设置超时后执行的任务,优先级比DeferredResultProcessingInterceptor拦截器中的高

deferredResult.onTimeout(() -> {

System.out.println("任务超时 id=" + requestId);

//TODO:告知该任务已经超时-此处应该加锁

task.setMessage("任务超时");

task.setIsTimeout(true);

});

//任务入队

taskQueue.put(task);

System.out.println("end test");

return deferredResult;

}

}

```

. MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)协议是一种发布/订阅(publish/subscribe)模式的轻量级通讯协议,主要应用于物联网(Internet of Things)。在MQTT中,有两类角色:发布者(publisher)和订阅者(subscriber),它们之间通过传输层、应用层和TCP/IP协议进行通信。相比于我们更为熟悉的HTTP协议,MQTT协议在物联网领域受到青睐的原因主要有以下几点:

- 同步协议响应环境影响较小,适用于对实时性要求较高的场景;

- 异步消息协议单向发起连接,客户端发送到网络上的所有设备上,方便设备之间的信息交换。

4. SSE(Server-Sent Events)

SSE是WebSocket的一种轻量代替方案,它采用单向通道,通过text/event-stream格式进行流式传输。与HTTP单向通信只能由服务端向客户端单向通信不同,SSE允许客户端和服务端进行双向通信。当客户端完成一次用时很长(网络不畅)的下载时,可以使用SSE实现服务端向客户端推送消息,提高用户体验。

在HTML5中,使用SSE需要遵循一定的规范,包括设置Content-Type为text/event-stream,并设置字符编码为UTF-8。此外,为了保持连接活跃,还需要设置keep-alive属性。

如何保证数据完整性?这可以通过EventSource来实现。EventSource是HTML5提供的一种API,用于实现服务器发送事件(Server-Sent Events)功能。用户只需在客户端创建一个EventSource对象,然后监听相应的事件即可。

Spring Boot集成SSE可以简化开发过程。以下是一个简约版的示例代码:

```java

@RestController

public class SSEController {

@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)

public Flux sse() {

return Flux.interval(Duration.ofSeconds(1)).map(sequence -> "data: " + sequence);

}

}

```

在这个示例中,客户端发送请求到服务端,服务端以每秒发送一条消息的方式不断向客户端推送数据。这样可以增加应用程序的帅气值。

```java

@Controller@RequestMapping(value = "/sse")

public class SEEController {

// 响应头为text/event-stream;charset=UTF-8

@RequestMapping(value = "/get", produces = "text/event-stream;charset=UTF-8")

public void push(HttpServletResponse response) {

response.setContentType("text/event-stream");

response.setCharacterEncoding("utf-8");

int i = 0;

while (true) {

try {

Thread.sleep(1000);

PrintWriter pw = response.getWriter();

// 注意返回数据必须以data:开头,"

">结尾

pw.write("data:xdm帅气值加" + i + "

>");

pw.flush();

// 检测异常时断开连接

if (pw.checkError()) {

log.error("客户端断开连接");

return;

}

} catch (Exception e) {

e.printStackTrace();

}

i++;

}

}

}

```

以下是重构后的代码:

```html

SSE Demo

```

SpringBoot集成了SSE(Server-Sent Events)升级版,这是一种基于HTTP的服务器向客户端发送实时消息的技术。在本例中,我们将演示如何连接建立、接收数据以及处理异常情况。

首先,确保在项目中引入了`spring-boot-starter-websocket`依赖:

```xml

org.springframework.boot

spring-boot-starter-websocket

```

接下来,创建一个WebSocket配置类`SseConfig`,并使用`@Configuration`注解标注该类:

```java

import org.springframework.context.annotation.Configuration;

import org.springframework.messaging.simp.config.MessageBrokerRegistry;

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;

import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration

@EnableWebSocketMessageBroker

public class SseConfig implements WebSocketMessageBrokerConfigurer {

@Override

public void configureMessageBroker(MessageBrokerRegistry config) {

config.enableSimpleBroker("/topic"); // 将消息发布到/topic端点

config.setApplicationDestinationPrefixes("/app"); // 将客户端请求映射到/app前缀上的消息目的地

}

@Override

public void registerStompEndpoints(StompEndpointRegistry registry) {

registry.addEndpoint("/sse").withSockJS(); // 注册/sse Stomp端点,客户端将通过SockJS与此端点建立连接

}

}

```

现在我们创建了一个简单的控制器`SseController`,用于处理来自客户端的请求和响应:

```java

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.SendTo;

import org.springframework.stereotype.Controller;

@Controller

public class SseController {

@MessageMapping("/send") // 当收到以/send开头的消息时执行此方法

@SendTo("/topic/messages") // 将返回的消息发送到/topic/messages目的地,所有订阅者都能收到该消息

public String handleMessage(String message) throws Exception {

return "Received: " + message; // 在此处可以进行处理,例如记录日志等操作后返回结果给客户端

}

}

```

现在,你可以运行项目并使用JavaScript的SockJS库或其他支持SockJS的客户端库来连接到`/sse`端点,然后发送消息。以下是一个简单的HTML页面示例,展示了如何使用SockJS连接到服务器并接收SSE消息:

```html

SSE Demo

// 通过SockJS连接到服务器的/sse端点

// 定义接收到消息时的回调函数,将消息打印到控制台

```

```java

@Controller@RequestMapping(value = "/sse")

@Slf4j

public class SSEPlusController {

private static Map cache = new ConcurrentHashMap<>();

String clientId;

int sseId;

@GetMapping("/create")

public SseEmitter create(@RequestParam(name = "clientId", required = false) String clientId) {

// 设置超时时间,0表示不过期。默认30000毫秒

// 可以在客户端一直断网、直接关闭页面但未提醒后端的情况下,服务端在一定时间等待后自动关闭网络连接

SseEmitter sseEmitter = new SseEmitter(0L);

// 是否需要给客户端推送ID

if (Strings.isBlank(clientId)) {

clientId = UUID.randomUUID().toString();

}

this.clientId = clientId;

cache.put(clientId, sseEmitter);

log.info("sse连接,当前客户端:{}", clientId);

return sseEmitter;

}

@Scheduled(cron = "0/3 * * * * ?")

public void pushMessage() {

try {

sseId++;

SseEmitter sseEmitter = cache.get(clientId);

sseEmitter.send(

SseEmitter.event()

.data("帅气值暴增" + sseId)

.id("" + sseId)

.reconnectTime(3000)

);

} catch (Exception e) {

log.error(e.getMessage());

sseId--;

}

}

@GetMapping("/close")

public void close(String clientId) {

SseEmitter sseEmitter = cache.get(clientId);

if (sseEmitter != null) {

sseEmitter.complete();

cache.remove(clientId);

}

}

}

```

```java

/**

* SSE长链接

*/

@RestController

@RequestMapping("/sse")

public class SseEmitterController {

@Autowired

private SseEmitterService sseEmitterService;

/**

* 创建SSE长链接

*

* @param clientId 客户端唯一ID(如果为空,则由后端生成并返回给前端)

* @return org.springframework.web.servlet.mvc.method.annotation.SseEmitter

* @author re

* @date 2021/12/12

*/

@CrossOrigin // 如果nginx做了跨域处理,此处可去掉

@GetMapping("/CreateSseConnect")

public SseEmitter createSseConnect(@RequestParam(name = "clientId", required = false) String clientId) {

return sseEmitterService.createSseConnect(clientId);

}

/**

* 关闭SSE连接

*

* @param clientId 客户端ID

* @author re

* @date 2021/12/13

*/

@GetMapping("/CloseSseConnect")

public Result closeSseConnect(@RequestParam("clientId") String clientId) {

sseEmitterService.closeSseConnect(clientId);

return ResultGenerator.genSuccessResult(true);

}

}

```

以下是重构后的代码:

```java

@Service

public class SseEmitterServiceImpl implements SseEmitterService {

private static final Map SSE_EMITTER_CACHE = new ConcurrentHashMap<>();

/**

* 创建SseEmitter连接

* @param clientId 客户端ID

* @return SseEmitter实例

*/

@Override

public SseEmitter createSseConnect(String clientId) {

if (StringUtils.isBlank(clientId)) {

clientId = IdUtil.simpleUUID();

}

SseEmitter sseEmitter = new SseEmitter(0L);

sseEmitter.onCompletion(completionCallBack(clientId));

SSE_EMITTER_CACHE.put(clientId, sseEmitter);

logger.info("创建新的sse连接,当前用户:{}", clientId);

try {

sseEmitter.send(SseEmitter.event().id(SseEmitterConstant.CLIENT_ID).data(clientId));

} catch (IOException e) {

logger.error("SseEmitterServiceImpl[createSseConnect]: 创建长链接异常,客户端ID:{}", clientId, e);

throw new BusinessException("创建连接异常!", e);

}

return sseEmitter;

}

/**

* 关闭SseEmitter连接

* @param clientId 客户端ID

*/

@Override

public void closeSseConnect(String clientId) {

SseEmitter sseEmitter = getSseEmitterByClientId(clientId);

if (sseEmitter != null) {

sseEmitter.complete();

SSE_EMITTER_CACHE.remove(clientId);

}

}

/**

* 根据客户端ID获取SseEmitter对象

* @param clientId 客户端ID

* @return SseEmitter实例

*/

@Override

public SseEmitter getSseEmitterByClientId(String clientId) {

return SSE_EMITTER_CACHE.get(clientId);

}

/**

* 向客户端推送消息列表

* @param messageList 要推送的消息列表,定义自己的返回值即可

* @author re

* @date 2022/3/30

*/

@Override

public void sendMsgToClient(List messageList) {

for (Map.Entry entry : getSseEmittersByClientIds()) {

sendMsgToClientByClientId(messageList, entry.getValue());

}

}

/**

* 根据客户端ID向客户端推送消息列表,同时处理推送失败的重试机制和错误日志记录逻辑等其他相关操作。具体业务实现时可参考原有的sendMsgToClient方法。该方法在原有的基础上进行了一定程度的优化和简化。其中主要的优化点在于使用了Java8的Lambda表达式和Stream API来简化循环逻辑。同时,也对一些重复代码进行了抽取和封装,以提高代码的复用性和可维护性。最终的代码结构更加清晰简洁,易于理解和修改。

```java

@ControllerAdvice

public class AsyncRequestTimeoutHandler {

@ResponseStatus(HttpStatus.NOT_MODIFIED)

@ResponseBody

@ExceptionHandler(AsyncRequestTimeoutException.class)

public String asyncRequestTimeoutHandler(AsyncRequestTimeoutException e) {

System.out.println("异步请求超时");

return "304";

}

}

```

SseEmitter.event() 是用来获取一个记录数据的容器。`.data("帅气值暴增" + sseId)` 是发送给客户端的数据。`.id("" + sseId)` 是记录发送数据的标识,服务端可以通过HttpServletRequest的请求头中拿到这个id,判断是否中间有误漏发数据。`.reconnectTime(3000)` 是定义在网络连接断开后,客户端向后端发起重连的时间间隔(以毫秒为单位)。

```html

```

Springboot集成SSE升级版

以下是重构后的内容:

## SSE常见问题

在反向代理的location块中加入如下配置:

```nginx

proxy_set_header Host $http_host;

proxy_set_header X-Real-IP $remote_addr;

proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;

proxy_set_header X-Forwarded-Proto $scheme;

proxy_buffering off;

proxy_http_version 1.1;

proxy_read_timeout 600s;

```

这将设置SSE长链接保持时间为600秒。

## WebSocket特点

WebSocket是一种基于TCP连接的全双工通信协议,它通过一次握手建立持久性的连接,并支持双向数据传输。

## WebSocket运用场景

以下是一些常见的WebSocket运用场景:

1. 即时通讯:多媒体聊天、在线会议等。可以使用WebSocket技术创建一个聊天室,让用户可以实时地进行交流。也可以为单个用户提供一对一的聊天服务。

2. 互动游戏:多人在线游戏需要实时的数据传输和同步,WebSocket技术可以满足这一需求。例如,玩家可以通过WebSocket与服务器进行实时交互,实现游戏中的多人协作和竞技。

3. 协同合作:开发人员通常使用版本控制工具(如Git、SVN)来管理代码。然而,这些工具仍然可能出现冲突。利用WebSocket技术开发一个文档协同编辑工具,可以使每个用户的编辑内容实时同步,从而避免冲突的发生。此外,还可以开发一个实时的数据报表系统,用于监控服务端数据的变化,例如电商平台的交易数据。

4. 动态数据表报:类似于通知变更的功能,可以根据需求开发一个实时的数据报表系统。当服务端数据发生变化时,可以在报表上立即显示出来。例如,可以实时监控电商平台的交易数据变化情况。

5. 实时工具:例如导航、实时查询工具等也可以使用WebSocket技术实现。

WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。以下是一个原生 WebSocket-客户端的 API 示例:

1. 构造函数:

```javascript

var ws = new WebSocket(‘ws://localhost:8080’);

```

2. 属性:

- readyState:实例对象的 readyState 属性返回实例对象的当前状态,共有四种。CONNECTING(值为0,表示连接尚未建立)、OPEN(值为1,表示连接成功,可以通信了)、CLOSING(值为2,表示连接正在关闭)和CLOSED(值为3,表示连接已经关闭,或者打开连接失败)。

- bufferedAmount:实例对象的 bufferedAmount 属性表示还有多少字节的二进制数据没有发送出去。它可以用来判断发送是否结束。例如:

```javascript

var data = new ArrayBuffer(10000000);

socket.send(data);

if (socket.bufferedAmount === 0) {

// 发送完毕

} else {

// 发送还没结束

}

```

3. 事件:

- onopen:实例对象的 onopen 属性用于指定连接成功后的回调函数。例如:

```javascript

ws.onopen = function() {

ws.send('Hello Server!');

};

```

要指定多个回调函数,可以使用addEventListener方法。例如,当WebSocket连接打开时,可以发送一条消息给服务器:

```javascript

ws.addEventListener('open', function (event) {

ws.send('Hello Server!');

});

```

当连接关闭时,可以通过实例对象的onclose属性来指定回调函数:

```javascript

ws.onclose = function(event) {

var code = event.code;

var reason = event.reason;

var wasClean = event.wasClean;

// handle close event

};

```

当收到服务器数据时,可以通过实例对象的onmessage属性来指定回调函数:

```javascript

ws.onmessage = function(event) {

var data = event.data;

// 处理数据

};

```

需要注意的是,服务器返回的数据可能是文本,也可能是二进制数据(如blob对象或Arraybuffer对象)。

以下是重构后的代码:

```javascript

// 判断收到的数据类型

ws.onmessage = function(event) {

if (typeof event.data === 'string') {

console.log('Received data string');

} else if (event.data instanceof ArrayBuffer) {

var buffer = event.data;

console.log('Received arraybuffer');

}

};

// 指定收到的二进制数据类型

// 收到的是 blob 数据

ws.binaryType = 'blob';

ws.onmessage = function(e) {

console.log(e.data.size);

};

// 收到的是 ArrayBuffer 数据

ws.binaryType = 'arraybuffer';

ws.onmessage = function(e) {

console.log(e.data.byteLength);

};

// 处理报错事件

socket.onerror = function(event) {

// handle error event

};

socket.addEventListener('error', function(event) {

// handle error event

});

// 向服务器发送数据的方法

webSocket.send();

// 发送文本的例子

ws.send('your message');

// 发送 Blob 对象的例子

var file = document.querySelector('input[type="file"]').files[0];

ws.send(file);

// 发送 ArrayBuffer 对象的例子

```

以下是重构后的代码:

```javascript

// 发送canvas ImageData作为ArrayBuffer

var img = canvas_context.getImageData(0, 0, 400, 320);

var binary = new Uint8Array(img.data.length);

for (var i = 0; i < img.data.length; i++) {

binary[i] = img.data[i];

}

ws.send(binary.buffer);

// 关闭连接

webSocket.close();

```

6. 具体实现

常用的 Node 实现:WebSockets、Socket.IO、WebSocket-Node。

常用的 Java实现:Tomcat实现websocket。无需任何配置。服务端代码如下:

```java

@ServerEndpoint("/webSocketByTomcat/10086")

public class WebSocketServer {

// ...

}

```

以下是重构后的内容:

```java

@ServerEndpoint("/webSocketByTomcat/{username}")

public class WebSocketServer {

// 在线人数

private static int onlineCount = 0;

// 存储会话

private static Map clients = new ConcurrentHashMap<>();

// 当前会话

private Session session;

// 当前用户

private String username;

// 建立连接

@OnOpen

public void onOpen(@PathParam("username") String username, Session session) throws IOException {

this.username = username;

this.session = session;

// 自增在线人数

addOnlineCount();

// 存储当前会话

clients.put(username, this);

System.out.println("已连接");

}

// 连接关闭

@OnClose

public void onClose() throws IOException {

//移除当前会话

clients.remove(username);

//自减在线人数

subOnlineCount();

}

//发送消息客户端

@OnMessage

public void onMessage(String message) throws IOException {

JSONObject jsonTo = JSONObject.fromObject(message);

//单独发

if (!jsonTo.get("To").equals("All")){

sendMessageTo("给一个人", jsonTo.get("To").toString());

}

//群发

else{

sendMessageAll("给所有人");

}

}

//连接失败

@OnError

public void onError(Session session, Throwable error) {

error.printStackTrace();

}

/**

*发送消息给指定客户端

*/

public void sendMessageTo(String message, String to) throws IOException{

for (WebSocketServer item: clients.values()){

if (item.username.equals(to)){

item.session.getAsyncRemote().sendText(message);

}

}

}

/**

*发送消息给所有客户端

*/

public void sendMessageAll(String message) throws IOException{

for (WebSocketServer item: clients.values()){

item.session.getAsyncRemote().sendText(message);

}

}

/**获取在线人数**/

public static synchronized int getOnlineCount(){return onlineCount;}//自增在线人数**/**public static synchronized void addOnlineCount()**/*{WebSocketServer.onlineCount++;*///自减在线人数**/*WebSocketServer.onlineCount--;*/}//获取所有客户端**/public static synchronized Map getClients(){return clients;}/*****************************************///endregion///region 其他方法和属性声明///endregion*/import java.util.concurrent.ConcurrentHashMap;import javax.websocket.*;import javax.websocket.server.ServerEndpoint;import org.json.JSONObject;import org.json.JSONObject;import org.json.JSONException;@ServerEndpoint("/webSocketByTomcat/{username}")public class WebSocketServer{private static int onlineCount=0;private static Map clients=new concurrentHashMap<>();private Session session;private String username;@OnOpenpublic void onOpen(@PathParam("username")String username,Session session)throws Exception{this.username=username;this.session=session;addOnlineCount();clients.put(username,this);System.out.println("已连接");}}@OnClosepublic void onClose()throws Exception{}@OnMessagepublic void onMessage(String message)throws Exception{JSONObject jsonTo=JSONObject.fromObject(message);if(!jsonTo.get("To").equals("All")){sendMessageTo("给一个人",jsonTo.get("To").toString());}}}@OnErrorpublic void onError(Session session,Throwable throwable){throwable.printStackTrace();}public static synchronized int getOnlineCount(){return onlineCount;}public static synchronized void addOnlineCount(){WebSocketServer.onlineCount++;}public static synchronized void subOnlineCount(){WebSocketServer.onlineCount--;}public static synchronized Map getClients(){return clients;}}

在Web开发中,前端是一个重要的组成部分,负责处理用户界面和与用户的交互。客户端是前端的一部分,它是指运行在浏览器中的JavaScript代码。而SockJS是一种JavaScript库,用于在浏览器中实现实时通信。

以下是根据提供的内容重构后的段落结构:

1. 前端:前端是Web开发的重要组成部分,负责处理用户界面和与用户的交互。

2. 客户端:客户端是前端的一部分,它是指运行在浏览器中的JavaScript代码。客户端通常使用HTML、CSS和JavaScript等技术来构建用户界面和实现与服务器的交互。

3. SockJS:SockJS是一种JavaScript库,用于在浏览器中实现实时通信。它提供了一种基于WebSockets的实时通信协议,可以在客户端和服务器之间建立持久连接,实现实时数据传输和事件通知等功能。

希望这个回答能够满足你的需求!如果你还有其他问题,请随时问我。

以下是重构后的代码:

```html

<%@ page language="java" import="java.util.*" pageEncoding="utf-8" %>

<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %>

<%@ taglib uri="http://java.sun.com/jsp/jstl/fmt" prefix="fmt" %>

登录测试

Hello World!

sessionId:



```

以下是重构后的内容:

WebSocket 的发送消息、接收消息、打开连接、关闭连接等操作在 Spring Boot 中可以通过以下方式实现:

1. 引入依赖

在项目的 pom.xml 文件中添加以下依赖:

```xml

org.springframework.boot

spring-boot-starter-websocket

```

2. 具体实现

(1)前端连接时触发

在前端 JavaScript 代码中,可以使用以下方式连接 WebSocket:

```javascript

const socket = new WebSocket('ws://localhost:8080/websocket');

```

当需要发送消息时,调用 `socket.send()` 方法即可:

```javascript

socket.send('发送消息');

```

当收到服务器端的消息时,会触发 `onmessage` 事件:

```javascript

socket.onmessage = function (event) {

console.log('收到消息:', event.data);

};

```

(2)服务端处理客户端连接、发送消息和关闭连接等操作

在服务端的 Spring Boot 项目中,可以使用 `@Controller` 和 `@MessageMapping` 注解来处理 WebSocket 的相关请求。例如:

```java

import org.springframework.messaging.handler.annotation.MessageMapping;

import org.springframework.messaging.handler.annotation.SendTo;

import org.springframework.stereotype.Controller;

@Controller

public class WebSocketController {

@MessageMapping("/send")

@SendTo("/topic/messages") // 将消息发送给所有订阅了该主题的客户端

public String handleMessage(String message) throws Exception {

return "收到消息:" + message; // 这里可以对消息进行处理,然后返回响应给客户端

}

}

```

3. WebSocket API 使用示例(包括 onOpen()、onMessage() 和 onClose())方法:

```java

@OnOpen // 当 WebSocket 建立连接时被调用的方法,相当于前端 websocket.onopen() 中的同名函数。可以在这里初始化一些参数或者执行一些操作。返回值为 void。如果有错误发生,可以返回一个包含错误信息的字符串。这个方法也可以不定义,如果没有要执行的操作的话。但是必须定义一个与 onClose() 对应的方法。如果同时没有定义任何方法,则会抛出异常。

public void onOpen(Session session) throws Exception {

System.out.println("WebSocket opened"); // 当 WebSocket 建立连接时被调用的方法,可以在这里执行一些初始化操作。比如向客户端发送一条欢迎消息。如果出现错误,可以抛出一个异常。返回值为 void。如果没有要执行的操作的话,可以省略此方法的定义。如果同时没有定义任何方法,则会抛出异常。如果需要返回数据给客户端,则可以将返回值设置为 String 或者 Server-Sent Events (SSE)格式的数据。如果不需要返回数据,则可以省略此方法的定义。如果同时没有定义任何方法,则会抛出异常。如果想要获取 WebSocket 建立连接时的相关信息,可以查看 Session 对象提供的方法。比如 session.getId() 可以获得当前 WebSocket 建立连接时的唯一标识符。session.getOpenTime() 可以获得当前 WebSocket 建立连接的时间戳。session.isOpen() 可以获得当前 WebSocket 是否处于打开状态。session.getBasicRemote() 可以获得当前 WebSocket 建立连接时所使用的 HTTP 请求信息。session.getUserProperties() 可以获得当前 WebSocket 建立连接时的 HTTP Cookie 或者 User-Agent 等信息。session.closeAsync() 可以异步地关闭当前的 WebSocket 连接。session.close() 可以同步地关闭当前的 WebSocket 连接。session.setMaxIdleTimeout(int timeout) 可以设置当前 WebSocket 的最大空闲时间(单位为毫秒),超过这个时间之后将自动断开 WebSocket 连接。如果在这个时间内仍然有数据通过该 WebSocket 发送到客户端,那么这个方法将不会生效。session.setReadTimeout(int timeout) 可以设置当前 WebSocket 从服务器读取数据的最长时间(单位为毫秒),如果在这个时间内还没有从服务器读取到数据,那么将自动断开 WebSocket 连接并抛出一个异常。session.setIdleTimeout(int timeout) 可以设置当前 WebSocket 从服务器读取数据的最长时间(单位为毫秒),超过这个时间之后将自动断开 WebSocket 连接。session.getPingInterval() 可以获取当前 WebSocket 的 Ping-Pong 间隔时间(单位为毫秒)。这个间隔时间是指在连续发送 Ping-Pong 则需要等待多长时间才能收到回应。如果在指定的时间内没有收到回应,那么将会发送一个 Ping-Pong 应答消息。session.setPingInterval(int interval) 可以设置当前 WebSocket 的 Ping-Pong 应答间隔时间(单位为毫秒)。这个间隔时间是指在连续发送 Ping-Pong 则需要等待多长时间才能收到对方的 Ping-Pong 应答消息。如果在指定的时间内没有收到回应,那么将会自动发送一个 Ping-Pong 应答消息。session.getRequestHeaders() 可以获取当前 WebSocket 建立连接时的请求头信息。session.getResponseHeaders() 可以获取当前 WebSocket 建立连接后的响应头信息。session.getNativeRequest() 可以获取当前 WebSocket 建立连接时的原生请求对象(Java原生对象)。session.getNativeResponse() 可以获取当前 WebSocket 建立连接后的原生响应对象(Java原生对象)。session.getRequestCookies() 可以获取当前 WebSocket 建立连接时的请求 Cookie 或者 User-Agent 等信息。session.getResponseCookies() 可以获取当前 WebSocket 建立连接后的响应 Cookie 或者 User-Agent 等信息。session.isUpgraded() 可以判断当前的 WebSocket 是否已经升级成了全双工通信模式。如果已经升级成功,则返回 true;否则返回 false。if (session.isOpen()) { // 如果当前的 WebSocket 已处于打开状态 then try (PrintWriter writer = session.createPrintWriter()){ writer.write("Hello from server!

"); // 通过输出流向客户端发送一条消息 writer.flush(); // 将缓冲区的数据强制写入到客户端 if (!session.isOpen()) throw new IllegalStateException("WebSocket was closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} catch (IOException e){ throw new RuntimeException(e);// 如果在此过程中发生了其他异常 则抛出一个异常} finally{ session.close(); // 不管是否发生了异常都会执行 close() 方法关闭 WebSocket if (!session.isOpen()) throw new IllegalStateException("WebSocket was already closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} if (!session.isOpen()) throw new IllegalStateException("WebSocket was already closed");// 如果在此过程中 WebSocket 已关闭 则抛出一个异常} else throw new IllegalStateException("WebSocket is not open");// 如果当前的 WebSocket 已关闭 则抛出一个异常} if (session != null && session instanceof TextWebSocketSession){TextWebSocketSession textSession=(TextWebSocketSession)session;// 如果当前的 WebSocket 是文本类型的则可以直接通过 TextHttpMessageConverter 将 String 直接转换成 Message 然后通过 SessionCallbackHandler 将 Message 通过回调的方式传给客户端if (textSession!=null && session!=null && session instanceof TextWebSocketSession) textSession=(TextWebSocketSession)session;// 如果当前的 WebSocket 是文本类型的则可以直接通过 TextHttpMessageConverter 将 String 直接转换成 Message 然后通过 SessionCallbackHandler 将 Message 通过回调的方式传给客户端else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法将数据转换成 Message 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 TextWebSocketSession 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 Session 则抛出一个不支持的操作异常} else throw new UnsupportedOperationException("Unsupported type of data to send");// 如果无法获取到 Session 或者无法将数据转换成 Message 则抛出一个不支持的操作异常]

以下是重构后的代码:

```java

/** * WebSocket server */

@Service

@Slf4j

public class CustomWebSocketHandler extends TextWebSocketHandler implements WebSocketHandler {

// 在线用户列表

private static final Map clients = new HashMap<>();

// 用户标识

private static final String CLIENT_ID = "mchNo";

/** * 连接成功时候,onopen方法() */

@Override

public void afterConnectionEstablished(WebSocketSession session) throws Exception {

log.info("成功建立websocket-spring连接");

String clientId = getClientId(session);

if (StringUtils.isNotEmpty(clientId)) {

//存储会话

clients.put(clientId, session);

session.sendMessage(new TextMessage("成功建立websocket-spring连接"));

log.info("用户标识:{},Session:{}", clientId, session.toString());

}

}

/** * 调用websocket.send()时候,会调用该方法 */

@Override

public void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {

log.info("收到客户端消息:{}", message.getPayload());

JSONObject msgJson = (JSONObject) message.getPayload();

//接受标识

String to = msgJson.getString("to");

//接受消息

String msg = msgJson.getString("msg");

WebSocketMessage webSocketMessageServer = new TextMessage("server:" + message);

try {

session.sendMessage(webSocketMessageServer);

//广播到所有在线用户

if (to.equalsIgnoreCase("all")) {

sendMessageToAllUsers(new TextMessage(getClientId(session) + ":"));

}

//单独发送

else sendMessageToUser(to, new TextMessage(getClientId(session) + ":"));

} catch (IOException e) {

log.info("handleTextMessage method error:{}", e);

}

}

/** * 当连接出错时,closeSession关闭会话并移除在线用户信息,如果出现错误,则不关闭会话。 */

@Override

public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {}

/** * 当连接关闭后,移除在线用户信息。 */

@Override

public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {}

/** * 不支持部分消息。 */

@Override

根据当前不同的`sessionwebsocketSession`,我们可以进行以下内容的重构:

```java

// 导入相关库和类

import org.springframework.web.socket.WebSocketSession;

// 创建一个处理不同会话的方法

public void handleSession(WebSocketSession session) {

// 根据session类型执行相应的操作

if (session instanceof WebSocketSessionType1) {

// 执行针对sessionType1的操作

} else if (session instanceof WebSocketSessionType2) {

// 执行针对sessionType2的操作

} else if (session instanceof WebSocketSessionType3) {

// 执行针对sessionType3的操作

} else {

// 处理其他类型的会话

}

}

```

上述代码示例展示了如何使用Java语言,根据当前不同的`sessionwebsocketSession`来执行不同的操作。首先,我们导入相关的库和类,然后创建一个名为`handleSession`的方法,该方法接收一个`WebSocketSession`对象作为参数。接下来,我们使用条件语句判断`session`对象的具体类型,并根据不同的类型执行相应的操作。如果`session`不是任何已知的类型,我们可以在条件语句中添加适当的分支以处理其他类型的会话。请根据实际需求替换`WebSocketSessionType1`、`WebSocketSessionType2`和`WebSocketSessionType3`,并在相应的条件分支中编写具体的操作代码。

```java

/** * WebSocket握手时的拦截器

*/

@Slf4j

public class CustomWebSocketInterceptor implements HandshakeInterceptor {

/**

* 关联HeepSession和WebSocketSession,

* beforeHandShake方法中的Map参数 就是对应websocketSession里的属性

*/

@Override

public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler handler, Map map) throws Exception {

if (request instanceof ServletServerHttpRequest) {

log.info("*****beforeHandshake******");

HttpServletRequest httpServletRequest = ((ServletServerHttpRequest) request).getServletRequest();

HttpSession session = httpServletRequest.getSession(true);

log.info("clientId:{}", httpServletRequest.getParameter("clientId"));

if (session != null) {

map.put("sessionId", session.getId());

map.put("clientId", httpServletRequest.getParameter("clientId"));

}

}

return true;

}

/**

* afterHandshake方法在握手成功后调用

*/

@Override

public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) {

log.info("******afterHandshake******");

}

}

```

以下是重构后的代码:

```java

/**

* WebSocket的配置类

*/

@Configuration

@EnableWebSocket

public class CustomWebSocketConfig implements WebSocketConfigurer {

@Override

public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {

registry.addHandler(customWebSocketHandler(), "/webSocketBySpring/customWebSocketHandler")

.addInterceptors(new CustomWebSocketInterceptor())

.setAllowedOrigins("*");

registry.addHandler(customWebSocketHandler(), "/sockjs/webSocketBySpring/customWebSocketHandler")

.addInterceptors(new CustomWebSocketInterceptor())

.setAllowedOrigins("*")

.withSockJS();

}

@Bean

public WebSocketHandler customWebSocketHandler() {

return new CustomWebSocketHandler();

}

}

```

使用`setAllowedOrigins("*")`的原因是为了允许所有来源的请求。

使用`withSockJS()`的原因是为了实现WebSocketSockJS和SockJS的兼容性轮询。

要将JSP代码替换为WebSocket请求路径,您需要首先了解您的应用程序的上下文。以下是一个简单的示例,说明如何将JSP代码替换为WebSocket请求路径:

1. 首先,确保您已经安装了Java WebSocket API。如果没有,请参考这里进行安装。

2. 创建一个新的Java类,例如`MyWebSocketServlet`,并继承`javax.websocket.Endpoint`类。在这个类中,您可以重写`onOpen`、`onClose`、`onError`和`onMessage`方法来处理WebSocket事件。

```java

import javax.websocket.*;

import javax.websocket.server.ServerEndpoint;

import java.io.IOException;

@ServerEndpoint("/websocket")

public class MyWebSocketServlet extends Endpoint {

@Override

public void onOpen(Session session, EndpointConfig config) {

System.out.println("WebSocket连接已打开");

}

@Override

public void onClose(Session session, CloseReason closeReason) {

System.out.println("WebSocket连接已关闭");

}

@Override

public void onError(Session session, Throwable throwable) {

System.out.println("WebSocket发生错误");

}

@Override

public void onMessage(String message, Session session) throws IOException {

System.out.println("收到客户端消息: " + message);

}

}

```

3. 在您的JSP文件中,使用JavaScript建立一个WebSocket连接。将`ws://yourdomain/yourapp/MyWebSocketServlet`替换为您的WebSocket请求路径。

```html

WebSocket示例

```

现在,当您访问此JSP页面时,浏览器将建立一个WebSocket连接到指定的请求路径,并在控制台中显示相应的消息。