diff options
author | Julien Dessaux | 2023-06-10 00:56:10 +0200 |
---|---|---|
committer | Julien Dessaux | 2023-06-10 00:56:10 +0200 |
commit | 31e14a0a069e85398a0cf0acc9d2b267c1413976 (patch) | |
tree | 50f5cc6ad251f4c4efaac7b7eda4f13b2584ea6e | |
parent | Implement a bit more autonomy for the ships (diff) | |
download | spacetraders-31e14a0a069e85398a0cf0acc9d2b267c1413976.tar.gz spacetraders-31e14a0a069e85398a0cf0acc9d2b267c1413976.tar.bz2 spacetraders-31e14a0a069e85398a0cf0acc9d2b267c1413976.zip |
Reworked the network code to improve sending rate management
-rw-r--r-- | lib/api.js | 125 |
1 files changed, 76 insertions, 49 deletions
@@ -1,12 +1,45 @@ import * as fs from 'fs'; +import * as events from 'events'; import { getToken } from '../database/config.js'; import { PriorityQueue } from './priority_queue.js'; -let busy = false; // lets us know if we are already sending api requests or not. +// queue processor module variables +const bus = new events.EventEmitter(); // a bus to notify the queue processor to start processing messages +let busy = false; // true if we are already sending api requests. +let backoffSeconds = 0; +let running = false; +// other module variables let headers = undefined; // a file scope variable so that we only evaluate these once. let queue = new PriorityQueue(); // a priority queue to hold api calls we want to send, allows for throttling. +// a single queue processor should be running at any time, otherwise there will be trouble! +async function queue_processor() { + if (running) { + throw 'refusing to start a second queue processor'; + } + running = true; + while(true) { + try { + if (backoffSeconds > 0) { + await sleep(backoffSeconds * 1000); + backoffSeconds = 0; + } + if (queue.isEmpty()) { + busy = false; + await new Promise(resolve => bus.once('send', resolve)); + busy = true; + } + await send_this(queue.dequeue().element); + await sleep(400); // 333 should work, but 400 should still allow some manual requests to go through during development + } catch (e) { + running = false; + throw e; + } + } +} +queue_processor(); + // send takes a request object as argument and an optional context ctx // example request: { // endpoint: the path part of the url to call, @@ -22,25 +55,15 @@ export function send(request, ctx) { request: request, resolve: resolve, }; + queue.enqueue(data, request.priority ? request.priority : 10); if (!busy) { - busy = true; - send_this(data); - } else { - queue.enqueue(data, request.priority ? request.priority : 10); + bus.emit('send'); // the queue was previously empty, let's wake up the queue_processor } }); } -function send_next() { - if (queue.isEmpty()) { - busy = false; - } else { - send_this(queue.dequeue().element); - } -} - // send_this take a data object as argument built in the send function above -function send_this(data) { +async function send_this(data) { if (headers === undefined) { const token = getToken(); if (token === null) { @@ -61,41 +84,45 @@ function send_this(data) { options['body'] = JSON.stringify(data.request.payload); } fs.writeFileSync('log', JSON.stringify({event: 'send', date: new Date(), data: data}) + '\n', {flag: 'a+'}); - fetch(`https://api.spacetraders.io/v2${data.request.endpoint}`, options) - .then(response => response.json()) - .then(async response => { - switch(response.error?.code) { - //case 401: // 401 means a server reset happened - // close database file - // rm database file - // logrotate - // spawnSync - // break; - case 429: // 429 means rate limited, let's hold back for 10 seconds - await sleep(10000); - queue.enqueue(data, 1); - break; - default: - fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: response}) + '\n', {flag: 'a+'}); - return data.resolve(response); - }}) - .catch(async err => { - fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: err}) + '\n', {flag: 'a+'}); - switch(err.cause?.code) { - case 503: // 503 means maintenance mode, let's hold back for 1 minute - await sleep(60000); - queue.enqueue(data, 1); - break; - case 'ECONNRESET': - queue.enqueue(data, 1); - break; - case 'UND_ERR_CONNECT_TIMEOUT': - queue.enqueue(data, 1); - break; - default: - data.reject(err); - }}); - setTimeout(send_next, 400); // 333 should work, but 400 will still allow manual requests to go through during development + let response = await fetch(`https://api.spacetraders.io/v2${data.request.endpoint}`, options); + if (response.ok) { + response = await response.json(); + switch(response.error?.code) { + //case 401: // TODO 401 means a server reset happened + // close database file + // rm database file + // logrotate + // spawnSync + // break; + case 429: // 429 means rate limited, let's hold back as instructed + backoffSeconds = response.error.data.retryAfter; + queue.enqueue(data, 1); + break; + default: // no error! + fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: response}) + '\n', {flag: 'a+'}); + return data.resolve(response); + } + } else { + fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: response}) + '\n', {flag: 'a+'}); + switch(response.cause?.code) { + case 503: // 503 means maintenance mode, let's hold back for 1 minute + backoffSeconds = 60; + queue.enqueue(data, 1); + break; + case 'EAI_AGAIN': // DNS lookup timed out error, let's hold back for 5 seconds + backoffSeconds = 5; + queue.enqueue(data, 1); + break; + case 'ECONNRESET': + queue.enqueue(data, 1); + break; + case 'UND_ERR_CONNECT_TIMEOUT': + queue.enqueue(data, 1); + break; + default: + data.reject(response); + } + } } export function debugLog(ctx) { |