1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
import * as fs from 'fs';
import * as events from 'events';
import { getToken } from '../database/config.js';
import { PriorityQueue } from './priority_queue.js';
// queue processor module variables
const bus = new events.EventEmitter(); // a bus to notify the queue processor to start processing messages
let busy = false; // true if we are already sending api requests.
let backoffSeconds = 0;
let running = false;
// other module variables
let headers = undefined; // a file scope variable so that we only evaluate these once.
let queue = new PriorityQueue(); // a priority queue to hold api calls we want to send, allows for throttling.
// a single queue processor should be running at any time, otherwise there will be trouble!
async function queue_processor() {
if (running) {
throw 'refusing to start a second queue processor';
}
running = true;
while(true) {
try {
if (backoffSeconds > 0) {
await sleep(backoffSeconds * 1000);
backoffSeconds = 0;
}
if (queue.isEmpty()) {
busy = false;
await new Promise(resolve => bus.once('send', resolve));
busy = true;
}
const before = new Date();
await send_this(queue.dequeue().element);
const duration = new Date() - before;
if (duration < 400) { // 333 should work, but 400 should still allow some manual requests to go through during development
await sleep(400 - duration);
}
} catch (e) {
running = false;
throw e;
}
}
}
queue_processor();
// send takes a request object as argument and an optional context ctx
// example request: {
// endpoint: the path part of the url to call,
// method: HTTP method for `fetch` call, defaults to 'GET',
// payload: optional json object that will be send along with the request,
// priority: optional priority value (defaults to 10, lower than 10 means the message will be sent faster)
// }
export function send(request, ctx) {
return new Promise((resolve, reject) => {
let data = {
ctx: ctx,
reject: reject,
request: request,
resolve: resolve,
};
queue.enqueue(data, request.priority ? request.priority : 10);
if (!busy) {
bus.emit('send'); // the queue was previously empty, let's wake up the queue_processor
}
});
}
// send_this take a data object as argument built in the send function above
async function send_this(data) {
if (headers === undefined) {
const token = getToken();
if (token === null) {
throw 'Could not get token from the database. Did you init or register yet?';
}
headers = {
'Content-Type': 'application/json',
'Authorization': `Bearer ${token}`
};
}
let options = {
headers: headers,
};
if (data.request.method !== undefined) {
options['method'] = data.request.method;
}
if (data.request.payload !== undefined) {
options['body'] = JSON.stringify(data.request.payload);
}
fs.writeFileSync('log', JSON.stringify({event: 'send', date: new Date(), data: data}) + '\n', {flag: 'a+'});
try {
let response = await fetch(`https://api.spacetraders.io/v2${data.request.endpoint}`, options);
response = await response.json();
switch(response.error?.code) {
//case 401: // TODO 401 means a server reset happened
// TODO reject all promises in queue
// close database file
// rm database file
// logrotate
// spawnSync
// break;
case 429: // 429 means rate limited, let's hold back as instructed
backoffSeconds = response.error.data.retryAfter;
queue.enqueue(data, 1);
break;
case 503: // 503 means maintenance mode, let's hold back for 1 minute
backoffSeconds = 60;
queue.enqueue(data, 1);
break;
default: // no error!
fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: response}) + '\n', {flag: 'a+'});
return data.resolve(response);
}
} catch (err) {
fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: err}) + '\n', {flag: 'a+'});
switch(err.cause?.code) {
case 'EAI_AGAIN': // DNS lookup timed out error, let's hold back for 5 seconds
backoffSeconds = 5;
queue.enqueue(data, 1);
break;
case 'ECONNRESET':
queue.enqueue(data, 1);
break;
case 'UND_ERR_CONNECT_TIMEOUT':
queue.enqueue(data, 1);
break;
default:
data.reject(response);
}
}
}
export function debugLog(ctx) {
console.log(`--- ${Date()} -----------------------------------------------------------------------------`);
console.log(JSON.stringify(ctx, null, 2));
}
export function sleep(delay) {
return new Promise((resolve) => setTimeout(resolve, delay))
}
|