runoops.com

nodejs midwayjs 使用 Server-Sent Events(SSE)

一、前端 EventSource 实现:

在客户端,我们使用 EventSource 来接收来自服务器的数据:

// client.js
const eventSource = new EventSource('http://localhost:3000');

eventSource.onmessage = function(event) {
    console.log('Received message:', event.data);
};

eventSource.onerror = function(event) {
    console.error('EventSource error:', event);
};

二、后端 发送一些消息给客户端实现:

// server.js
const http = require('http');

http.createServer((req, res) => {
    res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'Access-Control-Allow-Origin': '*',
    });

    setInterval(() => {
        const message = `data: Server time: ${new Date().toLocaleTimeString()}\n\n`;
        res.write(message);
    }, 3000);

}).listen(3000, () => {
    console.log('Server running on port 3000');
});

运行后,客户端将每 3 秒接收到一次来自服务器的消息。

实例:使用 nodejs midwayjs 实现,处理产品信息,定时返回进度条(部分代码):

  /**
   * sse 更新产品,返回进度条
   * @param queue 
   * @returns 
   */
  @Get('/syncUpdateProductList')
  async syncUpdateProductList(@Query() queryData: any) {
    const defaultQueue = 1;

    // 设置响应头
    this.ctx.set('Content-Type', 'text/event-stream');
    this.ctx.set('Cache-Control', 'no-cache');
    this.ctx.set('Connection', 'keep-alive');
    this.ctx.set('Access-Control-Allow-Origin', '*');

    // SSE (Server-Sent Events)响应
    const res = new HttpServerResponse(this.ctx).sse();

    try {
      res.send({
        data: `${JSON.stringify({ progress:0, message: 'OK',  timestamp: new Date() })}`
      });
      const jobData:any = await this.productSyncService.syncUpdateProductList({...queryData, user: this.ctx.user, queue:defaultQueue });
      const { jobId } = jobData;
      console.log('--sse-jobId-',jobId);

      // 每隔 5 秒发送一个事件
      let upprogress:any = 0;

      let count = 0;
      const intervalId = setInterval(async () => {
        upprogress = await this.productSyncService.getUpdateProgress({ user: this.ctx.user, jobId });
        console.log('--sse--setInterval-upprogress-jobId=' + jobId + '--', upprogress);
        const state = _.get(upprogress, 'state');
        const progress = _.get(upprogress, 'progress');
        
        if(state == 'failed'){
          res.send({
            data: `${JSON.stringify({progress,jobId,message: 'failed',  timestamp: new Date()})}`
          });
          // 清除定时器
          clearInterval(intervalId);
            res.sendEnd({
              data: 'end'
            });
        }
        else if (state == 'completed' || parseInt(progress) == 100) {
          res.send({
            data: `${JSON.stringify({ progress, jobId, message: 'OK', timestamp: new Date() })}`
          });
          // 清除定时器
          clearInterval(intervalId);
            res.sendEnd({
              data: 'end'
            });
        }else{
          res.send({
            data: `${JSON.stringify({ progress, jobId, message: 'OK', timestamp: new Date() })}`
          });
        }
        
        count++;
      }, 5000);

      // 当客户端断开连接时清除定时器
      this.ctx.req.on('close', () => {
        clearInterval(intervalId);
        res.sendEnd({
          data: 'end'
        });
      });



      // //for 实现, 限制最大数
      // const loopCount = 720; //  720*5/3600 = 1h
      // for (let i = 0; i < loopCount; i++) {
      //   progress= await this.productSyncService.getUpdateProgress({user: this.ctx.user, jobId});
      //   console.log('--sse--forloop-progress-jobId='+jobId+'--',progress);
      //   res.send({
      //     data: `${JSON.stringify({...progress,jobId,message: 'OK',  timestamp: new Date()})}`
      //   });
      //   // const progressVal  = _.get(progress,'progress',0);
      //   // if(progressVal >= 100){
      //   //   res.sendEnd({
      //   //     data: 'end'
      //   //   });
      //   // }else{
      //   //   await sleep(5000);
      //   // }

      //   await sleep(5000);
      // }

    } catch (e) {
      console.log('update error==========',e.message)
      res.sendError(new Error('Update error!'));
    }

    return res;

    //  // 每隔 2 秒发送一个事件
    //  const intervalId = setInterval(() => {
    //   res.send({
    //     data: `${JSON.stringify({ message: 'This is a message from the server aaaa',  timestamp: new Date() })}\n\n`
    //   });
    // }, 2000);

    // // 当客户端断开连接时清除定时器
    // this.ctx.req.on('close', () => {
    //   clearInterval(intervalId);
    //   res.sendEnd({
    //     data: 'end'
    //   });
    // });
    // return res;
  }

建立SSE连接的时候携带token

三、客户端携带Token

如果想要在建立SSE连接的时候携带token,需要用到 event-source-polyfill

EventSourcePolyfill 是 EventSource 封装好了的一个方法,可以直接配置请求头。

npm install event-source-polyfill --save
...
eventSource = new EventSourcePolyfill(url, {
            heartbeatTimeout: 3 * 60 * 1000,
            headers: {
                Authorization: 'Bearer ' + getToken(), //token 信息
                Accept: 'text/event-stream'
            },
            withCredentials: true,
        })
...

四、总结

本文我们深入介绍了如何在前端使用 EventSource 接收服务器的实时推送消息,如何通过 URL 参数传递数据以及配置请求头,最后还介绍了如何使用 EventSourcePolyfill 使得 EventSource 能在旧版浏览器中正常工作。EventSource 是一种非常轻量级的实现实时数据更新的技术,适用于需要服务器单向推送消息的场景,特别是在实时数据展示和动态更新方面有广泛的应用。

0 笔记

分享笔记

Inline Feedbacks
View all notes