Embryo!

进程模块 —— 雏形

在确定好设计理念之后,我们就需要来具体实现 OS 的进程系统了。我希望它作为一个模块挂载到 OS 下,这样抽象成模块存在,就可以易于安装、分离与运行。

既然它作为一个独立模块存在,那么它对主 AI 应当只暴露一些公开的接口,所以首先,我们需要设计它暴露的公开接口是什么。这样,即便后来我们换了或者是优化了具体的实现细节,只要公开的接口相同,其他的代码就不用跟着更换。

在此之前,需要先定义一些之前公开的涉及到的类型和常量:

/**
 * 公开的原子函数返回值
 */

/** 下一次执行时, 仍然从当前原子函数重复执行 */
export const OK_STOP_CURRENT = "ok_stop_current"
/** 下一次执行时, 从当前原子函数下一条继续执行 */
export const OK_STOP_NEXT = "ok_stop_next"
/** 下一次执行时, 从指定 Tag 开始继续执行 */
export const OK_STOP_CUSTOM = "ok_stop_custom"
/** 错误, 在下一 tick 重启运行该进程 */
export const STOP_ERR = "stop_err"

/**
 * 特殊的原子函数返回值
 */

/** 阻塞进程 */
const STOP_STUCK = "stop_stuck"

/** 普通原子函数返回值 */
type AtomicFuncReturnCode = OK | typeof OK_STOP_CURRENT | typeof OK_STOP_NEXT | [typeof OK_STOP_CUSTOM, string] | [typeof STOP_ERR, string]
/** 可阻塞的原子函数返回值 */
type StuckableAtomicFuncReturnCode = OK | typeof STOP_STUCK

/**
 * 原子函数定义
 */

/** 原子函数 */
type AtomicFunc = () => AtomicFuncReturnCode | StuckableAtomicFuncReturnCode
/** Tag + 原子函数 */
type AtomicFuncWithTag = [string, AtomicFunc]
/** 条件跳转: "JUMP" + 条件 + Tag */
type AtomicJump = ["JUMP", () => boolean, string]

/** 原子函数描述器, 用于构建进程时 */
type AtomicFuncDescriptor = (AtomicFunc | AtomicFuncWithTag | AtomicJump)[]

现在还没涉及到阻塞的锁、信号量和管程,所以暂且不写,留待以后补充。

接下来,我们需要在进程模块主文件内部来具体定义:

/**
 * 进程模块
 * 
 * 特别的, 暂时我们不允许 kill 进程. 如果引入 kill 进程, 会
 * 牵涉到权限控制等一系列尚未定义的方面.
 */
export interface ProcessModule {
    /**
     * 创建进程
     * @param descriptor 进程具体流程
     * @param description 进程简要描述
     * @returns 进程 Id
     */
    createProc(descriptor: AtomicFuncDescriptor, description: string): ProcId
    /**
     * 在当前 tick 运行一次
     * 
     * 注意: 本函数每 tick 只能运行一次, 否则会报错.
     * 所以, 请在其他功能模块创建完进程后, 再执行.
     * 
     * 注意: 本函数在运行中, 如果有新的进程被创建, 新
     * 的进程也会被执行.
     */
    tick(): void
}

/**
 * 进程
 * 不对外公开
 */
class Process {
    /** 进程号 */
    pid: ProcId
    /** 
     * 进程文字描述
     * 我设定为强制要求, 因为可读的文字描述有利于
     * 监控与调试
     */
    description: string
    /** 进程具体流程 */
    descriptor: AtomicFuncDescriptor
    /** 记录当前进程运行到的原子函数行数 */
    pc: number
    /** Tag 到对应原子函数行数的映射 */
    tagDict: {[tag: string]: number}
    /** 
     * 进程状态: 就绪, 阻塞, 运行
     * 结束状态被省去, 因为进程一旦消亡, 则不会被中断, 
     * 直至处理结束进程的函数结束
     */
    state: "Ready" | "Stuck" | "Running"

    constructor(
        pid: ProcId, 
        description: string, 
        descriptor: AtomicFuncDescriptor
    ) {
        this.pid = pid
        this.description = description
        this.descriptor = descriptor

        // 初始化当前 PC
        this.pc = 0
        // 计算 Tag 到对应原子函数行数的映射
        this.tagDict = {}
        this.descriptor.forEach((value, index) => {
            // 检查属于 AtomicFuncWithTag
            if ( Array.isArray(value) && value.length === 2 )
                this.tagDict[value[0]] = index
        })
        // 初始化状态
        this.state = "Ready"

        // 检查跳转的 Tag 都存在
        for (const desc of this.descriptor) {
            if ( Array.isArray(desc) && desc.length === 3 )
                assertWithMsg(desc[2] in this.tagDict, `在检查进程 ${this} 时发现未识别的跳转后标签 ${desc[2]}`)
        }
    }
    
    toString() {
        return `[Proc ${this.pid}: ${this.description}]`
    }
}

在之后, 锁、信号量和管程会作为进程模块的子模块附加在内。因为他们需要访问进程的私有属性和方法。

📃 Interface 模块内部实现

在 Interface 模块内部,我们需要实现一些私有属性和方法。

/**
 * 进程模块的具体实现
 * Process Module Implementation (PMI)
 * 
 * 特点:
 *  - 在这个实现当中, 同一时刻, 只有一个进程可以运行 (不使用异步函数)
 */
export class PMI implements ProcessModule {
    /** 当前正在运行的进程 Id, 用于内部阻塞原子函数识别进程 */
    static #currentProcId: number = -1
    /** 断言正在运行的进程 Id, 用于检查 */
    static #assertCurrentProcId(id: number, task: string) {
        if ( !(id in this.#procDict) )
            throw `在${task}中, 发现正在运行的进程 Id 为 ${this.#currentProcId}, 找不到应当处于进行态的进程 Id ${id}`
        else
            assertWithMsg( this.#currentProcId === id, `在${task}中, 发现正在运行的进程 Id 为 ${this.#currentProcId}, 但是进程 ${this.#procDict[id]} 的状态为运行态` )
    }
    /** 就绪进程 Id 队列 */
    static #processIdReadyQueue: number[] = []
    /** 阻塞进程 Id 队列 */
    static #processIdStuckQueue: number[] = []
    /** 映射进程 Id 到进程实体 */
    static #procDict: {[ id: number ]: Process} = {}

    /** 可用的进程 Id 链表 */
    static #idLinkList: number[] = [ ...Array(MAX_PROC_ID).keys() ]
    /** 获得下一个进程 Id */
    static #getProcId(): number {
        // 无可用的进程 Id
        if ( PMI.#idLinkList.length === 0 )
            throw "Error: 无可用进程 Id";
        
        // 返回进程 Id 链表中第一个可用 Id
        return PMI.#idLinkList.shift()
    }

    /**
     * 阻塞进程, 不对外暴露
     */
    static #stuckProc(id: number) {
        // 找不到进程 id
        if ( !(id in this.#procDict) )
            throw `Error: 无法找到进程 ${id} 以阻塞`
        
        const proc = this.#procDict[id]
        const state = proc.state

        if ( state === "Ready" ) {
            // 阻塞就绪进程
            // 从就绪进程 Id 队列中删去
            _.pull(this.#processIdReadyQueue, id)
        } else if ( state === "Running" ) {
            // 阻塞运行中进程
            // 确认当前只有一个进程正在运行
            // 并且为正被阻塞的进程
            this.#assertCurrentProcId(id, "阻塞")
            // 重置正在运行的进程 Id
            this.#currentProcId = -1
        } else if ( state === "Stuck" ) {
            throw `Error: 进程 ${proc} 已经处于阻塞态, 无法再次被阻塞`
        }
        // 调整状态为阻塞
        proc.state = "Stuck"
        // 将进程 Id 加入阻塞进程 Id 队列中
        this.#processIdStuckQueue.push(id)
    }

    /**
     * 将进程从阻塞态唤醒到就绪态
     * 
     * 注意: 考虑到群惊效应, 进程可能被唤醒多次. 因此, 若
     * 进程出于就绪态, 不会报错. 但是因为同一时间只有一个
     * 运行的进程, 而自己不能唤醒自己, 会报错.
     */
    static #wakeUpProc(id: number) {
        // 找不到进程 id
        if ( !(id in this.#procDict) )
            throw `Error: 无法找到进程 ${id} 以唤醒`
        
        const proc = this.#procDict[id]
        const state = proc.state

        if ( state === "Ready" )  {
            // 已经处于就绪的进程
            // 无需处理
        } else if ( state === "Stuck" ) {
            // 唤醒阻塞中进程
            // 从阻塞进程 Id 队列中删除
            _.pull(this.#processIdStuckQueue, id)
            // 将进程 Id 加入就绪进程 Id 队列中
            this.#processIdReadyQueue.push(id)
        } else if ( state === "Running" ) {
            // 唤醒运行中进程
            throw `Error: 运行中的进程 ${proc} 无法再次被唤醒`
        }
        // 调整状态为就绪
        proc.state = "Ready"
    }

    /** 
     * 销毁进程, 不对外暴露
     * @todo 给未释放的锁, 信号量或管程提出警告
     */
    static #destroyProc(id: number) {
        // 找不到进程 id
        if ( !(id in this.#procDict) )
            throw `Error: 无法找到进程 ${id} 以结束`
        
        const proc = this.#procDict[id]
        const state = proc.state

        if ( state === "Ready" ) {
            // 销毁就绪进程
            // 从就绪进程 Id 队列中删去
            _.pull(this.#processIdReadyQueue, id)
        } else if ( state === "Running" ) {
            // 销毁运行中进程
            // 确认当前只有一个进程正在运行
            // 并且为正被销毁的进程
            this.#assertCurrentProcId(id, "销毁")
            // 重置正在运行的进程 Id
            this.#currentProcId = -1
        } else if ( state === "Stuck" ) {
            // 销毁阻塞进程
            // 从阻塞进程 Id 队列中删去
            _.pull(this.#processIdStuckQueue, id)
        }
        
        // 从映射中删去实体
        delete this.#procDict[id]
        // 归还进程 Id
        this.#idLinkList.push(id)
    }

    createProc(descriptor: AtomicFuncDescriptor, description: string) {
        const id = PMI.#getProcId()
        const proc = new Process(id, description, descriptor)

        // 将进程注册到映射表中
        PMI.#procDict[id] = proc
        // 将新创建的进程加入到就绪队列当中
        PMI.#processIdReadyQueue.push(id)

        return id
    }
    /** 记录上一次调用 tick 的 Game.time 以保证每 tick 只能执行一次 tick */
    static #lastTick: number = -1

    tick() {
        // 检验为本 tick 第一次调用
        assertWithMsg(PMI.#lastTick === -1 || PMI.#lastTick !== Game.time, `进程模块在 ${Game.time} 被重复调用 tick 函数`)
        // 校验当前没有正在运行的进程
        assertWithMsg(PMI.#currentProcId === -1, `进程模块在 tick 开始时, 发现已有正在运行的进程 Id ${PMI.#currentProcId}`)
        // 创建临时就绪进程 Id 队列
        const processIdReadyQueue = []

        while ( PMI.#processIdReadyQueue.length !== 0 ) {
            // 从就绪队列 Id 中出队头 Id
            const id = PMI.#processIdReadyQueue.shift()
            const proc = PMI.#procDict[id]

            // 修改状态
            PMI.#currentProcId = id
            proc.state = "Running"

            // 运行进程
            // 这个不作为 Process 的成员函数存在, 
            // 是因为要访问进程模块的私有成员
            while ( true ) {
                // 进程执行结束
                if (proc.pc >= proc.descriptor.length) {
                    PMI.#destroyProc(id)
                    break
                }
                // 获得当前原子函数描述
                const desc = proc.descriptor[proc.pc]

                if (Array.isArray(desc) && desc.length === 3) {
                    // 条件跳转
                    const condition = desc[1]()

                    if ( condition )
                        proc.pc = proc.tagDict[desc[2]]
                    else
                        proc.pc++
                } else {
                    // 取得原子函数
                    let atomicFunc: AtomicFunc = null
                    if (Array.isArray(desc) && desc.length === 2) atomicFunc = desc[1]
                    else atomicFunc = desc

                    const returnCode = atomicFunc()
                    if (returnCode === OK) {
                        // 顺序执行下一条原子函数
                        proc.pc++
                    } else if (returnCode === "ok_stop_current") {
                        // 主动停止, 仍然从本条原子函数开始
                        proc.state = "Ready"
                        processIdReadyQueue.push(id)
                        // 复原状态
                        PMI.#currentProcId = -1
                        break
                    } else if (returnCode === "ok_stop_next") {
                        // 主动停止, 从下一条原子函数开始
                        proc.pc++
                        // 特殊情况: 进程结束
                        if ( proc.pc >= proc.descriptor.length )
                            PMI.#destroyProc(id)
                        else {
                            proc.state = "Ready"
                            processIdReadyQueue.push(id)
                            // 复原状态
                            PMI.#currentProcId = -1
                        }
                        break
                    } else if (returnCode === "stop_stuck") {
                        // 阻塞, 下次仍然从同一条原子函数开始执行
                        PMI.#stuckProc(id)
                        break
                    } else if (Array.isArray(returnCode) && returnCode[0] === "stop_err") {
                        // 错误, 资源的释放应当在进程内部完成
                        // 在下一 tick 重启运行该进程
                        proc.state = "Ready"
                        processIdReadyQueue.push(id)
                        proc.pc = 0
                        // 输出错误信息
                        log(LOG_ERR, `运行进程 ${proc} 时, 遇到错误: ${returnCode[1]}`)
                        // 复原状态
                        this.#currentProcId = -1
                        break
                    } else if (Array.isArray(returnCode) && returnCode[0] === "ok_stop_custom") {
                        // 复原状态
                        PMI.#currentProcId = -1

                        // 主动停止, 从特定 Tag 处开始
                        const tag = returnCode[1]
                        if (!(tag in proc.tagDict))
                            throw `Error: 在执行进程 ${proc} 的过程中, 无法跳转到标签 ${tag}`
                        
                        proc.state = "Ready"
                        proc.pc = proc.tagDict[tag]
                        processIdReadyQueue.push(id)
                        break
                    }
                }
            }
        }

        // 将进程模块的就绪进程 Id 队列指向临时变量
        PMI.#processIdReadyQueue = processIdReadyQueue
        // 更新上一次调用函数的时间
        PMI.#lastTick = Game.time
    }
}

锁的实现是进程同步机制中最简单的一个。它只需要实现acquirerelease 两个方法。

但是因为这牵涉到进程的阻塞与唤醒,所以它不能作为一个独立模块存在,而是作为进程模块的子模块存在。

首先,我们定义锁类实体。

/** 随机生成一个锁的 Id */
const getLockId = () => generate_random_hex(8)

/**
 * 锁类实体
 * 不对外公开
 */
class Lock {
    /** 锁 Id */
    lockId: string
    /** 持有锁的进程 Id */
    holder: ProcId
    /** 请求锁而阻塞的进程 Id 列表 */
    stuckList: ProcId[]
    constructor(lockId: string) {
        this.lockId = lockId
        this.holder = null
        this.stuckList = []
    }
}

接下来,我们需要定义锁模块的接口,并加到进程模块中去。

/**
 * 锁模块
 */
interface LockModule {
    /** 创建一个新锁 */
    createLock(): string
    /** 销毁一个锁 */
    destroyLock(lockId: string): void
    /** 获得一个锁, 只能在进程流程中运行使用 */
    acquireLock(lockId: string): StuckableAtomicFuncReturnCode
    /** 释放一个锁, 只能在进程流程中运行使用 */
    releaseLock(lockId: string): OK
}

在进程模块中加入管理锁的部分:

/** 映射锁 Id 到锁实体 */
static #lockDict: {[id: string]: Lock} = {}
/** 映射进程 Id 到使用过的锁 Id, 用于进程销毁时, 检测销毁锁 */
static #proc2Lock: {[id: ProcId]: string[]} = {}
/** 实例上的锁模块 */
lock: LockModule

static #lockCreateLock() {
    const id = getLockId()
    this.#lockDict[id] = new Lock(id)
    return id
}

static #lockDestroyLock(lockId: string) {
    // 如果 lock 不存在
    if ( !(lockId in this.#lockDict) )
        throw `Error: 无法找到锁 ${lockId} 以销毁`
    
    const lock = this.#lockDict[lockId]
    // 当锁销毁时, 会唤醒所有阻塞的进程
    for (const pid of lock.stuckList)
        this.#wakeUpProc(pid)

    delete this.#lockDict[lockId]
}

static #lockAcquireLock(lockId: string): StuckableAtomicFuncReturnCode {
    // 如果 lock 不存在 (可能已经销毁)
    if ( !(lockId in this.#lockDict) )
        return OK
    
    const lock = this.#lockDict[lockId]
    const pid = this.#currentProcId
    // 正在运行的进程不存在
    if ( pid === -1 )
        throw `Error: 在获得锁 ${lockId} 时, 无法找到正在运行的进程`
    
    // 如果正有进程持有锁
    if (lock.holder !== null) {
        assertWithMsg(!lock.stuckList.includes(pid), `锁 ${lockId} 的阻塞列表中包含 ${this.#procDict[pid]}, 该进程却又想获得锁`)
        lock.stuckList.push(pid)
        return STOP_STUCK
    }
    
    // 进程持有锁
    lock.holder = pid
    return OK
}

static #lockReleaseLock(lockId: string) {
    // 如果 lock 不存在 (可能已经销毁)
    if ( !(lockId in this.#lockDict) )
        return OK
    
    const lock = this.#lockDict[lockId]
    const pid = this.#currentProcId
    // 正在运行的进程不存在
    if ( pid === -1 )
        throw `Error: 在释放锁 ${lockId} 时, 无法找到正在运行的进程`
    
    // 释放的进程不持有锁
    if (lock.holder !== pid)
        throw `Error: 进程 ${this.#procDict[pid]} 不持有锁 ${lockId}, 但是却期望释放`
    
    // 重置持有进程
    lock.holder = null
    // 群惊阻塞进程
    for (const id of lock.stuckList)
        this.#wakeUpProc(id)
    // 重置阻塞进程 Id 列表
    lock.stuckList = []
    return OK
}

constructor() {
    // 创建锁子模块
    // 这里之所以采用这种写法, 是因为如果所有相关函数都在
    // 进程模块下, 不太美观. 不如整理到一个子模块下, 但是
    // 子模块又需要访问进程模块的私有属性和方法, 因此采用
    // 共有匿名函数对私有方法包装.
    this.lock = {
        createLock: () => PMI.#lockCreateLock(), 
        destroyLock: (lockId: string) => PMI.#lockDestroyLock(lockId), 
        acquireLock: (lockId: string) => PMI.#lockAcquireLock(lockId), 
        releaseLock: (lockId: string) => PMI.#lockReleaseLock(lockId)
    }
}

Okay,到目前为止,我们的进程模型大略已经完成了,下面需要一些简单的测试。

🚥 信号集的实现

信号集需要实现SwaitSsignal 。而信号集依赖于信号量。实际上,我们只要实现了一个个的信号量,就可以实现信号集。

但是因为这牵涉到进程的阻塞与唤醒,所以它不能作为一个独立模块存在,而是作为进程模块的子模块存在。

首先我们定义信号量类:

/** 随机生成一个信号量的 Id */
const getSignalId = () => generate_random_hex(8)

/**
 * 信号量类实体
 * 不对外公开
 */
class Signal {
    /** 信号量 Id */
    signalId: string
    /** 因该信号量而阻塞的进程 Id 列表 */
    stuckList: ProcId[]
    /** 信号量值 */
    value: number
    constructor(signalId: string, value: number) {
        this.signalId = signalId
        this.stuckList = []
        this.value = value
    }
}

接下来,我们需要定义信号量模块的接口,并加到进程模块中去。

/**
 * 信号量模块
 */
interface SignalModule {
    /** 
     * 创建一个新信号量
     * @param value 初始值
     */
    createSignal(value: number): string
    /** 销毁一个信号量 */
    destroySignal(signalId: string): void
    /** 等待一个信号量集 */
    Swait(...signals: {signalId: string, lowerbound: number, request: number}[]): StuckableAtomicFuncReturnCode
    /** 激活一个信号量集 */
    Ssignal(...signals: {signalId: string, request: number}[]): StuckableAtomicFuncReturnCode
}

接下来,我们在进程模块中具体添加信号量子模块:

/** 映射信号量 Id 到信号量实体 */
static #signalDict: {[id: string]: Signal} = {}
/** 实例上的信号量模块 */
signal: SignalModule

static #signalCreateSignal(value: number) {
    const id = getSignalId()
    this.#signalDict[id] = new Signal(id, value)
    return id
}

static #signalDestroySignal(signalId: string) {
    if ( !(signalId in this.#signalDict) )
        throw `Error: 无法找到信号量 ${signalId} 以销毁`
    
    const signal = this.#signalDict[signalId]
    // 当信号量销毁时, 会唤醒所有阻塞的进程
    for (const pid of signal.stuckList)
        this.#wakeUpProc(pid)
    signal.stuckList = []
    
    delete this.#signalDict[signalId]
}

static #signalSwait(...signals: {signalId: string, lowerbound: number, request: number}[]): StuckableAtomicFuncReturnCode {
    const pid = this.#currentProcId
    // 正在运行的进程不存在
    if ( pid === -1 )
        throw `Error: 在获得信号集 ${signals.map(o => o.signalId)} 时, 无法找到正在运行的进程`
    
    for (const signalDescriptor of signals) {
        const signal = this.#signalDict[signalDescriptor.signalId]
        // 找不到信号量 (可能已经销毁)
        if (!signal) continue
        if (signal.value < signalDescriptor.lowerbound) {
            assertWithMsg(!signal.stuckList.includes(pid), `信号量 ${signal.signalId} 的阻塞列表中包含 ${this.#procDict[pid]}, 该进程却又想获得信号量`)
            signal.stuckList.push(pid)
            return STOP_STUCK
        }
    }

    for (const signalDescriptor of signals) {
        const signal = this.#signalDict[signalDescriptor.signalId]
        // 找不到信号量 (可能已经销毁)
        if (!signal) continue
        signal.value -= signalDescriptor.request
    }
    return OK
}

static #signalSsignal(...signals: {signalId: string, request: number}[]): StuckableAtomicFuncReturnCode {
    const pid = this.#currentProcId
    // 正在运行的进程不存在
    if ( pid === -1 )
        throw `Error: 在激活信号集 ${signals.map(o => o.signalId)} 时, 无法找到正在运行的进程`
    
    for (const signalDescriptor of signals) {
        const signal = this.#signalDict[signalDescriptor.signalId]
        // 找不到信号量 (可能已经销毁)
        if (!signal) continue
        signal.value += signalDescriptor.request
        for (const id of signal.stuckList)
            this.#wakeUpProc(id)
        signal.stuckList = []
    }
    
    return OK
}

constructor() {
    // ...
    // 创建信号量子模块
    this.signal = {
        createSignal: (value: number) => PMI.#signalCreateSignal(value), 
        destroySignal: (signalId: string) => PMI.#signalDestroySignal(signalId), 
        Swait: (...signals: {signalId: string, lowerbound: number, request: number}[]) => PMI.#signalSwait(...signals), 
        Ssignal: (...signals: {signalId: string, request: number}[]) => PMI.#signalSsignal(...signals)
    }
}

信号集的实现十分关键,我们后边资源管理模块也要依赖信号集来实现。

Last updated