From 4cd6bed8d240aeb42f4a5f8d3cdb0043594cefe9 Mon Sep 17 00:00:00 2001 From: Kyle Carberry Date: Wed, 23 Jan 2019 11:52:58 -0600 Subject: [PATCH] Add stdio sources (#17) --- packages/protocol/src/browser/client.ts | 14 ++++- packages/protocol/src/browser/command.ts | 18 +++--- packages/protocol/src/node/command.ts | 18 ++++-- packages/protocol/src/node/server.ts | 15 ++++- packages/protocol/src/proto/command.proto | 10 +++- packages/protocol/src/proto/command_pb.d.ts | 18 ++++-- packages/protocol/src/proto/command_pb.js | 65 ++++++++++++++++----- packages/protocol/test/command.test.ts | 19 ++++-- packages/protocol/test/forker.js | 4 +- packages/server/src/vscode/bootstrapFork.ts | 23 ++++++-- 10 files changed, 157 insertions(+), 47 deletions(-) diff --git a/packages/protocol/src/browser/client.ts b/packages/protocol/src/browser/client.ts index 4fda7ea5..d4e58ad9 100644 --- a/packages/protocol/src/browser/client.ts +++ b/packages/protocol/src/browser/client.ts @@ -284,8 +284,18 @@ export class Client { return; } const data = new TextDecoder().decode(output.getData_asU8()); - const stream = output.getFd() === SessionOutputMessage.FD.STDOUT ? s.stdout : s.stderr; - stream.emit("data", data); + const source = output.getSource(); + switch (source) { + case SessionOutputMessage.Source.STDOUT: + case SessionOutputMessage.Source.STDERR: + (source === SessionOutputMessage.Source.STDOUT ? s.stdout : s.stderr).emit("data", data); + break; + case SessionOutputMessage.Source.IPC: + s.emit("message", JSON.parse(data)); + break; + default: + throw new Error(`Unknown source ${source}`); + } } else if (message.hasIdentifySession()) { const s = this.sessions.get(message.getIdentifySession()!.getId()); if (!s) { diff --git a/packages/protocol/src/browser/command.ts b/packages/protocol/src/browser/command.ts index f6ef21a3..ba094d9f 100644 --- a/packages/protocol/src/browser/command.ts +++ b/packages/protocol/src/browser/command.ts @@ -23,8 +23,11 @@ export interface ChildProcess { readonly pid: number | undefined; kill(signal?: string): void; - send(message: string | Uint8Array): void; + send(message: string | Uint8Array, ipc?: false): void; + send(message: any, ipc: true): void; + + on(event: "message", listener: (data: any) => void): void; on(event: "error", listener: (err: Error) => void): void; on(event: "exit", listener: (code: number, signal: string) => void): void; @@ -45,10 +48,6 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess { private readonly hasTty: boolean = false, ) { super(); - this.connection.onMessage((message) => { - this.emit("message", message); - }); - if (!this.hasTty) { delete this.resize; } @@ -71,10 +70,15 @@ export class ServerProcess extends events.EventEmitter implements ChildProcess { this._killed = true; } - public send(message: string | Uint8Array): void { + public send(message: string | Uint8Array | any, ipc: boolean = false): void { const send = new WriteToSessionMessage(); send.setId(this.id); - send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message); + send.setSource(ipc ? WriteToSessionMessage.Source.IPC : WriteToSessionMessage.Source.STDIN); + if (ipc) { + send.setData(new TextEncoder().encode(JSON.stringify(message))); + } else { + send.setData(typeof message === "string" ? new TextEncoder().encode(message) : message); + } const client = new ClientMessage(); client.setWriteToSession(send); this.connection.send(client.serializeBinary()); diff --git a/packages/protocol/src/node/command.ts b/packages/protocol/src/node/command.ts index 128dbb55..9f9cd195 100644 --- a/packages/protocol/src/node/command.ts +++ b/packages/protocol/src/node/command.ts @@ -8,6 +8,7 @@ import { SendableConnection } from "../common/connection"; import { ServerOptions } from "./server"; export interface Process { + stdio?: Array; stdin?: stream.Writable; stdout?: stream.Readable; stderr?: stream.Readable; @@ -69,27 +70,34 @@ export const handleNewSession = (connection: SendableConnection, newSession: New }; } - const sendOutput = (_fd: SessionOutputMessage.FD, msg: string | Uint8Array): void => { + const sendOutput = (_source: SessionOutputMessage.Source, msg: string | Uint8Array): void => { const serverMsg = new ServerMessage(); const d = new SessionOutputMessage(); d.setId(newSession.getId()); d.setData(typeof msg === "string" ? new TextEncoder().encode(msg) : msg); - d.setFd(SessionOutputMessage.FD.STDOUT); + d.setSource(_source); serverMsg.setSessionOutput(d); connection.send(serverMsg.serializeBinary()); }; if (process.stdout && process.stderr) { process.stdout.on("data", (data) => { - sendOutput(SessionOutputMessage.FD.STDOUT, data); + sendOutput(SessionOutputMessage.Source.STDOUT, data); }); process.stderr.on("data", (data) => { - sendOutput(SessionOutputMessage.FD.STDERR, data); + sendOutput(SessionOutputMessage.Source.STDERR, data); }); } else { process.on("data", (data) => { - sendOutput(SessionOutputMessage.FD.STDOUT, Buffer.from(data)); + sendOutput(SessionOutputMessage.Source.STDOUT, Buffer.from(data)); + }); + } + + if (process.stdio && process.stdio[3]) { + // We have ipc fd + process.stdio[3].on("data", (data) => { + sendOutput(SessionOutputMessage.Source.IPC, data); }); } diff --git a/packages/protocol/src/node/server.ts b/packages/protocol/src/node/server.ts index 2c9f5b32..e7093cdf 100644 --- a/packages/protocol/src/node/server.ts +++ b/packages/protocol/src/node/server.ts @@ -1,11 +1,11 @@ import * as os from "os"; import * as cp from "child_process"; import * as path from "path"; -import { mkdir } from "fs"; +import { mkdir, WriteStream } from "fs"; import { promisify } from "util"; import { TextDecoder } from "text-encoding"; import { logger, field } from "@coder/logger"; -import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage } from "../proto"; +import { ClientMessage, WorkingInitMessage, ServerMessage, NewSessionMessage, WriteToSessionMessage } from "../proto"; import { evaluate } from "./evaluate"; import { ReadWriteConnection } from "../common/connection"; import { Process, handleNewSession, handleNewConnection } from "./command"; @@ -120,7 +120,16 @@ export class Server { if (!s) { return; } - s.write(new TextDecoder().decode(message.getWriteToSession()!.getData_asU8())); + const data = new TextDecoder().decode(message.getWriteToSession()!.getData_asU8()); + const source = message.getWriteToSession()!.getSource(); + if (source === WriteToSessionMessage.Source.IPC) { + if (!s.stdio || !s.stdio[3]) { + throw new Error("Cannot send message via IPC to process without IPC"); + } + (s.stdio[3] as WriteStream).write(data); + } else { + s.write(data); + } } else if (message.hasNewConnection()) { const socket = handleNewConnection(this.connection, message.getNewConnection()!, () => { this.connections.delete(message.getNewConnection()!.getId()); diff --git a/packages/protocol/src/proto/command.proto b/packages/protocol/src/proto/command.proto index 64419af0..740ffcd1 100644 --- a/packages/protocol/src/proto/command.proto +++ b/packages/protocol/src/proto/command.proto @@ -45,6 +45,11 @@ message IdentifySessionMessage { message WriteToSessionMessage { uint64 id = 1; bytes data = 2; + enum Source { + Stdin = 0; + Ipc = 1; + } + Source source = 3; } // Resizes the TTY of the session identified by the id. @@ -67,11 +72,12 @@ message ShutdownSessionMessage { // SessionOutputMessage carries data read from the stdout or stderr of the session identified by the id. message SessionOutputMessage { uint64 id = 1; - enum FD { + enum Source { Stdout = 0; Stderr = 1; + Ipc = 2; } - FD fd = 2; + Source source = 2; bytes data = 3; } diff --git a/packages/protocol/src/proto/command_pb.d.ts b/packages/protocol/src/proto/command_pb.d.ts index 0c555275..bb134041 100644 --- a/packages/protocol/src/proto/command_pb.d.ts +++ b/packages/protocol/src/proto/command_pb.d.ts @@ -144,6 +144,9 @@ export class WriteToSessionMessage extends jspb.Message { getData_asB64(): string; setData(value: Uint8Array | string): void; + getSource(): WriteToSessionMessage.Source; + setSource(value: WriteToSessionMessage.Source): void; + serializeBinary(): Uint8Array; toObject(includeInstance?: boolean): WriteToSessionMessage.AsObject; static toObject(includeInstance: boolean, msg: WriteToSessionMessage): WriteToSessionMessage.AsObject; @@ -158,6 +161,12 @@ export namespace WriteToSessionMessage { export type AsObject = { id: number, data: Uint8Array | string, + source: WriteToSessionMessage.Source, + } + + export enum Source { + STDIN = 0, + IPC = 1, } } @@ -235,8 +244,8 @@ export class SessionOutputMessage extends jspb.Message { getId(): number; setId(value: number): void; - getFd(): SessionOutputMessage.FD; - setFd(value: SessionOutputMessage.FD): void; + getSource(): SessionOutputMessage.Source; + setSource(value: SessionOutputMessage.Source): void; getData(): Uint8Array | string; getData_asU8(): Uint8Array; @@ -256,13 +265,14 @@ export class SessionOutputMessage extends jspb.Message { export namespace SessionOutputMessage { export type AsObject = { id: number, - fd: SessionOutputMessage.FD, + source: SessionOutputMessage.Source, data: Uint8Array | string, } - export enum FD { + export enum Source { STDOUT = 0, STDERR = 1, + IPC = 2, } } diff --git a/packages/protocol/src/proto/command_pb.js b/packages/protocol/src/proto/command_pb.js index a24d0677..0b6ab6c7 100644 --- a/packages/protocol/src/proto/command_pb.js +++ b/packages/protocol/src/proto/command_pb.js @@ -22,10 +22,11 @@ goog.exportSymbol('proto.NewSessionMessage', null, global); goog.exportSymbol('proto.ResizeSessionTTYMessage', null, global); goog.exportSymbol('proto.SessionDoneMessage', null, global); goog.exportSymbol('proto.SessionOutputMessage', null, global); -goog.exportSymbol('proto.SessionOutputMessage.FD', null, global); +goog.exportSymbol('proto.SessionOutputMessage.Source', null, global); goog.exportSymbol('proto.ShutdownSessionMessage', null, global); goog.exportSymbol('proto.TTYDimensions', null, global); goog.exportSymbol('proto.WriteToSessionMessage', null, global); +goog.exportSymbol('proto.WriteToSessionMessage.Source', null, global); /** * Generated by JsPbCodeGenerator. @@ -1047,7 +1048,8 @@ proto.WriteToSessionMessage.prototype.toObject = function(opt_includeInstance) { proto.WriteToSessionMessage.toObject = function(includeInstance, msg) { var f, obj = { id: msg.getId(), - data: msg.getData_asB64() + data: msg.getData_asB64(), + source: msg.getSource() }; if (includeInstance) { @@ -1092,6 +1094,10 @@ proto.WriteToSessionMessage.deserializeBinaryFromReader = function(msg, reader) var value = /** @type {!Uint8Array} */ (reader.readBytes()); msg.setData(value); break; + case 3: + var value = /** @type {!proto.WriteToSessionMessage.Source} */ (reader.readEnum()); + msg.setSource(value); + break; default: reader.skipField(); break; @@ -1144,6 +1150,13 @@ proto.WriteToSessionMessage.prototype.serializeBinaryToWriter = function (writer f ); } + f = this.getSource(); + if (f !== 0.0) { + writer.writeEnum( + 3, + f + ); + } }; @@ -1210,6 +1223,29 @@ proto.WriteToSessionMessage.prototype.setData = function(value) { }; +/** + * optional Source source = 3; + * @return {!proto.WriteToSessionMessage.Source} + */ +proto.WriteToSessionMessage.prototype.getSource = function() { + return /** @type {!proto.WriteToSessionMessage.Source} */ (jspb.Message.getFieldProto3(this, 3, 0)); +}; + + +/** @param {!proto.WriteToSessionMessage.Source} value */ +proto.WriteToSessionMessage.prototype.setSource = function(value) { + jspb.Message.setField(this, 3, value); +}; + + +/** + * @enum {number} + */ +proto.WriteToSessionMessage.Source = { + STDIN: 0, + IPC: 1 +}; + /** * Generated by JsPbCodeGenerator. @@ -1805,7 +1841,7 @@ proto.SessionOutputMessage.prototype.toObject = function(opt_includeInstance) { proto.SessionOutputMessage.toObject = function(includeInstance, msg) { var f, obj = { id: msg.getId(), - fd: msg.getFd(), + source: msg.getSource(), data: msg.getData_asB64() }; @@ -1848,8 +1884,8 @@ proto.SessionOutputMessage.deserializeBinaryFromReader = function(msg, reader) { msg.setId(value); break; case 2: - var value = /** @type {!proto.SessionOutputMessage.FD} */ (reader.readEnum()); - msg.setFd(value); + var value = /** @type {!proto.SessionOutputMessage.Source} */ (reader.readEnum()); + msg.setSource(value); break; case 3: var value = /** @type {!Uint8Array} */ (reader.readBytes()); @@ -1900,7 +1936,7 @@ proto.SessionOutputMessage.prototype.serializeBinaryToWriter = function (writer) f ); } - f = this.getFd(); + f = this.getSource(); if (f !== 0.0) { writer.writeEnum( 2, @@ -1942,16 +1978,16 @@ proto.SessionOutputMessage.prototype.setId = function(value) { /** - * optional FD fd = 2; - * @return {!proto.SessionOutputMessage.FD} + * optional Source source = 2; + * @return {!proto.SessionOutputMessage.Source} */ -proto.SessionOutputMessage.prototype.getFd = function() { - return /** @type {!proto.SessionOutputMessage.FD} */ (jspb.Message.getFieldProto3(this, 2, 0)); +proto.SessionOutputMessage.prototype.getSource = function() { + return /** @type {!proto.SessionOutputMessage.Source} */ (jspb.Message.getFieldProto3(this, 2, 0)); }; -/** @param {!proto.SessionOutputMessage.FD} value */ -proto.SessionOutputMessage.prototype.setFd = function(value) { +/** @param {!proto.SessionOutputMessage.Source} value */ +proto.SessionOutputMessage.prototype.setSource = function(value) { jspb.Message.setField(this, 2, value); }; @@ -1998,9 +2034,10 @@ proto.SessionOutputMessage.prototype.setData = function(value) { /** * @enum {number} */ -proto.SessionOutputMessage.FD = { +proto.SessionOutputMessage.Source = { STDOUT: 0, - STDERR: 1 + STDERR: 1, + IPC: 2 }; diff --git a/packages/protocol/test/command.test.ts b/packages/protocol/test/command.test.ts index f299f8ab..a9a6a408 100644 --- a/packages/protocol/test/command.test.ts +++ b/packages/protocol/test/command.test.ts @@ -1,3 +1,4 @@ +import * as cp from "child_process"; import * as net from "net"; import * as os from "os"; import * as path from "path"; @@ -8,7 +9,15 @@ import { createClient } from "./helpers"; (global).TextEncoder = TextEncoder; describe("spawn", () => { - const client = createClient(); + const client = createClient({ + dataDirectory: "", + workingDirectory: "", + forkProvider: (msg) => { + return cp.spawn(msg.getCommand(), msg.getArgsList(), { + stdio: [null, null, null, "pipe"], + }); + }, + }); it("should execute command and return output", (done) => { const proc = client.spawn("echo", ["test"]); @@ -124,11 +133,13 @@ describe("spawn", () => { proc.on("exit", () => done()); }); - it("should fork", (done) => { + it("should fork and echo messages", (done) => { const proc = client.fork(path.join(__dirname, "forker.js")); - proc.stdout.on("data", (data) => { - expect(data).toEqual("test"); + proc.on("message", (msg) => { + expect(msg.bananas).toBeTruthy(); + proc.kill(); }); + proc.send({ bananas: true }, true); proc.on("exit", () => done()); }); }); diff --git a/packages/protocol/test/forker.js b/packages/protocol/test/forker.js index 02525d0f..6f8b9593 100755 --- a/packages/protocol/test/forker.js +++ b/packages/protocol/test/forker.js @@ -1 +1,3 @@ -console.log("test"); \ No newline at end of file +process.on("message", (data) => { + process.send(data); +}); \ No newline at end of file diff --git a/packages/server/src/vscode/bootstrapFork.ts b/packages/server/src/vscode/bootstrapFork.ts index 281e4ff0..03361fe3 100644 --- a/packages/server/src/vscode/bootstrapFork.ts +++ b/packages/server/src/vscode/bootstrapFork.ts @@ -1,5 +1,6 @@ import * as cp from "child_process"; import * as fs from "fs"; +import * as net from "net"; import * as path from "path"; import { logger, field } from "@coder/logger/src"; @@ -8,6 +9,18 @@ declare var __non_webpack_require__: typeof require; export const requireModule = (modulePath: string): void => { process.env.AMD_ENTRYPOINT = modulePath; process.env.VSCODE_ALLOW_IO = "true"; + + if (!process.send) { + const socket = new net.Socket({ fd: 3 }); + socket.on("data", (data) => { + process.emit("message", JSON.parse(data.toString()), undefined); + }); + + process.send = (message: any): void => { + socket.write(JSON.stringify(message)); + }; + } + const content = fs.readFileSync(path.join(process.env.BUILD_DIR as string || path.join(__dirname, "../.."), "./build/bootstrap-fork.js")); eval(content.toString()); }; @@ -36,13 +49,13 @@ export const forkModule = (modulePath: string, stdio?: boolean): cp.ChildProcess let proc: cp.ChildProcess | undefined; const args = ["--bootstrap-fork", modulePath]; + const options: cp.SpawnOptions = { + stdio: [null, null, null, "pipe"], + }; if (process.env.CLI === "true") { - proc = stdio ? cp.spawn(process.execPath, args) : cp.fork(process.execPath, args); - } else if (stdio) { - proc = cp.spawn("npm", ["start", "--scripts-prepend-node-path", "--", ...args]); + proc = stdio ? cp.spawn(process.execPath, args, options) : cp.fork(process.execPath, args, options); } else { - // TODO: need to fork somehow so we get send/onMessage. - proc = cp.spawn("npm", ["start", "--scripts-prepend-node-path", "--", ...args]); + proc = cp.spawn(process.execArgv[0], ["-r", "tsconfig-paths/register", process.argv[1], ...args], options); } proc.stdout.on("data", (message) => {