[golang] fixed golang api client design mistakes
This commit is contained in:
parent
d0f6c4343e
commit
0d00bf9fd2
9 changed files with 268 additions and 149 deletions
|
@ -2,86 +2,143 @@ package api
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"container/heap"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Error[T any] struct {
|
||||
Code int `json:"code"`
|
||||
Data T `json:"data"`
|
||||
Message string `json:"message"`
|
||||
type APIError struct {
|
||||
Code int `json:"code"`
|
||||
Data json.RawMessage `json:"data"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type APIMessage[T any, E any] struct {
|
||||
Data T `json:"data"`
|
||||
Error Error[E] `json:"error"`
|
||||
func (e *APIError) Error() string {
|
||||
return fmt.Sprintf("unhandled APIError code %d, message \"%s\", data: %s", e.Code, e.Message, string(e.Data))
|
||||
}
|
||||
|
||||
type APIMessage struct {
|
||||
Data json.RawMessage `json:"data"`
|
||||
Error *APIError `json:"error"`
|
||||
//meta
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
index int
|
||||
priority int
|
||||
|
||||
method string
|
||||
uri *url.URL
|
||||
payload any
|
||||
responseChannel chan *Response
|
||||
}
|
||||
|
||||
type Response struct {
|
||||
Response []byte
|
||||
Err error
|
||||
Message *APIMessage
|
||||
Err error
|
||||
}
|
||||
|
||||
func Send[T any](c *Client, method, path string, payload any) (message APIMessage[T, any], err error) {
|
||||
resp := make(chan *Response)
|
||||
c.channel <- &Request{
|
||||
method: method,
|
||||
path: path,
|
||||
payload: payload,
|
||||
priority: 10,
|
||||
resp: resp,
|
||||
func (c *Client) Send(method string, uriRef *url.URL, payload any) (*APIMessage, error) {
|
||||
responseChannel := make(chan *Response)
|
||||
c.requestsChannel <- &Request{
|
||||
method: method,
|
||||
payload: payload,
|
||||
priority: 10,
|
||||
responseChannel: responseChannel,
|
||||
uri: c.baseURI.ResolveReference(uriRef),
|
||||
}
|
||||
res := <-resp
|
||||
if res.Err != nil {
|
||||
return message, res.Err
|
||||
}
|
||||
err = json.Unmarshal(res.Response, &message)
|
||||
return message, err
|
||||
res := <-responseChannel
|
||||
return res.Message, res.Err
|
||||
}
|
||||
|
||||
func (c *Client) sendOne(method, path string, payload any) (body []byte, err error) {
|
||||
slog.Debug("Request", "method", method, "path", path, "payload", payload)
|
||||
var req *http.Request
|
||||
if payload != nil {
|
||||
body, err = json.Marshal(payload)
|
||||
if err == nil {
|
||||
req, err = http.NewRequest(method, c.baseURL+path, bytes.NewBuffer(body))
|
||||
} else {
|
||||
return nil, err
|
||||
func queueProcessor(client *Client) {
|
||||
var (
|
||||
req *Request
|
||||
ok bool
|
||||
)
|
||||
for {
|
||||
// The queue is empty so we do this blocking call
|
||||
select {
|
||||
case <-client.ctx.Done():
|
||||
return
|
||||
case req, ok = <-client.requestsChannel:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
heap.Push(client.pq, req)
|
||||
}
|
||||
// we enqueue all values read from the channel and process the queue's
|
||||
// contents until empty. We keep reading the channel as long as this
|
||||
// emptying goes on
|
||||
for {
|
||||
select {
|
||||
case <-client.ctx.Done():
|
||||
return
|
||||
case req, ok = <-client.requestsChannel:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
heap.Push(client.pq, req)
|
||||
default:
|
||||
if client.pq.Len() == 0 {
|
||||
break
|
||||
}
|
||||
// we process one
|
||||
if req, ok = heap.Pop(client.pq).(*Request); !ok {
|
||||
panic("queueProcessor got something other than a Request on its channel")
|
||||
}
|
||||
msg, err := client.sendOne(req.method, req.uri, req.payload)
|
||||
req.responseChannel <- &Response{
|
||||
Message: msg,
|
||||
Err: err,
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
req, err = http.NewRequest(method, c.baseURL+path, nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) sendOne(method string, uri *url.URL, payload any) (*APIMessage, error) {
|
||||
slog.Debug("request", "method", method, "path", uri.Path, "payload", payload)
|
||||
var payloadReader io.Reader
|
||||
if payload != nil {
|
||||
if body, err := json.Marshal(payload); err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal payload: %w", err)
|
||||
} else {
|
||||
payloadReader = bytes.NewReader(body)
|
||||
}
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(c.ctx, method, uri.String(), payloadReader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
req.Header = *c.headers
|
||||
req = req.WithContext(c.ctx)
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
slog.Error("sendOne Do", "method", method, "path", path, "error", err)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to do request: %w", err)
|
||||
}
|
||||
defer func() {
|
||||
if e := resp.Body.Close(); err == nil {
|
||||
err = e
|
||||
}
|
||||
}()
|
||||
if body, err = io.ReadAll(resp.Body); err != nil {
|
||||
slog.Error("sendOne ReadAll", "method", method, "path", path, "error", err)
|
||||
return nil, err
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
slog.Debug("Response", "body", string(body))
|
||||
|
||||
var msg APIMessage
|
||||
if err = json.Unmarshal(body, &msg); err != nil {
|
||||
return nil, fmt.Errorf("failed to unmarshal response body: %w", err)
|
||||
}
|
||||
slog.Debug("response", "code", resp.StatusCode, "message", msg)
|
||||
switch resp.StatusCode {
|
||||
case 429:
|
||||
e := decode429(body)
|
||||
time.Sleep(time.Duration(e.Error.Data.RetryAfter * float64(time.Second)))
|
||||
return c.sendOne(method, path, payload)
|
||||
e := decodeRateLimitError(msg.Error.Data)
|
||||
time.Sleep(e.RetryAfter.Duration() * time.Second)
|
||||
return c.sendOne(method, uri, payload)
|
||||
}
|
||||
return body, nil
|
||||
return &msg, nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue