影视聚合站 科技 文章内容

Node.js 多进程/线程 —— 日志系统架构优化实践

发布时间:2021-11-11 21:54:32来源:腾讯IMWeb前端团队

  在日常的项目中,常常需要在用户侧记录一些关键的行为,以日志的形式存储在用户本地,对日志进行定期上报。这样能够在用户反馈问题时,准确及时的对问题进行定位。

  为了保证日志信息传输的安全、缩小日志文件的体积,在实际的日志上传过程中会对日志进行加密和压缩,最后上传由若干个加密文件组成的一个压缩包。

  为了更清晰的查看用户的日志信息。需要搭建一个用户日志管理系统,在管理系统中可以清晰的查看用户的日志信息。但是用户上传的都是经过加密和压缩过的文件,所以就需要在用户上传日志后,实时的对用户上传的日志进行解密和解压缩,还原出用户的关键操作。如下图所示,是一个用户基本的使用过程。

  但是解密和解压缩都是十分耗时的操作,需要进行大量的计算,在众多用户庞大的日志量的情况下无法立即完成所有的解密操作,所以上传的日志拥有状态。(解密中、解密完成、解密失败等)

  一个常见的日志系统架构如下:

  其中按照解密状态的变化,大体分为三个阶段:

用户终端上传日志到cos并通知后台日志服务已经上传了日志,后台日志服务记录这条日志,将其状态设置为未解密

日志服务通知解密服务对刚上传的日志进行解密,收到响应后将日志的状态更改为解密中

解密服务进行解密,完成后将明文日志上传并通知日志服务已完成解密,日志服务将解密状态更改为解密完成。如果过程中出现错误,则将日志解密状态更改为解密失败

  但是在实际的项目使用过程中,发现系统中有很多问题,具体表现如下:

有些日志在上传很久以后,状态仍然为解密中。

日志会大量解密失败。(只要有一个步骤出现错误,状态就会设置为解密失败)

接下来将以这些问题为线索,对其背后的技术实现进行深入探索。

  第一个问题是有些日志上传很久之后,状态仍然为解密中。根据表现,可以初步确定问题出现在上述的阶段3(日志状态已设置为解密中,但并未进行进一步的状态设置),因此,可以判断是解密服务内部出现异常。

  解密服务使用Node.js实现,整体架构如下:

  解密服务Master主进程负责进程调度与负载均衡,由它开启多个工作进程(WorkProcess)处理cgi请求,同时它也开启一个解密进程专用于解密操作。下面将着重介绍Node.js实现多进程和其通信的方法

  进程是资源分配的最小单位,不同进程之间是隔离开来,内存不共享的,使用多进程将相对复杂且独立的内容分隔开来,能降低代码的复杂度,每个进程只需要关注其具体工作内容即可,降低了程序之间的耦合。并且子进程崩溃不影响主进程的稳定性,能够增加系统的鲁棒性。  进程作为线程的容器,使用多进程也充分享受多线程所带来的好处。在下文会有多线程的详细介绍。

  进程作为资源分配的最小单位,启动一个进程必须分配给它独立的内存地址空间,需要建立众多的数据表来维护它的代码段、堆栈段和数据段,在进程切换时开销很大,速度较为缓慢。除此之外,进程之间的数据不共享,进程之间的数据传输会造成一定的消耗。

  因此,在使用多进程时应充分考虑程序的可靠性、运行效率等,创建适量的进程。

  Node.js内部通过两个库创建子进程:child_process和cluster,下文先介绍child_process模块。

  child_process模块提供了四个创建子进程的函数,分别为:spawn、execFile、exec、fork,可以根据实际的需求选用适当的方法,各个函数的区别如下:

  其中fork用于开启Node.js应用,在Node.js中较为常用,其用法如下:

  一个简单的demo如下:

//demo/parent.jsconstChildProcess=require('child_process');console.log(`parentpid:${process.pid}`)constchildProcess=ChildProcess.fork('./child.js');childProcess.on('message',(msg)=>{console.log("parentreceived:",msg);})//demo/child.jsconsole.log(`childpid:${process.pid}`)setInterval(()=>{process.send(newDate());},2000)$cddemo&&nodeparent.js//在demo目录下执行parent.js文件  结果:

  在任务管理器(活动监视器)中看到,确实创建了对应pid的Node.js进程:

  试想有以下两个独立的进程,它们通过执行两个js文件创建,那么如何在它们之间传递信息呢?

//Process1console.log("PID1:",process.pid);setInterval(()=>{//保持进程不退出console.log("PROCESS1isalive");},5000)//Process2console.log("PID2:",process.pid);setInterval(()=>{//保持进程不退出console.log("PROCESS2isalive");},5000)  接下来介绍几种通信方式:

1.信号

  信号是一种通信机制,程序运行时会接受并处理一系列信号,并且可以发送信号。

1.1发送信号

  可以通过kill指令向指定进程发送信号,如下例子表示向pid为3000的进程发送USR2信号(用户自定义信号)

//shell指令,可以直接在命令行中输入$kill-USR230001.2接收信号

  定义process在指定信号事件时,执行处理函数即可接收并处理信号。在收到未定义处理函数的信号时进程会直接退出

//javascriptprocess.on('SIGUSR2',()=>{console.log("接收到了信号USR2");}1.3示例

//Receiverconsole.log("PID",process.pid);setInterval(()=>{console.log("PROCESS1isalive");},5000)process.on('SIGUSR2',()=>{console.log("收到了USR2信号");})  假设Receiver执行之后的pid为58241,则:

//SenderconstChildProcess=require('child_process');console.log("PID",process.pid);setInterval(()=>{console.log("PROCESS2isalive");},5000)constresult=ChildProcess.execSync('kill-USR258241');  在运行Sender后,Receiver成功收到信号,实现了进程间的通信。同样的方式,Receiver也可以向Sender发送信号。

2.套接字通信

  通过在接受方和发送方之间建立socket连接实现全双工通信,例如在两者间建立TCP连接:

//Serverconstnet=require('net');letserver=net.createServer((client)=>{client.on('data',(msg)=>{console.log("ONCE",String(msg));client.write('serversendmessage');})})server.listen(8087);//Clientconstnet=require('net');constclient=newnet.Socket();client.connect('8087','127.0.0.1');client.on('data',data=>console.log(String(data)));client.write('clientsendmessage');  创建server和client进程后成功发送并接收消息,分别输出以下内容:

3.共享内存

  在两个进程之间共享部分内存段,两个进程都可以访问,可用于进程之间的通信。Node.js中暂无原生的共享内存方式,可通过使用cpp扩展模块实现,实现较为复杂,在此不再举例。

4.命名管道

  命名管道可以在不相关的进程之间和不同的计算机之间使用,建立命名管道时给他指定一个名字,任何进程都可以使用名字将其打开,根据给定权限进行通信。

  例如我们创建一个命名管道,通过它在server和client之间传输信息,例如server向client发送消息:

//shell$mkfifo/tmp/nfifo//Serverconstfs=require('fs');fs.writeFile('/tmp/tmpipe','infotosend',(data,err)=>console.log(data,err));//Clientconstfs=require('fs');fs.readFile('/tmp/tmpipe',(err,data)=>{console.log(err,String(data));})先创建命名管道/tmp/nfifo后运行client,与读取一般的文件不同,读取一般的文件会直接返回结果,而读取fifo则会等待,在fifo有数据写入时返回结果,然后开启server,server向fifo中写入信息,client将收到信息,并打印结果,如下所示:

5.匿名管道

  匿名管道与命名管道类似,但是它是在调用pipe函数生成匿名管道后返回一个读端和一个写端,而不具备名字,没有具名管道灵活,在此不做过多介绍。

  原生的Node.js在windows中使用命名管道实现,在*nix系统采用unixdomainsocket(套接字)实现,它们都可以实现全双工通信,Node.js对这些底层实现进行了封装,表现在应用层上的进程间通信,只有简单的message事件和send()方法,例如父子进程发送消息:

//主进程process.jsconstfork=require('child_process').fork;constworker=fork('./child_process.js');worker.send('start');worker.on('message',(msg)=>{console.log(`SERVERRECEIVED:${msg}`)})//子进程child_process.jsprocess.on('message',(msg)=>{console.log("CLIENTRECEIVED",msg)process.send('done');});2.2.3兄弟进程之间通信的实现  Node.js创建进程时便实现了其进程间通信,但这种方式只能够用于父子进程之间的通信,而不能在兄弟进程之间通信,若要利用原生的方式实现兄弟进程之间的通信,则需要借助它们公共的父进程,发送消息的子进程将消息发送给父进程,然后父进程收到消息时将消息转发给接收消息的进程。但是使用这种方式进行进程间的通信经过父进程的转发效率低下,所以我们可以根据Node.js原生的进程间通信方式实现兄弟进程的通信:在windows上使用命名管道,在*nix上使用unix域套接字,该方法与上文套接字通信类似,只是这里不是监听一个端口,而是使用一个文件。

//Serverconstnet=require('net');letserver=net.createServer(()=>{console.log("Serverstart");})server.on('connection',(client)=>{client.on('data',(msg)=>{console.log(String(msg));client.write('serversendmessage');})})server.listen('/tmp/unix.sock');//Clientconstnet=require('net');constclient=newnet.Socket();client.connect('/tmp/unix.sock');client.on('data',data=>console.log(String(data)));client.write('clientsendmessage');  启动server后会在指定路径创建文件,用于ipc通信。

  本项目中通过一个requestManager实现兄弟进程之间的通信,set方法用于设定当指定序列号收到消息时执行的回调函数。

  在本项目中过程如下:

  1和2流程:

  3流程:

  解密数据处理片段:

  而本项目的第一个问题,就出现在这里:程序在返回结果时,调用了res.toString方法,在出现异常时调用e.toString方法获取异常字符串,而实际中项目抛出的异常可能为空异常null,null不具有toString方法,所以向客户端写入数据失败,导致了解密状态的更新没有触发。

  提示:在处理异常时,返回的异常信息一般情况下应该能描述具体的异常,而不应该返回空值;其次,可以使用String(e)代替e.toString(),并且不应该在捕获到异常时静默处理。

  在解决完上述的问题后,发现bug并没有完全解决,于是发现了另一个问题:接收端每次接受的数据并不一定是发送的单条数据,而可能是多条数据的合体。当发送端只发送单条JSON数据时,服务端JSON.parse单条数据顺利处理消息;然而,当接收端同时接受多条消息时,便会出现错误,最终造成进程间通信超时:

UncaughtSyntaxError:Unexpectedtoken{inJSON2.3.1“粘包”问题的出现原因  由于TCP协议是面向字节流的,为了减少网络中报文的数量,默认采取Nagle算法进行优化,当向缓冲区写入数据后不会立即将缓冲区的数据发送出去,而可能在写入多条数据后将数据一同发送出去,所以接收端收到的消息可能是多条数据的组合体。除此之外,也有可能是发送端一次发送一条数据,但是接收端没有及时读取,导致后续一次读取多条消息。

  “粘包”问题的根本原因就在于传输的数据边界不明确,因此确定数据边界即可。

  可以通过在发送的消息前指定消息的长度大小,服务端读取指定长度大小的数据。

  除此之外,还能够制定消息的起始和结束符号,起始符和结束符中间的内容即为一条消息。

  在本项目中,解密会大量失败,而大量失败的原因是进程间通信失败,查看具体原因后发现是解密进程已经退出,导致大量的失败。接下来将探讨Node.js进程退出的原因和其解决办法。

在实际Node.js进程使用中,如果异常处理不当,会造成进程的退出,使服务不可用。Node.js退出的原因有以下几种:

Node.js事件循环不再需要执行任何额外的工作,这是一种最常见的进程退出原因,当运行一个js文件时,发现文件执行完成之后,进程会自动退出,其原因就是因为事件循环不需要执行额外的工作。阻止此类进程退出可以不断在事件循环中添加事件,如使用setInterval方法定时添加任务。

显式调用process.exit()方法,该方法可接受一个参数,表示返回代码,代码为0表示正常退出,否则为异常。

未捕获的异常,未捕获的异常会导致进程退出并打印错误信息。使用process.setUncaughtExceptionCaptureCallback(fn)可以在有未捕获异常时调用fn,防止进程的退出。

未兑现的承诺,未捕获的Promise.reject在高版本的Node.js(v15以后)会导致进程的退出,而在低版本不会。

未监听的错误事件,newEventEmitter().emit('error')若没有监听error事件则会导致进程退出,处理方法同未捕获的异常

未处理的信号,在向进程发送信号时,若没有设置监听函数,则进程会退出。

$kill-USR2<程序中输出的pid>2.4.2处理异常的方式对于上述造成Node.js退出的原因,都有其解决办法。

Node.js事件循环不再需要执行任何额外的工作,可以在事件循环中定时添加任务,例如setInterval会定时添加任务,阻止进程退出。

显示调用process.exit()方法,在程序中非必要情况下,不要调用exit方法。

未捕获的异常,使用try{...}catch(e){}对异常进行捕获,并且可以设置process.setUncaughtExceptionCaptureCallback(fn)可以在有未捕获异常时调用fn,防止进程的退出,作为兜底策略。

未兑现的承诺,在promise后调用.catch方法或者设置process.on('unhandledRejection',fn),防止进程退出,作为兜底策略。

未监听的错误事件,在触发'error'事件前,可以通过EventEmitter.listenerCount方法查看其监听器的个数,如果没有监听器,则使用其它策略提示错误。

未处理的信号,对于信号量,设置监听函数process.on('信号量',fn)监听其信号量的接受,防止进程退出。

process.on('uncaughtException',err=>console.log(err));letpro=newPromise((resolve,reject)=>{thrownewError('error');});setInterval(()=>console.log(pro),1000);这种情况这个promise的状态如何呢?在promise内部既没有调用resolve方法,也没有调用reject方法。那么promise的状态为pending吗?

--答案是否定的,在promise内部抛出异常,会立即将promise的状态更改为reject,而不会使promise的状态始终为pending。

那么又有另外一个问题,如果当前不捕获异常的情况下,这里使用那个事件捕获异常呢?

unhandledRejection?uncaughtException?

答案是都可以,这个异常会先由unhandledRejection的handler处理,如果该事件未定义则由uncaughtException的handler处理,如果两个事件都未定义则会提示错误并终止进程,具体原因在此处不作过多讨论。

  由于需要进行大量的解密和解压缩操作,在本项目中的解密进程中,创建了多个线程,接下来将对Node.js多线程做详细的介绍。

  前文已经提到过,进程是资源分配的最小单位,使用多进程能够将关联很小的部分隔离开来,使其各自关注自己的职责。

  而线程则是CPU调度的最小单位,使用多线程能够充分利用CPU的多核特性,在每一个核心中执行一个线程,多线程并发执行,提高CPU的利用率,适合用于计算密集型任务。

  在Node.js中,内置了用于实现多线程的模块worker_threads,该模块提供了如下方法/变量:

isMainThread:当线程不运行在Worker线程中时,为true。

Worker类:代表独立的javascript执行线程:

parentPort:用于父子线程之间的信息传输:

//子线程->父线程//子线程中const{parentPort}=require('worker_threads');parentPort.postMessage(`msg`);//父线程中const{Worker}=require('worker_threads');constworker=newWorker('filepath');worker.on('message',(msg)=>{console.log(msg)});//父线程->子线程//父线程中const{Worker}=require('worker_threads');constworker=newWorker('filepath');worker.postMessage(`msg`);//子线程中const{parentPort}=require('worker_threads');parentPort.on('message',(msg)=>console.log(msg));2.5.3单线程、多线程、多进程的比较  接下来,将使用单线程、多线程、多进程完成相同的操作。

//单线程console.time('timer');letj;for(leti=0;i<6e9;i++){Math.random();}console.timeEnd('timer');//多线程//主线程thread.jsconsole.time('timer');const{Worker,isMainThread}=require('worker_threads')letcnt=15;for(leti=0;i<15;i++){constworker=newWorker('./worker.js');worker.postMessage('start');worker.on('message',()=>{if(--cnt===0){console.timeEnd('timer');process.exit(0);}})}//工作线程worker.jsconst{parentPort,isMainThread}=require('worker_threads');parentPort.on('message',()=>{for(leti=0;i<1e9;i++){Math.random();}parentPort.postMessage('done');})//多进程//主进程process.jsconsole.time('timer');constfork=require('child_process').fork;letcnt=15;for(leti=0;i<15;i++){constworker=fork('./child_process.js');worker.send('start');worker.on('message',()=>{if(--cnt===0){console.timeEnd('timer');process.exit(0);}})}//子进程child_process.jsprocess.on('message',()=>{for(leti=0;i<1e9;i++){Math.random();}process.send('done');});  实际运行结果如下(测试机为8核CPU):

  分别为单个线程、6个线程、6个进程的运行结果,(在多次实验后)结果有以下规律:

  多线程<多进程<单线程<(多线程/多进程)*6

  其原因如下:

  多线程:由于充分利用CPU,所以执行的最快。

  多进程:由于每个进程中都有一个线程,同样能充分利用CPU,但是进程创建的开销要比线程大,所以执行的略慢于多线程。

  单线程:由于CPU利用不充分所以慢于多线程和多进程,但是由于多线程/多进程的创建需要一定的开销,所以快于单个线程执行时间*线程个数。

  结果与预期一致。

  在本系统中,实现了一个线程池,它能够在线程持续空闲的时候将线程退出,它会在线程创建时监听它的退出事件。

worker.on('exit',()=>{//找到该线程对应的数据结构,然后删除该线程的数据结构constposition=this.workerQueue.findIndex(({worker})=>{returnworker.threadId===threadId;});constexitedThread=this.workerQueue.splice(position,1);//退出时状态是BUSY说明还在处理任务(非正常退出)this.totalWork-=exitedThread.state===THREAD_STATE.BUSY?1:0;});  当线程一段时间内都是空闲时,调用线程的terminate方法,将其退出。然而,这段代码中的问题是,线程在调用terminate函数退出后,其threadId自动重置为-1,所以这段代码并不会在线程池中将其移除,而由于splice(-1,1)会将线程池中的最后一个线程移出。这样,当线程池分配任务时,会分配给已经退出的线程,而已经退出的线程不具备处理任务的能力,因此造成进程间通信超时。

  在实际的应用中一个服务端项目往往都会持续运行很长时间,Node.js会自动对没有引用的变量所占用的内存进行回收,但是还有很多内存泄漏的问题,系统并不能够自动对其进行处理,例如使用对象作为缓存,在对象上不断添加数据,而不对无用的缓存做清除,则会导致这个对象占用的内存越来越大,直到达到内存分配的最大限度后进程自动退出。下文将介绍如何分析内存泄漏问题。

  分析内存泄漏问题最基本的方式是通过内存快照,在Node.js中可以通过heapdump库获取内存快照,内存快照可以用于查看内存的具体占用情况。

constheapdump=require('heapdump');constA={//4587b:{//4585c:{//4583d:newArray(1e7),}}}heapdump.writeSnapshot();  例如在Chrome调试工具中查看内存快照:

  在Summary快照总览中可以看到内存分配的详细信息。

  第一列是Constructor构造函数,表示该内存的对象由该构造函数创建,()包裹的部分为内置的底层构造函数,后方的x1407表示有1407个实例通过该构造函数创建,下方Object@4583表示该Object实例的唯一内存标识为4583,下方d::Array表示其内部的键为d的值为一个Array类型的数据;

  第二列为Distance,距离顶层GCroot的距离,例如直接在全局作用域中的变量。

  第三列为ShallowSize,表示其自身真实占用的内存大小。

  第四列为RetainedSize,表示与其关联的内存大小,此处和此处可释放的子节点占用的内存总和。

  从上图可以看出,标记为4583的对象,它的键为d的数组下真实分配了80000016字节大小的数据,占总堆分配的数据的98%,点击它查看详情,可以看到它以c这个键存在于标记为4585对象下,查看4585对象可以看到,它以b这个键存在于标记为4587的对象下:

  查看标记为4587的对象可以看到,它直接存在于垃圾回收根节点上GCRoot,与代码完全对应,相关对应关系如下:

constA={//4587b:{//4585c:{//4583d:e,}}}2.6.2本案例中的内存泄漏问题  在本案例中,也发现其一些任务始终存在于内存中,下图为时间间隔为一天后内存的占用量,可以看出内存占用量提升的非常快,

  查看其内存占用后发现是线程池中部分任务,由于进程间通信超时,始终没有得到释放,解决进程间通信超时问题,并且设置一个timeout超时释放即可。

  在一个大型项目中,往往需要用到多方面的技术,如果各方面内容的实现都放在一起,会比较杂乱,耦合度高,难以阅读和维护。因此,需要对程序的模块进行划分,对每一个模块进行良好的设计,让每一个模块都各司其职,最后组成整个程序。

  在本项目中的nodejs-i-p-c进程间通信库,nodejs-threadpool线程池均以包的形式发布到了npm上。接下来将介绍基本的npm包发布流程。

注册npm账号(在npm官网使用邮箱注册账号,需要注意的是npm官网登录只能使用用户名+密码登录,而不能使用邮箱+密码登录!

初始化本地npm包。在一个本地的空文件夹中运行npminit指令,创建一个npm仓库,仓库的名称即为将要发布的包的名称。(package.json文件中的name字段)

登录npm账号在本地命令行中运行npmlogin指令即可进行登录操作,在输入用户名、密码、邮箱后即可完成,登录成功则会提示Loggedinasonhttps://registry.npmjs.org/.npmwhoami指令可以查看当前登录的账户。

在(2)中初始化的仓库中运行npmpublish即可快速发布当前包如果发布失败,可能是因为包名重复,提示没有权限发布该包,需要更改包名重新发布。

使用npmview验证包发布,如果出现该包的详细信息则说明包发布成功了!

  在包发布成功之后其他人都能够访问到该包,通过npmi即可安装您发布的包使用啦。

  处理前:日志解密大量失败,一些日志持续停留在解密中状态

  处理后:解密全部成功,无其它异常。

紧追技术前沿,深挖专业领域

扫码关注我们吧!

© 2016-2021 ysjhz.com Inc.

站点统计| 举报| Archiver| 手机版| 小黑屋| 影视聚合站 ( 皖ICP备16004362号-1 )