Merge tag 'v4.2.0-beta2'

This commit is contained in:
bgme 2023-08-22 03:51:20 +08:00
commit 9e38d55101
3010 changed files with 81215 additions and 55173 deletions

View file

@ -1,65 +1,28 @@
// @ts-check
const os = require('os');
const throng = require('throng');
const fs = require('fs');
const http = require('http');
const url = require('url');
const dotenv = require('dotenv');
const express = require('express');
const http = require('http');
const redis = require('redis');
const pg = require('pg');
const log = require('npmlog');
const url = require('url');
const uuid = require('uuid');
const fs = require('fs');
const WebSocket = require('ws');
const { JSDOM } = require('jsdom');
const log = require('npmlog');
const pg = require('pg');
const dbUrlToConfig = require('pg-connection-string').parse;
const metrics = require('prom-client');
const redis = require('redis');
const uuid = require('uuid');
const WebSocket = require('ws');
const env = process.env.NODE_ENV || 'development';
const alwaysRequireAuth = process.env.LIMITED_FEDERATION_MODE === 'true' || process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
const environment = process.env.NODE_ENV || 'development';
dotenv.config({
path: env === 'production' ? '.env.production' : '.env',
path: environment === 'production' ? '.env.production' : '.env',
});
log.level = process.env.LOG_LEVEL || 'verbose';
/**
* @param {string} dbUrl
* @return {Object.<string, any>}
*/
const dbUrlToConfig = (dbUrl) => {
if (!dbUrl) {
return {};
}
const params = url.parse(dbUrl, true);
const config = {};
if (params.auth) {
[config.user, config.password] = params.auth.split(':');
}
if (params.hostname) {
config.host = params.hostname;
}
if (params.port) {
config.port = params.port;
}
if (params.pathname) {
config.database = params.pathname.split('/')[1];
}
const ssl = params.query && params.query.ssl;
if (ssl && ssl === 'true' || ssl === '1') {
config.ssl = true;
}
return config;
};
/**
* @param {Object.<string, any>} defaultConfig
* @param {string} redisUrl
@ -89,8 +52,6 @@ const redisUrlToClient = async (defaultConfig, redisUrl) => {
return client;
};
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
/**
* Attempts to safely parse a string as JSON, used when both receiving a message
* from redis and when receiving a message from a client over a websocket
@ -122,55 +83,79 @@ const parseJSON = (json, req) => {
}
};
const startMaster = () => {
if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
}
log.warn(`Starting streaming API server master with ${numWorkers} workers`);
};
const startWorker = async (workerId) => {
log.warn(`Starting worker ${workerId}`);
/**
* @param {Object.<string, any>} env the `process.env` value to read configuration from
* @returns {Object.<string, any>} the configuration for the PostgreSQL connection
*/
const pgConfigFromEnv = (env) => {
const pgConfigs = {
development: {
user: process.env.DB_USER || pg.defaults.user,
password: process.env.DB_PASS || pg.defaults.password,
database: process.env.DB_NAME || 'mastodon_development',
host: process.env.DB_HOST || pg.defaults.host,
port: process.env.DB_PORT || pg.defaults.port,
user: env.DB_USER || pg.defaults.user,
password: env.DB_PASS || pg.defaults.password,
database: env.DB_NAME || 'mastodon_development',
host: env.DB_HOST || pg.defaults.host,
port: env.DB_PORT || pg.defaults.port,
},
production: {
user: process.env.DB_USER || 'mastodon',
password: process.env.DB_PASS || '',
database: process.env.DB_NAME || 'mastodon_production',
host: process.env.DB_HOST || 'localhost',
port: process.env.DB_PORT || 5432,
user: env.DB_USER || 'mastodon',
password: env.DB_PASS || '',
database: env.DB_NAME || 'mastodon_production',
host: env.DB_HOST || 'localhost',
port: env.DB_PORT || 5432,
},
};
const app = express();
let baseConfig;
app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
if (env.DATABASE_URL) {
baseConfig = dbUrlToConfig(env.DATABASE_URL);
const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL), {
max: process.env.DB_POOL || 10,
// Support overriding the database password in the connection URL
if (!baseConfig.password && env.DB_PASS) {
baseConfig.password = env.DB_PASS;
}
} else {
baseConfig = pgConfigs[environment];
if (env.DB_SSLMODE) {
switch(env.DB_SSLMODE) {
case 'disable':
case '':
baseConfig.ssl = false;
break;
case 'no-verify':
baseConfig.ssl = { rejectUnauthorized: false };
break;
default:
baseConfig.ssl = {};
break;
}
}
}
return {
...baseConfig,
max: env.DB_POOL || 10,
connectionTimeoutMillis: 15000,
ssl: !!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable',
}));
application_name: '',
};
};
const server = http.createServer(app);
const redisNamespace = process.env.REDIS_NAMESPACE || null;
/**
* @param {Object.<string, any>} env the `process.env` value to read configuration from
* @returns {Object.<string, any>} configuration for the Redis connection
*/
const redisConfigFromEnv = (env) => {
const redisNamespace = env.REDIS_NAMESPACE || null;
const redisParams = {
socket: {
host: process.env.REDIS_HOST || '127.0.0.1',
port: process.env.REDIS_PORT || 6379,
host: env.REDIS_HOST || '127.0.0.1',
port: env.REDIS_PORT || 6379,
},
database: process.env.REDIS_DB || 0,
password: process.env.REDIS_PASSWORD || undefined,
database: env.REDIS_DB || 0,
password: env.REDIS_PASSWORD || undefined,
};
if (redisNamespace) {
@ -179,17 +164,101 @@ const startWorker = async (workerId) => {
const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
return {
redisParams,
redisPrefix,
redisUrl: env.REDIS_URL,
};
};
const startServer = async () => {
const app = express();
app.set('trust proxy', process.env.TRUSTED_PROXY_IP ? process.env.TRUSTED_PROXY_IP.split(/(?:\s*,\s*|\s+)/) : 'loopback,uniquelocal');
const pgPool = new pg.Pool(pgConfigFromEnv(process.env));
const server = http.createServer(app);
const { redisParams, redisUrl, redisPrefix } = redisConfigFromEnv(process.env);
/**
* @type {Object.<string, Array.<function(Object<string, any>): void>>}
*/
const subs = {};
const redisSubscribeClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisClient = await redisUrlToClient(redisParams, process.env.REDIS_URL);
const redisSubscribeClient = await redisUrlToClient(redisParams, redisUrl);
const redisClient = await redisUrlToClient(redisParams, redisUrl);
// Collect metrics from Node.js
metrics.collectDefaultMetrics();
new metrics.Gauge({
name: 'pg_pool_total_connections',
help: 'The total number of clients existing within the pool',
collect() {
this.set(pgPool.totalCount);
},
});
new metrics.Gauge({
name: 'pg_pool_idle_connections',
help: 'The number of clients which are not checked out but are currently idle in the pool',
collect() {
this.set(pgPool.idleCount);
},
});
new metrics.Gauge({
name: 'pg_pool_waiting_queries',
help: 'The number of queued requests waiting on a client when all clients are checked out',
collect() {
this.set(pgPool.waitingCount);
},
});
const connectedClients = new metrics.Gauge({
name: 'connected_clients',
help: 'The number of clients connected to the streaming server',
labelNames: ['type'],
});
connectedClients.set({ type: 'websocket' }, 0);
connectedClients.set({ type: 'eventsource' }, 0);
const connectedChannels = new metrics.Gauge({
name: 'connected_channels',
help: 'The number of channels the streaming server is streaming to',
labelNames: [ 'type', 'channel' ]
});
const redisSubscriptions = new metrics.Gauge({
name: 'redis_subscriptions',
help: 'The number of Redis channels the streaming server is subscribed to',
});
// When checking metrics in the browser, the favicon is requested this
// prevents the request from falling through to the API Router, which would
// error for this endpoint:
app.get('/favicon.ico', (req, res) => res.status(404).end());
app.get('/api/v1/streaming/health', (req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('OK');
});
app.get('/metrics', async (req, res) => {
try {
res.set('Content-Type', metrics.register.contentType);
res.end(await metrics.register.metrics());
} catch (ex) {
log.error(ex);
res.status(500).end();
}
});
/**
* @param {string[]} channels
* @return {function(): void}
* @returns {function(): void}
*/
const subscriptionHeartbeat = channels => {
const interval = 6 * 60;
@ -244,6 +313,7 @@ const startWorker = async (workerId) => {
if (subs[channel].length === 0) {
log.verbose(`Subscribe ${channel}`);
redisSubscribeClient.subscribe(channel, onRedisMessage);
redisSubscriptions.inc();
}
subs[channel].push(callback);
@ -265,6 +335,7 @@ const startWorker = async (workerId) => {
if (subs[channel].length === 0) {
log.verbose(`Unsubscribe ${channel}`);
redisSubscribeClient.unsubscribe(channel);
redisSubscriptions.dec();
delete subs[channel];
}
};
@ -283,7 +354,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} value
* @return {boolean}
* @returns {boolean}
*/
const isTruthy = value =>
value && !FALSE_VALUES.includes(value);
@ -291,7 +362,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
* @param {function(Error=): void} next
*/
const allowCrossDomain = (req, res, next) => {
res.header('Access-Control-Allow-Origin', '*');
@ -304,7 +375,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
* @param {function(Error=): void} next
*/
const setRequestId = (req, res, next) => {
req.requestId = uuid.v4();
@ -316,7 +387,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {any} res
* @param {function(Error=): void}
* @param {function(Error=): void} next
*/
const setRemoteAddress = (req, res, next) => {
req.remoteAddress = req.connection.remoteAddress;
@ -327,7 +398,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {string[]} necessaryScopes
* @return {boolean}
* @returns {boolean}
*/
const isInScope = (req, necessaryScopes) =>
req.scopes.some(scope => necessaryScopes.includes(scope));
@ -335,7 +406,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} token
* @param {any} req
* @return {Promise.<void>}
* @returns {Promise.<void>}
*/
const accountFromToken = (token, req) => new Promise((resolve, reject) => {
pgPool.connect((err, client, done) => {
@ -373,25 +444,19 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {boolean=} required
* @return {Promise.<void>}
* @returns {Promise.<void>}
*/
const accountFromRequest = (req, required = true) => new Promise((resolve, reject) => {
const accountFromRequest = (req) => new Promise((resolve, reject) => {
const authorization = req.headers.authorization;
const location = url.parse(req.url, true);
const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
if (!authorization && !accessToken) {
if (required) {
const err = new Error('Missing access token');
err.status = 401;
const err = new Error('Missing access token');
err.status = 401;
reject(err);
return;
} else {
resolve();
return;
}
reject(err);
return;
}
const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
@ -444,8 +509,8 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {string} channelName
* @return {Promise.<void>}
* @param {string|undefined} channelName
* @returns {Promise.<void>}
*/
const checkScopes = (req, channelName) => new Promise((resolve, reject) => {
log.silly(req.requestId, `Checking OAuth scopes for ${channelName}`);
@ -494,7 +559,7 @@ const startWorker = async (workerId) => {
// variables. OAuth scope checks are moved to the point of subscription
// to a specific stream.
accountFromRequest(info.req, alwaysRequireAuth).then(() => {
accountFromRequest(info.req).then(() => {
callback(true, undefined, undefined);
}).catch(err => {
log.error(info.req.requestId, err.toString());
@ -547,10 +612,14 @@ const startWorker = async (workerId) => {
res.on('close', () => {
unsubscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
unsubscribe(`${redisPrefix}${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).dec(2);
});
subscribe(`${redisPrefix}${accessTokenChannelId}`, listener);
subscribe(`${redisPrefix}${systemChannelId}`, listener);
connectedChannels.labels({ type: 'eventsource', channel: 'system' }).inc(2);
};
/**
@ -564,7 +633,19 @@ const startWorker = async (workerId) => {
return;
}
accountFromRequest(req, alwaysRequireAuth).then(() => checkScopes(req, channelNameFromPath(req))).then(() => {
const channelName = channelNameFromPath(req);
// If no channelName can be found for the request, then we should terminate
// the connection, as there's nothing to stream back
if (!channelName) {
const err = new Error('Unknown channel requested');
err.status = 400;
next(err);
return;
}
accountFromRequest(req).then(() => checkScopes(req, channelName)).then(() => {
subscribeHttpToSystemChannel(req, res);
}).then(() => {
next();
@ -594,14 +675,14 @@ const startWorker = async (workerId) => {
/**
* @param {array} arr
* @param {number=} shift
* @return {string}
* @returns {string}
*/
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
/**
* @param {string} listId
* @param {any} req
* @return {Promise.<void>}
* @returns {Promise.<void>}
*/
const authorizeListAccess = (listId, req) => new Promise((resolve, reject) => {
const { accountId } = req;
@ -836,6 +917,7 @@ const startWorker = async (workerId) => {
}).catch(err => {
releasePgConnection();
log.error(err);
releasePgConnection();
});
});
};
@ -854,11 +936,20 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @param {any} res
* @return {function(string, string): void}
* @returns {function(string, string): void}
*/
const streamToHttp = (req, res) => {
const accountId = req.accountId || req.remoteAddress;
const channelName = channelNameFromPath(req);
connectedClients.labels({ type: 'eventsource' }).inc();
// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).inc();
}
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-store');
res.setHeader('Transfer-Encoding', 'chunked');
@ -869,6 +960,14 @@ const startWorker = async (workerId) => {
req.on('close', () => {
log.verbose(req.requestId, `Ending stream for ${accountId}`);
// We decrement these counters here instead of in streamHttpEnd as in that
// method we don't have knowledge of the channel names
connectedClients.labels({ type: 'eventsource' }).dec();
// In theory we'll always have a channel name, but channelNameFromPath can return undefined:
if (typeof channelName === 'string') {
connectedChannels.labels({ type: 'eventsource', channel: channelName }).dec();
}
clearInterval(heartbeat);
});
@ -900,7 +999,7 @@ const startWorker = async (workerId) => {
* @param {any} req
* @param {any} ws
* @param {string[]} streamName
* @return {function(string, string): void}
* @returns {function(string, string): void}
*/
const streamToWs = (req, ws, streamName) => (event, payload) => {
if (ws.readyState !== ws.OPEN) {
@ -908,7 +1007,11 @@ const startWorker = async (workerId) => {
return;
}
ws.send(JSON.stringify({ stream: streamName, event, payload }));
ws.send(JSON.stringify({ stream: streamName, event, payload }), (err) => {
if (err) {
log.error(req.requestId, `Failed to send to websocket: ${err}`);
}
});
};
/**
@ -919,40 +1022,18 @@ const startWorker = async (workerId) => {
res.end(JSON.stringify({ error: 'Not found' }));
};
app.use(setRequestId);
app.use(setRemoteAddress);
app.use(allowCrossDomain);
const api = express.Router();
app.get('/api/v1/streaming/health', (req, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
res.end('OK');
});
app.use(api);
app.get('/metrics', (req, res) => server.getConnections((err, count) => {
res.writeHeader(200, { 'Content-Type': 'application/openmetrics-text; version=1.0.0; charset=utf-8' });
res.write('# TYPE connected_clients gauge\n');
res.write('# HELP connected_clients The number of clients connected to the streaming server\n');
res.write(`connected_clients ${count}.0\n`);
res.write('# TYPE connected_channels gauge\n');
res.write('# HELP connected_channels The number of Redis channels the streaming server is subscribed to\n');
res.write(`connected_channels ${Object.keys(subs).length}.0\n`);
res.write('# TYPE pg_pool_total_connections gauge\n');
res.write('# HELP pg_pool_total_connections The total number of clients existing within the pool\n');
res.write(`pg_pool_total_connections ${pgPool.totalCount}.0\n`);
res.write('# TYPE pg_pool_idle_connections gauge\n');
res.write('# HELP pg_pool_idle_connections The number of clients which are not checked out but are currently idle in the pool\n');
res.write(`pg_pool_idle_connections ${pgPool.idleCount}.0\n`);
res.write('# TYPE pg_pool_waiting_queries gauge\n');
res.write('# HELP pg_pool_waiting_queries The number of queued requests waiting on a client when all clients are checked out\n');
res.write(`pg_pool_waiting_queries ${pgPool.waitingCount}.0\n`);
res.write('# EOF\n');
res.end();
}));
api.use(setRequestId);
api.use(setRemoteAddress);
api.use(allowCrossDomain);
app.use(authenticationMiddleware);
app.use(errorMiddleware);
api.use(authenticationMiddleware);
api.use(errorMiddleware);
app.get('/api/v1/streaming/*', (req, res) => {
api.get('/api/v1/streaming/*', (req, res) => {
channelNameToIds(req, channelNameFromPath(req), req.query).then(({ channelIds, options }) => {
const onSend = streamToHttp(req, res);
const onEnd = streamHttpEnd(req, subscriptionHeartbeat(channelIds));
@ -975,7 +1056,7 @@ const startWorker = async (workerId) => {
/**
* @param {any} req
* @return {string[]}
* @returns {string[]}
*/
const channelsForUserStream = req => {
const arr = [`timeline:${req.accountId}`];
@ -1000,7 +1081,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} str
* @return {string}
* @returns {string}
*/
const foldToASCII = str => {
const regex = new RegExp(NON_ASCII_CHARS.split('').join('|'), 'g');
@ -1013,7 +1094,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} str
* @return {string}
* @returns {string}
*/
const normalizeHashtag = str => {
return foldToASCII(str.normalize('NFKC').toLowerCase()).replace(/[^\p{L}\p{N}_\u00b7\u200c]/gu, '');
@ -1023,7 +1104,7 @@ const startWorker = async (workerId) => {
* @param {any} req
* @param {string} name
* @param {StreamParams} params
* @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
* @returns {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>}
*/
const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => {
switch (name) {
@ -1131,7 +1212,7 @@ const startWorker = async (workerId) => {
/**
* @param {string} channelName
* @param {StreamParams} params
* @return {string[]}
* @returns {string[]}
*/
const streamNameFromChannelName = (channelName, params) => {
if (channelName === 'list') {
@ -1147,15 +1228,16 @@ const startWorker = async (workerId) => {
* @typedef WebSocketSession
* @property {any} socket
* @property {any} request
* @property {Object.<string, { listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
* @property {Object.<string, { channelName: string, listener: SubscriptionListener, stopHeartbeat: function(): void }>} subscriptions
*/
/**
* @param {WebSocketSession} session
* @param {string} channelName
* @param {StreamParams} params
* @returns {void}
*/
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) =>
const subscribeWebsocketToChannel = ({ socket, request, subscriptions }, channelName, params) => {
checkScopes(request, channelName).then(() => channelNameToIds(request, channelName, params)).then(({
channelIds,
options,
@ -1168,7 +1250,10 @@ const startWorker = async (workerId) => {
const stopHeartbeat = subscriptionHeartbeat(channelIds);
const listener = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering);
connectedChannels.labels({ type: 'websocket', channel: channelName }).inc();
subscriptions[channelIds.join(';')] = {
channelName,
listener,
stopHeartbeat,
};
@ -1176,35 +1261,47 @@ const startWorker = async (workerId) => {
log.verbose(request.requestId, 'Subscription error:', err.toString());
socket.send(JSON.stringify({ error: err.toString() }));
});
}
const removeSubscription = (subscriptions, channelIds, request) => {
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
const subscription = subscriptions[channelIds.join(';')];
if (!subscription) {
return;
}
channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, subscription.listener);
});
connectedChannels.labels({ type: 'websocket', channel: subscription.channelName }).dec();
subscription.stopHeartbeat();
delete subscriptions[channelIds.join(';')];
}
/**
* @param {WebSocketSession} session
* @param {string} channelName
* @param {StreamParams} params
* @returns {void}
*/
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) =>
const unsubscribeWebsocketFromChannel = ({ socket, request, subscriptions }, channelName, params) => {
channelNameToIds(request, channelName, params).then(({ channelIds }) => {
log.verbose(request.requestId, `Ending stream from ${channelIds.join(', ')} for ${request.accountId}`);
const subscription = subscriptions[channelIds.join(';')];
if (!subscription) {
return;
}
const { listener, stopHeartbeat } = subscription;
channelIds.forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, listener);
});
stopHeartbeat();
delete subscriptions[channelIds.join(';')];
removeSubscription(subscriptions, channelIds, request);
}).catch(err => {
log.verbose(request.requestId, 'Unsubscription error:', err);
socket.send(JSON.stringify({ error: err.toString() }));
log.verbose(request.requestId, 'Unsubscribe error:', err);
// If we have a socket that is alive and open still, send the error back to the client:
// FIXME: In other parts of the code ws === socket
if (socket.isAlive && socket.readyState === socket.OPEN) {
socket.send(JSON.stringify({ error: "Error unsubscribing from channel" }));
}
});
}
/**
* @param {WebSocketSession} session
@ -1225,21 +1322,25 @@ const startWorker = async (workerId) => {
subscribe(`${redisPrefix}${systemChannelId}`, listener);
subscriptions[accessTokenChannelId] = {
channelName: 'system',
listener,
stopHeartbeat: () => {
},
};
subscriptions[systemChannelId] = {
channelName: 'system',
listener,
stopHeartbeat: () => {
},
};
connectedChannels.labels({ type: 'websocket', channel: 'system' }).inc(2);
};
/**
* @param {string|string[]} arrayOrString
* @return {string}
* @returns {string}
*/
const firstParam = arrayOrString => {
if (Array.isArray(arrayOrString)) {
@ -1261,6 +1362,8 @@ const startWorker = async (workerId) => {
ws.isAlive = true;
});
connectedClients.labels({ type: 'websocket' }).inc();
/**
* @type {WebSocketSession}
*/
@ -1271,17 +1374,18 @@ const startWorker = async (workerId) => {
};
const onEnd = () => {
const keys = Object.keys(session.subscriptions);
const subscriptions = Object.keys(session.subscriptions);
keys.forEach(channelIds => {
const { listener, stopHeartbeat } = session.subscriptions[channelIds];
channelIds.split(';').forEach(channelId => {
unsubscribe(`${redisPrefix}${channelId}`, listener);
});
stopHeartbeat();
subscriptions.forEach(channelIds => {
removeSubscription(session.subscriptions, channelIds.split(';'), req)
});
// ensure garbage collection:
session.socket = null;
session.request = null;
session.subscriptions = {};
connectedClients.labels({ type: 'websocket' }).dec();
};
ws.on('close', onEnd);
@ -1330,11 +1434,10 @@ const startWorker = async (workerId) => {
}, 30000);
attachServerWithConfig(server, address => {
log.warn(`Worker ${workerId} now listening on ${address}`);
log.warn(`Streaming API now listening on ${address}`);
});
const onExit = () => {
log.warn(`Worker ${workerId} exiting`);
server.close();
process.exit(0);
};
@ -1372,34 +1475,4 @@ const attachServerWithConfig = (server, onSuccess) => {
}
};
/**
* @param {function(Error=): void} onSuccess
*/
const onPortAvailable = onSuccess => {
const testServer = http.createServer();
testServer.once('error', err => {
onSuccess(err);
});
testServer.once('listening', () => {
testServer.once('close', () => onSuccess());
testServer.close();
});
attachServerWithConfig(testServer);
};
onPortAvailable(err => {
if (err) {
log.error('Could not start server, the port or socket is in use');
return;
}
throng({
workers: numWorkers,
lifetime: Infinity,
start: startWorker,
master: startMaster,
});
});
startServer();