summaryrefslogtreecommitdiff
path: root/nodejs/lib/api.ts
blob: f023ccf0e60c2a62ff77e02c6ba7ba1c5aa91467 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import fs from 'fs';
import events from 'events';

import { PriorityQueue } from './priority_queue.ts';
import { getToken } from '../database/tokens.ts';
import { APIError, Request, RequestPromise, Response } from '../model/api.ts';
import { RateLimitError } from '../model/errors.ts';

// queue processor module variables
const bus = new events.EventEmitter(); // a bus to notify the queue processor to start processing messages
let busy = false; // true if we are already sending api requests.
let backoffSeconds = 0;
let running = false;
// other module variables
let headers: {[key:string]:string}|null = null; // a file scoped variable so that we only evaluate these once.
let queue = new PriorityQueue(); // a priority queue to hold api calls we want to send, allows for throttling.

async function queue_processor(): Promise<void> {
	if (running) {
		throw 'refusing to start a second queue processor';
	}
	running = true;
	while(true) {
		try {
			if (backoffSeconds > 0) {
				await sleep(backoffSeconds * 1000);
				backoffSeconds = 0;
			}
			if (queue.isEmpty()) {
				busy = false;
				await new Promise(resolve => bus.once('send', resolve));
				busy = true;
			}
			const before = new Date().getTime();
			const data = queue.dequeue() as RequestPromise<unknown>;
			await send_this(data);
			const duration = new Date().getTime() - before;
			if (duration < 400) { // 333 should work, but 400 should still allow some manual requests to go through during development
				await sleep(400 - duration);
			}
		} catch (e) {
			running = false;
			throw e;
		}
	}
}
queue_processor();

export async function send<T>(request: Request): Promise<Response<T>> {
	return new Promise((resolve, reject) => {
		const data: RequestPromise<T> = {
			reject: reject,
			request: request,
			resolve: resolve,
		};
		queue.enqueue(data, request.priority ? request.priority : 10);
		if (!busy) {
			bus.emit('send'); // the queue was previously empty, let's wake up the queue_processor
		}
	});
}

export async function sendPaginated<T>(request: Request): Promise<Array<T>> {
	if (request.page === undefined) request.page = 1;
	let ret: Array<T> = [];
	while (true) {
		const response = await send<T>(request);
		if (response.meta === undefined) {
			throw {"message": "paginated request did not return a meta block", "request": request, "response": response};
		} else if (response.error) {
			throw {"message": "paginated request returned an error", "request": request, "response": response};
		}
		ret = ret.concat(response.data);
		if (response.meta.limit * response.meta.page >= response.meta.total) {
			return ret;
		}
		request.page++;
	}
}

// send_this take a data object as argument built in the send function above
async function send_this(data: RequestPromise<unknown>): Promise<void> {
	if (headers === null) {
		const token = getToken();
		if (token === null) {
			throw 'Could not get token from the database. Did you init or register yet?';
		}
		headers = {
			'Content-Type': 'application/json',
			'Authorization': `Bearer ${token}`
		};
	}
	let options: {[key:string]:any} = {
		headers: headers,
	};
	if (data.request.method !== undefined) {
		options['method'] = data.request.method;
	}
	if (data.request.payload !== undefined) {
		options['body'] = JSON.stringify(data.request.payload);
	}
	let pagination = "";
	if (data.request.page !== undefined) {
		pagination=`?limit=20&page=${data.request.page}`;
	}
	fs.writeFileSync('log', JSON.stringify({event: 'send', date: new Date(), data: data}) + '\n', {flag: 'a+'});
	try {
		const response = await fetch(`https://api.spacetraders.io/v2${data.request.endpoint}${pagination}`, options);
		const json = await response.json() as Response<unknown>;
		switch(json.error?.code) {
			case 401: // TODO 401 means a server reset happened
				throw json;
				// TODO reject all promises in queue
				// reset database
				// logrotate
				// spawnSync?
				// break;
			case 429:  // 429 means rate limited, let's hold back as instructed
				const errorData = json.error.data as RateLimitError;
				backoffSeconds = errorData.retryAfter;
				queue.enqueue(data, 1);
				break;
			case 503:  // 503 means maintenance mode, let's hold back for 1 minute
				backoffSeconds = 60;
				queue.enqueue(data, 1);
				break;
			default: // no error!
				fs.writeFileSync('log', JSON.stringify({event: 'response', date: new Date(), data: json}) + '\n', {flag: 'a+'});
				return data.resolve(json);
		}
	} catch (_err) {
		const err = _err as {cause?: {code: string}};
		fs.writeFileSync('log', JSON.stringify({event: 'error', date: new Date(), data: err}) + '\n', {flag: 'a+'});
		switch(err.cause?.code) {
			case 'EAI_AGAIN': // DNS lookup timed out error, let's hold back for 5 seconds
				backoffSeconds = 5;
				queue.enqueue(data, 1);
				break;
			case 'ECONNRESET':
				queue.enqueue(data, 1);
				break;
			case 'UND_ERR_CONNECT_TIMEOUT':
				queue.enqueue(data, 1);
				break;
			default:
				data.reject(err);
		}
	}
}

export function debugLog(ctx: any): void {
	console.log(`--- ${Date()} -----------------------------------------------------------------------------`);
	console.log(JSON.stringify(ctx, null, 2));
}

export function sleep(delay: number): Promise<unknown> {
	return new Promise((resolve) => setTimeout(resolve, delay));
}