Move onMessage so it can be used in the wrappers

This commit is contained in:
Asher 2020-11-18 12:05:24 -06:00
parent d55e06936b
commit 247c4ec776
No known key found for this signature in database
GPG Key ID: D63C1EF81242354A
2 changed files with 81 additions and 86 deletions

View File

@ -1,4 +1,4 @@
import { field, logger } from "@coder/logger" import { logger } from "@coder/logger"
import * as cp from "child_process" import * as cp from "child_process"
import * as net from "net" import * as net from "net"
import * as path from "path" import * as path from "path"
@ -8,13 +8,12 @@ import { rootPath } from "./constants"
import { settings } from "./settings" import { settings } from "./settings"
import { SocketProxyProvider } from "./socket" import { SocketProxyProvider } from "./socket"
import { isFile } from "./util" import { isFile } from "./util"
import { wrapper } from "./wrapper" import { onMessage, wrapper } from "./wrapper"
export class VscodeProvider { export class VscodeProvider {
public readonly serverRootPath: string public readonly serverRootPath: string
public readonly vsRootPath: string public readonly vsRootPath: string
private _vscode?: Promise<cp.ChildProcess> private _vscode?: Promise<cp.ChildProcess>
private timeoutInterval = 10000 // 10s, matches VS Code's timeouts.
private readonly socketProvider = new SocketProxyProvider() private readonly socketProvider = new SocketProxyProvider()
public constructor() { public constructor() {
@ -69,10 +68,13 @@ export class VscodeProvider {
vscode, vscode,
) )
const message = await this.onMessage(vscode, (message): message is ipc.OptionsMessage => { const message = await onMessage<ipc.VscodeMessage, ipc.OptionsMessage>(
vscode,
(message): message is ipc.OptionsMessage => {
// There can be parallel initializations so wait for the right ID. // There can be parallel initializations so wait for the right ID.
return message.type === "options" && message.id === id return message.type === "options" && message.id === id
}) },
)
return message.options return message.options
} }
@ -104,61 +106,13 @@ export class VscodeProvider {
dispose() dispose()
}) })
this._vscode = this.onMessage(vscode, (message): message is ipc.ReadyMessage => { this._vscode = onMessage<ipc.VscodeMessage, ipc.ReadyMessage>(vscode, (message): message is ipc.ReadyMessage => {
return message.type === "ready" return message.type === "ready"
}).then(() => vscode) }).then(() => vscode)
return this._vscode return this._vscode
} }
/**
* Listen to a single message from a process. Reject if the process errors,
* exits, or times out.
*
* `fn` is a function that determines whether the message is the one we're
* waiting for.
*/
private onMessage<T extends ipc.VscodeMessage>(
proc: cp.ChildProcess,
fn: (message: ipc.VscodeMessage) => message is T,
): Promise<T> {
return new Promise((resolve, reject) => {
const cleanup = () => {
proc.off("error", onError)
proc.off("exit", onExit)
proc.off("message", onMessage)
clearTimeout(timeout)
}
const timeout = setTimeout(() => {
cleanup()
reject(new Error("timed out"))
}, this.timeoutInterval)
const onError = (error: Error) => {
cleanup()
reject(error)
}
const onExit = (code: number | null) => {
cleanup()
reject(new Error(`VS Code exited unexpectedly with code ${code}`))
}
const onMessage = (message: ipc.VscodeMessage) => {
logger.trace("got message from vscode", field("message", message))
if (fn(message)) {
cleanup()
resolve(message)
}
}
proc.on("message", onMessage)
proc.on("error", onError)
proc.on("exit", onExit)
})
}
/** /**
* VS Code expects a raw socket. It will handle all the web socket frames. * VS Code expects a raw socket. It will handle all the web socket frames.
*/ */

View File

@ -5,6 +5,59 @@ import * as rfs from "rotating-file-stream"
import { Emitter } from "../common/emitter" import { Emitter } from "../common/emitter"
import { paths } from "./util" import { paths } from "./util"
const timeoutInterval = 10000 // 10s, matches VS Code's timeouts.
/**
* Listen to a single message from a process. Reject if the process errors,
* exits, or times out.
*
* `fn` is a function that determines whether the message is the one we're
* waiting for.
*/
export function onMessage<M, T extends M>(
proc: cp.ChildProcess | NodeJS.Process,
fn: (message: M) => message is T,
customLogger?: Logger,
): Promise<T> {
return new Promise((resolve, reject) => {
const cleanup = () => {
proc.off("error", onError)
proc.off("exit", onExit)
proc.off("message", onMessage)
clearTimeout(timeout)
}
const timeout = setTimeout(() => {
cleanup()
reject(new Error("timed out"))
}, timeoutInterval)
const onError = (error: Error) => {
cleanup()
reject(error)
}
const onExit = (code: number) => {
cleanup()
reject(new Error(`exited unexpectedly with code ${code}`))
}
const onMessage = (message: M) => {
;(customLogger || logger).trace("got message", field("message", message))
if (fn(message)) {
cleanup()
resolve(message)
}
}
proc.on("message", onMessage)
// NodeJS.Process doesn't have `error` but binding anyway shouldn't break
// anything. It does have `exit` but the types aren't working.
;(proc as cp.ChildProcess).on("error", onError)
;(proc as cp.ChildProcess).on("exit", onExit)
})
}
interface HandshakeMessage { interface HandshakeMessage {
type: "handshake" type: "handshake"
} }
@ -111,19 +164,15 @@ class ChildProcess extends Process {
/** /**
* Initiate the handshake and wait for a response from the parent. * Initiate the handshake and wait for a response from the parent.
*/ */
public handshake(): Promise<void> { public async handshake(): Promise<void> {
return new Promise((resolve) => {
const onMessage = (message: Message): void => {
logger.debug(`received message from ${this.parentPid}`, field("message", message))
if (message.type === "handshake") {
process.removeListener("message", onMessage)
resolve()
}
}
// Initiate the handshake and wait for the reply.
process.on("message", onMessage)
this.send({ type: "handshake" }) this.send({ type: "handshake" })
}) await onMessage<Message, HandshakeMessage>(
process,
(message): message is HandshakeMessage => {
return message.type === "handshake"
},
this.logger,
)
} }
/** /**
@ -270,23 +319,15 @@ export class ParentProcess extends Process {
/** /**
* Wait for a handshake from the child then reply. * Wait for a handshake from the child then reply.
*/ */
private handshake(child: cp.ChildProcess): Promise<void> { private async handshake(child: cp.ChildProcess): Promise<void> {
return new Promise((resolve, reject) => { await onMessage<Message, HandshakeMessage>(
const onMessage = (message: Message): void => { child,
logger.debug(`received message from ${child.pid}`, field("message", message)) (message): message is HandshakeMessage => {
if (message.type === "handshake") { return message.type === "handshake"
child.removeListener("message", onMessage) },
child.on("message", (msg) => this._onChildMessage.emit(msg)) this.logger,
)
child.send({ type: "handshake" }) child.send({ type: "handshake" })
resolve()
}
}
child.on("message", onMessage)
child.once("error", reject)
child.once("exit", (code) => {
reject(new ProcessError(`Unexpected exit with code ${code}`, code !== null ? code : undefined))
})
})
} }
} }