Commit b96c69c7 authored by Bengfort's avatar Bengfort
Browse files

implement message broker in go

parent aae61ee8
Pipeline #10995 passed with stages
in 1 minute and 47 seconds
......@@ -27,3 +27,6 @@ test:
makemessages:
$(MANAGEPY) makemessages -l de -d django
$(MANAGEPY) makemessages -l de -d djangojs
%: %.go
go build -buildmode=pie -ldflags="-s -w" -o $@ $<
// This service translates normal HTTP to a custom protocol here requests are
// sent as Server-Sent-Events (SSE) and responses are sent as HTTP POST
// requests.
//
// This is useful to send requests to services behind a firewall.
//
// # establish SSE connection
// curl "http://localhost:8001/castellum/"
//
// # send a request (blocks until response is available or timeout)
// curl "http://localhost:8001/scheduler/" --data foo
//
// # send a response
// curl "http://localhost:8001/castellum/?id=1&success" --data bar
package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"strconv"
"sync"
"time"
)
type RequestMsg struct {
id int
Data []byte
}
type ResponseMsg struct {
success bool
Data []byte
}
var mux = &sync.RWMutex{}
var verbose = false
var connected = false
var lastId = 1
var jobs = make(map[int](chan ResponseMsg))
var sse = make(chan RequestMsg)
func releaseConnection() {
connected = false
}
func createJob() (int, chan ResponseMsg) {
mux.Lock()
defer mux.Unlock()
lastId += 1
ch := make(chan ResponseMsg)
jobs[lastId] = ch
return lastId, ch
}
func deleteJob(id int) {
mux.Lock()
delete(jobs, id)
mux.Unlock()
}
func schedulerPost(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("error reading request body:", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
id, ch := createJob()
defer deleteJob(id)
sse <- RequestMsg{id, body}
ctx := r.Context()
timeout := time.NewTimer(10 * time.Second)
select {
case <-ctx.Done():
return
case <-timeout.C:
http.Error(w, "", http.StatusGatewayTimeout)
case msg := <-ch:
if !msg.success {
http.Error(w, "", http.StatusInternalServerError)
}
w.Write(msg.Data)
}
}
func parseUrl(r *http.Request) (int, bool, bool) {
query := r.URL.Query()
ids, ok := query["id"]
if !ok {
return 0, false, false
}
if len(ids) != 1 {
return 0, false, false
}
id, err := strconv.Atoi(ids[0])
if err != nil {
return 0, false, false
}
_, success := query["success"]
return id, success, true
}
func castellumPost(w http.ResponseWriter, r *http.Request) {
id, success, ok := parseUrl(r)
if !ok {
http.Error(w, "", http.StatusNotFound)
return
}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("error reading request body:", err)
http.Error(w, "", http.StatusInternalServerError)
return
}
mux.RLock()
ch, ok := jobs[id]
mux.RUnlock()
if !ok {
http.Error(w, "", http.StatusNotFound)
return
}
ch <- ResponseMsg{success, body}
}
func castellumGet(w http.ResponseWriter, r *http.Request) {
if connected {
http.Error(w, "", http.StatusInternalServerError)
return
} else {
connected = true
defer releaseConnection()
}
ctx := r.Context()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
case msg := <-sse:
fmt.Fprintf(w, "id: %d\ndata: %s\n\n", msg.id, msg.Data)
flusher.Flush()
}
}
}
func handler(w http.ResponseWriter, r *http.Request) {
if verbose {
log.Println(r.Method, r.URL)
}
if r.URL.Path == "/castellum/" {
if r.Method == http.MethodGet {
castellumGet(w, r)
} else if r.Method == http.MethodPost {
castellumPost(w, r)
} else {
http.Error(w, "", http.StatusMethodNotAllowed)
}
} else if r.URL.Path == "/scheduler/" {
if r.Method == http.MethodPost {
schedulerPost(w, r)
} else {
http.Error(w, "", http.StatusMethodNotAllowed)
}
} else {
http.Error(w, "", http.StatusNotFound)
}
}
func main() {
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "foo [-v] [port]\n")
flag.PrintDefaults()
}
flag.BoolVar(&verbose, "v", false, "enable verbose logs")
flag.Parse()
addr := "localhost:8001"
if len(flag.Args()) > 0 {
addr = fmt.Sprintf("localhost:%s", flag.Args()[0])
}
http.HandleFunc("/", handler)
log.Printf("Serving on http://%s", addr)
log.Fatal(http.ListenAndServe(addr, nil))
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment