Fix coping and moving files around using the file tree (#568)

* Implement write/read buffers in electron fill

This makes cutting and copy files from the file tree work.

* Implement fs.createReadStream

This is used by the file tree to copy files.

* Allow passing proxies back from client to server

This makes things like piping streams possible.

* Synchronously bind to proxy events

This eliminates any chance whatsoever of missing events due to binding
too late.

* Make it possible to bind some events on demand

* Add some protocol documentation
This commit is contained in:
Asher 2019-04-24 10:38:21 -05:00 committed by GitHub
parent 30b8565e2d
commit c9f91e77cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 546 additions and 278 deletions

View File

@ -171,8 +171,10 @@ const newCreateElement = <K extends keyof HTMLElementTagNameMap>(tagName: K): HT
document.createElement = newCreateElement; document.createElement = newCreateElement;
class Clipboard { class Clipboard {
public has(): boolean { private readonly buffers = new Map<string, Buffer>();
return false;
public has(format: string): boolean {
return this.buffers.has(format);
} }
public readFindText(): string { public readFindText(): string {
@ -190,6 +192,14 @@ class Clipboard {
public readText(): Promise<string> { public readText(): Promise<string> {
return clipboard.readText(); return clipboard.readText();
} }
public writeBuffer(format: string, buffer: Buffer): void {
this.buffers.set(format, buffer);
}
public readBuffer(format: string): Buffer | undefined {
return this.buffers.get(format);
}
} }
class Shell { class Shell {

View File

@ -0,0 +1,47 @@
# Protocol
This module provides a way for the browser to run Node modules like `fs`, `net`,
etc.
## Internals
### Server-side proxies
The server-side proxies are regular classes that call native Node functions. The
only thing special about them is that they must return promises and they must
return serializable values.
The only exception to the promise rule are event-related methods such as
`onEvent` and `onDone` (these are synchronous). The server will simply
immediately bind and push all events it can to the client. It doesn't wait for
the client to start listening. This prevents issues with the server not
receiving the client's request to start listening in time.
However, there is a way to specify events that should not bind immediately and
should wait for the client to request it, because some events (like `data` on a
stream) cannot be bound immediately (because doing so changes how the stream
behaves).
### Client-side proxies
Client-side proxies are `Proxy` instances. They simply make remote calls for any
method you call on it. The only exception is for events. Each client proxy has a
local emitter which it uses in place of a remote call (this allows the call to
be completed synchronously on the client). Then when an event is received from
the server, it gets emitted on that local emitter.
When an event is listened to, the proxy also notifies the server so it can start
listening in case it isn't already (see the `data` example above). This only
works for events that only fire after they are bound.
### Client-side fills
The client-side fills implement the Node API and make calls to the server-side
proxies using the client-side proxies.
When a proxy returns a proxy (for example `fs.createWriteStream`), that proxy is
a promise (since communicating with the server is asynchronous). We have to
return the fill from `fs.createWriteStream` synchronously, so that means the
fill has to contain a proxy promise. To eliminate the need for calling `then`
and to keep the code looking clean every time you use the proxy, the proxy is
itself wrapped in another proxy which just calls the method after a `then`. This
works since all the methods return promises (aside from the event methods, but
those are not used by the fills directly—they are only used internally to
forward events to the fill if it is an event emitter).

View File

@ -4,7 +4,7 @@ import { promisify } from "util";
import { Emitter } from "@coder/events"; import { Emitter } from "@coder/events";
import { logger, field } from "@coder/logger"; import { logger, field } from "@coder/logger";
import { ReadWriteConnection, InitData, SharedProcessData } from "../common/connection"; import { ReadWriteConnection, InitData, SharedProcessData } from "../common/connection";
import { Module, ServerProxy } from "../common/proxy"; import { ClientServerProxy, Module, ServerProxy } from "../common/proxy";
import { argumentToProto, protoToArgument, moduleToProto, protoToModule, protoToOperatingSystem } from "../common/util"; import { argumentToProto, protoToArgument, moduleToProto, protoToModule, protoToOperatingSystem } from "../common/util";
import { Argument, Ping, ServerMessage, ClientMessage, Method, Event, Callback } from "../proto"; import { Argument, Ping, ServerMessage, ClientMessage, Method, Event, Callback } from "../proto";
import { FsModule, ChildProcessModule, NetModule, NodePtyModule, SpdlogModule, TrashModule } from "./modules"; import { FsModule, ChildProcessModule, NetModule, NodePtyModule, SpdlogModule, TrashModule } from "./modules";
@ -224,7 +224,11 @@ export class Client {
field("method", method), field("method", method),
]); ]);
proxyMessage.setArgsList(args.map((a) => argumentToProto(a, storeCallback))); proxyMessage.setArgsList(args.map((a) => argumentToProto<ClientServerProxy>(
a,
storeCallback,
(p) => p.proxyId,
)));
const clientMessage = new ClientMessage(); const clientMessage = new ClientMessage();
clientMessage.setMethod(message); clientMessage.setMethod(message);
@ -429,7 +433,7 @@ export class Client {
/** /**
* Return a proxy that makes remote calls. * Return a proxy that makes remote calls.
*/ */
private createProxy<T>(proxyId: number | Module, promise: Promise<any> = Promise.resolve()): T { private createProxy<T extends ClientServerProxy>(proxyId: number | Module, promise: Promise<any> = Promise.resolve()): T {
logger.trace(() => [ logger.trace(() => [
"creating proxy", "creating proxy",
field("proxyId", proxyId), field("proxyId", proxyId),
@ -449,7 +453,7 @@ export class Client {
cb(event.event, ...event.args); cb(event.event, ...event.args);
}); });
}, },
}, { } as ClientServerProxy, {
get: (target: any, name: string): any => { get: (target: any, name: string): any => {
// When resolving a promise with a proxy, it will check for "then". // When resolving a promise with a proxy, it will check for "then".
if (name === "then") { if (name === "then") {

View File

@ -2,13 +2,22 @@ import * as cp from "child_process";
import * as net from "net"; import * as net from "net";
import * as stream from "stream"; import * as stream from "stream";
import { callbackify } from "util"; import { callbackify } from "util";
import { ClientProxy } from "../../common/proxy"; import { ClientProxy, ClientServerProxy } from "../../common/proxy";
import { ChildProcessModuleProxy, ChildProcessProxy, ChildProcessProxies } from "../../node/modules/child_process"; import { ChildProcessModuleProxy, ChildProcessProxy } from "../../node/modules/child_process";
import { Readable, Writable } from "./stream"; import { ClientWritableProxy, ClientReadableProxy, Readable, Writable } from "./stream";
// tslint:disable completed-docs // tslint:disable completed-docs
export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.ChildProcess { export interface ClientChildProcessProxy extends ChildProcessProxy, ClientServerProxy<cp.ChildProcess> {}
export interface ClientChildProcessProxies {
childProcess: ClientChildProcessProxy;
stdin?: ClientWritableProxy | null;
stdout?: ClientReadableProxy | null;
stderr?: ClientReadableProxy | null;
}
export class ChildProcess extends ClientProxy<ClientChildProcessProxy> implements cp.ChildProcess {
public readonly stdin: stream.Writable; public readonly stdin: stream.Writable;
public readonly stdout: stream.Readable; public readonly stdout: stream.Readable;
public readonly stderr: stream.Readable; public readonly stderr: stream.Readable;
@ -18,7 +27,7 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C
private _killed: boolean = false; private _killed: boolean = false;
private _pid = -1; private _pid = -1;
public constructor(proxyPromises: Promise<ChildProcessProxies>) { public constructor(proxyPromises: Promise<ClientChildProcessProxies>) {
super(proxyPromises.then((p) => p.childProcess)); super(proxyPromises.then((p) => p.childProcess));
this.stdin = new Writable(proxyPromises.then((p) => p.stdin!)); this.stdin = new Writable(proxyPromises.then((p) => p.stdin!));
this.stdout = new Readable(proxyPromises.then((p) => p.stdout!)); this.stdout = new Readable(proxyPromises.then((p) => p.stdout!));
@ -99,8 +108,14 @@ export class ChildProcess extends ClientProxy<ChildProcessProxy> implements cp.C
} }
} }
interface ClientChildProcessModuleProxy extends ChildProcessModuleProxy, ClientServerProxy {
exec(command: string, options?: { encoding?: string | null } & cp.ExecOptions | null, callback?: ((error: cp.ExecException | null, stdin: string | Buffer, stdout: string | Buffer) => void)): Promise<ClientChildProcessProxies>;
fork(modulePath: string, args?: string[], options?: cp.ForkOptions): Promise<ClientChildProcessProxies>;
spawn(command: string, args?: string[], options?: cp.SpawnOptions): Promise<ClientChildProcessProxies>;
}
export class ChildProcessModule { export class ChildProcessModule {
public constructor(private readonly proxy: ChildProcessModuleProxy) {} public constructor(private readonly proxy: ClientChildProcessModuleProxy) {}
public exec = ( public exec = (
command: string, command: string,

View File

@ -1,12 +1,11 @@
import * as fs from "fs"; import * as fs from "fs";
import { callbackify } from "util"; import { callbackify } from "util";
import { ClientProxy, Batch } from "../../common/proxy"; import { Batch, ClientProxy, ClientServerProxy } from "../../common/proxy";
import { IEncodingOptions, IEncodingOptionsCallback } from "../../common/util"; import { IEncodingOptions, IEncodingOptionsCallback } from "../../common/util";
import { FsModuleProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs"; import { FsModuleProxy, ReadStreamProxy, Stats as IStats, WatcherProxy, WriteStreamProxy } from "../../node/modules/fs";
import { Writable } from "./stream"; import { Readable, Writable } from "./stream";
// tslint:disable no-any // tslint:disable completed-docs no-any
// tslint:disable completed-docs
class StatBatch extends Batch<IStats, { path: fs.PathLike }> { class StatBatch extends Batch<IStats, { path: fs.PathLike }> {
public constructor(private readonly proxy: FsModuleProxy) { public constructor(private readonly proxy: FsModuleProxy) {
@ -38,7 +37,9 @@ class ReaddirBatch extends Batch<Buffer[] | fs.Dirent[] | string[], { path: fs.P
} }
} }
class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher { interface ClientWatcherProxy extends WatcherProxy, ClientServerProxy<fs.FSWatcher> {}
class Watcher extends ClientProxy<ClientWatcherProxy> implements fs.FSWatcher {
public close(): void { public close(): void {
this.catch(this.proxy.close()); this.catch(this.proxy.close());
} }
@ -48,7 +49,25 @@ class Watcher extends ClientProxy<WatcherProxy> implements fs.FSWatcher {
} }
} }
class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream { interface ClientReadStreamProxy extends ReadStreamProxy, ClientServerProxy<fs.ReadStream> {}
class ReadStream extends Readable<ClientReadStreamProxy> implements fs.ReadStream {
public get bytesRead(): number {
throw new Error("not implemented");
}
public get path(): string | Buffer {
throw new Error("not implemented");
}
public close(): void {
this.catch(this.proxy.close());
}
}
interface ClientWriteStreamProxy extends WriteStreamProxy, ClientServerProxy<fs.WriteStream> {}
class WriteStream extends Writable<ClientWriteStreamProxy> implements fs.WriteStream {
public get bytesWritten(): number { public get bytesWritten(): number {
throw new Error("not implemented"); throw new Error("not implemented");
} }
@ -62,12 +81,18 @@ class WriteStream extends Writable<WriteStreamProxy> implements fs.WriteStream {
} }
} }
interface ClientFsModuleProxy extends FsModuleProxy, ClientServerProxy {
createReadStream(path: fs.PathLike, options?: any): Promise<ClientReadStreamProxy>;
createWriteStream(path: fs.PathLike, options?: any): Promise<ClientWriteStreamProxy>;
watch(filename: fs.PathLike, options?: IEncodingOptions): Promise<ClientWatcherProxy>;
}
export class FsModule { export class FsModule {
private readonly statBatch: StatBatch; private readonly statBatch: StatBatch;
private readonly lstatBatch: LstatBatch; private readonly lstatBatch: LstatBatch;
private readonly readdirBatch: ReaddirBatch; private readonly readdirBatch: ReaddirBatch;
public constructor(private readonly proxy: FsModuleProxy) { public constructor(private readonly proxy: ClientFsModuleProxy) {
this.statBatch = new StatBatch(this.proxy); this.statBatch = new StatBatch(this.proxy);
this.lstatBatch = new LstatBatch(this.proxy); this.lstatBatch = new LstatBatch(this.proxy);
this.readdirBatch = new ReaddirBatch(this.proxy); this.readdirBatch = new ReaddirBatch(this.proxy);
@ -110,6 +135,10 @@ export class FsModule {
); );
} }
public createReadStream = (path: fs.PathLike, options?: any): fs.ReadStream => {
return new ReadStream(this.proxy.createReadStream(path, options));
}
public createWriteStream = (path: fs.PathLike, options?: any): fs.WriteStream => { public createWriteStream = (path: fs.PathLike, options?: any): fs.WriteStream => {
return new WriteStream(this.proxy.createWriteStream(path, options)); return new WriteStream(this.proxy.createWriteStream(path, options));
} }

View File

@ -1,16 +1,18 @@
import * as net from "net"; import * as net from "net";
import { callbackify } from "util"; import { callbackify } from "util";
import { ClientProxy } from "../../common/proxy"; import { ClientProxy, ClientServerProxy } from "../../common/proxy";
import { NetModuleProxy, NetServerProxy, NetSocketProxy } from "../../node/modules/net"; import { NetModuleProxy, NetServerProxy, NetSocketProxy } from "../../node/modules/net";
import { Duplex } from "./stream"; import { Duplex } from "./stream";
// tslint:disable completed-docs // tslint:disable completed-docs
export class Socket extends Duplex<NetSocketProxy> implements net.Socket { interface ClientNetSocketProxy extends NetSocketProxy, ClientServerProxy<net.Socket> {}
export class Socket extends Duplex<ClientNetSocketProxy> implements net.Socket {
private _connecting: boolean = false; private _connecting: boolean = false;
private _destroyed: boolean = false; private _destroyed: boolean = false;
public constructor(proxyPromise: Promise<NetSocketProxy> | NetSocketProxy, connecting?: boolean) { public constructor(proxyPromise: Promise<ClientNetSocketProxy> | ClientNetSocketProxy, connecting?: boolean) {
super(proxyPromise); super(proxyPromise);
if (connecting) { if (connecting) {
this._connecting = connecting; this._connecting = connecting;
@ -126,12 +128,16 @@ export class Socket extends Duplex<NetSocketProxy> implements net.Socket {
} }
} }
export class Server extends ClientProxy<NetServerProxy> implements net.Server { interface ClientNetServerProxy extends NetServerProxy, ClientServerProxy<net.Server> {
onConnection(cb: (proxy: ClientNetSocketProxy) => void): Promise<void>;
}
export class Server extends ClientProxy<ClientNetServerProxy> implements net.Server {
private socketId = 0; private socketId = 0;
private readonly sockets = new Map<number, net.Socket>(); private readonly sockets = new Map<number, net.Socket>();
private _listening: boolean = false; private _listening: boolean = false;
public constructor(proxyPromise: Promise<NetServerProxy> | NetServerProxy) { public constructor(proxyPromise: Promise<ClientNetServerProxy> | ClientNetServerProxy) {
super(proxyPromise); super(proxyPromise);
this.catch(this.proxy.onConnection((socketProxy) => { this.catch(this.proxy.onConnection((socketProxy) => {
@ -208,11 +214,17 @@ export class Server extends ClientProxy<NetServerProxy> implements net.Server {
type NodeNet = typeof net; type NodeNet = typeof net;
interface ClientNetModuleProxy extends NetModuleProxy, ClientServerProxy {
createSocket(options?: net.SocketConstructorOpts): Promise<ClientNetSocketProxy>;
createConnection(target: string | number | net.NetConnectOpts, host?: string): Promise<ClientNetSocketProxy>;
createServer(options?: { allowHalfOpen?: boolean, pauseOnConnect?: boolean }): Promise<ClientNetServerProxy>;
}
export class NetModule implements NodeNet { export class NetModule implements NodeNet {
public readonly Socket: typeof net.Socket; public readonly Socket: typeof net.Socket;
public readonly Server: typeof net.Server; public readonly Server: typeof net.Server;
public constructor(private readonly proxy: NetModuleProxy) { public constructor(private readonly proxy: ClientNetModuleProxy) {
// @ts-ignore this is because Socket is missing things from the Stream // @ts-ignore this is because Socket is missing things from the Stream
// namespace but I'm unsure how best to provide them (finished, // namespace but I'm unsure how best to provide them (finished,
// finished.__promisify__, pipeline, and some others) or if it even matters. // finished.__promisify__, pipeline, and some others) or if it even matters.

View File

@ -1,15 +1,17 @@
import * as pty from "node-pty"; import * as pty from "node-pty";
import { ClientProxy } from "../../common/proxy"; import { ClientProxy, ClientServerProxy } from "../../common/proxy";
import { NodePtyModuleProxy, NodePtyProcessProxy } from "../../node/modules/node-pty"; import { NodePtyModuleProxy, NodePtyProcessProxy } from "../../node/modules/node-pty";
// tslint:disable completed-docs // tslint:disable completed-docs
export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements pty.IPty { interface ClientNodePtyProcessProxy extends NodePtyProcessProxy, ClientServerProxy {}
export class NodePtyProcess extends ClientProxy<ClientNodePtyProcessProxy> implements pty.IPty {
private _pid = -1; private _pid = -1;
private _process = ""; private _process = "";
public constructor( public constructor(
private readonly moduleProxy: NodePtyModuleProxy, private readonly moduleProxy: ClientNodePtyModuleProxy,
private readonly file: string, private readonly file: string,
private readonly args: string[] | string, private readonly args: string[] | string,
private readonly options: pty.IPtyForkOptions, private readonly options: pty.IPtyForkOptions,
@ -18,10 +20,12 @@ export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements
this.on("process", (process) => this._process = process); this.on("process", (process) => this._process = process);
} }
protected initialize(proxyPromise: Promise<NodePtyProcessProxy>): void { protected initialize(proxyPromise: Promise<ClientNodePtyProcessProxy>): ClientNodePtyProcessProxy {
super.initialize(proxyPromise); const proxy = super.initialize(proxyPromise);
this.catch(this.proxy.getPid().then((p) => this._pid = p)); this.catch(this.proxy.getPid().then((p) => this._pid = p));
this.catch(this.proxy.getProcess().then((p) => this._process = p)); this.catch(this.proxy.getProcess().then((p) => this._process = p));
return proxy;
} }
public get pid(): number { public get pid(): number {
@ -53,8 +57,12 @@ export class NodePtyProcess extends ClientProxy<NodePtyProcessProxy> implements
type NodePty = typeof pty; type NodePty = typeof pty;
interface ClientNodePtyModuleProxy extends NodePtyModuleProxy, ClientServerProxy {
spawn(file: string, args: string[] | string, options: pty.IPtyForkOptions): Promise<ClientNodePtyProcessProxy>;
}
export class NodePtyModule implements NodePty { export class NodePtyModule implements NodePty {
public constructor(private readonly proxy: NodePtyModuleProxy) {} public constructor(private readonly proxy: ClientNodePtyModuleProxy) {}
public spawn = (file: string, args: string[] | string, options: pty.IPtyForkOptions): pty.IPty => { public spawn = (file: string, args: string[] | string, options: pty.IPtyForkOptions): pty.IPty => {
return new NodePtyProcess(this.proxy, file, args, options); return new NodePtyProcess(this.proxy, file, args, options);

View File

@ -1,12 +1,14 @@
import * as spdlog from "spdlog"; import * as spdlog from "spdlog";
import { ClientProxy } from "../../common/proxy"; import { ClientProxy, ClientServerProxy } from "../../common/proxy";
import { RotatingLoggerProxy, SpdlogModuleProxy } from "../../node/modules/spdlog"; import { RotatingLoggerProxy, SpdlogModuleProxy } from "../../node/modules/spdlog";
// tslint:disable completed-docs // tslint:disable completed-docs
class RotatingLogger extends ClientProxy<RotatingLoggerProxy> implements spdlog.RotatingLogger { interface ClientRotatingLoggerProxy extends RotatingLoggerProxy, ClientServerProxy {}
class RotatingLogger extends ClientProxy<ClientRotatingLoggerProxy> implements spdlog.RotatingLogger {
public constructor( public constructor(
private readonly moduleProxy: SpdlogModuleProxy, private readonly moduleProxy: ClientSpdlogModuleProxy,
private readonly name: string, private readonly name: string,
private readonly filename: string, private readonly filename: string,
private readonly filesize: number, private readonly filesize: number,
@ -31,10 +33,14 @@ class RotatingLogger extends ClientProxy<RotatingLoggerProxy> implements spdlog.
} }
} }
interface ClientSpdlogModuleProxy extends SpdlogModuleProxy, ClientServerProxy {
createLogger(name: string, filePath: string, fileSize: number, fileCount: number): Promise<ClientRotatingLoggerProxy>;
}
export class SpdlogModule { export class SpdlogModule {
public readonly RotatingLogger: typeof spdlog.RotatingLogger; public readonly RotatingLogger: typeof spdlog.RotatingLogger;
public constructor(private readonly proxy: SpdlogModuleProxy) { public constructor(private readonly proxy: ClientSpdlogModuleProxy) {
this.RotatingLogger = class extends RotatingLogger { this.RotatingLogger = class extends RotatingLogger {
public constructor(name: string, filename: string, filesize: number, filecount: number) { public constructor(name: string, filename: string, filesize: number, filecount: number) {
super(proxy, name, filename, filesize, filecount); super(proxy, name, filename, filesize, filecount);

View File

@ -1,11 +1,14 @@
import * as stream from "stream"; import * as stream from "stream";
import { callbackify } from "util"; import { callbackify } from "util";
import { ClientProxy } from "../../common/proxy"; import { ClientProxy, ClientServerProxy } from "../../common/proxy";
import { DuplexProxy, IReadableProxy, WritableProxy } from "../../node/modules/stream"; import { isPromise } from "../../common/util";
import { DuplexProxy, ReadableProxy, WritableProxy } from "../../node/modules/stream";
// tslint:disable completed-docs // tslint:disable completed-docs no-any
export class Writable<T extends WritableProxy = WritableProxy> extends ClientProxy<T> implements stream.Writable { export interface ClientWritableProxy extends WritableProxy, ClientServerProxy<stream.Writable> {}
export class Writable<T extends ClientWritableProxy = ClientWritableProxy> extends ClientProxy<T> implements stream.Writable {
public get writable(): boolean { public get writable(): boolean {
throw new Error("not implemented"); throw new Error("not implemented");
} }
@ -50,7 +53,6 @@ export class Writable<T extends WritableProxy = WritableProxy> extends ClientPro
return this.catch(this.proxy.setDefaultEncoding(encoding)); return this.catch(this.proxy.setDefaultEncoding(encoding));
} }
// tslint:disable-next-line no-any
public write(chunk: any, encoding?: string | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean { public write(chunk: any, encoding?: string | ((error?: Error | null) => void), callback?: (error?: Error | null) => void): boolean {
if (typeof encoding === "function") { if (typeof encoding === "function") {
callback = encoding; callback = encoding;
@ -65,7 +67,6 @@ export class Writable<T extends WritableProxy = WritableProxy> extends ClientPro
return true; // Always true since we can't get this synchronously. return true; // Always true since we can't get this synchronously.
} }
// tslint:disable-next-line no-any
public end(data?: any | (() => void), encoding?: string | (() => void), callback?: (() => void)): void { public end(data?: any | (() => void), encoding?: string | (() => void), callback?: (() => void)): void {
if (typeof data === "function") { if (typeof data === "function") {
callback = data; callback = data;
@ -88,7 +89,9 @@ export class Writable<T extends WritableProxy = WritableProxy> extends ClientPro
} }
} }
export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientProxy<T> implements stream.Readable { export interface ClientReadableProxy extends ReadableProxy, ClientServerProxy<stream.Readable> {}
export class Readable<T extends ClientReadableProxy = ClientReadableProxy> extends ClientProxy<T> implements stream.Readable {
public get readable(): boolean { public get readable(): boolean {
throw new Error("not implemented"); throw new Error("not implemented");
} }
@ -141,11 +144,20 @@ export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientP
throw new Error("not implemented"); throw new Error("not implemented");
} }
public pipe<T>(): T { public pipe<P extends NodeJS.WritableStream>(destination: P, options?: { end?: boolean }): P {
throw new Error("not implemented"); const writableProxy = (destination as any as Writable).proxyPromise;
if (!writableProxy) {
throw new Error("can only pipe stream proxies");
}
this.catch(
isPromise(writableProxy)
? writableProxy.then((p) => this.proxy.pipe(p, options))
: this.proxy.pipe(writableProxy, options),
);
return destination;
} }
// tslint:disable-next-line no-any
public [Symbol.asyncIterator](): AsyncIterableIterator<any> { public [Symbol.asyncIterator](): AsyncIterableIterator<any> {
throw new Error("not implemented"); throw new Error("not implemented");
} }
@ -164,7 +176,9 @@ export class Readable<T extends IReadableProxy = IReadableProxy> extends ClientP
} }
} }
export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> implements stream.Duplex, stream.Readable { export interface ClientDuplexProxy extends DuplexProxy, ClientServerProxy<stream.Duplex> {}
export class Duplex<T extends ClientDuplexProxy = ClientDuplexProxy> extends Writable<T> implements stream.Duplex, stream.Readable {
private readonly _readable: Readable; private readonly _readable: Readable;
public constructor(proxyPromise: Promise<T> | T) { public constructor(proxyPromise: Promise<T> | T) {
@ -228,7 +242,6 @@ export class Duplex<T extends DuplexProxy = DuplexProxy> extends Writable<T> imp
this._readable.unshift(); this._readable.unshift();
} }
// tslint:disable-next-line no-any
public [Symbol.asyncIterator](): AsyncIterableIterator<any> { public [Symbol.asyncIterator](): AsyncIterableIterator<any> {
return this._readable[Symbol.asyncIterator](); return this._readable[Symbol.asyncIterator]();
} }

View File

@ -1,10 +1,13 @@
import * as trash from "trash"; import * as trash from "trash";
import { ClientServerProxy } from "../../common/proxy";
import { TrashModuleProxy } from "../../node/modules/trash"; import { TrashModuleProxy } from "../../node/modules/trash";
// tslint:disable completed-docs // tslint:disable completed-docs
interface ClientTrashModuleProxy extends TrashModuleProxy, ClientServerProxy {}
export class TrashModule { export class TrashModule {
public constructor(private readonly proxy: TrashModuleProxy) {} public constructor(private readonly proxy: ClientTrashModuleProxy) {}
public trash = (path: string, options?: trash.Options): Promise<void> => { public trash = (path: string, options?: trash.Options): Promise<void> => {
return this.proxy.trash(path, options); return this.proxy.trash(path, options);

View File

@ -1,13 +1,13 @@
import { EventEmitter } from "events"; import { EventEmitter } from "events";
import { isPromise } from "./util"; import { isPromise, EventCallback } from "./util";
// tslint:disable no-any // tslint:disable no-any
/** /**
* Allow using a proxy like it's returned synchronously. This only works because * Allow using a proxy like it's returned synchronously. This only works because
* all proxy methods return promises. * all proxy methods must return promises.
*/ */
const unpromisify = <T extends ServerProxy>(proxyPromise: Promise<T>): T => { const unpromisify = <T extends ClientServerProxy>(proxyPromise: Promise<T>): T => {
return new Proxy({}, { return new Proxy({}, {
get: (target: any, name: string): any => { get: (target: any, name: string): any => {
if (typeof target[name] === "undefined") { if (typeof target[name] === "undefined") {
@ -24,23 +24,23 @@ const unpromisify = <T extends ServerProxy>(proxyPromise: Promise<T>): T => {
}; };
/** /**
* Client-side emitter that just forwards proxy events to its own emitter. * Client-side emitter that just forwards server proxy events to its own
* It also turns a promisified proxy into a non-promisified proxy so we don't * emitter. It also turns a promisified server proxy into a non-promisified
* need a bunch of `then` calls everywhere. * proxy so we don't need a bunch of `then` calls everywhere.
*/ */
export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter { export abstract class ClientProxy<T extends ClientServerProxy> extends EventEmitter {
private _proxy: T | undefined; private _proxy: T;
/** /**
* You can specify not to bind events in order to avoid emitting twice for * You can specify not to bind events in order to avoid emitting twice for
* duplex streams. * duplex streams.
*/ */
public constructor( public constructor(
proxyPromise: Promise<T> | T, private _proxyPromise: Promise<T> | T,
private readonly bindEvents: boolean = true, private readonly bindEvents: boolean = true,
) { ) {
super(); super();
this.initialize(proxyPromise); this._proxy = this.initialize(this._proxyPromise);
if (this.bindEvents) { if (this.bindEvents) {
this.on("disconnected", (error) => { this.on("disconnected", (error) => {
try { try {
@ -64,11 +64,34 @@ export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
return this; return this;
} }
protected get proxy(): T { /**
if (!this._proxy) { * Bind the event locally and ensure the event is bound on the server.
throw new Error("not initialized"); */
} public addListener(event: string, listener: (...args: any[]) => void): this {
this.catch(this.proxy.bindDelayedEvent(event));
return super.on(event, listener);
}
/**
* Alias for `addListener`.
*/
public on(event: string, listener: (...args: any[]) => void): this {
return this.addListener(event, listener);
}
/**
* Original promise for the server proxy. Can be used to be passed as an
* argument.
*/
public get proxyPromise(): Promise<T> | T {
return this._proxyPromise;
}
/**
* Server proxy.
*/
protected get proxy(): T {
return this._proxy; return this._proxy;
} }
@ -76,13 +99,18 @@ export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
* Initialize the proxy by unpromisifying if necessary and binding to its * Initialize the proxy by unpromisifying if necessary and binding to its
* events. * events.
*/ */
protected initialize(proxyPromise: Promise<T> | T): void { protected initialize(proxyPromise: Promise<T> | T): T {
this._proxy = isPromise(proxyPromise) ? unpromisify(proxyPromise) : proxyPromise; this._proxyPromise = proxyPromise;
this._proxy = isPromise(this._proxyPromise)
? unpromisify(this._proxyPromise)
: this._proxyPromise;
if (this.bindEvents) { if (this.bindEvents) {
this.catch(this.proxy.onEvent((event, ...args): void => { this.proxy.onEvent((event, ...args): void => {
this.emit(event, ...args); this.emit(event, ...args);
})); });
} }
return this._proxy;
} }
/** /**
@ -102,34 +130,107 @@ export abstract class ClientProxy<T extends ServerProxy> extends EventEmitter {
} }
} }
export interface ServerProxyOptions<T> {
/**
* The events to bind immediately.
*/
bindEvents: string[];
/**
* Events that signal the proxy is done.
*/
doneEvents: string[];
/**
* Events that should only be bound when asked
*/
delayedEvents?: string[];
/**
* Whatever is emitting events (stream, child process, etc).
*/
instance: T;
}
/** /**
* Proxy to the actual instance on the server. Every method must only accept * The actual proxy instance on the server. Every method must only accept
* serializable arguments and must return promises with serializable values. If * serializable arguments and must return promises with serializable values.
* a proxy itself has proxies on creation (like how ChildProcess has stdin), *
* If a proxy itself has proxies on creation (like how ChildProcess has stdin),
* then it should return all of those at once, otherwise you will miss events * then it should return all of those at once, otherwise you will miss events
* from those child proxies and fail to dispose them properly. * from those child proxies and fail to dispose them properly.
*
* Events listeners are added client-side (since all events automatically
* forward to the client), so onDone and onEvent do not need to be asynchronous.
*/ */
export interface ServerProxy { export abstract class ServerProxy<T extends EventEmitter = EventEmitter> {
public readonly instance: T;
private readonly callbacks = <EventCallback[]>[];
public constructor(private readonly options: ServerProxyOptions<T>) {
this.instance = options.instance;
}
/** /**
* Dispose the proxy. * Dispose the proxy.
*/ */
dispose(): Promise<void>; public async dispose(): Promise<void> {
this.instance.removeAllListeners();
}
/** /**
* This is used instead of an event to force it to be implemented since there * This is used instead of an event to force it to be implemented since there
* would be no guarantee the implementation would remember to emit the event. * would be no guarantee the implementation would remember to emit the event.
*/ */
onDone(cb: () => void): Promise<void>; public onDone(cb: () => void): void {
this.options.doneEvents.forEach((event) => {
this.instance.on(event, cb);
});
}
/**
* Bind an event that will not fire without first binding it and shouldn't be
* bound immediately.
* For example, binding to `data` switches a stream to flowing mode, so we
* don't want to do it until we're asked. Otherwise something like `pipe`
* won't work because potentially some or all of the data will already have
* been flushed out.
*/
public async bindDelayedEvent(event: string): Promise<void> {
if (this.options.delayedEvents
&& this.options.delayedEvents.includes(event)
&& !this.options.bindEvents.includes(event)) {
this.options.bindEvents.push(event);
this.callbacks.forEach((cb) => {
this.instance.on(event, (...args: any[]) => cb(event, ...args));
});
}
}
/** /**
* Listen to all possible events. On the client, this is to reduce boilerplate * Listen to all possible events. On the client, this is to reduce boilerplate
* that would just be a bunch of error-prone forwarding of each individual * that would just be a bunch of error-prone forwarding of each individual
* event from the proxy to its own emitter. It also fixes a timing issue * event from the proxy to its own emitter.
* because we just always send all events from the server, so we never miss *
* any due to listening too late. * It also fixes a timing issue because we just always send all events from
* the server, so we never miss any due to listening too late.
*
* This cannot be async because then we can bind to the events too late.
*/ */
// tslint:disable-next-line no-any public onEvent(cb: EventCallback): void {
onEvent(cb: (event: string, ...args: any[]) => void): Promise<void>; this.callbacks.push(cb);
this.options.bindEvents.forEach((event) => {
this.instance.on(event, (...args: any[]) => cb(event, ...args));
});
}
}
/**
* A server-side proxy stored on the client. The proxy ID only exists on the
* client-side version of the server proxy. The event listeners are handled by
* the client and the remaining methods are proxied to the server.
*/
export interface ClientServerProxy<T extends EventEmitter = EventEmitter> extends ServerProxy<T> {
proxyId: number | Module;
} }
/** /**

View File

@ -1,6 +1,6 @@
import { Argument, Module as ProtoModule, WorkingInit } from "../proto"; import { Argument, Module as ProtoModule, WorkingInit } from "../proto";
import { OperatingSystem } from "../common/connection"; import { OperatingSystem } from "../common/connection";
import { Module, ServerProxy } from "./proxy"; import { ClientServerProxy, Module, ServerProxy } from "./proxy";
// tslint:disable no-any // tslint:disable no-any
@ -19,6 +19,8 @@ export const escapePath = (path: string): string => {
return `'${path.replace(/'/g, "'\\''")}'`; return `'${path.replace(/'/g, "'\\''")}'`;
}; };
export type EventCallback = (event: string, ...args: any[]) => void;
export type IEncodingOptions = { export type IEncodingOptions = {
encoding?: BufferEncoding | null; encoding?: BufferEncoding | null;
flag?: string; flag?: string;
@ -34,15 +36,26 @@ export type IEncodingOptionsCallback = IEncodingOptions | ((err: NodeJS.ErrnoExc
* If sending a function is possible, provide `storeFunction`. * If sending a function is possible, provide `storeFunction`.
* If sending a proxy is possible, provide `storeProxy`. * If sending a proxy is possible, provide `storeProxy`.
*/ */
export const argumentToProto = ( export const argumentToProto = <P = ClientServerProxy | ServerProxy>(
value: any, value: any,
storeFunction?: (fn: () => void) => number, storeFunction?: (fn: () => void) => number,
storeProxy?: (proxy: ServerProxy) => number, storeProxy?: (proxy: P) => number | Module,
): Argument => { ): Argument => {
const convert = (currentValue: any): Argument => { const convert = (currentValue: any): Argument => {
const message = new Argument(); const message = new Argument();
if (currentValue instanceof Error if (isProxy<P>(currentValue)) {
if (!storeProxy) {
throw new Error("no way to serialize proxy");
}
const arg = new Argument.ProxyValue();
const id = storeProxy(currentValue);
if (typeof id === "string") {
throw new Error("unable to serialize module proxy");
}
arg.setId(id);
message.setProxy(arg);
} else if (currentValue instanceof Error
|| (currentValue && typeof currentValue.message !== "undefined" || (currentValue && typeof currentValue.message !== "undefined"
&& typeof currentValue.stack !== "undefined")) { && typeof currentValue.stack !== "undefined")) {
const arg = new Argument.ErrorValue(); const arg = new Argument.ErrorValue();
@ -58,13 +71,6 @@ export const argumentToProto = (
const arg = new Argument.ArrayValue(); const arg = new Argument.ArrayValue();
arg.setDataList(currentValue.map(convert)); arg.setDataList(currentValue.map(convert));
message.setArray(arg); message.setArray(arg);
} else if (isProxy(currentValue)) {
if (!storeProxy) {
throw new Error("no way to serialize proxy");
}
const arg = new Argument.ProxyValue();
arg.setId(storeProxy(currentValue));
message.setProxy(arg);
} else if (currentValue instanceof Date } else if (currentValue instanceof Date
|| (currentValue && typeof currentValue.getTime === "function")) { || (currentValue && typeof currentValue.getTime === "function")) {
const arg = new Argument.DateValue(); const arg = new Argument.DateValue();
@ -218,7 +224,7 @@ export const platformToProto = (platform: NodeJS.Platform): WorkingInit.Operatin
} }
}; };
export const isProxy = (value: any): value is ServerProxy => { export const isProxy = <P = ClientServerProxy | ServerProxy>(value: any): value is P => {
return value && typeof value === "object" && typeof value.onEvent === "function"; return value && typeof value === "object" && typeof value.onEvent === "function";
}; };

View File

@ -7,29 +7,35 @@ import { WritableProxy, ReadableProxy } from "./stream";
export type ForkProvider = (modulePath: string, args?: string[], options?: cp.ForkOptions) => cp.ChildProcess; export type ForkProvider = (modulePath: string, args?: string[], options?: cp.ForkOptions) => cp.ChildProcess;
export class ChildProcessProxy implements ServerProxy { export class ChildProcessProxy extends ServerProxy<cp.ChildProcess> {
public constructor(private readonly process: cp.ChildProcess) {} public constructor(instance: cp.ChildProcess) {
super({
bindEvents: ["close", "disconnect", "error", "exit", "message"],
doneEvents: ["close"],
instance,
});
}
public async kill(signal?: string): Promise<void> { public async kill(signal?: string): Promise<void> {
this.process.kill(signal); this.instance.kill(signal);
} }
public async disconnect(): Promise<void> { public async disconnect(): Promise<void> {
this.process.disconnect(); this.instance.disconnect();
} }
public async ref(): Promise<void> { public async ref(): Promise<void> {
this.process.ref(); this.instance.ref();
} }
public async unref(): Promise<void> { public async unref(): Promise<void> {
this.process.unref(); this.instance.unref();
} }
// tslint:disable-next-line no-any // tslint:disable-next-line no-any
public async send(message: any): Promise<void> { public async send(message: any): Promise<void> {
return new Promise((resolve, reject): void => { return new Promise((resolve, reject): void => {
this.process.send(message, (error) => { this.instance.send(message, (error) => {
if (error) { if (error) {
reject(error); reject(error);
} else { } else {
@ -40,25 +46,13 @@ export class ChildProcessProxy implements ServerProxy {
} }
public async getPid(): Promise<number> { public async getPid(): Promise<number> {
return this.process.pid; return this.instance.pid;
}
public async onDone(cb: () => void): Promise<void> {
this.process.on("close", cb);
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.process.kill(); this.instance.kill();
setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap. setTimeout(() => this.instance.kill("SIGKILL"), 5000); // Double tap.
} await super.dispose();
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
this.process.on("close", (code, signal) => cb("close", code, signal));
this.process.on("disconnect", () => cb("disconnect"));
this.process.on("error", (error) => cb("error", error));
this.process.on("exit", (exitCode, signal) => cb("exit", exitCode, signal));
this.process.on("message", (message) => cb("message", message));
} }
} }
@ -98,8 +92,10 @@ export class ChildProcessModuleProxy {
return { return {
childProcess: new ChildProcessProxy(process), childProcess: new ChildProcessProxy(process),
stdin: process.stdin && new WritableProxy(process.stdin), stdin: process.stdin && new WritableProxy(process.stdin),
stdout: process.stdout && new ReadableProxy(process.stdout), // Child processes streams appear to immediately flow so we need to bind
stderr: process.stderr && new ReadableProxy(process.stderr), // to the data event right away.
stdout: process.stdout && new ReadableProxy(process.stdout, ["data"]),
stderr: process.stderr && new ReadableProxy(process.stderr, ["data"]),
}; };
} }
} }

View File

@ -2,9 +2,9 @@ import * as fs from "fs";
import { promisify } from "util"; import { promisify } from "util";
import { ServerProxy } from "../../common/proxy"; import { ServerProxy } from "../../common/proxy";
import { IEncodingOptions } from "../../common/util"; import { IEncodingOptions } from "../../common/util";
import { WritableProxy } from "./stream"; import { ReadableProxy, WritableProxy } from "./stream";
// tslint:disable completed-docs // tslint:disable completed-docs no-any
/** /**
* A serializable version of fs.Stats. * A serializable version of fs.Stats.
@ -37,45 +37,52 @@ export interface Stats {
_isSocket: boolean; _isSocket: boolean;
} }
export class WriteStreamProxy extends WritableProxy<fs.WriteStream> { export class ReadStreamProxy extends ReadableProxy<fs.ReadStream> {
public constructor(stream: fs.ReadStream) {
super(stream, ["open"]);
}
public async close(): Promise<void> { public async close(): Promise<void> {
this.stream.close(); this.instance.close();
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.instance.close();
await super.dispose(); await super.dispose();
this.stream.close();
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
await super.onEvent(cb);
this.stream.on("open", (fd) => cb("open", fd));
} }
} }
export class WatcherProxy implements ServerProxy { export class WriteStreamProxy extends WritableProxy<fs.WriteStream> {
public constructor(private readonly watcher: fs.FSWatcher) {} public constructor(stream: fs.WriteStream) {
super(stream, ["open"]);
}
public async close(): Promise<void> { public async close(): Promise<void> {
this.watcher.close(); this.instance.close();
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.watcher.close(); this.instance.close();
this.watcher.removeAllListeners(); await super.dispose();
}
}
export class WatcherProxy extends ServerProxy<fs.FSWatcher> {
public constructor(watcher: fs.FSWatcher) {
super({
bindEvents: ["change", "close", "error"],
doneEvents: ["close", "error"],
instance: watcher,
});
} }
public async onDone(cb: () => void): Promise<void> { public async close(): Promise<void> {
this.watcher.on("close", cb); this.instance.close();
this.watcher.on("error", cb);
} }
// tslint:disable-next-line no-any public async dispose(): Promise<void> {
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> { this.instance.close();
this.watcher.on("change", (event, filename) => cb("change", event, filename)); await super.dispose();
this.watcher.on("close", () => cb("close"));
this.watcher.on("error", (error) => cb("error", error));
} }
} }
@ -84,7 +91,6 @@ export class FsModuleProxy {
return promisify(fs.access)(path, mode); return promisify(fs.access)(path, mode);
} }
// tslint:disable-next-line no-any
public appendFile(file: fs.PathLike | number, data: any, options?: fs.WriteFileOptions): Promise<void> { public appendFile(file: fs.PathLike | number, data: any, options?: fs.WriteFileOptions): Promise<void> {
return promisify(fs.appendFile)(file, data, options); return promisify(fs.appendFile)(file, data, options);
} }
@ -105,7 +111,10 @@ export class FsModuleProxy {
return promisify(fs.copyFile)(src, dest, flags); return promisify(fs.copyFile)(src, dest, flags);
} }
// tslint:disable-next-line no-any public async createReadStream(path: fs.PathLike, options?: any): Promise<ReadStreamProxy> {
return new ReadStreamProxy(fs.createReadStream(path, options));
}
public async createWriteStream(path: fs.PathLike, options?: any): Promise<WriteStreamProxy> { public async createWriteStream(path: fs.PathLike, options?: any): Promise<WriteStreamProxy> {
return new WriteStreamProxy(fs.createWriteStream(path, options)); return new WriteStreamProxy(fs.createWriteStream(path, options));
} }
@ -236,7 +245,6 @@ export class FsModuleProxy {
return promisify(fs.write)(fd, buffer, offset, length, position); return promisify(fs.write)(fd, buffer, offset, length, position);
} }
// tslint:disable-next-line no-any
public writeFile (path: fs.PathLike | number, data: any, options: IEncodingOptions): Promise<void> { public writeFile (path: fs.PathLike | number, data: any, options: IEncodingOptions): Promise<void> {
return promisify(fs.writeFile)(path, data, options); return promisify(fs.writeFile)(path, data, options);
} }

View File

@ -2,78 +2,65 @@ import * as net from "net";
import { ServerProxy } from "../../common/proxy"; import { ServerProxy } from "../../common/proxy";
import { DuplexProxy } from "./stream"; import { DuplexProxy } from "./stream";
// tslint:disable completed-docs // tslint:disable completed-docs no-any
export class NetSocketProxy extends DuplexProxy<net.Socket> { export class NetSocketProxy extends DuplexProxy<net.Socket> {
public constructor(socket: net.Socket) {
super(socket, ["connect", "lookup", "timeout"]);
}
public async connect(options: number | string | net.SocketConnectOpts, host?: string): Promise<void> { public async connect(options: number | string | net.SocketConnectOpts, host?: string): Promise<void> {
this.stream.connect(options as any, host as any); // tslint:disable-line no-any this works fine this.instance.connect(options as any, host as any);
} }
public async unref(): Promise<void> { public async unref(): Promise<void> {
this.stream.unref(); this.instance.unref();
} }
public async ref(): Promise<void> { public async ref(): Promise<void> {
this.stream.ref(); this.instance.ref();
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.stream.removeAllListeners(); this.instance.end();
this.stream.end(); this.instance.destroy();
this.stream.destroy(); this.instance.unref();
this.stream.unref(); await super.dispose();
}
public async onDone(cb: () => void): Promise<void> {
this.stream.on("close", cb);
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
await super.onEvent(cb);
this.stream.on("connect", () => cb("connect"));
this.stream.on("lookup", (error, address, family, host) => cb("lookup", error, address, family, host));
this.stream.on("timeout", () => cb("timeout"));
} }
} }
export class NetServerProxy implements ServerProxy { export class NetServerProxy extends ServerProxy<net.Server> {
public constructor(private readonly server: net.Server) {} public constructor(instance: net.Server) {
super({
bindEvents: ["close", "error", "listening"],
doneEvents: ["close"],
instance,
});
}
public async listen(handle?: net.ListenOptions | number | string, hostname?: string | number, backlog?: number): Promise<void> { public async listen(handle?: net.ListenOptions | number | string, hostname?: string | number, backlog?: number): Promise<void> {
this.server.listen(handle, hostname as any, backlog as any); // tslint:disable-line no-any this is fine this.instance.listen(handle, hostname as any, backlog as any);
} }
public async ref(): Promise<void> { public async ref(): Promise<void> {
this.server.ref(); this.instance.ref();
} }
public async unref(): Promise<void> { public async unref(): Promise<void> {
this.server.unref(); this.instance.unref();
} }
public async close(): Promise<void> { public async close(): Promise<void> {
this.server.close(); this.instance.close();
} }
public async onConnection(cb: (proxy: NetSocketProxy) => void): Promise<void> { public async onConnection(cb: (proxy: NetSocketProxy) => void): Promise<void> {
this.server.on("connection", (socket) => cb(new NetSocketProxy(socket))); this.instance.on("connection", (socket) => cb(new NetSocketProxy(socket)));
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.server.close(); this.instance.close();
this.server.removeAllListeners(); this.instance.removeAllListeners();
}
public async onDone(cb: () => void): Promise<void> {
this.server.on("close", cb);
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
this.server.on("close", () => cb("close"));
this.server.on("error", (error) => cb("error", error));
this.server.on("listening", () => cb("listening"));
} }
} }
@ -83,7 +70,7 @@ export class NetModuleProxy {
} }
public async createConnection(target: string | number | net.NetConnectOpts, host?: string): Promise<NetSocketProxy> { public async createConnection(target: string | number | net.NetConnectOpts, host?: string): Promise<NetSocketProxy> {
return new NetSocketProxy(net.createConnection(target as any, host)); // tslint:disable-line no-any defeat stubborness return new NetSocketProxy(net.createConnection(target as any, host));
} }
public async createServer(options?: { allowHalfOpen?: boolean, pauseOnConnect?: boolean }): Promise<NetServerProxy> { public async createServer(options?: { allowHalfOpen?: boolean, pauseOnConnect?: boolean }): Promise<NetServerProxy> {

View File

@ -9,18 +9,25 @@ import { preserveEnv } from "../../common/util";
/** /**
* Server-side IPty proxy. * Server-side IPty proxy.
*/ */
export class NodePtyProcessProxy implements ServerProxy { export class NodePtyProcessProxy extends ServerProxy {
private readonly emitter = new EventEmitter();
public constructor(private readonly process: pty.IPty) { public constructor(private readonly process: pty.IPty) {
super({
bindEvents: ["process", "data", "exit"],
doneEvents: ["exit"],
instance: new EventEmitter(),
});
this.process.on("data", (data) => this.instance.emit("data", data));
this.process.on("exit", (exitCode, signal) => this.instance.emit("exit", exitCode, signal));
let name = process.process; let name = process.process;
setTimeout(() => { // Need to wait for the caller to listen to the event. setTimeout(() => { // Need to wait for the caller to listen to the event.
this.emitter.emit("process", name); this.instance.emit("process", name);
}, 1); }, 1);
const timer = setInterval(() => { const timer = setInterval(() => {
if (process.process !== name) { if (process.process !== name) {
name = process.process; name = process.process;
this.emitter.emit("process", name); this.instance.emit("process", name);
} }
}, 200); }, 200);
@ -47,21 +54,10 @@ export class NodePtyProcessProxy implements ServerProxy {
this.process.write(data); this.process.write(data);
} }
public async onDone(cb: () => void): Promise<void> {
this.process.on("exit", cb);
}
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.process.kill(); this.process.kill();
setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap. setTimeout(() => this.process.kill("SIGKILL"), 5000); // Double tap.
this.emitter.removeAllListeners(); await super.dispose();
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
this.emitter.on("process", (process) => cb("process", process));
this.process.on("data", (data) => cb("data", data));
this.process.on("exit", (exitCode, signal) => cb("exit", exitCode, signal));
} }
} }

View File

@ -5,10 +5,14 @@ import { ServerProxy } from "../../common/proxy";
// tslint:disable completed-docs // tslint:disable completed-docs
export class RotatingLoggerProxy implements ServerProxy { export class RotatingLoggerProxy extends ServerProxy<EventEmitter> {
private readonly emitter = new EventEmitter(); public constructor(private readonly logger: spdlog.RotatingLogger) {
super({
public constructor(private readonly logger: spdlog.RotatingLogger) {} bindEvents: [],
doneEvents: ["dispose"],
instance: new EventEmitter(),
});
}
public async trace (message: string): Promise<void> { this.logger.trace(message); } public async trace (message: string): Promise<void> { this.logger.trace(message); }
public async debug (message: string): Promise<void> { this.logger.debug(message); } public async debug (message: string): Promise<void> { this.logger.debug(message); }
@ -21,19 +25,10 @@ export class RotatingLoggerProxy implements ServerProxy {
public async flush (): Promise<void> { this.logger.flush(); } public async flush (): Promise<void> { this.logger.flush(); }
public async drop (): Promise<void> { this.logger.drop(); } public async drop (): Promise<void> { this.logger.drop(); }
public async onDone(cb: () => void): Promise<void> {
this.emitter.on("dispose", cb);
}
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
await this.flush(); await this.flush();
this.emitter.emit("dispose"); this.instance.emit("dispose");
this.emitter.removeAllListeners(); await super.dispose();
}
// tslint:disable-next-line no-any
public async onEvent(_cb: (event: string, ...args: any[]) => void): Promise<void> {
// No events.
} }
} }

View File

@ -1,32 +1,38 @@
import { EventEmitter } from "events";
import * as stream from "stream"; import * as stream from "stream";
import { ServerProxy } from "../../common/proxy"; import { ServerProxy } from "../../common/proxy";
// tslint:disable completed-docs // tslint:disable completed-docs no-any
export class WritableProxy<T extends stream.Writable = stream.Writable> implements ServerProxy { export class WritableProxy<T extends stream.Writable = stream.Writable> extends ServerProxy<T> {
public constructor(protected readonly stream: T) {} public constructor(instance: T, bindEvents: string[] = [], delayedEvents?: string[]) {
super({
public async destroy(): Promise<void> { bindEvents: ["close", "drain", "error", "finish"].concat(bindEvents),
this.stream.destroy(); doneEvents: ["close"],
delayedEvents,
instance,
});
}
public async destroy(): Promise<void> {
this.instance.destroy();
} }
// tslint:disable-next-line no-any
public async end(data?: any, encoding?: string): Promise<void> { public async end(data?: any, encoding?: string): Promise<void> {
return new Promise((resolve): void => { return new Promise((resolve): void => {
this.stream.end(data, encoding, () => { this.instance.end(data, encoding, () => {
resolve(); resolve();
}); });
}); });
} }
public async setDefaultEncoding(encoding: string): Promise<void> { public async setDefaultEncoding(encoding: string): Promise<void> {
this.stream.setDefaultEncoding(encoding); this.instance.setDefaultEncoding(encoding);
} }
// tslint:disable-next-line no-any
public async write(data: any, encoding?: string): Promise<void> { public async write(data: any, encoding?: string): Promise<void> {
return new Promise((resolve, reject): void => { return new Promise((resolve, reject): void => {
this.stream.write(data, encoding, (error) => { this.instance.write(data, encoding, (error) => {
if (error) { if (error) {
reject(error); reject(error);
} else { } else {
@ -37,22 +43,8 @@ export class WritableProxy<T extends stream.Writable = stream.Writable> implemen
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.stream.end(); this.instance.end();
this.stream.removeAllListeners(); await super.dispose();
}
public async onDone(cb: () => void): Promise<void> {
this.stream.on("close", cb);
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
// Sockets have an extra argument on "close".
// tslint:disable-next-line no-any
this.stream.on("close", (...args: any[]) => cb("close", ...args));
this.stream.on("drain", () => cb("drain"));
this.stream.on("error", (error) => cb("error", error));
this.stream.on("finish", () => cb("finish"));
} }
} }
@ -60,50 +52,58 @@ export class WritableProxy<T extends stream.Writable = stream.Writable> implemen
* This noise is because we can't do multiple extends and we also can't seem to * This noise is because we can't do multiple extends and we also can't seem to
* do `extends WritableProxy<T> implement ReadableProxy<T>` (for `DuplexProxy`). * do `extends WritableProxy<T> implement ReadableProxy<T>` (for `DuplexProxy`).
*/ */
export interface IReadableProxy extends ServerProxy { export interface IReadableProxy<T extends EventEmitter> extends ServerProxy<T> {
destroy(): Promise<void>; pipe<P extends WritableProxy>(destination: P, options?: { end?: boolean; }): Promise<void>;
setEncoding(encoding: string): Promise<void>; setEncoding(encoding: string): Promise<void>;
dispose(): Promise<void>;
onDone(cb: () => void): Promise<void>;
} }
export class ReadableProxy<T extends stream.Readable = stream.Readable> implements IReadableProxy { export class ReadableProxy<T extends stream.Readable = stream.Readable> extends ServerProxy<T> implements IReadableProxy<T> {
public constructor(protected readonly stream: T) {} public constructor(instance: T, bindEvents: string[] = []) {
super({
bindEvents: ["close", "end", "error"].concat(bindEvents),
doneEvents: ["close"],
delayedEvents: ["data"],
instance,
});
}
public async pipe<P extends WritableProxy>(destination: P, options?: { end?: boolean; }): Promise<void> {
this.instance.pipe(destination.instance, options);
// `pipe` switches the stream to flowing mode and makes data start emitting.
await this.bindDelayedEvent("data");
}
public async destroy(): Promise<void> { public async destroy(): Promise<void> {
this.stream.destroy(); this.instance.destroy();
} }
public async setEncoding(encoding: string): Promise<void> { public async setEncoding(encoding: string): Promise<void> {
this.stream.setEncoding(encoding); this.instance.setEncoding(encoding);
} }
public async dispose(): Promise<void> { public async dispose(): Promise<void> {
this.stream.destroy(); this.instance.destroy();
} await super.dispose();
public async onDone(cb: () => void): Promise<void> {
this.stream.on("close", cb);
}
// tslint:disable-next-line no-any
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> {
this.stream.on("close", () => cb("close"));
this.stream.on("data", (chunk) => cb("data", chunk));
this.stream.on("end", () => cb("end"));
this.stream.on("error", (error) => cb("error", error));
} }
} }
export class DuplexProxy<T extends stream.Duplex = stream.Duplex> extends WritableProxy<T> implements IReadableProxy { export class DuplexProxy<T extends stream.Duplex = stream.Duplex> extends WritableProxy<T> implements IReadableProxy<T> {
public constructor(stream: T, bindEvents: string[] = []) {
super(stream, ["end"].concat(bindEvents), ["data"]);
}
public async pipe<P extends WritableProxy>(destination: P, options?: { end?: boolean; }): Promise<void> {
this.instance.pipe(destination.instance, options);
// `pipe` switches the stream to flowing mode and makes data start emitting.
await this.bindDelayedEvent("data");
}
public async setEncoding(encoding: string): Promise<void> { public async setEncoding(encoding: string): Promise<void> {
this.stream.setEncoding(encoding); this.instance.setEncoding(encoding);
} }
// tslint:disable-next-line no-any public async dispose(): Promise<void> {
public async onEvent(cb: (event: string, ...args: any[]) => void): Promise<void> { this.instance.destroy();
await super.onEvent(cb); await super.dispose();
this.stream.on("data", (chunk) => cb("data", chunk));
this.stream.on("end", () => cb("end"));
} }
} }

View File

@ -136,6 +136,7 @@ export class Server {
const args = proxyMessage.getArgsList().map((a) => protoToArgument( const args = proxyMessage.getArgsList().map((a) => protoToArgument(
a, a,
(id, args) => this.sendCallback(proxyId, id, args), (id, args) => this.sendCallback(proxyId, id, args),
(id) => this.getProxy(id).instance,
)); ));
logger.trace(() => [ logger.trace(() => [
@ -241,9 +242,7 @@ export class Server {
this.proxies.set(proxyId, { instance }); this.proxies.set(proxyId, { instance });
if (isProxy(instance)) { if (isProxy(instance)) {
instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args)).catch((error) => { instance.onEvent((event, ...args) => this.sendEvent(proxyId, event, ...args));
logger.error(error.message);
});
instance.onDone(() => { instance.onDone(() => {
// It might have finished because we disposed it due to a disconnect. // It might have finished because we disposed it due to a disconnect.
if (!this.disconnected) { if (!this.disconnected) {
@ -255,8 +254,6 @@ export class Server {
this.removeProxy(proxyId); this.removeProxy(proxyId);
}, this.responseTimeout); }, this.responseTimeout);
} }
}).catch((error) => {
logger.error(error.message);
}); });
} }

View File

@ -10,7 +10,7 @@ describe("child_process", () => {
const cp = client.modules[Module.ChildProcess]; const cp = client.modules[Module.ChildProcess];
const getStdout = async (proc: ChildProcess): Promise<string> => { const getStdout = async (proc: ChildProcess): Promise<string> => {
return new Promise((r): Readable => proc.stdout!.on("data", r)) return new Promise((r): Readable => proc.stdout!.once("data", r))
.then((s) => s.toString()); .then((s) => s.toString());
}; };

View File

@ -131,6 +131,42 @@ describe("fs", () => {
}); });
}); });
describe("createReadStream", () => {
it("should read a file", async () => {
const file = helper.tmpFile();
const content = "foobar";
await util.promisify(nativeFs.writeFile)(file, content);
const reader = fs.createReadStream(file);
await expect(new Promise((resolve, reject): void => {
let data = "";
reader.once("error", reject);
reader.once("end", () => resolve(data));
reader.on("data", (d) => data += d.toString());
})).resolves.toBe(content);
});
it("should pipe to a writable stream", async () => {
const source = helper.tmpFile();
const content = "foo";
await util.promisify(nativeFs.writeFile)(source, content);
const destination = helper.tmpFile();
const reader = fs.createReadStream(source);
const writer = fs.createWriteStream(destination);
await new Promise((resolve, reject): void => {
reader.once("error", reject);
writer.once("error", reject);
writer.once("close", resolve);
reader.pipe(writer);
});
await expect(util.promisify(nativeFs.readFile)(destination, "utf8")).resolves.toBe(content);
});
});
describe("exists", () => { describe("exists", () => {
it("should output file exists", async () => { it("should output file exists", async () => {
await expect(util.promisify(fs.exists)(__filename)) await expect(util.promisify(fs.exists)(__filename))
@ -279,10 +315,9 @@ describe("fs", () => {
.resolves.toBeUndefined(); .resolves.toBeUndefined();
}); });
// TODO: Doesn't fail on my system?
it("should fail to lchown nonexistent file", async () => { it("should fail to lchown nonexistent file", async () => {
await expect(util.promisify(fs.lchown)(helper.tmpFile(), 1, 1)) await expect(util.promisify(fs.lchown)(helper.tmpFile(), 1, 1))
.resolves.toBeUndefined(); .rejects.toThrow("ENOENT");
}); });
}); });