From 3e9dc244e2ee33aa5a722e1ccb581b7d4ddd1e6c Mon Sep 17 00:00:00 2001 From: ookjosh Date: Mon, 10 Nov 2025 07:54:36 -0700 Subject: [PATCH] Initial commit --- .gitignore | 3 + README.md | 40 +++++ birdwhisperer/birdwhisperer.go | 243 ++++++++++++++++++++++++++ birdwhisperer/birdwhispererer_test.go | 97 ++++++++++ birdwhisperer/go.mod | 3 + db/query.sql | 0 db/schema.sql | 0 db/sqlc.yaml | 11 ++ go.mod | 9 + main.go | 150 ++++++++++++++++ 10 files changed, 556 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 birdwhisperer/birdwhisperer.go create mode 100644 birdwhisperer/birdwhispererer_test.go create mode 100644 birdwhisperer/go.mod create mode 100644 db/query.sql create mode 100644 db/schema.sql create mode 100644 db/sqlc.yaml create mode 100644 go.mod create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d4620b1 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ + +# Binary +roost diff --git a/README.md b/README.md new file mode 100644 index 0000000..47b1cf4 --- /dev/null +++ b/README.md @@ -0,0 +1,40 @@ +# Roost +> Where the birds come home to sleep. + +Data ingest server, providing an endpoint for nodes to send data for archival. + +## Go Details +Currently 1.25.3, will stay updated as quickly as possible. Go is *very* good with backwards compatibility so breaking changes are unlikely. + +[Install Go](https://go.dev/learn/) + +Once go is installed, `go version` should return >= 1.25.3. + +### Install Dependencies +Go will take care of downloading the actual library dependencies (no need for an `npm i` equivalent) but we have some tooling dependencies as well. + +- sqlc is a code gen library that gives type-safe APIs against a schema at compile time. Very handy + - `go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest` + +### Learn Go +[Use Go By Example](gobyexample.com) - it is an incredible site and will get up to speed quickly. Go is a very simple language so most of the points are easy to look up. + +## To run `roost`: + +- `cd path/to/roost` +- `go run .` + +## To build `roost` for your current target: + +- `cd path/to/roost` +- `go build . [-o binary-name]` + - Default binary name is `roost`, in the current directory + +## To build for different target: +Go is statically linked (and all the code we use is as well, no additional runtime libraries required), so it is trivial to compile on one architecture and copy the binary over to another. + +- `cd path/to/roost` +- `go tool dist list` +- Using the relevant pair above (e.g., `linux/amd64`): + - `GOOS=linux GOARCH=amd64 go build -o roost-linux-amd64` + - The `-o` names the output binary, it could be `bob` diff --git a/birdwhisperer/birdwhisperer.go b/birdwhisperer/birdwhisperer.go new file mode 100644 index 0000000..f7fd604 --- /dev/null +++ b/birdwhisperer/birdwhisperer.go @@ -0,0 +1,243 @@ +package birdwhisperer + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "log/slog" + "net" + "time" +) + +type Packet struct { + TargetAddr *net.UDPAddr + Buffer []byte +} + +type ChunkHeaderV1 struct { + MessageID uint16 + Length uint16 + Version uint8 + Index uint16 + NumChunks uint16 + HeaderLength uint8 +} + +func (h *ChunkHeaderV1) toBytes() []byte { + result := make([]byte, CHUNK_V1_HEADER_LENGTH) + binary.BigEndian.PutUint16(result[0:2], h.MessageID) + binary.BigEndian.PutUint16(result[2:4], h.Length) + result[CHUNK_HEADER_VERSION_OFFSET] = 1 + binary.BigEndian.PutUint16(result[5:7], h.Index) + binary.BigEndian.PutUint16(result[7:9], h.NumChunks) + result[8] = CHUNK_V1_HEADER_LENGTH + + return result +} + +func (h *ChunkHeaderV1) toHexString() []byte { + return []byte(hex.EncodeToString(h.toBytes())) +} + +type ReconstructedMessage struct { + Chunks []Packet + ReceivedTime int64 + CompletedTime int64 + NumChunks uint16 + ReceivedChunks uint16 +} + +const CHUNK_HEADER_VERSION_OFFSET = 4 +const CHUNK_V1_HEADER_LENGTH = 9 +const RECONSTRUCTED_BUFFER_SIZE_ALERT = 1e8 + +// Based on an MTU of 1500 bytes, with some flexibility for overhead +const CHUNK_MAX_LENGTH = 1400 + +type BirdWhisperer struct { + ChunkRecord map[uint16]ReconstructedMessage +} + +func NewBirdWhisperer() BirdWhisperer { + return BirdWhisperer{ + ChunkRecord: make(map[uint16]ReconstructedMessage), + } +} + +func ReconstructMessageFromChunks(chunkRecord ReconstructedMessage) ([]byte, error) { + reconstructedLength := 0 + for i := range len(chunkRecord.Chunks) { + reconstructedLength += len(chunkRecord.Chunks[i].Buffer) + } + + if reconstructedLength > RECONSTRUCTED_BUFFER_SIZE_ALERT { + // We have some huge thing... + return nil, fmt.Errorf("Attempted to reconstruct message bigger than allowed size: [%d] bytes", reconstructedLength) + } + + reconstructedMessage := make([]byte, reconstructedLength) + + offset := 0 + for i := range len(chunkRecord.Chunks) { + chunkLength := len(chunkRecord.Chunks[i].Buffer) + copy(reconstructedMessage[offset:offset+chunkLength], chunkRecord.Chunks[i].Buffer[:]) + offset += chunkLength + } + + return reconstructedMessage, nil +} + +func (bw *BirdWhisperer) PruneChunkRecord(currentTimestamp int64) { + const CHUNK_PRUNING_TIMEOUT_MS = 30_000 + const CHUNK_RECEIVE_COMPLETE_TIMEOUT_MS = 60_000 * 3 + for k, v := range bw.ChunkRecord { + if v.CompletedTime > 0 && currentTimestamp-v.CompletedTime > CHUNK_PRUNING_TIMEOUT_MS { + // Remove this from map - safe to do in loop + delete(bw.ChunkRecord, k) + } else if v.ReceivedTime > 0 && v.CompletedTime == 0 && currentTimestamp-v.ReceivedTime > CHUNK_RECEIVE_COMPLETE_TIMEOUT_MS { + // Remove messages that were never completed after enough time + delete(bw.ChunkRecord, k) + } + } +} + +func (bw *BirdWhisperer) ReceiveChunk(chunk Packet, completedMessages chan []byte) error { + + if len(chunk.Buffer) <= CHUNK_HEADER_VERSION_OFFSET { + return fmt.Errorf("Packet not long enough to be a chunk!") + } + + switch chunk.Buffer[CHUNK_HEADER_VERSION_OFFSET] { + case 1: + header, err := ParseChunkHeaderV1(chunk) + if err != nil { + return err + } + + _, exists := bw.ChunkRecord[header.MessageID] + if !exists { + bw.ChunkRecord[header.MessageID] = ReconstructedMessage{ + make([]Packet, header.NumChunks), + time.Now().Unix(), + 0, + header.NumChunks, + 0, + } + } + + if chunkRecord, ok := bw.ChunkRecord[header.MessageID]; ok { + if chunkRecord.CompletedTime > 0 { + // Already received all of the chunks for this message + return nil + } + if chunkRecord.Chunks[header.Index].TargetAddr == nil { + // New chunk for existing message + chunkRecord.Chunks[header.Index].Buffer = chunk.Buffer[header.Length : header.Length+uint16(header.HeaderLength)] + chunkRecord.ReceivedChunks += 1 + // Update map + bw.ChunkRecord[header.MessageID] = chunkRecord + } + // If we have all chunks reconstruct the message and pass it along for processing + if chunkRecord.ReceivedChunks == chunkRecord.NumChunks { + // Have all chunks + msg, err := ReconstructMessageFromChunks(chunkRecord) + + if err != nil { + return err + } + chunkRecord.CompletedTime = time.Now().Unix() + // Update in map + bw.ChunkRecord[header.MessageID] = chunkRecord + + slog.Debug("Received all chunks of a message", "firstReceived", chunkRecord.ReceivedTime, "lastReceived", chunkRecord.CompletedTime) + // Pass on to other things + completedMessages <- msg + } + + } else { + return fmt.Errorf("Critical: Chunk record does not exist for message ID") + } + + if int(header.Length+uint16(header.HeaderLength)) < len(chunk.Buffer) { + // There is more data in our incoming chunk, perhaps another message appended + bw.ReceiveChunk(Packet{TargetAddr: chunk.TargetAddr, Buffer: chunk.Buffer[header.Length+uint16(header.HeaderLength):]}, completedMessages) + } + + default: + return fmt.Errorf("Packet has unsupported chunk version") + } + + return nil +} + +func ParseChunkHeaderV1(chunk Packet) (header ChunkHeaderV1, err error) { + if len(chunk.Buffer) < CHUNK_V1_HEADER_LENGTH { + return header, fmt.Errorf("Malformed chunk") + } + + // We chould check the version beforehand, but it doesn't cost + // much to just have it parsed and then check anyway and this + // makes the numeric ordering clear + header.MessageID = binary.BigEndian.Uint16(chunk.Buffer[0:2]) + header.Length = binary.BigEndian.Uint16(chunk.Buffer[2:4]) + header.Version = chunk.Buffer[CHUNK_HEADER_VERSION_OFFSET] + header.Index = binary.BigEndian.Uint16(chunk.Buffer[5:7]) + header.NumChunks = binary.BigEndian.Uint16(chunk.Buffer[7:9]) + header.HeaderLength = CHUNK_V1_HEADER_LENGTH + + if header.Version != 1 { + return header, fmt.Errorf("Not a V1 header!") + } + + return +} + +/* + * message: Array of *chars* + */ +func ChunkMessage(message []byte, messageId uint16) [][]byte { + messageLen := len(message) + numChunks := uint16(messageLen / CHUNK_MAX_LENGTH) + leftoverBytes := uint16(messageLen % CHUNK_MAX_LENGTH) + if leftoverBytes > 0 { + numChunks += 1 + } + + // +1 due to flooring + messageChunks := make([][]byte, numChunks) + + consumedBytes := 0 + + // Loop through all full-length chunks + for i := range numChunks - 1 { + // TODO: When remaining bytes < CHUNK_MAX_LENGTH, does it set length to max anyway? Other edges + thisHeader := ChunkHeaderV1{ + MessageID: messageId, + Index: i + 1, + NumChunks: numChunks + 1, + Length: CHUNK_MAX_LENGTH + CHUNK_V1_HEADER_LENGTH, + Version: 1, + HeaderLength: CHUNK_V1_HEADER_LENGTH, + } + + // Header + portion of data it represents + messageChunks[i] = append(thisHeader.toBytes(), message[(CHUNK_MAX_LENGTH*i):(CHUNK_MAX_LENGTH*(i+1))]...) + consumedBytes += CHUNK_MAX_LENGTH + } + + // This handles numChunks == 0 (i.e. single message) or any remaining bytes + if leftoverBytes > 0 { + thisHeader := ChunkHeaderV1{ + MessageID: messageId, + Index: numChunks + 1, + NumChunks: numChunks + 1, + Length: leftoverBytes + CHUNK_V1_HEADER_LENGTH, + Version: 1, + HeaderLength: CHUNK_V1_HEADER_LENGTH, + } + + messageChunks[len(messageChunks)-1] = append(thisHeader.toBytes(), message[consumedBytes:]...) + } + + return messageChunks +} diff --git a/birdwhisperer/birdwhispererer_test.go b/birdwhisperer/birdwhispererer_test.go new file mode 100644 index 0000000..cb46133 --- /dev/null +++ b/birdwhisperer/birdwhispererer_test.go @@ -0,0 +1,97 @@ +package birdwhisperer + +import ( + "testing" +) + +func TestPruningChunkRecord(t *testing.T) { + bw := NewBirdWhisperer() + + incompleteMessageNotExpired := ReconstructedMessage{ + ReceivedTime: 195_000, // "Just started" + CompletedTime: 0, + NumChunks: 2, + ReceivedChunks: 1, + } + incompleteMessageExpired := ReconstructedMessage{ + ReceivedTime: 100, // Started a long time ago + CompletedTime: 0, + NumChunks: 2, + ReceivedChunks: 1, + } + completeMessageNotExpired := ReconstructedMessage{ + ReceivedTime: 100, + CompletedTime: 199_000, // "Just finished" + NumChunks: 2, + ReceivedChunks: 2, + } + completeMessageExpired := ReconstructedMessage{ + ReceivedTime: 100, + CompletedTime: 10_000, // Finished a long time ago + NumChunks: 2, + ReceivedChunks: 2, + } + + bw.ChunkRecord[0] = incompleteMessageNotExpired + bw.ChunkRecord[1] = incompleteMessageExpired + bw.ChunkRecord[2] = completeMessageNotExpired + bw.ChunkRecord[3] = completeMessageExpired + + bw.PruneChunkRecord(200_000) + + if _, exists := bw.ChunkRecord[0]; !exists { + t.Errorf("Pruned an incomplete message that hasn't expired yet") + } + + if _, exists := bw.ChunkRecord[1]; exists { + t.Errorf("Didn't prune an incomplete message that expired") + } + if _, exists := bw.ChunkRecord[2]; !exists { + t.Errorf("Pruned an complete message that hasn't expired yet") + } + if _, exists := bw.ChunkRecord[3]; exists { + t.Errorf("Didn't prune an incomplete message that expired") + } +} + +func TestChunkMessageSingleChunk(t *testing.T) { + messageId := uint16(123) + message := make([]byte, CHUNK_MAX_LENGTH-1) + chunks := ChunkMessage(message, messageId) + + if len(chunks) != 1 { + t.Errorf("A message that should be 1 chunk is actually %d chunks", len(chunks)) + } +} + +func TestChunkMessageMultipleChunks(t *testing.T) { + messageId := uint16(123) + message := make([]byte, CHUNK_MAX_LENGTH*2) + chunks := ChunkMessage(message, messageId) + + if len(chunks) != 2 { + t.Errorf("A message that should be 2 chunks is actually %d chunks", len(chunks)) + } +} + +func TestChunkMessageMultipleChunksAndSome(t *testing.T) { + messageId := uint16(123) + extraDataLen := 1 + message := make([]byte, CHUNK_MAX_LENGTH*2+extraDataLen) + chunks := ChunkMessage(message, messageId) + + if len(chunks) != 3 { + t.Errorf("A message that should be 3 chunks is actually %d chunks", len(chunks)) + } + + const expectedChunkLength = CHUNK_MAX_LENGTH + CHUNK_V1_HEADER_LENGTH + if len(chunks[0]) != expectedChunkLength { + t.Errorf("Chunk 0 should be %d, is %d", expectedChunkLength, len(chunks[0])) + } + if len(chunks[1]) != expectedChunkLength { + t.Errorf("Chunk 1 should be %d, is %d", expectedChunkLength, len(chunks[1])) + } + if len(chunks[2]) != extraDataLen+CHUNK_V1_HEADER_LENGTH { + t.Errorf("Chunk 2 should be %d, is %d", extraDataLen+CHUNK_V1_HEADER_LENGTH, len(chunks[2])) + } +} diff --git a/birdwhisperer/go.mod b/birdwhisperer/go.mod new file mode 100644 index 0000000..b89640b --- /dev/null +++ b/birdwhisperer/go.mod @@ -0,0 +1,3 @@ +module jjb.dev/birdwhisperer + +go 1.25.3 diff --git a/db/query.sql b/db/query.sql new file mode 100644 index 0000000..e69de29 diff --git a/db/schema.sql b/db/schema.sql new file mode 100644 index 0000000..e69de29 diff --git a/db/sqlc.yaml b/db/sqlc.yaml new file mode 100644 index 0000000..87520f9 --- /dev/null +++ b/db/sqlc.yaml @@ -0,0 +1,11 @@ +version: "2" +sql: + - engine: "mysql" + queries: "query.sql" + schema: "schema.sql" + gen: + go: + # go package name to use + package: "roost_mysql" + # folder to generate + out: "mysql" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..1fd71dd --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module jjb.dev/roost + +go 1.25.3 + +require ( + jjb.dev/birdwhisperer v0.0.0 +) + +replace jjb.dev/birdwhisperer => ./birdwhisperer diff --git a/main.go b/main.go new file mode 100644 index 0000000..a733be8 --- /dev/null +++ b/main.go @@ -0,0 +1,150 @@ +/* +* + + Roost - where the birds come to sleep. + + Network ingest point for remote sensor modules. +*/ +package main + +import ( + "fmt" + "log/slog" + "net" + "os" + "os/signal" + "syscall" + "time" + + "jjb.dev/birdwhisperer" +) + +const LOG_DIR = "./data-archive" + +func runServer(ip string, port int) { + // Unbuffered channel; can add a buffer if other complexity results in drops + outbox := make(chan birdwhisperer.Packet) + // Max MTU is 64K for IPv4 + receiveBuffer := make([]byte, 1<<16) + + hostAddr := net.UDPAddr{ + Port: port, + IP: net.ParseIP(ip), + } + + udpConnection, err := net.ListenUDP("udp", &hostAddr) + if err != nil { + slog.Error("Failed to create UDP server!", "error", err) + } + + // Service our outbox + go serviceOutbox(outbox, udpConnection) + + for { + messageLength, remoteAddr, err := udpConnection.ReadFromUDP(receiveBuffer) + + if err != nil { + slog.Error("Failed to read from UDP", "error", err) + continue + } + + msg := make([]byte, messageLength) + copy(msg, receiveBuffer[:messageLength]) + + packet := birdwhisperer.Packet{TargetAddr: remoteAddr, Buffer: msg} + outbox <- packet + } +} + +/** + * Listens for packets on the channel and writes them using the provided udpConnection + */ +func serviceOutbox(channel <-chan birdwhisperer.Packet, udpConnection *net.UDPConn) { + for { + packet := <-channel + _, err := udpConnection.WriteToUDP(packet.Buffer, packet.TargetAddr) + if err != nil { + slog.Error("Failed to send UDP response", "error", err, "target_address", packet.TargetAddr.String()) + } + } +} + +/* + * Handles a message that originates from an external source + */ +func processProbeMessages(channel <-chan birdwhisperer.Packet) { + var logFile *os.File = nil + var err error = nil + var currentLogDate string = "" + + var bw birdwhisperer.BirdWhisperer = birdwhisperer.NewBirdWhisperer() + + reconstructedChunkStream := make(chan []byte) + + // Store packet + // Parse chunk + // Wait for complete message + // Store it in database + for { + packet, ok := <-channel + + if !ok { + // Channel is closed + if logFile != nil { + logFile.Close() + } + break + } + + today := time.Now() + + // Note: the date format is go-specific! You can't use random dates or numbers + formattedDate := today.Format("2006-01-02") + if formattedDate != currentLogDate { + fileName := fmt.Sprintf("%s/%s-log.dat", LOG_DIR, formattedDate) + + // Close previous + if logFile != nil { + logFile.Close() + } + + // os.O_WRONLY only writes + // os.O_CREATE create if not exists + // os.O_APPEND append, don't overwrite existing + logFile, err = os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + slog.Error("Failed to open daily log file", "file_name", fileName) + } + } + + if logFile != nil { + _, err = logFile.Write(packet.Buffer) + + if err != nil { + slog.Error("Failed to write packet to archive log") + } + } + + // Parse packet + bw.ReceiveChunk(packet, reconstructedChunkStream) + } +} + +func main() { + sigs := make(chan os.Signal, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + const port = 3000 + const ip = "0.0.0.0" + + slog.Info("Starting server", "IP", ip, "Port", port) + go runServer(ip, port) + + for { + <-sigs + fmt.Println("Gracefully exiting...") + // TODO: Context cancel? + os.Exit(0) + } +}