# 多进程架构

通常,我们使用Master/Worker进程模型使应用程序可以最大化使用CPU资源:

  • Master进程:协调进程资源,比如:创建/重启Worker进程、全局状态/行为维护等。

  • Worker进程:处理实际业务。

在多进程架构的应用程序中,Master进程中执行的操作具有应用程序级别的唯一性;而不同的进程拥有完全独立的内存空间,进程内的状态完全隔离。因此,对于在多个Worker进程中共享的状态的场景,我们可以使用Corejs提供的全局对象实现。

注意

在多Worker进程的应用程序中,每个Worker进程需要严格保证无状态设计,否则可能产生奇怪的问题。


Corejs引入了ClusterCoreAppMain以实现应用程序的多进程架构。其中:

  • ClusterCore:是多进程架构的核心,其中包含进程管理、通讯相关的API。

  • AppMain:实现应用程序生命周期的各个阶段,将被ClusterCore加载和调用。

提示

通常,我们应将应用程序的业务逻辑全部装入AppMain,在AppMain外部只需要:

  1. 执行Core.ClusterCore.init()指定应用程序使用的AppMain

  2. 执行Core.ClusterCore.start()启动ClusterCore

ClusterCore将在恰当的时间点自动调用AppMain中对应生命周期以驱动应用程序运行。

# 多进程模型

多进程模型

# AppMain

AppMain抽象了应用模型进程模型,覆盖了应用程序生命周期的各个阶段:

提示

我们在实现AppMain时,需要继承自Core.AppMain

# 实例属性

ClusterCore执行初始化时将自动创建AppMain实例。因此,在应用程序的任意生命周期中,可以直接使用this访问AppMain的实例属性:

  • processId:当前进程的ID。

    说明

    Master进程Worker进程中的processId有不同的构成规则:

    • Master进程中,进程ID为'M'

    • Worker进程中,进程ID为'W:<%进程偏移%>'

    进程偏移是一个>= 1的整数,反映了Master进程创建Worker进程的时序。

  • clusterCore:创建并加载AppMainClusterCore实例。

  • launchParams:进程的初始化参数,即Master进程中的process.argv

注意

AppMain的实例属性将在onProcessDidInit()的默认行为中被设置。

因此,我们在重写onProcessDidInit()时必须执行super操作,以保证AppMain中实例属性的正确性。

# 进程生命周期

我们可以在AppMain中指定应用程序在进程维度生命周期中的行为:

  • Master进程Worker进程初始化完成时。

  • Master进程检测到有Worker进程退出时。

提示

对于Master进程的退出事件,我们可以Master进程的业务层中使用process.on()


# onProcessDidInit(processId, launchParams)

# 参数列表
  • processId:初始化完成的进程ID。

  • launchParams:进程的初始化参数,即Master进程中的process.argv

# 使用场景

Master进程Worker进程初始化完成时将触发此生命周期方法。通常,我们在此方法中根据processId判断当前的进程环境执行不同的逻辑:

  • 当前是Master进程时,使用Core.ClusterCore.fork()创建Worker进程,或执行一些在期望在应用程序维度保证唯一性的操作。

  • 当前是Worker进程时,执行实际业务,比如:启动ServiceCore等。

提示

在微服务架构中,如果需要在多个应用程序间同步或共享数据,我们可以使用分布式协调工具,比如:ZooKeeperRedis等。


# onWorkerProcessDidExit(exitedProcessId, exitedDetail, reboot)

# 参数列表
  • exitedProcessId:已退出的进程ID。

  • exitedDetail:进程退出详情,结构为{ code, signal }

  • reboot:重新拉起Worker进程的函数。使用reboot()重新拉起的进程将使用exitedProcessId作为进程ID。

# 使用场景

Master进程检测到进程组中有Worker进程退出时将触发此生命周期方法。我们在此方法中有两种重新拉起Worker进程的方式:

  • 使用reboot():新的Worker进程将复用退出进程的processId

  • 使用Core.ClusterCore.fork():新的Worker进程将使用进程组内的进程偏移创建processId


# 实现原理

Master进程中,执行Core.ClusterCore.start()将触发Master进程的初始化动作:

  1. 首先,使用nodejs原生模块cluster监听Worker进程的退出消息,在Worker进程退出时调用onWorkerProcessDidExit()通知业务层。

  2. 接下来,使用同样方式监听Worker进程的通信消息。

  3. 最后,调用onProcessDidInit()通知业务层Master进程已初始化完成。


Worker进程中执行Core.ClusterCore.start()时,将触发Worker进程的初始化动作:

  1. 首先,使用process.on()监听来自Master进程的通信消息。

  2. 接下来,向Master进程发起TraceIPC以获取进程的初始化信息。

  3. 最后,在收到Master进程应答的初始化信息时调用onProcessDidInit()通知业务层Worker进程已初始化完成。

# ClusterCore

在设计上,ClusterCore是进程级别的单例,在实例化时将自动根据当前运行进程的类型加载对应的API。即使在不同的进程环境中,相同的API在使用体验上完全相同。因此,我们在使用ClusterCore时通常无需关注进程类型。

需要注意的是,运行在Master进程中的ClusterCore拥有创建Worker进程关闭应用程序的能力。即运行在Worker进程中的ClusterCore无法调用以下API:

  • fork(workerNum):创建指定数量的Worker进程

  • shutdown([exitCode]):关闭进程组中的所有Master进程Worker进程


我们已经知道,ClusterCore将自动调用AppMain中对应生命周期方法以驱动应用程序的运行。因此,在使用ClusterCore前需要指定AppMain进行初始化。

接下来,让我们来看一个使用ClusterCoreAppMain的标准样例:

const Core = require('node-corejs');

/**
 * 实现AppMain
 */
class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    // 重写时必须执行super操作
    super.onProcessDidInit(processId, launchParams);

    // 在Master进程初始化完成后 - 创建4个Worker进程
    if (processId === 'M') {
      console.log(`Master进程初始化完成`);
      Core.ClusterCore.fork(4);
    }
    // 在Worker进程初始化完成后 - 执行业务逻辑
    else {
      console.log(`Worker进程初始化完成 -> ${processId}`);
      // 模拟worker进程退出触发重启
      setTimeout(() => { process.exit() }, 1500);
    }
  }

  /**
   * Worker进程退出
   * @override
   */
  onWorkerProcessDidExit(exitedProcessId, exitedDetail, reboot) {
    // 在worker进程退出时自动重启
    reboot();
  }
}

// 初始化并启动ClusterCore
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

# 进程间通信

进程间通信有两种场景:

  1. 发送方接收方单向发送消息,无需关注接收方的应答情况,即发起IPC

  2. 发送方接收方发送消息后,需要接收到接收方的应答消息后才认为通信完成,即发起TraceIPC

# API设计

ClusterCore提供了在进程组中的任意进程间发起IPCTraceIPC的API:

注意

ClusterCore将拒绝在相同的进程间发起的通信动作。


# sendData(processId, data[, callBack])

# 使用场景

用于发送方接收方发送单向的进程间通信消息,即发起IPC

# 参数列表
  • processId接收方的进程ID,必填项。

    提示

    我们可以使用Core.ClusterCore.getAllProcessIds()获取进程组中的所有进程ID。

  • data发送方接收方发送的自定义数据,必填项。我们将在消息结构一节中讨论具体细节。

  • callBack:发起进程间通信动作的执行结果,非必填项。是一个cps风格的Function,其参数列表为(error)

    提示

    我们可以根据callBack回调的error判断进程间通信是否执行成功:

    • 当执行成功时,error的值为null
    • 当执行失败时,error的值为通信失败的原因。

# sendDataWithTraceCallBack(processId, data, options[, callBack])

# 使用场景

发送方接收方发送需要应答的进程间通信消息,即发起TraceIPC

# 参数列表
  • processId接收方的进程ID,必填项。

    提示

    我们可以使用Core.ClusterCore.getAllProcessIds()获取进程组中的所有进程ID。

  • data发送方接收方发送的自定义数据,必填项。我们将在消息结构一节中讨论具体细节。

  • options:进程间通信的配置项,必填项,支持使用ObjectFunction类型的配置项。

    提示

    当我们使用Object类型的options时,其结构为{ timeout, traceCallBack }

    • timeout:判定应答超时的毫秒数,默认为10000

      在指定时间内未收到来自接收方应答消息时,将触发AppMain中的onProcessTraceMessageTimeout()进行消息超时处理。

    • traceCallBack发送方收到应答消息时执行的回调函数,必填项。其参数列表为(resData, next)

      我们可以使用next()使应答消息进入AppMain中的onProcessDidReceiveMessage()继续处理。

    当使用Function类型的options时,将直接应用于traceCallBack,且此时timeout10000

  • callBack:进程间通信的执行结果,非必填项。是一个cps风格的Function,其参数列表为(error)

    提示

    我们可以根据callBack回调的error判断进程间通信是否执行成功:

    • 当执行成功时,error的值为null
    • 当执行失败时,error的值为通信失败的原因。

# 消息结构

进程间通信传输的消息将被ClusterCore包装为由元数据meta和自定义数据data构成的Object

元数据meta中存储了消息的基本信息,通常不被业务层感知:

  • traceId:消息的链路追踪ID。

  • toProcessId接收方的进程ID。

  • fromProcessId发送方的进程ID。

  • isRes:是否为TraceIPC的应答消息。

  • isTransitRes:是否为转发结果的应答消息。

  • transitTraceId:转发结果消息的链路追踪ID。


自定义数据data由业务层定义,主要用于存储业务功能所需的信息,通常包括触发动作附属数据

我们在进行进程间通信时,必须在自定义数据中指定消息的触发动作,即action。另外,推荐使用payload作为附属数据的键名。

通常,进程间通信消息的元数据不向业务层暴露。在以下场景中,ClusterCore仅向业务层抛出自定义数据data

  • onProcessWillReceiveMessage()中进程收到的进程间通信消息。

  • onProcessDidReceiveMessage()中进程决定处理的进程间通信消息。

  • onProcessDidDiscardMessage()中进程决定丢弃的进程间通信消息。

  • onProcessTraceMessageTimeout()中超时未被应答的进程间通信消息。

  • sendDataWithTraceCallBack()中触发traceCallBack()时收到的应答消息。


我们可以使用data.getOriginData()在业务层中取得消息的原始结构以访问元数据,即包含metadataObject

说明

在实现原理上,ClusterCore接收到进程间通信的消息时,将对消息结构的原始状态进行Deep Copy生成快照,业务层执行getOriginData()时将取得此快照。

因此,在业务层中修改自定义数据data并不会影响消息的原始结构。

另外,业务层中的自定义数据data可以使用getTraceDetail()获取消息的链路追踪信息,用于应答TraceIPC

执行data.getTraceDetail()将得到结构为{ traceId, responsive, resTrace }Object,其中:

  • traceId:消息的链路追踪ID。

  • responsive:消息是否可应答。

  • resTrace:消息的快捷应答方法,其参数列表为(data[, callBack])


注意

在进行进程间通信时,我们通常仅需指定消息的自定义数据,即data

为避免nodejs在进程间通信过程中自动执行的序列化对消息的完整性产生影响,我们应使用基本类型组成data

  • Array
  • Object
  • Number
  • String
  • Boolean

# IPC

对于无需关注接收方应答的单向进程间通信,使用sendData()直接向目标进程发送消息即可。

让我们来看一个单向进程间通信的🌰:

const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // Master进程 - 创建两个Worker进程
    if (processId === 'M') {
      Core.ClusterCore.fork(2);
    }
    // Worker进程 - 在两个Worker进程间进行通信
    else {
      Core.ClusterCore.getAllProcessIds((err, processIds) => {
        if (err || processIds.length !== 3) {
          return;
        }
        const toProcessId = processIds[1];
        const fromProcessId = processIds[2];
        this.processId === fromProcessId && Core.ClusterCore.sendData(toProcessId, {
          // 设置消息的触发动作
          action: 'TEST_IPC_ACTION',
          // 设置消息的附属数据
          payload: { value: '这是一个🌰' }
        });
      });
    }
  }

  /**
   * 进程确认处理自定义通信消息
   * @override
   */
  onProcessDidReceiveMessage(fromProcessId, data) {
    console.log(`进程[${this.processId}]处理来自进程[${fromProcessId}]的消息:[${JSON.stringify(data)}]`);
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

# TraceIPC

对于需要接收方应答的双向的进程间通信,需要发送方接收方进行协作:

# 发起TraceIPC


发送方使用sendDataWithTraceCallBack()向目标进程发起TraceIPC

ClusterCore将在发送方发起TraceIPC时执行三个操作:

  1. 生成traceId填充至元数据meta中以追踪TraceIPC链路。

  2. 使用步骤①中生成的traceId在当前进程中注册此次TraceIPC的追踪信息。

    即:消息接收方的进程ID和收到应答消息时执行的回调函数traceCallBack

  3. 当接收到的消息元数据中包含已被注册的traceId且来源方与注册信息匹配时,认为此消息为TraceIPC的应答消息。

    此时将分发消息进入与traceId对应的traceCallBack并注销其注册状态。

提示

我们可以在发起TraceIPC时指定判定应答超时的时间,当发送方在指定时间内未收到来自接收方的应答消息时,将自动注销此次TraceIPC使用的traceId对应的注册信息并触发AppMain中的onProcessTraceMessageTimeout()以对超时消息进行处理。

另外,在TraceIPC超时后,如果收到了应答消息将直接触发AppMain中的onProcessDidDiscardMessage()进行舍弃处理。

对于traceId的结构:

  • 由24位字符组成。
  • 前12位为当前时间戳的16进制表示,位数不足填充0
  • 第13-16位为当前进程PID的16进制表示,位数不足填充0
  • 第16-24位为随机数,位数不足填充0

ClusterCore每次生成traceId时,将与当前进程中已注册的traceId进行冲突检验,以保证其进程级别的唯一性。

# 应答TraceIPC


接收方应答TraceIPC时需要进行两个步骤:

  1. 执行getTraceDetail()取得消息的链路追踪信息。

  2. 使用链路追踪信息中提供的resTrace()向发送应答消息。

提示

通常,我们在接收方处理进程间通信消息时,使用消息的触发动作判断是否需要向发送方发送应答消息。

需要注意的是,应答TraceIPC时仅允许应答一次,链路追踪信息中的responsive表示是否允许进行应答。

让我们来看一个使用TraceIPC的🌰:

const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // Master进程 - 创建两个Worker进程
    if (processId === 'M') {
      Core.ClusterCore.fork(2);
    }
    // Worker进程 - 在两个Worker进程间进行通信
    else {
      Core.ClusterCore.getAllProcessIds((err, processIds) => {
        if (err || processIds.length !== 3) {
          return;
        }
        const toProcessId = processIds[1];
        const fromProcessId = processIds[2];
        // 发起TraceIPC
        this.processId === fromProcessId && Core.ClusterCore.sendDataWithTraceCallBack(toProcessId, {
          // 设置消息的触发动作
          action: 'TEST_TRACE_IPC_ACTION',
          // 设置消息的附属数据
          payload: { value: '这是一个🌰' }
        }, (resData, next) => {
          console.log(`TraceIPC收到了应答消息:[${JSON.stringify(resData)}]`);
          // 使用next()分发应答消息进入AppMain中的确认处理
          next();
        });
      });
    }
  }

  /**
   * 进程确认处理自定义通信消息
   * @override
   */
  onProcessDidReceiveMessage(fromProcessId, data) {
    console.log(`进程[${this.processId}]处理来自进程[${fromProcessId}]的消息:[${JSON.stringify(data)}]`);
    const { action, payload } = data;
    // 使用action判断是否需要应答
    if (action === 'TEST_TRACE_IPC_ACTION') {
      const { value } = payload;
      // 获取链路追踪信息
      const { responsive, resTrace } = data.getTraceDetail();
      // 发送应答消息
      const resData = {
        action: 'TEST_TRACE_IPC_RES_ACTION',
        payload: { value: value + '🌰' }
      };
      responsive && resTrace(resData);
    }
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61

# 消息处理

说明

本章中使用的宏变量存储在Core.Macros中。

在收到进程间通信消息时,ClusterCore将根据进程间通信消息自定义数据中的action内部通信消息自定义通信消息进行预分类:

  • 对于自定义通信消息,将直接进入AppMain中进行消息处理。

  • 对于内部通信消息,将仅执行内部处理,不再进入AppMain中的消息处理流程。


进程间通信消息自定义数据中的action指定为以下宏变量时,将被ClusterCore认为是内部通信消息,此类消息不会进入AppMain中的消息处理流程:

宏变量 描述
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_GET_INITIAL_INFO Worker进程向Master进程发起获取初始化信息
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_GET_INITIAL_INFO Master进程向Worker进程应答初始化信息
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_SET_GLOBAL_OBJECT Worker进程向Master进程发起设置全局对象
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_SET_GLOBAL_OBJECT Master进程向Worker进程应答设置全局对象执行结果
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_GET_GLOBAL_OBJECT Worker进程向Master进程发起获取全局对象
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_GET_GLOBAL_OBJECT Master进程向Worker进程应答获取全局对象执行结果
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_REMOVE_GLOBAL_OBJECT Worker进程向Master进程发起删除全局对象
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_REMOVE_GLOBAL_OBJECT Master进程向Worker进程应答删除全局对象执行结果
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_GET_ALL_PROCESS_IDS Worker进程向Master进程发起获取进程组内的所有进程ID
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_GET_ALL_PROCESS_IDS Master进程向Worker进程应答进程组内所有进程ID
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_QUERY_GLOBAL_OBJECT Worker进程向Master进程发起自定义读取全局对象
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_QUERY_GLOBAL_OBJECT Master进程向Worker进程应答自定义读取全局对象执行结果
CLUSTER_CORE_WORKER_TO_MASTER_ACTION_REQ_UPDATE_GLOBAL_OBJECT Worker进程向Master进程发起自定义更新全局对象
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_UPDATE_GLOBAL_OBJECT Master进程向Worker进程应答自定义更新全局对象执行结果
CLUSTER_CORE_MASTER_TO_WORKER_ACTION_RES_TRANSIT_RESULT Master进程向Worker进程应答通信消息转发结果

我们可以重写AppMain中消息处理相关的生命周期方法,以定制进程间通信消息的处理流程。

提示

在消息处理的相关生命周期方法中,仅带入进程间通信消息的自定义数据部分,即data。我们可以通过:

  • data.getOriginData():读取完整的消息结构

  • data.getTraceDetail():读取消息的链路追踪信息,即我们在消息结构中提到的{ traceId, responsive, resTrace }

需要注意的是,消息处理流程在实现细节上使用中间件模式,任何对dataoriginData的修改将被保留并分发至后续的处理流程。因此,我们在业务层应尽量避免修改源消息。

接下来,我们将讨论AppMain中消息处理相关的生命周期方法:

# onProcessWillReceiveMessage(fromProcessId, data, next)

# 参数列表
  • fromProcessId:消息发送方的进程ID。

  • data:消息的自定义数据。

  • next:流程分发函数。我们将在接下来的使用场景中介绍流程分发函数的使用方法。

# 使用场景

当前进程接收到自定义通信消息时触发此生命周期方法。通常,我们在此生命周期方法中对进程间通信消息进行分流,使用流程控制函数next()确认或舍弃收到的消息。

  • 执行next():确认进程间通信消息,分发源消息进入onProcessDidReceiveMessage()。是onProcessWillReceiveMessage()的默认行为。

  • 执行next(data):确认进程间通信消息,将next()带入的data覆盖源消息的自定义数据data中的payloadonProcessDidReceiveMessage()

  • 执行next(CLUSTER_CORE_MESSAGE_COMMAND_DISCARD):舍弃进程间通信消息,分发源消息进入onProcessDidDiscardMessage()

提示

Master进程Worker进程收到自定义通信消息时都将触发AppMain中的onProcessWillReceiveMessage()。因此,我们通常根据消息的action实现分流逻辑,无需关注当前运行进程环境。


# onProcessDidReceiveMessage(fromProcessId, data)

# 参数列表
  • fromProcessId:消息发送方的进程ID。

  • data:消息的自定义数据。

# 使用场景

此生命周期方法有两种触发方式:

  • 当前进程中收到的自定义通信消息在分流阶段被确认。
  • 当前进程中收到的TraceIPC应答消息触发的traceCallBack()中执行了next()

通常,我们在此生命周期方法中根据消息的action实现消息触发的实际业务逻辑,比如:应答TraceIPC等。


# onProcessDidDiscardMessage(fromProcessId, data)

# 参数列表
  • fromProcessId:消息发送方的进程ID。

  • data:消息的自定义数据。

# 使用场景

此生命周期方法有两种触发方式:

  • 当前进程中收到的自定义通信消息在分流阶段被舍弃。
  • 当前进程中在TraceIPC超时后收到了应答消息。

通常,我们在此生命周期方法中对被舍弃的进程间通信消息进行统一处理。


# onProcessTraceMessageTimeout(toProcessId, data)

# 参数列表
  • fromProcessId:超时的TraceIPC消息接收方的进程ID。

  • data:消息的自定义数据。

# 使用场景

当前进程发起TraceIPC且在指定时间内未收到接收方应答时触发此生命周期方法。通常,我们在此生命周期方法对超时的TraceIPC消息进行统一处理,比如:重新尝试发起TraceIPC等。

# 全局对象

我们已经知道,在多进程架构的应用程序中,不同的进程拥有完全独立的内存空间,进程内的状态完全隔离。因此,Corejs提供了在进程组中共享状态的能力,即:全局对象

注意

全局对象仅限于在单个应用程序中的多个进程之间共享数据,无法取代分布式协调工具,比如:ZooKeeperRedis等。

# API设计

在实现细节上,全局对象是一个存储在Master进程中的ObjectWorker进程通过进程间通信Master进程发起读写指令以实现对全局对象的访问。

提示

因此,全局对象中值的类型受制于进程间通信时nodejs自动执行序列化的影响。

为避免序列化造成的问题,我们设置全局对象时同样应使用基本类型构成的field,即:

  • Array
  • Object
  • Number
  • String
  • Boolean

ClusterCore提供了对全局对象进行简单读写操作的API:


# getGlobalObject([keyPath], callBack)

# 使用说明

我们可以使用getGlobalObject()读取整个全局对象,或读取全局对象中指定键名/键路径对应的值。

# 参数列表
  • keyPath:期望读取的键名或键路径,非必填项。指定键路径时,使用键名链路字符串构成的Array即可。

    当此项未指定或指定为nullundefinedNaN''[]{}时将读取整个全局对象

    提示

    尝试读取全局对象时,如果键路径中存在键名指向的field不存在或指向的值不为引用类型,则将在callBack中得到一个异常。

  • callBack:读取全局对象的执行结果,必填项,是一个cps风格的Function。其参数列表为(error, value)

    • error:读取全局对象失败的原因,为null时表示读取动作执行成功。
    • value:在全局对象中读取到的值,当操作执行失败时为undefined

      需要注意的是,对值为引用类型的field中不存在的键名进行取值时也将得到undefined

    说明

    在执行getGlobalObject()时,即使没有指定callBack也不会产生阻塞性异常。

    出于性能考虑,在检测到没有指定calllBack时将直接退出,不再触发实际读取逻辑。


# setGlobalObject(keyPath[, value][, callBack])

提示

本节中使用的宏变量存储在Core.Macros中。

# 使用说明

我们可以使用setGlobalObject()设置或更新指定键名或键路径指向的field

另外,我们可以结合数组指令快速对全局对象中的Array进行操作。

# 参数列表
  • keyPath:期望设置或更新的键名或键路径,必填项。指定键路径时,使用键名链路字符串构成的Array即可。

    说明

    setGlobalObject()不允许对全局对象进行不安全的写入,在执行以下操作时将在callBack中得到一个异常:

    • 尝试在不存在的field中创建新field
    • 尝试在值为非引用类型的field中创建新field
  • value:期望设置/更新的键值或数组指令的参数,非必填项。

    说明

    在执行setGlobalObject()时,将根据业务层实际使用的参数列表动态生成一个参数Array以作为value

    • 当参数列表中最后一个参数为Function时,使用业务层实际使用的参数列表中从第二个参数至倒数第二个参数作为value
    • 当参数列表中最后一个参数为非Function时,使用参数列表中第二个参数至倒数第一个参数作为value

    我们已经知道,value是一个Array,对于value的应用于全局对象的效果:

    • 在指定的keyPath中不包含数组指令时,将value.pop()的值设置至指定的keyPath中。
    • 在指定的keyPath中使用了数组指令时,将...value作为数组指令的执行参数应用至指定的keyPath中。

    需要注意的是,一些无需参数即可执行的数组指令可以不传入value,比如:

    • CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_POP
    • CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_SHIFT
    • CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_REVERSE
  • callBack:设置或更新全局对象的执行结果,非必填项,是一个cps风格的Function。其参数列表为(error, detail)

    • error:设置或更新全局对象失败的原因,为null时表示操作执行成功。
    • detail:设置或更新全局对象的操作详情,其结构为{ globalObject, commandResult }

    说明

    对于设置或更新全局对象的操作详情:

    • globalObject:执行设置或更新操作后的全局对象,当设置/更新操作执行失败时为undefined
    • commandResult数组指令的执行结果,当没有使用数组指令或设置/更新操作执行失败时为undefined

      对空数组使用CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_POP指令,也将得到undefined

# 使用样例

接下来,让我们来看一个使用setGlobalObject()设置全局对象的🌰:

const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // 主进程中创建键名为processDetail.M的field存储其进程pid
    if (processId === 'M') {
      // 首先创建键名processDetail指向的field
      Core.ClusterCore.setGlobalObject('processDetail', {}, (err) => {
        if (err) {
          return;
        }
        // 在创建首层field成功后写入主进程的pid并派生子进程
        Core.ClusterCore.setGlobalObject(['processDetail', 'M'], process.pid);
        Core.ClusterCore.fork(4);
      });
    }
    // 子进程中创建键名为processDetail.[进程偏移]的field存储其进程pid
    else {
      const processOffset = processId.split(':').pop();
      Core.ClusterCore.setGlobalObject(['processDetail', processOffset], process.pid);
    }
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

我们可以在onProcessDidInit()中使用getGlobalObject()检查设置全局对象的成果:

// 查看全局对象的设置结果
setTimeout(() => {
  const fieldName = processId === 'M' ? 'M' : processId.split(':').pop();
  const keyPath = ['processDetail', fieldName];
  Core.ClusterCore.getGlobalObject(keyPath, (err, value) => {
    !err && console.log(`进程[${fieldName}]的PID -> ${value}`);
  });
}, 1000);
1
2
3
4
5
6
7
8

# 数组指令

setGlobalObject()时,如果指定的键路径对应了全局对象中的值为Array类型,我们可以在键路径中追加以下数组指令以快捷实现数组变异操作:

数组指令 作用
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_POP 删除数组尾部的第一个元素,即执行pop()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_PUSH 向数组尾部追加新元素,即执行push()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_FILL 数组填充,即执行fill()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_SHIFT 删除数组头部的第一个元素,即执行shift()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_SPLICE 对数组执行铰接操作,即执行splice()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_UNSHIFT 向数组头部添加新元素,即执行unshift()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_REVERSE 数组翻转,即执行reverse()
CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_COPYWITHIN 数组内部替换,即执行copywhthin()
# 使用样例

接下来,让我们来看一个在setGlobalObject()使用数组指令设置全局对象的🌰:

const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // 在主进程中创建键名为processDetail的field,并推入主进程信息
    if (processId === 'M') {
      // 首先创建键名processDetail指向的field
      Core.ClusterCore.setGlobalObject('processDetail', [], (err) => {
        if (err) {
          return;
        }
        // 在创建首层field成功后推入主进程信息并派生子进程
        Core.ClusterCore.setGlobalObject(
          ['processDetail', Core.Macros.CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_PUSH],
          { processId, processPid: process.pid }
        );
        Core.ClusterCore.fork(4);
      });
    }
    // 子进程中将子进程信息推入processDetail指向的field
    else {
      Core.ClusterCore.setGlobalObject(
        ['processDetail', Core.Macros.CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_PUSH],
        { processId, processPid: process.pid }
      );
    }
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

同样,我们可以在onProcessDidInit()中使用getGlobalObject()检查设置全局对象的成果:

// 查看全局对象的设置结果
setTimeout(() => {
  Core.ClusterCore.getGlobalObject((err, value) => {
    if (err) {
      return;
    }
    console.log(`当前应用程序中的全局对象为 -> ${JSON.stringify(value)}`);
  });
}, 1000);
1
2
3
4
5
6
7
8
9

# removeGlobalObject(keyPath[, callBack])

# 使用说明

我们可以使用removeGlobalObject()移除指定键名或键路径指向的field

# 参数列表
  • keyPath:期望移除的键名或键路径,必填项。指定键路径时,使用键名链路字符串构成的Array即可。

    说明

    removeGlobalObject()将因期望移除的键名或键路径对应的值类型有不同的删除行为:

    • 当键名或键路径对应的值为Array时,如果期望移除的键名为Number,则执行splice()移除键名对应位置的元素。
    • 其余场景下,在期望移除的键名所处的全局对象上下文中执行delete操作。

    如果键路径中存在键名指向的field不存在或指向的值不为引用类型,则将在callBack中得到一个异常。

  • callBack全局对象移除操作的执行结果,非必填项,是一个cps风格的Function。其参数列表为(error, detail)

    • error全局对象移除操作执行失败的原因,为null时表示操作执行成功。
    • globalObject:执行移除操作后的全局对象,当操作执行失败时为undefined
# 使用样例

让我们基于设置全局对象的样例,来使用removeGlobalObject()删除全局对象中指定的field






















 
 
 
 
 
 
 
 
 
 
 













const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // 主进程中创建键名为processDetail.M的field存储其进程pid
    if (processId === 'M') {
      // 首先创建键名processDetail指向的field
      Core.ClusterCore.setGlobalObject('processDetail', {}, (err) => {
        if (err) {
          return;
        }
        // 在创建首层field成功后写入主进程的pid并派生子进程
        Core.ClusterCore.setGlobalObject(['processDetail', 'M'], process.pid);
        Core.ClusterCore.fork(4);
      });

      // 尝试删除全局对象中的field
      setTimeout(()=>{
        // 移除processDetail.M指向的field
        Core.ClusterCore.removeGlobalObject(['processDetail', 'M'], (err, globalObject) => {
          !err && console.log(`移除processDetail.M后的全局对象 -> ${JSON.stringify(globalObject)}`);
        });
        // 移除processDetail指向的field
        Core.ClusterCore.removeGlobalObject('processDetail', (err, globalObject) => {
          !err && console.log(`移除processDetail后的全局对象 -> ${JSON.stringify(globalObject)}`);
        });
      }, 1000);
    }
    // 子进程中创建键名为processDetail.[进程偏移]的field存储其进程pid
    else {
      const processOffset = processId.split(':').pop();
      Core.ClusterCore.setGlobalObject(['processDetail', processOffset], process.pid);
    }
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44

# 数据一致性

在多进程架构下,对全局对象的并发操作可能会导致数据一致性问题。为保证操作全局对象的事务性,ClusterCore提供了对全局对象进行自定义事务操作的API:

注意

自定义读取/更新全局对象的API在Master进程中使用Sandbox严格同步执行代码片段的方式以保证全局对象在同一时刻仅被一个操作访问。

因此,操作规则queryFnupdateFn不允许执行异步逻辑无法访问外部变量,我们可以通过context将外部资源植入Sandbox。

需要注意的是,context受制于进程间通信时nodejs自动执行序列化的影响,应使用基本类型构成,即:

  • Array
  • Object
  • Number
  • String
  • Boolean

# queryGlobalObject([context,] [queryFn,] callBack)

# 使用说明

在自定义读取全局对象时,我们可以使用以下方式:

  • queryGlobalObject(callBack):适用于直接读取整个全局对象的场景。

  • queryGlobalObject(queryFn, callBack):适用于使用不访问任何外部资源的自定义规则读取全局对象的场景。

  • queryGlobalObject(context, queryFn, callBack):适用于使用依赖外部资源的自定义规则读取全局对象的场景。

# 参数列表
  • context:应用于读取规则的资源上下文,非必填项。

  • queryFn全局对象的自定义读取规则,非必填项,是一个Function。其参数列表为(globalObject, context)

    • globalObject:执行读取操作时应用程序中全局对象的快照。
    • context:外部资源上下文。当执行自定义读取操作未指定资源上下文时,则此值为undefined

    注意

    ClusterCore将在未指定自定义读取规则queryFn时返回整个全局对象。另外,queryFn不允许执行异步逻辑无法访问外部变量

    通常,我们在queryFn中根据外部依赖资源contextglobalObject进行解析和计算,并将期望的数据结构使用return指令返回。

  • callBack:自定义读取全局对象的执行结果,必填项,是一个cps风格的Function。其参数列表为(error, value)

    • error:自定义读取全局对象失败的原因,为null时表示读取动作执行成功。
    • value:自定义读取规则的返回值,即queryFnreturn的结果,当操作执行失败时为undefined

    说明

    在执行queryGlobalObject()时,即使没有指定callBack也不会产生阻塞性异常。

    出于性能考虑,在检测到没有指定calllBack时将直接退出,不再触发实际读取逻辑。

# 使用样例

让我们基于使用数组指令设置全局对象的样例,来演示如何使用queryGlobalObject()自定义读取全局对象

































 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 







const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // 在主进程中创建键名为processDetail的field,并推入主进程信息
    if (processId === 'M') {
      // 首先创建键名processDetail指向的field
      Core.ClusterCore.setGlobalObject('processDetail', [], (err) => {
        if (err) {
          return;
        }
        // 在创建首层field成功后推入主进程信息并派生子进程
        Core.ClusterCore.setGlobalObject(
          ['processDetail', Core.Macros.CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_PUSH],
          { processId, processPid: process.pid }
        );
        Core.ClusterCore.fork(4);
      });
    }
    // 子进程中将子进程信息推入processDetail指向的field
    else {
      Core.ClusterCore.setGlobalObject(
        ['processDetail', Core.Macros.CLUSTER_CORE_GLOBAL_OBJECT_ARRAY_PUSH],
        { processId, processPid: process.pid }
      );
    }

    // 自定义读取全局对象
    setTimeout(() => {
      // 定义读取规则中依赖的外部变量为资源上下文
      const context = { processId };
      Core.ClusterCore.queryGlobalObject(
        // 设置外部资源上下文
        context,
        // 设置自定义读取规则
        (globalObject, context) => {
          const { processId } = context;
          const { processDetail } = globalObject;
          return processDetail.find((detail) => detail.processId === processId);
        },
        // 处理读取结果
        (err, value) => {
          if (err) {
            return;
          }
          console.log(`当前进程的信息 -> ${JSON.stringify(value)}`);
        });
    }, 1000);
  }
}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

# updateGlobalObject([context,] updateFn[, callBack])

# 使用说明

在自定义更新全局对象时,我们可以使用以下方式:

  • updateGlobalObject(updateFn):适用于直接更新全局对象不关注执行结果的场景。

  • updateGlobalObject(context, updateFn):适用于使用依赖外部资源的自定义规则更新全局对象但不关注执行结果的场景。

  • updateGlobalObject(context, updateFn, callBack):适用于使用依赖外部资源的自定义规则更新全局对象的场景。

# 参数列表
  • context:应用于更新规则的资源上下文,非必填项。

  • updateFn全局对象的自定义更新规则,必填项,是一个Function。其参数列表为(globalObject, context)

    • globalObject:执行更新操作时应用程序中全局对象的快照。
    • context:外部资源上下文。当执行自定义更新操作未指定资源上下文时,则此值为undefined

    提示

    自定义更新规则updateFn不允许执行异步逻辑无法访问外部变量ClusterCore将使用Object.assign()updateFn的执行结果应用至全局对象

    通常,我们在updateFn中根据外部依赖资源context和当前globalObject完成解析和计算,并通过使用Spread等操作符重组新的全局对象使用return指令返回。

  • callBack:自定义更新全局对象的执行结果,非必填项,是一个cps风格的Function。其参数列表为(error, globalObject)

    • error:自定义更新全局对象失败的原因,为null时表示更新动作执行成功。
    • globalObject:自定义更新操作执行成功后的全局对象,当操作执行失败时为undefined
# 使用样例

接下来,让我们借助全局对象来实现一个在进程组中竞争资源的Demo:

const Core = require('node-corejs');

class AppMain extends Core.AppMain {
  /**
   * 进程初始化完成
   * @override
   */
  onProcessDidInit(processId, launchParams) {
    super.onProcessDidInit(processId, launchParams);
    // 在主进程中设置资源总数为9
    // 创建8个Worker进程对资源进行竞争
    if (processId === 'M') {
      Core.ClusterCore.setGlobalObject('totalCount', 9, (err) => {
        if (err) {
          return;
        }
        Core.ClusterCore.fork(8);
      });
    }
    // 在子进程中竞争资源,并关闭未获得资源的进程
    // 偏移为奇数的进程占用1个资源
    // 偏移为偶数的进程占用2个资源
    else {
      // 计算进程偏移并定义更新规则依赖的资源上下文
      const processOffset = parseInt(processId.split(':').pop());
      const context = { processOffset };
      Core.ClusterCore.updateGlobalObject(
        // 设置外部资源上下文
        context,
        // 设置自定义更新规则
        (globalObject, context) => {
          let { totalCount } = globalObject;
          const { processOffset } = context;

          // 根据进程偏移尝试对资源进行预占位
          if (processOffset % 2) {
            totalCount -= 1;
          } else {
            totalCount -= 2;
          }

          // 当资源不足时抛出异常,不改变全局对象
          if (totalCount < 0) {
            throw new Error('资源不足');
          }
          // 资源预占位成功时更新全局对象扣除对应的资源
          else {
            return {
              ...globalObject,
              totalCount,
            };
          }
        },
        // 处理更新结果
        (err, globalObject) => {
          // 当更新全局对象失败时说明竞争资源失败,退出进程
          err && process.exit();
          console.log(`进程[${processId}]获取资源成功,当前剩余资源 -> ${globalObject.totalCount}`);
        }
      )
    }
  }

  /**
   * Worker进程退出
   * @override
   */
  onWorkerProcessDidExit(exitedProcessId, exitedDetail, reboot) {
    console.log(`进程[${exitedProcessId}]因未获取到资源退出`);
  }

}

// 使用AppMain初始化ClusterCore并启动
Core.ClusterCore.init(AppMain);
Core.ClusterCore.start();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76