Reconnection works

This commit is contained in:
Asher 2019-07-15 18:55:30 -05:00
parent 83819cb3f9
commit 1a3fc86894
No known key found for this signature in database
GPG Key ID: D63C1EF81242354A
4 changed files with 70 additions and 100 deletions

View File

@ -195,7 +195,7 @@ const main = async (): Promise<void> => {
if (args["open"]) { if (args["open"]) {
await open(serverAddress).catch(console.error); await open(serverAddress).catch(console.error);
console.log(" - Opened URL"); console.log(` - Opened ${serverAddress}`);
} }
}; };

View File

@ -4,7 +4,7 @@ import { getPathFromAmdModule } from "vs/base/common/amd";
import { VSBuffer } from "vs/base/common/buffer"; import { VSBuffer } from "vs/base/common/buffer";
import { Emitter } from "vs/base/common/event"; import { Emitter } from "vs/base/common/event";
import { ISocket } from "vs/base/parts/ipc/common/ipc.net"; import { ISocket } from "vs/base/parts/ipc/common/ipc.net";
import { NodeSocket, WebSocketNodeSocket } from "vs/base/parts/ipc/node/ipc.net"; import { NodeSocket } from "vs/base/parts/ipc/node/ipc.net";
import { ILogService } from "vs/platform/log/common/log"; import { ILogService } from "vs/platform/log/common/log";
import { IExtHostReadyMessage, IExtHostSocketMessage } from "vs/workbench/services/extensions/common/extensionHostProtocol"; import { IExtHostReadyMessage, IExtHostSocketMessage } from "vs/workbench/services/extensions/common/extensionHostProtocol";
@ -12,76 +12,54 @@ import { Protocol } from "vs/server/src/protocol";
import { uriTransformerPath } from "vs/server/src/util"; import { uriTransformerPath } from "vs/server/src/util";
export abstract class Connection { export abstract class Connection {
private readonly _onClose = new Emitter<void>(); protected readonly _onClose = new Emitter<void>();
public readonly onClose = this._onClose.event; public readonly onClose = this._onClose.event;
protected disposed: boolean = false;
private timeout: NodeJS.Timeout | undefined; public constructor(protected protocol: Protocol) {}
private readonly wait = 1000 * 60;
private closed: boolean = false;
public constructor(protected protocol: Protocol) {
// onClose seems to mean we want to disconnect, so close immediately.
protocol.onClose(() => this.close());
// If the socket closes, we want to wait before closing so we can
// reconnect in the meantime.
protocol.onSocketClose(() => {
this.timeout = setTimeout(() => {
this.close();
}, this.wait);
});
}
/** /**
* Set up the connection on a new socket. * Set up the connection on a new socket.
*/ */
public reconnect(protocol: Protocol, buffer: VSBuffer): void { public abstract reconnect(socket: ISocket, buffer: VSBuffer): void;
if (this.closed) {
throw new Error("Cannot reconnect to closed connection");
}
clearTimeout(this.timeout as any); // Not sure why the type doesn't work.
this.protocol = protocol;
this.connect(protocol.getSocket(), buffer);
}
/**
* Close and clean up connection. This will also kill the socket the
* connection is on. Probably not safe to reconnect once this has happened.
*/
protected close(): void {
if (!this.closed) {
this.closed = true;
this.protocol.sendDisconnect();
this.dispose();
this.protocol.dispose();
this._onClose.fire();
}
}
/** /**
* Clean up the connection. * Clean up the connection.
*/ */
protected abstract dispose(): void; protected abstract dispose(): void;
/**
* Connect to a new socket.
*/
protected abstract connect(socket: ISocket, buffer: VSBuffer): void;
} }
/** /**
* Used for all the IPC channels. * Used for all the IPC channels.
*/ */
export class ManagementConnection extends Connection { export class ManagementConnection extends Connection {
protected dispose(): void { private timeout: NodeJS.Timeout | undefined;
// Nothing extra to do here. private readonly wait = 1000 * 60;
public constructor(protocol: Protocol) {
super(protocol);
protocol.onClose(() => this.dispose());
protocol.onSocketClose(() => {
this.timeout = setTimeout(() => this.dispose(), this.wait);
});
} }
protected connect(socket: ISocket, buffer: VSBuffer): void { public reconnect(socket: ISocket, buffer: VSBuffer): void {
clearTimeout(this.timeout as any); // Not sure why the type doesn't work.
this.protocol.beginAcceptReconnection(socket, buffer); this.protocol.beginAcceptReconnection(socket, buffer);
this.protocol.endAcceptReconnection(); this.protocol.endAcceptReconnection();
} }
protected dispose(): void {
if (!this.disposed) {
clearTimeout(this.timeout as any); // Not sure why the type doesn't work.
this.disposed = true;
this.protocol.sendDisconnect();
this.protocol.dispose();
this.protocol.getSocket().end();
this._onClose.fire();
}
}
} }
/** /**
@ -90,38 +68,45 @@ export class ManagementConnection extends Connection {
export class ExtensionHostConnection extends Connection { export class ExtensionHostConnection extends Connection {
private process: cp.ChildProcess; private process: cp.ChildProcess;
public constructor(protocol: Protocol, private readonly log: ILogService) { public constructor(
protocol: Protocol, buffer: VSBuffer,
private readonly log: ILogService,
) {
super(protocol); super(protocol);
const socket = this.protocol.getSocket(); protocol.dispose();
const buffer = this.protocol.readEntireBuffer(); this.process = this.spawn(buffer);
this.process = this.spawn(socket, buffer);
} }
protected dispose(): void { protected dispose(): void {
this.process.kill(); if (!this.disposed) {
this.disposed = true;
this.process.kill();
this.protocol.getSocket().end();
this._onClose.fire();
}
} }
protected connect(socket: ISocket, buffer: VSBuffer): void { public reconnect(socket: ISocket, buffer: VSBuffer): void {
this.sendInitMessage(socket, buffer); // This is just to set the new socket.
this.protocol.beginAcceptReconnection(socket, null);
this.protocol.dispose();
this.sendInitMessage(buffer);
} }
private sendInitMessage(nodeSocket: ISocket, buffer: VSBuffer): void { private sendInitMessage(buffer: VSBuffer): void {
const socket = nodeSocket instanceof NodeSocket const socket = this.protocol.getUnderlyingSocket();
? nodeSocket.socket
: (nodeSocket as WebSocketNodeSocket).socket.socket;
socket.pause(); socket.pause();
const initMessage: IExtHostSocketMessage = { const initMessage: IExtHostSocketMessage = {
type: "VSCODE_EXTHOST_IPC_SOCKET", type: "VSCODE_EXTHOST_IPC_SOCKET",
initialDataChunk: (buffer.buffer as Buffer).toString("base64"), initialDataChunk: (buffer.buffer as Buffer).toString("base64"),
skipWebSocketFrames: nodeSocket instanceof NodeSocket, skipWebSocketFrames: this.protocol.getSocket() instanceof NodeSocket,
}; };
this.process.send(initMessage, socket); this.process.send(initMessage, socket);
} }
private spawn(socket: ISocket, buffer: VSBuffer): cp.ChildProcess { private spawn(buffer: VSBuffer): cp.ChildProcess {
const proc = cp.fork( const proc = cp.fork(
getPathFromAmdModule(require, "bootstrap-fork"), getPathFromAmdModule(require, "bootstrap-fork"),
[ [
@ -142,20 +127,15 @@ export class ExtensionHostConnection extends Connection {
}, },
); );
proc.on("error", (error) => { proc.on("error", () => this.dispose());
console.error(error); proc.on("exit", () => this.dispose());
this.close();
});
proc.on("exit", (code, signal) => {
console.error("Extension host exited", { code, signal });
this.close();
});
proc.stdout.setEncoding("utf8"); proc.stdout.setEncoding("utf8");
proc.stderr.setEncoding("utf8"); proc.stderr.setEncoding("utf8");
proc.stdout.on("data", (data) => this.log.info("Extension host stdout", data));
proc.stderr.on("data", (data) => this.log.error("Extension host stderr", data)); proc.stdout.on("data", (d) => this.log.info("Extension host stdout", d));
proc.stderr.on("data", (d) => this.log.error("Extension host stderr", d));
proc.on("message", (event) => { proc.on("message", (event) => {
if (event && event.type === "__$console") { if (event && event.type === "__$console") {
const severity = this.log[event.severity] ? event.severity : "info"; const severity = this.log[event.severity] ? event.severity : "info";
@ -166,10 +146,9 @@ export class ExtensionHostConnection extends Connection {
const listen = (message: IExtHostReadyMessage) => { const listen = (message: IExtHostReadyMessage) => {
if (message.type === "VSCODE_EXTHOST_IPC_READY") { if (message.type === "VSCODE_EXTHOST_IPC_READY") {
proc.removeListener("message", listen); proc.removeListener("message", listen);
this.sendInitMessage(socket, buffer); this.sendInitMessage(buffer);
} }
}; };
proc.on("message", listen); proc.on("message", listen);
return proc; return proc;

View File

@ -13,8 +13,6 @@ export interface SocketOptions {
} }
export class Protocol extends PersistentProtocol { export class Protocol extends PersistentProtocol {
private disposed: boolean = false;
public constructor( public constructor(
secWebsocketKey: string, secWebsocketKey: string,
socket: net.Socket, socket: net.Socket,
@ -25,15 +23,14 @@ export class Protocol extends PersistentProtocol {
? new NodeSocket(socket) ? new NodeSocket(socket)
: new WebSocketNodeSocket(new NodeSocket(socket)), : new WebSocketNodeSocket(new NodeSocket(socket)),
); );
socket.on("error", () => this.dispose()); socket.on("error", () => socket.destroy());
socket.on("end", () => this.dispose()); socket.on("end", () => socket.destroy());
// This magic value is specified by the websocket spec. // This magic value is specified by the websocket spec.
const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; const magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
const reply = crypto.createHash("sha1") const reply = crypto.createHash("sha1")
.update(secWebsocketKey + magic) .update(secWebsocketKey + magic)
.digest("base64"); .digest("base64");
socket.write([ socket.write([
"HTTP/1.1 101 Switching Protocols", "HTTP/1.1 101 Switching Protocols",
"Upgrade: websocket", "Upgrade: websocket",
@ -42,21 +39,11 @@ export class Protocol extends PersistentProtocol {
].join("\r\n") + "\r\n\r\n"); ].join("\r\n") + "\r\n\r\n");
} }
public sendDisconnect(): void { public getUnderlyingSocket(): net.Socket {
if (!this.disposed) { const socket = this.getSocket();
super.sendDisconnect(); return socket instanceof NodeSocket
} ? socket.socket
} : (socket as WebSocketNodeSocket).socket.socket;
public dispose(error?: Error): void {
if (!this.disposed) {
this.disposed = true;
if (error) {
this.sendMessage({ type: "error", reason: error.message });
}
super.dispose();
this.getSocket().dispose();
}
} }
/** /**

View File

@ -376,7 +376,9 @@ export class MainServer extends Server {
try { try {
await this.connect(await protocol.handshake(), protocol); await this.connect(await protocol.handshake(), protocol);
} catch (error) { } catch (error) {
protocol.dispose(error); protocol.sendMessage({ type: "error", reason: error.message });
protocol.dispose();
protocol.getSocket().dispose();
} }
}); });
@ -539,7 +541,7 @@ export class MainServer extends Server {
protocol.sendMessage(ok); protocol.sendMessage(ok);
const buffer = protocol.readEntireBuffer(); const buffer = protocol.readEntireBuffer();
protocol.dispose(); protocol.dispose();
return connections.get(token)!.reconnect(protocol, buffer); return connections.get(token)!.reconnect(protocol.getSocket(), buffer);
} }
if (protocol.options.reconnection || connections.has(token)) { if (protocol.options.reconnection || connections.has(token)) {
@ -559,8 +561,10 @@ export class MainServer extends Server {
onDidClientDisconnect: connection.onClose, onDidClientDisconnect: connection.onClose,
}); });
} else { } else {
const buffer = protocol.readEntireBuffer();
connection = new ExtensionHostConnection( connection = new ExtensionHostConnection(
protocol, this.services.get(ILogService) as ILogService, protocol, buffer,
this.services.get(ILogService) as ILogService,
); );
} }
connections.set(protocol.options.reconnectionToken, connection); connections.set(protocol.options.reconnectionToken, connection);