From 31e14a0a069e85398a0cf0acc9d2b267c1413976 Mon Sep 17 00:00:00 2001 From: Julien Dessaux Date: Sat, 10 Jun 2023 00:56:10 +0200 Subject: Reworked the network code to improve sending rate management --- lib/api.js | 125 +++++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 76 insertions(+), 49 deletions(-) diff --git a/lib/api.js b/lib/api.js index d8010a9..7bbd156 100644 --- a/lib/api.js +++ b/lib/api.js @@ -1,12 +1,45 @@ import * as fs from 'fs'; +import * as events from 'events'; import { getToken } from '../database/config.js'; import { PriorityQueue } from './priority_queue.js'; -let busy = false; // lets us know if we are already sending api requests or not. +// queue processor module variables +const bus = new events.EventEmitter(); // a bus to notify the queue processor to start processing messages +let busy = false; // true if we are already sending api requests. +let backoffSeconds = 0; +let running = false; +// other module variables let headers = undefined; // a file scope variable so that we only evaluate these once. let queue = new PriorityQueue(); // a priority queue to hold api calls we want to send, allows for throttling. +// a single queue processor should be running at any time, otherwise there will be trouble! +async function queue_processor() { + if (running) { + throw 'refusing to start a second queue processor'; + } + running = true; + while(true) { + try { + if (backoffSeconds > 0) { + await sleep(backoffSeconds * 1000); + backoffSeconds = 0; + } + if (queue.isEmpty()) { + busy = false; + await new Promise(resolve => bus.once('send', resolve)); + busy = true; + } + await send_this(queue.dequeue().element); + await sleep(400); // 333 should work, but 400 should still allow some manual requests to go through during development + } catch (e) { + running = false; + throw e; + } + } +} +queue_processor(); + // send takes a request object as argument and an optional context ctx // example request: { // endpoint: the path part of the url to call, @@ -22,25 +55,15 @@ export function send(request, ctx) { request: request, resolve: resolve, }; + queue.enqueue(data, request.priority ? request.priority : 10); if (!busy) { - busy = true; - send_this(data); - } else { - queue.enqueue(data, request.priority ? request.priority : 10); + bus.emit('send'); // the queue was previously empty, let's wake up the queue_processor } }); } -function send_next() { - if (queue.isEmpty()) { - busy = false; - } else { - send_this(queue.dequeue().element); - } -} - // send_this take a data object as argument built in the send function above -function send_this(data) { +async function send_this(data) { if (headers === undefined) { const token = getToken(); if (token === null) { @@ -61,41 +84,45 @@ function send_this(data) { options['body'] = JSON.stringify(data.request.payload); } fs.writeFileSync('log', JSON.stringify({event: 'send', date: new Date(), data: data}) + '\n', {flag: 'a+'}); - fetch(`https://api.spacetraders.io/v2${data.request.endpoint}`, options) - .then(response => response.json()) - .then(async response => { - switch(response.error?.code) { - //case 401: // 401 means a server reset happened - // close database file - // rm database file - // logrotate - // spawnSync - // break; - case 429: // 429 means rate limited, let's hold back for 10 seconds - await sleep(10000); - queue.enqueue(data, 1); - break; - default: - fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: response}) + '\n', {flag: 'a+'}); - return data.resolve(response); - }}) - .catch(async err => { - fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: err}) + '\n', {flag: 'a+'}); - switch(err.cause?.code) { - case 503: // 503 means maintenance mode, let's hold back for 1 minute - await sleep(60000); - queue.enqueue(data, 1); - break; - case 'ECONNRESET': - queue.enqueue(data, 1); - break; - case 'UND_ERR_CONNECT_TIMEOUT': - queue.enqueue(data, 1); - break; - default: - data.reject(err); - }}); - setTimeout(send_next, 400); // 333 should work, but 400 will still allow manual requests to go through during development + let response = await fetch(`https://api.spacetraders.io/v2${data.request.endpoint}`, options); + if (response.ok) { + response = await response.json(); + switch(response.error?.code) { + //case 401: // TODO 401 means a server reset happened + // close database file + // rm database file + // logrotate + // spawnSync + // break; + case 429: // 429 means rate limited, let's hold back as instructed + backoffSeconds = response.error.data.retryAfter; + queue.enqueue(data, 1); + break; + default: // no error! + fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: response}) + '\n', {flag: 'a+'}); + return data.resolve(response); + } + } else { + fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: response}) + '\n', {flag: 'a+'}); + switch(response.cause?.code) { + case 503: // 503 means maintenance mode, let's hold back for 1 minute + backoffSeconds = 60; + queue.enqueue(data, 1); + break; + case 'EAI_AGAIN': // DNS lookup timed out error, let's hold back for 5 seconds + backoffSeconds = 5; + queue.enqueue(data, 1); + break; + case 'ECONNRESET': + queue.enqueue(data, 1); + break; + case 'UND_ERR_CONNECT_TIMEOUT': + queue.enqueue(data, 1); + break; + default: + data.reject(response); + } + } } export function debugLog(ctx) { -- cgit v1.2.3