From 0d00bf9fd242d3b4f9f3d6caaf5a61f8a3627d7f Mon Sep 17 00:00:00 2001 From: Julien Dessaux Date: Tue, 28 May 2024 13:13:13 +0200 Subject: [golang] fixed golang api client design mistakes --- golang/pkg/api/api.go | 153 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 105 insertions(+), 48 deletions(-) (limited to 'golang/pkg/api/api.go') diff --git a/golang/pkg/api/api.go b/golang/pkg/api/api.go index 662bb87..7aee2fa 100644 --- a/golang/pkg/api/api.go +++ b/golang/pkg/api/api.go @@ -2,86 +2,143 @@ package api import ( "bytes" + "container/heap" "encoding/json" + "fmt" "io" "log/slog" "net/http" + "net/url" "time" ) -type Error[T any] struct { - Code int `json:"code"` - Data T `json:"data"` - Message string `json:"message"` +type APIError struct { + Code int `json:"code"` + Data json.RawMessage `json:"data"` + Message string `json:"message"` } -type APIMessage[T any, E any] struct { - Data T `json:"data"` - Error Error[E] `json:"error"` +func (e *APIError) Error() string { + return fmt.Sprintf("unhandled APIError code %d, message \"%s\", data: %s", e.Code, e.Message, string(e.Data)) +} + +type APIMessage struct { + Data json.RawMessage `json:"data"` + Error *APIError `json:"error"` //meta } +type Request struct { + index int + priority int + + method string + uri *url.URL + payload any + responseChannel chan *Response +} + type Response struct { - Response []byte - Err error + Message *APIMessage + Err error } -func Send[T any](c *Client, method, path string, payload any) (message APIMessage[T, any], err error) { - resp := make(chan *Response) - c.channel <- &Request{ - method: method, - path: path, - payload: payload, - priority: 10, - resp: resp, +func (c *Client) Send(method string, uriRef *url.URL, payload any) (*APIMessage, error) { + responseChannel := make(chan *Response) + c.requestsChannel <- &Request{ + method: method, + payload: payload, + priority: 10, + responseChannel: responseChannel, + uri: c.baseURI.ResolveReference(uriRef), } - res := <-resp - if res.Err != nil { - return message, res.Err + res := <-responseChannel + return res.Message, res.Err +} + +func queueProcessor(client *Client) { + var ( + req *Request + ok bool + ) + for { + // The queue is empty so we do this blocking call + select { + case <-client.ctx.Done(): + return + case req, ok = <-client.requestsChannel: + if !ok { + return + } + heap.Push(client.pq, req) + } + // we enqueue all values read from the channel and process the queue's + // contents until empty. We keep reading the channel as long as this + // emptying goes on + for { + select { + case <-client.ctx.Done(): + return + case req, ok = <-client.requestsChannel: + if !ok { + return + } + heap.Push(client.pq, req) + default: + if client.pq.Len() == 0 { + break + } + // we process one + if req, ok = heap.Pop(client.pq).(*Request); !ok { + panic("queueProcessor got something other than a Request on its channel") + } + msg, err := client.sendOne(req.method, req.uri, req.payload) + req.responseChannel <- &Response{ + Message: msg, + Err: err, + } + } + } } - err = json.Unmarshal(res.Response, &message) - return message, err } -func (c *Client) sendOne(method, path string, payload any) (body []byte, err error) { - slog.Debug("Request", "method", method, "path", path, "payload", payload) - var req *http.Request +func (c *Client) sendOne(method string, uri *url.URL, payload any) (*APIMessage, error) { + slog.Debug("request", "method", method, "path", uri.Path, "payload", payload) + var payloadReader io.Reader if payload != nil { - body, err = json.Marshal(payload) - if err == nil { - req, err = http.NewRequest(method, c.baseURL+path, bytes.NewBuffer(body)) + if body, err := json.Marshal(payload); err != nil { + return nil, fmt.Errorf("failed to marshal payload: %w", err) } else { - return nil, err + payloadReader = bytes.NewReader(body) } - } else { - req, err = http.NewRequest(method, c.baseURL+path, nil) } + + req, err := http.NewRequestWithContext(c.ctx, method, uri.String(), payloadReader) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to create request: %w", err) } req.Header = *c.headers - req = req.WithContext(c.ctx) resp, err := c.httpClient.Do(req) if err != nil { - slog.Error("sendOne Do", "method", method, "path", path, "error", err) - return nil, err + return nil, fmt.Errorf("failed to do request: %w", err) } - defer func() { - if e := resp.Body.Close(); err == nil { - err = e - } - }() - if body, err = io.ReadAll(resp.Body); err != nil { - slog.Error("sendOne ReadAll", "method", method, "path", path, "error", err) - return nil, err + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + var msg APIMessage + if err = json.Unmarshal(body, &msg); err != nil { + return nil, fmt.Errorf("failed to unmarshal response body: %w", err) } - slog.Debug("Response", "body", string(body)) + slog.Debug("response", "code", resp.StatusCode, "message", msg) switch resp.StatusCode { case 429: - e := decode429(body) - time.Sleep(time.Duration(e.Error.Data.RetryAfter * float64(time.Second))) - return c.sendOne(method, path, payload) + e := decodeRateLimitError(msg.Error.Data) + time.Sleep(e.RetryAfter.Duration() * time.Second) + return c.sendOne(method, uri, payload) } - return body, nil + return &msg, nil } -- cgit v1.2.3