这个是我在 CSDN 的第一百篇原则博文,留念😎
#1 需求说明
先说下项目结构,后端基于 Spring Boot 3,前端为 node.js 开发的控制台程序。现在希望能够在前端模拟 tail 命令,持续输出后端的日志文件。
#2 技术方案
#2.1 基于轮询(PASS)
这个方案实施较为简单,通过前端不断(定时)发起请求,并携带已读的内容坐标(position),询问后端日志文件是否有更新,判断依据为当前文件大小大于 position。若有变动,则读取更新的内容,回显在前端控制台。
此方案会产生非常多的请求,如果定时间隔设置不好,会有明显的延迟,故不采用。
#2.2 WebSocket 长连接
- 前端开启一个
WebSocket
- 后端监听到长连接后,启动文件变动检测线程
- 若文件发生变动,则读取更新内容,发送到前端
#3 实施
#3.1 后端改造
关于 Spring Boot 与 WebSocket 的集成,请转到:springboot集成websocket持久连接(权限过滤+拦截)
首先,我们定义一个监听文件变动并读取最新内容的工具类(借助于 common-io
包):
class FileTail(val path:Path, val handler: Consumer<String>, delay:Long=1000): FileAlterationListenerAdaptor() {private val watcher = FileSystems.getDefault().newWatchService()private val MODE = "r"private var reader = RandomAccessFile(path.toFile(), MODE)private var position= reader.length()// 使用 JDK 自带的 WatchService ,发现不能正常读取文件追加的内容private var monitor: FileAlterationMonitor = FileAlterationMonitor(delay)init {// 初始化监视器,只检测同名的文件FileAlterationObserver(path.parent.toFile()) { f: File -> f.name == path.name }.also { observer->observer.addListener(this)monitor.addObserver(observer)monitor.start()}}override fun onFileChange(file: File) {reader.seek(position)val bytes = mutableListOf<Byte>()val tmp = ByteArray(1024)var readSize: Intwhile ((reader.read(tmp).also { readSize = it }) != -1) {for (i in 0..< readSize){bytes.add(tmp[i])}}position += bytes.sizehandler.accept(String(bytes.toByteArray()))}fun stop() {reader.close()monitor.stop()}
}
再定义长连接的通信处理类:
@Component
class FileTailWsHandler : TextWebSocketHandler() {private val logger = LoggerFactory.getLogger(javaClass)companion object {val monitors = mutableMapOf<String, FileTail>()}override fun afterConnectionEstablished(session: WebSocketSession) {try{val textFile = Paths.get("logs/spring.log")// 加入队列monitors[session.id] = FileTail(textFile,{ text -> session.sendMessage(TextMessage(text)) })}catch (e:Exception){logger.error("处理客户端消息失败", e)session.sendMessage(TextMessage("服务器出错:${ExceptionUtils.getMessage(e)}"))session.close(CloseStatus.SERVER_ERROR)}}override fun afterConnectionClosed(session: WebSocketSession, status: CloseStatus) {logger.info("客户端(${session.id})${session.remoteAddress} 断开连接...")monitors.remove(session.id)?.stop()}
}
编写配置类,启用上述的组件:
@Component
class WsInterceptor : HandshakeInterceptor {private val logger = LoggerFactory.getLogger(javaClass)override fun beforeHandshake(request: ServerHttpRequest,response: ServerHttpResponse,wsHandler: WebSocketHandler,attributes: MutableMap<String, Any>): Boolean {if(logger.isDebugEnabled){logger.debug("WS 握手开始:${request.uri} 客户端=${request.remoteAddress}")request.headers.forEach { name, v -> logger.debug("[HEADER] $name = $v") }}//此处可以进行鉴权//写入属性值,方便在 handler 中获取attributes[F.PARAMS] = request.headers.getFirst(F.PARAMS)?: EMPTY// 返回 true 才能建立连接return true}override fun afterHandshake(request: ServerHttpRequest,response: ServerHttpResponse,wsHandler: WebSocketHandler,exception: Exception?) {}
}@Configuration
@EnableWebSocket
class SocketConfig : WebSocketConfigurer {private val logger = LoggerFactory.getLogger(javaClass)@Resourcelateinit var interceptor: WsInterceptor@Resourcelateinit var fileTailHandler:FileTailWsHandleroverride fun registerWebSocketHandlers(registry: WebSocketHandlerRegistry) {registry.addHandler(fileTailHandler, "/ws/file-tail").addInterceptors(interceptor)}
}
#3.2 前端(node.js)
请先安装依赖:
npm i -D ws
/*** 跟踪远程日志文件* @param {*} ps*/
const _tailRemoteFile = async ps=>{let url = remoteUrl("/ws/file-tail")let index = url.indexOf("://")let headers = {}headers.params = JSON.stringify(ps)const client = new WebSocket(`ws${url.substring(index)}`, { headers })client.on('open', ()=> console.debug(chalk.magenta(`与服务器连接成功 🤝`)))// client.on('close',()=> console.debug(chalk.magenta(`\n与服务器连接关闭 👋`)))client.on('error', e=> {console.debug(chalk.red(e))})client.on('message', /** @param {Buffer} buf */buf=>{let line = buf.toString()if(line.endsWith("\n") || line.endsWith("\r\n"))line = line.substring(0, line.length-2)console.debug(line)})
}