From 1a3fc868948b5b044410ccdebe6dbba1e083f0d1 Mon Sep 17 00:00:00 2001 From: Asher Date: Mon, 15 Jul 2019 18:55:30 -0500 Subject: [PATCH] Reconnection works --- src/cli.ts | 2 +- src/connection.ts | 131 +++++++++++++++++++--------------------------- src/protocol.ts | 27 +++------- src/server.ts | 10 ++-- 4 files changed, 70 insertions(+), 100 deletions(-) diff --git a/src/cli.ts b/src/cli.ts index d06b7d2c..5a7abe80 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -195,7 +195,7 @@ const main = async (): Promise => { if (args["open"]) { await open(serverAddress).catch(console.error); - console.log(" - Opened URL"); + console.log(` - Opened ${serverAddress}`); } }; diff --git a/src/connection.ts b/src/connection.ts index 3c29a97c..0a39202d 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -4,7 +4,7 @@ import { getPathFromAmdModule } from "vs/base/common/amd"; import { VSBuffer } from "vs/base/common/buffer"; import { Emitter } from "vs/base/common/event"; import { ISocket } from "vs/base/parts/ipc/common/ipc.net"; -import { NodeSocket, WebSocketNodeSocket } from "vs/base/parts/ipc/node/ipc.net"; +import { NodeSocket } from "vs/base/parts/ipc/node/ipc.net"; import { ILogService } from "vs/platform/log/common/log"; import { IExtHostReadyMessage, IExtHostSocketMessage } from "vs/workbench/services/extensions/common/extensionHostProtocol"; @@ -12,76 +12,54 @@ import { Protocol } from "vs/server/src/protocol"; import { uriTransformerPath } from "vs/server/src/util"; export abstract class Connection { - private readonly _onClose = new Emitter(); + protected readonly _onClose = new Emitter(); public readonly onClose = this._onClose.event; + protected disposed: boolean = false; - private timeout: NodeJS.Timeout | undefined; - private readonly wait = 1000 * 60; - - private closed: boolean = false; - - public constructor(protected protocol: Protocol) { - // onClose seems to mean we want to disconnect, so close immediately. - protocol.onClose(() => this.close()); - - // If the socket closes, we want to wait before closing so we can - // reconnect in the meantime. - protocol.onSocketClose(() => { - this.timeout = setTimeout(() => { - this.close(); - }, this.wait); - }); - } + public constructor(protected protocol: Protocol) {} /** * Set up the connection on a new socket. */ - public reconnect(protocol: Protocol, buffer: VSBuffer): void { - if (this.closed) { - throw new Error("Cannot reconnect to closed connection"); - } - clearTimeout(this.timeout as any); // Not sure why the type doesn't work. - this.protocol = protocol; - this.connect(protocol.getSocket(), buffer); - } - - /** - * Close and clean up connection. This will also kill the socket the - * connection is on. Probably not safe to reconnect once this has happened. - */ - protected close(): void { - if (!this.closed) { - this.closed = true; - this.protocol.sendDisconnect(); - this.dispose(); - this.protocol.dispose(); - this._onClose.fire(); - } - } + public abstract reconnect(socket: ISocket, buffer: VSBuffer): void; /** * Clean up the connection. */ protected abstract dispose(): void; - - /** - * Connect to a new socket. - */ - protected abstract connect(socket: ISocket, buffer: VSBuffer): void; } /** * Used for all the IPC channels. */ export class ManagementConnection extends Connection { - protected dispose(): void { - // Nothing extra to do here. + private timeout: NodeJS.Timeout | undefined; + private readonly wait = 1000 * 60; + + public constructor(protocol: Protocol) { + super(protocol); + protocol.onClose(() => this.dispose()); + protocol.onSocketClose(() => { + this.timeout = setTimeout(() => this.dispose(), this.wait); + }); } - protected connect(socket: ISocket, buffer: VSBuffer): void { + public reconnect(socket: ISocket, buffer: VSBuffer): void { + clearTimeout(this.timeout as any); // Not sure why the type doesn't work. this.protocol.beginAcceptReconnection(socket, buffer); this.protocol.endAcceptReconnection(); } + + protected dispose(): void { + if (!this.disposed) { + clearTimeout(this.timeout as any); // Not sure why the type doesn't work. + this.disposed = true; + this.protocol.sendDisconnect(); + this.protocol.dispose(); + this.protocol.getSocket().end(); + this._onClose.fire(); + } + } } /** @@ -90,38 +68,45 @@ export class ManagementConnection extends Connection { export class ExtensionHostConnection extends Connection { private process: cp.ChildProcess; - public constructor(protocol: Protocol, private readonly log: ILogService) { + public constructor( + protocol: Protocol, buffer: VSBuffer, + private readonly log: ILogService, + ) { super(protocol); - const socket = this.protocol.getSocket(); - const buffer = this.protocol.readEntireBuffer(); - this.process = this.spawn(socket, buffer); + protocol.dispose(); + this.process = this.spawn(buffer); } protected dispose(): void { - this.process.kill(); + if (!this.disposed) { + this.disposed = true; + this.process.kill(); + this.protocol.getSocket().end(); + this._onClose.fire(); + } } - protected connect(socket: ISocket, buffer: VSBuffer): void { - this.sendInitMessage(socket, buffer); + public reconnect(socket: ISocket, buffer: VSBuffer): void { + // This is just to set the new socket. + this.protocol.beginAcceptReconnection(socket, null); + this.protocol.dispose(); + this.sendInitMessage(buffer); } - private sendInitMessage(nodeSocket: ISocket, buffer: VSBuffer): void { - const socket = nodeSocket instanceof NodeSocket - ? nodeSocket.socket - : (nodeSocket as WebSocketNodeSocket).socket.socket; - + private sendInitMessage(buffer: VSBuffer): void { + const socket = this.protocol.getUnderlyingSocket(); socket.pause(); const initMessage: IExtHostSocketMessage = { type: "VSCODE_EXTHOST_IPC_SOCKET", initialDataChunk: (buffer.buffer as Buffer).toString("base64"), - skipWebSocketFrames: nodeSocket instanceof NodeSocket, + skipWebSocketFrames: this.protocol.getSocket() instanceof NodeSocket, }; this.process.send(initMessage, socket); } - private spawn(socket: ISocket, buffer: VSBuffer): cp.ChildProcess { + private spawn(buffer: VSBuffer): cp.ChildProcess { const proc = cp.fork( getPathFromAmdModule(require, "bootstrap-fork"), [ @@ -142,20 +127,15 @@ export class ExtensionHostConnection extends Connection { }, ); - proc.on("error", (error) => { - console.error(error); - this.close(); - }); - - proc.on("exit", (code, signal) => { - console.error("Extension host exited", { code, signal }); - this.close(); - }); + proc.on("error", () => this.dispose()); + proc.on("exit", () => this.dispose()); proc.stdout.setEncoding("utf8"); proc.stderr.setEncoding("utf8"); - proc.stdout.on("data", (data) => this.log.info("Extension host stdout", data)); - proc.stderr.on("data", (data) => this.log.error("Extension host stderr", data)); + + proc.stdout.on("data", (d) => this.log.info("Extension host stdout", d)); + proc.stderr.on("data", (d) => this.log.error("Extension host stderr", d)); + proc.on("message", (event) => { if (event && event.type === "__$console") { const severity = this.log[event.severity] ? event.severity : "info"; @@ -166,10 +146,9 @@ export class ExtensionHostConnection extends Connection { const listen = (message: IExtHostReadyMessage) => { if (message.type === "VSCODE_EXTHOST_IPC_READY") { proc.removeListener("message", listen); - this.sendInitMessage(socket, buffer); + this.sendInitMessage(buffer); } }; - proc.on("message", listen); return proc; diff --git a/src/protocol.ts b/src/protocol.ts index b11ecfc5..1ecbaed8 100644 --- a/src/protocol.ts +++ b/src/protocol.ts @@ -13,8 +13,6 @@ export interface SocketOptions { } export class Protocol extends PersistentProtocol { - private disposed: boolean = false; - public constructor( secWebsocketKey: string, socket: net.Socket, @@ -25,15 +23,14 @@ export class Protocol extends PersistentProtocol { ? new NodeSocket(socket) : new WebSocketNodeSocket(new NodeSocket(socket)), ); - socket.on("error", () => this.dispose()); - socket.on("end", () => this.dispose()); + socket.on("error", () => socket.destroy()); + socket.on("end", () => socket.destroy()); // This magic value is specified by the websocket spec. const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; const reply = crypto.createHash("sha1") .update(secWebsocketKey + magic) .digest("base64"); - socket.write([ "HTTP/1.1 101 Switching Protocols", "Upgrade: websocket", @@ -42,21 +39,11 @@ export class Protocol extends PersistentProtocol { ].join("\r\n") + "\r\n\r\n"); } - public sendDisconnect(): void { - if (!this.disposed) { - super.sendDisconnect(); - } - } - - public dispose(error?: Error): void { - if (!this.disposed) { - this.disposed = true; - if (error) { - this.sendMessage({ type: "error", reason: error.message }); - } - super.dispose(); - this.getSocket().dispose(); - } + public getUnderlyingSocket(): net.Socket { + const socket = this.getSocket(); + return socket instanceof NodeSocket + ? socket.socket + : (socket as WebSocketNodeSocket).socket.socket; } /** diff --git a/src/server.ts b/src/server.ts index 8fac0f2d..cafd76fd 100644 --- a/src/server.ts +++ b/src/server.ts @@ -376,7 +376,9 @@ export class MainServer extends Server { try { await this.connect(await protocol.handshake(), protocol); } catch (error) { - protocol.dispose(error); + protocol.sendMessage({ type: "error", reason: error.message }); + protocol.dispose(); + protocol.getSocket().dispose(); } }); @@ -539,7 +541,7 @@ export class MainServer extends Server { protocol.sendMessage(ok); const buffer = protocol.readEntireBuffer(); protocol.dispose(); - return connections.get(token)!.reconnect(protocol, buffer); + return connections.get(token)!.reconnect(protocol.getSocket(), buffer); } if (protocol.options.reconnection || connections.has(token)) { @@ -559,8 +561,10 @@ export class MainServer extends Server { onDidClientDisconnect: connection.onClose, }); } else { + const buffer = protocol.readEntireBuffer(); connection = new ExtensionHostConnection( - protocol, this.services.get(ILogService) as ILogService, + protocol, buffer, + this.services.get(ILogService) as ILogService, ); } connections.set(protocol.options.reconnectionToken, connection);