Move connection logic into connection class

- Moved everything I could into the class itself.
- Improve the logging situation a bit.
- Switch some trace logs to debug.
- Get debug port from message arguments.
This commit is contained in:
Asher 2021-04-20 11:17:18 -05:00
parent ae6089f852
commit f0bafa387f
No known key found for this signature in database
GPG Key ID: D63C1EF81242354A
4 changed files with 110 additions and 79 deletions

View File

@ -3,30 +3,49 @@ import * as cp from 'child_process';
import { VSBuffer } from 'vs/base/common/buffer';
import { Emitter } from 'vs/base/common/event';
import { FileAccess } from 'vs/base/common/network';
import { ISocket } from 'vs/base/parts/ipc/common/ipc.net';
import { INativeEnvironmentService } from 'vs/platform/environment/common/environment';
import { IRemoteExtensionHostStartParams } from 'vs/platform/remote/common/remoteAgentConnection';
import { getNlsConfiguration } from 'vs/server/node/nls';
import { Protocol } from 'vs/server/node/protocol';
import { IExtHostReadyMessage } from 'vs/workbench/services/extensions/common/extensionHostProtocol';
export abstract class Connection {
private readonly _onClose = new Emitter<void>();
/**
* Fire when the connection is closed (not just disconnected). This should
* only happen when the connection is offline and old or has an error.
*/
public readonly onClose = this._onClose.event;
private disposed = false;
private _offline: number | undefined;
public constructor(protected protocol: Protocol, public readonly token: string) {}
protected readonly logger: Logger;
public constructor(
protected readonly protocol: Protocol,
public readonly name: string,
) {
this.logger = logger.named(
this.name,
field('token', this.protocol.options.reconnectionToken),
);
this.logger.debug('Connecting...');
this.onClose(() => this.logger.debug('Closed'));
}
public get offline(): number | undefined {
return this._offline;
}
public reconnect(socket: ISocket, buffer: VSBuffer): void {
public reconnect(protocol: Protocol): void {
this.logger.debug('Reconnecting...');
this._offline = undefined;
this.doReconnect(socket, buffer);
this.doReconnect(protocol);
}
public dispose(): void {
public dispose(reason?: string): void {
this.logger.debug('Disposing...', field('reason', reason));
if (!this.disposed) {
this.disposed = true;
this.doDispose();
@ -35,6 +54,7 @@ export abstract class Connection {
}
protected setOffline(): void {
this.logger.debug('Disconnected');
if (!this._offline) {
this._offline = Date.now();
}
@ -43,7 +63,11 @@ export abstract class Connection {
/**
* Set up the connection on a new socket.
*/
protected abstract doReconnect(socket: ISocket, buffer: VSBuffer): void;
protected abstract doReconnect(protcol: Protocol): void;
/**
* Dispose/destroy everything permanently.
*/
protected abstract doDispose(): void;
}
@ -51,19 +75,22 @@ export abstract class Connection {
* Used for all the IPC channels.
*/
export class ManagementConnection extends Connection {
public constructor(protected protocol: Protocol, token: string) {
super(protocol, token);
public constructor(protocol: Protocol) {
super(protocol, 'management');
protocol.onDidDispose(() => this.dispose()); // Explicit close.
protocol.onSocketClose(() => this.setOffline()); // Might reconnect.
protocol.sendMessage({ type: 'ok' });
}
protected doDispose(): void {
this.protocol.destroy();
}
protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
this.protocol.beginAcceptReconnection(socket, buffer);
protected doReconnect(protocol: Protocol): void {
protocol.sendMessage({ type: 'ok' });
this.protocol.beginAcceptReconnection(protocol.getSocket(), protocol.readEntireBuffer());
this.protocol.endAcceptReconnection();
protocol.dispose();
}
}
@ -82,17 +109,21 @@ type ExtHostMessage = DisconnectedMessage | ConsoleMessage | IExtHostReadyMessag
export class ExtensionHostConnection extends Connection {
private process?: cp.ChildProcess;
private readonly logger: Logger;
public constructor(
locale: string, protocol: Protocol, buffer: VSBuffer, token: string,
protocol: Protocol,
private readonly params: IRemoteExtensionHostStartParams,
private readonly environment: INativeEnvironmentService,
) {
super(protocol, token);
this.logger = logger.named('exthost', field('token', token));
this.protocol.dispose();
this.spawn(locale, buffer).then((p) => this.process = p);
this.protocol.getUnderlyingSocket().pause();
super(protocol, 'exthost');
protocol.sendMessage({ debugPort: this.params.port });
const buffer = protocol.readEntireBuffer();
const inflateBytes = protocol.inflateBytes;
protocol.dispose();
protocol.getUnderlyingSocket().pause();
this.spawn(buffer, inflateBytes).then((p) => this.process = p);
}
protected doDispose(): void {
@ -102,34 +133,38 @@ export class ExtensionHostConnection extends Connection {
}
}
protected doReconnect(socket: ISocket, buffer: VSBuffer): void {
this.protocol.setSocket(socket);
this.protocol.dispose();
this.sendInitMessage(buffer);
protected doReconnect(protocol: Protocol): void {
protocol.sendMessage({ debugPort: this.params.port });
const buffer = protocol.readEntireBuffer();
const inflateBytes = protocol.inflateBytes;
protocol.dispose();
protocol.getUnderlyingSocket().pause();
this.protocol.setSocket(protocol.getSocket());
this.sendInitMessage(buffer, inflateBytes);
}
private sendInitMessage(buffer: VSBuffer): void {
private sendInitMessage(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): void {
if (!this.process) {
throw new Error("Tried to initialize VS Code before spawning");
throw new Error('Tried to initialize VS Code before spawning');
}
this.protocol.getUnderlyingSocket().pause();
this.logger.debug('Sending socket');
// TODO: Do something with the debug port.
this.process.send({
type: 'VSCODE_EXTHOST_IPC_SOCKET',
initialDataChunk: Buffer.from(buffer.buffer).toString('base64'),
skipWebSocketFrames: this.protocol.options.skipWebSocketFrames,
permessageDeflate: this.protocol.options.permessageDeflate,
inflateBytes: this.protocol.inflateBytes,
inflateBytes: inflateBytes ? Buffer.from(inflateBytes).toString('base64') : undefined,
}, this.protocol.getUnderlyingSocket());
}
private async spawn(locale: string, buffer: VSBuffer): Promise<cp.ChildProcess> {
this.logger.trace('Getting NLS configuration...');
const config = await getNlsConfiguration(locale, this.environment.userDataPath);
this.logger.trace('Spawning extension host...');
private async spawn(buffer: VSBuffer, inflateBytes: Uint8Array | undefined): Promise<cp.ChildProcess> {
this.logger.debug('Getting NLS configuration...');
const config = await getNlsConfiguration(this.params.language, this.environment.userDataPath);
this.logger.debug('Spawning extension host...');
const proc = cp.fork(
FileAccess.asFileUri('bootstrap-fork', require).fsPath,
// While not technically necessary, makes it easier to tell which process
@ -158,7 +193,7 @@ export class ExtensionHostConnection extends Connection {
this.dispose();
});
proc.on('exit', (code) => {
this.logger.trace('Exited', field('code', code));
this.logger.debug('Exited', field('code', code));
this.dispose();
});
if (proc.stdout && proc.stderr) {
@ -177,12 +212,12 @@ export class ExtensionHostConnection extends Connection {
}
break;
case 'VSCODE_EXTHOST_DISCONNECTED':
this.logger.trace('Going offline');
this.logger.debug('Got disconnected message');
this.setOffline();
break;
case 'VSCODE_EXTHOST_IPC_READY':
this.logger.trace('Got ready message');
this.sendInitMessage(buffer);
this.logger.debug('Handshake completed');
this.sendInitMessage(buffer, inflateBytes);
break;
default:
this.logger.error('Unexpected message', field('event', event));
@ -190,7 +225,7 @@ export class ExtensionHostConnection extends Connection {
}
});
this.logger.trace('Waiting for handshake...');
this.logger.debug('Waiting for handshake...');
return proc;
}
}

View File

@ -130,12 +130,12 @@ export class Protocol extends PersistentProtocol {
}
/**
* Get inflateBytes in base64 format from the current socket.
* Get inflateBytes from the current socket.
*/
public get inflateBytes(): string | undefined {
public get inflateBytes(): Uint8Array | undefined {
const socket = this.getSocket();
return socket instanceof WebSocketNodeSocket
? Buffer.from(socket.recordedInflateBytes.buffer).toString('base64')
? socket.recordedInflateBytes.buffer
: undefined;
}
}

View File

@ -1,4 +1,3 @@
import { field } from '@coder/logger';
import * as fs from 'fs';
import * as net from 'net';
import { release } from 'os';
@ -140,56 +139,61 @@ export class Vscode {
switch (message.desiredConnectionType) {
case ConnectionType.ExtensionHost:
case ConnectionType.Management:
// Initialize connection map for this type of connection.
if (!this.connections.has(message.desiredConnectionType)) {
this.connections.set(message.desiredConnectionType, new Map());
}
const connections = this.connections.get(message.desiredConnectionType)!;
const ok = async () => {
return message.desiredConnectionType === ConnectionType.ExtensionHost
? { debugPort: await this.getDebugPort() }
: { type: 'ok' };
};
const token = protocol.options.reconnectionToken;
if (protocol.options.reconnection && connections.has(token)) {
protocol.sendMessage(await ok());
const buffer = protocol.readEntireBuffer();
protocol.dispose();
return connections.get(token)!.reconnect(protocol.getSocket(), buffer);
} else if (protocol.options.reconnection || connections.has(token)) {
throw new Error(protocol.options.reconnection
? 'Unrecognized reconnection token'
: 'Duplicate reconnection token'
);
let connection = connections.get(token);
if (protocol.options.reconnection && connection) {
return connection.reconnect(protocol);
}
logger.debug('New connection', field('token', token));
protocol.sendMessage(await ok());
// This probably means the process restarted so the session was lost
// while the browser remained open.
if (protocol.options.reconnection) {
throw new Error(`Unable to reconnect; session no longer exists (${token})`);
}
let connection: Connection;
// This will probably never happen outside a chance collision.
if (connection) {
throw new Error('Unable to connect; token is already in use');
}
// Now that the initial exchange has completed we can create the actual
// connection on top of the protocol then send it to whatever uses it.
if (message.desiredConnectionType === ConnectionType.Management) {
connection = new ManagementConnection(protocol, token);
// The management connection is used by firing onDidClientConnect
// which makes the IPC server become aware of the connection.
connection = new ManagementConnection(protocol);
this._onDidClientConnect.fire({
protocol, onDidClientDisconnect: connection.onClose,
protocol,
onDidClientDisconnect: connection.onClose,
});
} else {
const buffer = protocol.readEntireBuffer();
// The extension host connection is used by spawning an extension host
// and passing the socket into it.
connection = new ExtensionHostConnection(
message.args ? message.args.language : 'en',
protocol, buffer, token,
protocol,
{
language: 'en',
...message.args,
},
this.services.get(IEnvironmentService) as INativeEnvironmentService,
);
}
connections.set(token, connection);
connection.onClose(() => {
logger.debug('Connection closed', field('token', token));
connections.delete(token);
});
connection.onClose(() => connections.delete(token));
this.disposeOldOfflineConnections(connections);
logger.debug(`${connections.size} active ${connection.name} connection(s)`);
break;
case ConnectionType.Tunnel: return protocol.tunnel();
default: throw new Error('Unrecognized connection type');
case ConnectionType.Tunnel:
return protocol.tunnel();
default:
throw new Error(`Unrecognized connection type ${message.desiredConnectionType}`);
}
}
@ -197,8 +201,7 @@ export class Vscode {
const offline = Array.from(connections.values())
.filter((connection) => typeof connection.offline !== 'undefined');
for (let i = 0, max = offline.length - this.maxExtraOfflineConnections; i < max; ++i) {
logger.debug('Disposing offline connection', field('token', offline[i].token));
offline[i].dispose();
offline[i].dispose('old');
}
}
@ -292,11 +295,4 @@ export class Vscode {
});
});
}
/**
* TODO: implement.
*/
private async getDebugPort(): Promise<number | undefined> {
return undefined;
}
}

View File

@ -41,7 +41,7 @@ export const register = async (
if (error) {
return reject(error)
}
logger.trace(plural(count, `${count} active connection`))
logger.debug(plural(count, `${count} active connection`))
resolve(count > 0)
})
})