有效封装WebSocket,让你的代码更简洁!

前言

在现代 Web 应用中,实时通信已经成为越来越重要的一部分。而 WebSocket 技术的出现,使得实时通信变得更加高效和便捷。

WebSocket 协议是一种基于 TCP 协议的双向通信协议,它能够在客户端和服务器之间建立起持久性的连接,从而实现实时通信。

在前端开发中,为了更好地利用 WebSocket 技术,我们通常会对其进行封装,以便于全局调用并根据自己的业务做不同的预处理。

本文将介绍如何有效封装一个 WebSocket 供全局使用,并根据自己的业务做不同的预处理,实现更方便的调用,减少重复代码。

具体实现

我们将基于 Web API 提供的 WebSocket 类,封装一个 Socket 类,该类将提供以下功能:

  1. 建立 WebSocket 连接,并支持发送 query 参数。
  2. 发送、接收消息,支持对 WebSocket 的事件进行监听。
  3. 断开 WebSocket 连接。
  4. 支持心跳检测。
  5. 可以根据业务需要,对发送和接收的消息进行预处理。

下面是实现代码:

// socket.js
import modal from '@/plugins/modal'
const baseURL = import.meta.env.VITE_APP_BASE_WS
const EventTypes = ['open', 'close', 'message', 'error', 'reconnect']
const DEFAULT_CHECK_TIME = 55 * 1000 // 心跳检测的默认时间
const DEFAULT_CHECK_COUNT = 3 // 心跳检测默认失败重连次数
const DEFAULT_CHECK_DATA = { Type: 1, Parameters: ['alive'] } // 心跳检测的默认参数 - 跟后端协商的
const CLOSE_ABNORMAL = 1006 // WebSocket非正常关闭code码

class EventMap {
  deps = new Map()
  depend(eventType, callback) {
    this.deps.set(eventType, callback)
  }
  notify(eventType, event) {
    if (this.deps.has(eventType)) {
      this.deps.get(eventType)(event)
    }
  }
}

class Socket extends WebSocket {
  heartCheckData = DEFAULT_CHECK_DATA
  heartCheckTimeout = DEFAULT_CHECK_TIME
  heartCheckInterval = null
  heartCheckCount = DEFAULT_CHECK_COUNT
  constructor(options, dep, reconnectCount = 0) {
    let _baseURL = baseURL
    const {
      url,
      protocols,
      query = {},
      greet = null,
      customBase = null,
    } = options
    const _queryParams = Object.keys(query).reduce((str, key) => {
      if (typeof query[key] !== 'object' && typeof query[key] !== 'function') {
        return (str +=
          str.length > 0 ? `&${key}=${query[key]}` : `${key}=${query[key]}`)
      } else {
        return str
      }
    }, '')
    if (customBase) {
      _baseURL = customBase
    }
    super(`${_baseURL}${url}?${_queryParams}`, protocols)
    this._currentOptions = options
    this._dep = dep
    this._reconnectCount = reconnectCount
    greet &&
      Object.assign(this, {
        heartCheckData: greet,
      })
    this.initSocket()
  }

  // 初始化WebSocket
  initSocket() {
    // 监听webSocket的事件
    this.onopen = function (e) {
      this._dep.notify('open', e)
      this.heartCheckStart()
    }
    this.onclose = function (e) {
      this._dep.notify('close', e)
      // 如果WebSocket是非正常关闭 则进行重连
      if (e.code === CLOSE_ABNORMAL) {
        if (this._reconnectCount < this.heartCheckCount) {
          this._reconnectCount++
          const _socket = new Socket(
            this._currentOptions,
            this._dep,
            this._reconnectCount
          )
          this._dep.notify('reconnect', _socket)
        } else {
          return modal.msgError('WebSocket重连失败, 请联系技术客服!')
        }
      }
    }
    this.onerror = function (e) {
      this._dep.notify('error', e)
    }
    this.onmessage = function (e) {
      // 如果后端返回的是二进制数据
      if (e.data instanceof Blob) {
        const reader = new FileReader()
        reader.readAsArrayBuffer(e.data)
        reader.onload = (ev) => {
          if (ev.target.readyState === FileReader.DONE) {
            this._dep.notify('message', ev.target?.result)
          }
        }
      } else {
        // 处理普通数据
        try {
          const _parseData = JSON.parse(e.data)
          this._dep.notify('message', _parseData)
        } catch (error) {
          console.log(error)
        }
      }
    }
  }

  // 订阅事件
  subscribe(eventType, callback) {
    if (typeof callback !== 'function')
      throw new Error('The second param is must be a function')
    if (!EventTypes.includes(eventType))
      throw new Error('The first param is not supported')
    this._dep.depend(eventType, callback)
  }

  // 发送消息
  sendMessage(data, options = {}) {
    const { transformJSON = true } = options
    let result = data
    // const _jsonData = JSON.stringify(data);
    if (transformJSON) {
      result = JSON.stringify(data)
    }
    this.send(result)
  }

  // 关闭WebSocket
  closeSocket(code, reason) {
    this.close(code, reason)
  }

  // 开始心跳检测
  heartCheckStart() {
    this.heartCheckInterval = setInterval(() => {
      if (this.readyState === this.OPEN) {
        let transformJSON = typeof this.heartCheckData === 'object'
        this.sendMessage(this.heartCheckData, { transformJSON })
      } else {
        this.clearHeartCheck()
      }
    }, this.heartCheckTimeout)
  }

  // 清除心跳检测
  clearHeartCheck() {
    clearInterval(this.heartCheckInterval)
  }

  // 重置心跳检测
  resetHeartCheck() {
    clearInterval(this.heartCheckInterval)
    this.heartCheckStart()
  }
}
// 默认的配置项
const defaultOptions = {
  url: '',
  protocols: '',
  query: {},
}

export const useSocket = (options = defaultOptions) => {
  if (!window.WebSocket)
    return modal.msgWarning('您的浏览器不支持WebSocket, 请更换浏览器!')
  const dep = new EventMap()
  const reconnectCount = 0
  return new Socket(options, dep, reconnectCount)
}

接下来我们从实际使用的角度解释一下上面的代码,首先我们暴露了一个 useSocket 函数,该函数接收一个 options 配置项参数,支持的参数有:

  • url:要连接的 WebSocket URL;
  • protocols:一个协议字符串或者一个包含协议字符串的数组;
  • query:可以通过 URL 传递给后端的查询参数;
  • greet:心跳检测的打招呼信息;
  • customBase:自定义的 baseURL ,否则默认使用环境变量中定义的 env.VITE_APP_BASE_WS

在调用该函数后,我们首先会判断当前用户的浏览器是否支持 WebSocket,如果不支持给予用户提示。

然后我们实例化了一个 EventMap 类的实例对象 dep,你可以把它当作是一个依赖收集桶,当用户订阅了某个 WebSocket 事件时,我们将收集这个事件对应的回调作为依赖,在事件触发时,再通知该依赖,然后调用该事件对应的回调函数。

接下来我们定义了一个初始的重连次数记录值 reconnectCount 为 0,每当这个 WebSocket 重连时,该值会自增。

之后我们实例化了自己封装的 Socket 类,并传入了我们上面的三个参数。 在 Socket 类的构造函数 constructor 中,我们先取出配置项,把 query 内的参数拼接在 URL 上,然后使用 super 调用父类的构造函数进行建立 WebSocket 连接。

之后我们缓存了当前 Socket 实例化时的参数,再调用 initSocket() 方法去进行 WebSocket 事件的监听:

  • onopen:触发 depopen 对应的回调函数并且打开心跳检测;
  • onclose:触发 depclose 对应的回调函数并且对关闭的 code 码进行判断,如果是非正常关闭连接,将会进行重连,如果重连次数达到阈值,则通知给用户;
  • onerror:触发 deperror 对应的回调函数;
  • onmessage:接收到服务端返回的数据,可以先根据自身业务做一些预处理,比如我就根据不同的数据类型进行了数据解析的预处理,之后再触发 depmessage 对应的回调函数并传入处理过后的数据。

我们也暴露了一些成员方法以供实例对象使用:

  • subscribe:订阅 WebSocket 事件,传入事件类型并须是 EventTypes 内的类型之一,第二个参数则是回调函数;
  • sendMessage:同样的,我们在给服务端发送数据之前也可以根据自身业务做一些预处理,比如我将需要转成 JSON 的数据,在这里统一转换后再发送给服务端;
  • closeSocket:关闭 WebSocket 连接;
  • heartCheckStart:开始心跳检测,会创建一个定时器,在一定时间之后(默认是 55s)给服务端发送信息确认连接是否正常;
  • clearHeartCheck:清除心跳检测定时器(如果当前 WebSocket 连接已经关闭,则自动清除);
  • resetHeartCheck:重置心跳检测定时器。

如何使用

让我们看下如何使用这个封装好的 useSocket 函数,以在 Vue3 中使用为例:

// xx.jsx or xx.vue
import { useSocket } from './socket.js'
const socket = ref(null) // WebSocket实例
const initWebSocket = () => {
  const options = {
    url: '/<your url>',
    query: {
      // something params
    },
  }
  socket.value = useSocket(options)
  socket.value.subscribe('open', () => {
    console.log('WebSocket连接成功!')
    const greet = 'hello'
    // 发送打招呼消息
    socket.value.sendMessage(greet)
  })
  socket.value.subscribe('close', (reason) => {
    console.log('WebSocket连接关闭!', reason)
  })
  socket.value.subscribe('message', (result) => {
    console.log('WebSocket接收到消息:', result)
  })
  socket.value.subscribe('error', (err) => {
    console.log('WebSocket捕获错误:', err)
  })
  socket.value.subscribe('reconnect', (_socket) => {
    console.log('WebSocket断开重连:', _socket)
    socket.value = _socket
  })
}
initWebSocket()

最后,如果想 debug 我们的心跳检测是否有效,可以使用下面这段代码:

// 测试心跳检测重连 手动模拟断开的情况
if (this._reconnectCount > 0) return
const tempTimer = setInterval(() => {
  this.close()
  if (this._reconnectCount < 3) {
    console.log('重连')
    this._reconnectCount++
    const _socket = new Socket(
      this._currentOptions,
      this._dep,
      this._reconnectCount
    )
    this._dep.notify('reconnect', _socket)
  } else {
    return clearInterval(tempTimer)
  }
}, 3 * 1000)

initSocket() 方法中的 this.onopen 事件的回调函数内的最后添加上面这段代码即可。

总结

至此,我们实现了一个 WebSocket 类的封装,提供了连接、断开、消息发送、接收和心跳检测等功能,并可以根据业务需要对消息进行预处理。同时,我们还介绍了如何使用封装好的 useSocket 函数。

WebSocket 封装的好处在于可以让我们在全局范围内方便地使用 WebSocket,提高代码的可读性和可维护性,降低代码的复杂度和重复性。在实际开发过程中,我们可以结合自己的业务需求,对封装的 WebSocket 类进行扩展和优化,以达到更好的效果。

尽管我在文中尽可能地详细介绍了每一个步骤和细节,但是难免会存在一些错误和不足之处。如果您在使用本文中介绍的方法时发现了任何错误或者有更好的方法,非常欢迎您指正并提出建议,以便我能够不断改进和提升文章的质量。

我是Ricky,一个兴趣使然的开发者。非常感谢您阅读本文,希望本文对您有所帮助!