Initial commit

main
ookjosh 2025-11-10 07:54:36 -07:00
commit 3e9dc244e2
10 changed files with 556 additions and 0 deletions

3
.gitignore vendored 100644
View File

@ -0,0 +1,3 @@
# Binary
roost

40
README.md 100644
View File

@ -0,0 +1,40 @@
# Roost
> Where the birds come home to sleep.
Data ingest server, providing an endpoint for nodes to send data for archival.
## Go Details
Currently 1.25.3, will stay updated as quickly as possible. Go is *very* good with backwards compatibility so breaking changes are unlikely.
[Install Go](https://go.dev/learn/)
Once go is installed, `go version` should return >= 1.25.3.
### Install Dependencies
Go will take care of downloading the actual library dependencies (no need for an `npm i` equivalent) but we have some tooling dependencies as well.
- sqlc is a code gen library that gives type-safe APIs against a schema at compile time. Very handy
- `go install github.com/sqlc-dev/sqlc/cmd/sqlc@latest`
### Learn Go
[Use Go By Example](gobyexample.com) - it is an incredible site and will get up to speed quickly. Go is a very simple language so most of the points are easy to look up.
## To run `roost`:
- `cd path/to/roost`
- `go run .`
## To build `roost` for your current target:
- `cd path/to/roost`
- `go build . [-o binary-name]`
- Default binary name is `roost`, in the current directory
## To build for different target:
Go is statically linked (and all the code we use is as well, no additional runtime libraries required), so it is trivial to compile on one architecture and copy the binary over to another.
- `cd path/to/roost`
- `go tool dist list`
- Using the relevant pair above (e.g., `linux/amd64`):
- `GOOS=linux GOARCH=amd64 go build -o roost-linux-amd64`
- The `-o` names the output binary, it could be `bob`

View File

@ -0,0 +1,243 @@
package birdwhisperer
import (
"encoding/binary"
"encoding/hex"
"fmt"
"log/slog"
"net"
"time"
)
type Packet struct {
TargetAddr *net.UDPAddr
Buffer []byte
}
type ChunkHeaderV1 struct {
MessageID uint16
Length uint16
Version uint8
Index uint16
NumChunks uint16
HeaderLength uint8
}
func (h *ChunkHeaderV1) toBytes() []byte {
result := make([]byte, CHUNK_V1_HEADER_LENGTH)
binary.BigEndian.PutUint16(result[0:2], h.MessageID)
binary.BigEndian.PutUint16(result[2:4], h.Length)
result[CHUNK_HEADER_VERSION_OFFSET] = 1
binary.BigEndian.PutUint16(result[5:7], h.Index)
binary.BigEndian.PutUint16(result[7:9], h.NumChunks)
result[8] = CHUNK_V1_HEADER_LENGTH
return result
}
func (h *ChunkHeaderV1) toHexString() []byte {
return []byte(hex.EncodeToString(h.toBytes()))
}
type ReconstructedMessage struct {
Chunks []Packet
ReceivedTime int64
CompletedTime int64
NumChunks uint16
ReceivedChunks uint16
}
const CHUNK_HEADER_VERSION_OFFSET = 4
const CHUNK_V1_HEADER_LENGTH = 9
const RECONSTRUCTED_BUFFER_SIZE_ALERT = 1e8
// Based on an MTU of 1500 bytes, with some flexibility for overhead
const CHUNK_MAX_LENGTH = 1400
type BirdWhisperer struct {
ChunkRecord map[uint16]ReconstructedMessage
}
func NewBirdWhisperer() BirdWhisperer {
return BirdWhisperer{
ChunkRecord: make(map[uint16]ReconstructedMessage),
}
}
func ReconstructMessageFromChunks(chunkRecord ReconstructedMessage) ([]byte, error) {
reconstructedLength := 0
for i := range len(chunkRecord.Chunks) {
reconstructedLength += len(chunkRecord.Chunks[i].Buffer)
}
if reconstructedLength > RECONSTRUCTED_BUFFER_SIZE_ALERT {
// We have some huge thing...
return nil, fmt.Errorf("Attempted to reconstruct message bigger than allowed size: [%d] bytes", reconstructedLength)
}
reconstructedMessage := make([]byte, reconstructedLength)
offset := 0
for i := range len(chunkRecord.Chunks) {
chunkLength := len(chunkRecord.Chunks[i].Buffer)
copy(reconstructedMessage[offset:offset+chunkLength], chunkRecord.Chunks[i].Buffer[:])
offset += chunkLength
}
return reconstructedMessage, nil
}
func (bw *BirdWhisperer) PruneChunkRecord(currentTimestamp int64) {
const CHUNK_PRUNING_TIMEOUT_MS = 30_000
const CHUNK_RECEIVE_COMPLETE_TIMEOUT_MS = 60_000 * 3
for k, v := range bw.ChunkRecord {
if v.CompletedTime > 0 && currentTimestamp-v.CompletedTime > CHUNK_PRUNING_TIMEOUT_MS {
// Remove this from map - safe to do in loop
delete(bw.ChunkRecord, k)
} else if v.ReceivedTime > 0 && v.CompletedTime == 0 && currentTimestamp-v.ReceivedTime > CHUNK_RECEIVE_COMPLETE_TIMEOUT_MS {
// Remove messages that were never completed after enough time
delete(bw.ChunkRecord, k)
}
}
}
func (bw *BirdWhisperer) ReceiveChunk(chunk Packet, completedMessages chan []byte) error {
if len(chunk.Buffer) <= CHUNK_HEADER_VERSION_OFFSET {
return fmt.Errorf("Packet not long enough to be a chunk!")
}
switch chunk.Buffer[CHUNK_HEADER_VERSION_OFFSET] {
case 1:
header, err := ParseChunkHeaderV1(chunk)
if err != nil {
return err
}
_, exists := bw.ChunkRecord[header.MessageID]
if !exists {
bw.ChunkRecord[header.MessageID] = ReconstructedMessage{
make([]Packet, header.NumChunks),
time.Now().Unix(),
0,
header.NumChunks,
0,
}
}
if chunkRecord, ok := bw.ChunkRecord[header.MessageID]; ok {
if chunkRecord.CompletedTime > 0 {
// Already received all of the chunks for this message
return nil
}
if chunkRecord.Chunks[header.Index].TargetAddr == nil {
// New chunk for existing message
chunkRecord.Chunks[header.Index].Buffer = chunk.Buffer[header.Length : header.Length+uint16(header.HeaderLength)]
chunkRecord.ReceivedChunks += 1
// Update map
bw.ChunkRecord[header.MessageID] = chunkRecord
}
// If we have all chunks reconstruct the message and pass it along for processing
if chunkRecord.ReceivedChunks == chunkRecord.NumChunks {
// Have all chunks
msg, err := ReconstructMessageFromChunks(chunkRecord)
if err != nil {
return err
}
chunkRecord.CompletedTime = time.Now().Unix()
// Update in map
bw.ChunkRecord[header.MessageID] = chunkRecord
slog.Debug("Received all chunks of a message", "firstReceived", chunkRecord.ReceivedTime, "lastReceived", chunkRecord.CompletedTime)
// Pass on to other things
completedMessages <- msg
}
} else {
return fmt.Errorf("Critical: Chunk record does not exist for message ID")
}
if int(header.Length+uint16(header.HeaderLength)) < len(chunk.Buffer) {
// There is more data in our incoming chunk, perhaps another message appended
bw.ReceiveChunk(Packet{TargetAddr: chunk.TargetAddr, Buffer: chunk.Buffer[header.Length+uint16(header.HeaderLength):]}, completedMessages)
}
default:
return fmt.Errorf("Packet has unsupported chunk version")
}
return nil
}
func ParseChunkHeaderV1(chunk Packet) (header ChunkHeaderV1, err error) {
if len(chunk.Buffer) < CHUNK_V1_HEADER_LENGTH {
return header, fmt.Errorf("Malformed chunk")
}
// We chould check the version beforehand, but it doesn't cost
// much to just have it parsed and then check anyway and this
// makes the numeric ordering clear
header.MessageID = binary.BigEndian.Uint16(chunk.Buffer[0:2])
header.Length = binary.BigEndian.Uint16(chunk.Buffer[2:4])
header.Version = chunk.Buffer[CHUNK_HEADER_VERSION_OFFSET]
header.Index = binary.BigEndian.Uint16(chunk.Buffer[5:7])
header.NumChunks = binary.BigEndian.Uint16(chunk.Buffer[7:9])
header.HeaderLength = CHUNK_V1_HEADER_LENGTH
if header.Version != 1 {
return header, fmt.Errorf("Not a V1 header!")
}
return
}
/*
* message: Array of *chars*
*/
func ChunkMessage(message []byte, messageId uint16) [][]byte {
messageLen := len(message)
numChunks := uint16(messageLen / CHUNK_MAX_LENGTH)
leftoverBytes := uint16(messageLen % CHUNK_MAX_LENGTH)
if leftoverBytes > 0 {
numChunks += 1
}
// +1 due to flooring
messageChunks := make([][]byte, numChunks)
consumedBytes := 0
// Loop through all full-length chunks
for i := range numChunks - 1 {
// TODO: When remaining bytes < CHUNK_MAX_LENGTH, does it set length to max anyway? Other edges
thisHeader := ChunkHeaderV1{
MessageID: messageId,
Index: i + 1,
NumChunks: numChunks + 1,
Length: CHUNK_MAX_LENGTH + CHUNK_V1_HEADER_LENGTH,
Version: 1,
HeaderLength: CHUNK_V1_HEADER_LENGTH,
}
// Header + portion of data it represents
messageChunks[i] = append(thisHeader.toBytes(), message[(CHUNK_MAX_LENGTH*i):(CHUNK_MAX_LENGTH*(i+1))]...)
consumedBytes += CHUNK_MAX_LENGTH
}
// This handles numChunks == 0 (i.e. single message) or any remaining bytes
if leftoverBytes > 0 {
thisHeader := ChunkHeaderV1{
MessageID: messageId,
Index: numChunks + 1,
NumChunks: numChunks + 1,
Length: leftoverBytes + CHUNK_V1_HEADER_LENGTH,
Version: 1,
HeaderLength: CHUNK_V1_HEADER_LENGTH,
}
messageChunks[len(messageChunks)-1] = append(thisHeader.toBytes(), message[consumedBytes:]...)
}
return messageChunks
}

View File

@ -0,0 +1,97 @@
package birdwhisperer
import (
"testing"
)
func TestPruningChunkRecord(t *testing.T) {
bw := NewBirdWhisperer()
incompleteMessageNotExpired := ReconstructedMessage{
ReceivedTime: 195_000, // "Just started"
CompletedTime: 0,
NumChunks: 2,
ReceivedChunks: 1,
}
incompleteMessageExpired := ReconstructedMessage{
ReceivedTime: 100, // Started a long time ago
CompletedTime: 0,
NumChunks: 2,
ReceivedChunks: 1,
}
completeMessageNotExpired := ReconstructedMessage{
ReceivedTime: 100,
CompletedTime: 199_000, // "Just finished"
NumChunks: 2,
ReceivedChunks: 2,
}
completeMessageExpired := ReconstructedMessage{
ReceivedTime: 100,
CompletedTime: 10_000, // Finished a long time ago
NumChunks: 2,
ReceivedChunks: 2,
}
bw.ChunkRecord[0] = incompleteMessageNotExpired
bw.ChunkRecord[1] = incompleteMessageExpired
bw.ChunkRecord[2] = completeMessageNotExpired
bw.ChunkRecord[3] = completeMessageExpired
bw.PruneChunkRecord(200_000)
if _, exists := bw.ChunkRecord[0]; !exists {
t.Errorf("Pruned an incomplete message that hasn't expired yet")
}
if _, exists := bw.ChunkRecord[1]; exists {
t.Errorf("Didn't prune an incomplete message that expired")
}
if _, exists := bw.ChunkRecord[2]; !exists {
t.Errorf("Pruned an complete message that hasn't expired yet")
}
if _, exists := bw.ChunkRecord[3]; exists {
t.Errorf("Didn't prune an incomplete message that expired")
}
}
func TestChunkMessageSingleChunk(t *testing.T) {
messageId := uint16(123)
message := make([]byte, CHUNK_MAX_LENGTH-1)
chunks := ChunkMessage(message, messageId)
if len(chunks) != 1 {
t.Errorf("A message that should be 1 chunk is actually %d chunks", len(chunks))
}
}
func TestChunkMessageMultipleChunks(t *testing.T) {
messageId := uint16(123)
message := make([]byte, CHUNK_MAX_LENGTH*2)
chunks := ChunkMessage(message, messageId)
if len(chunks) != 2 {
t.Errorf("A message that should be 2 chunks is actually %d chunks", len(chunks))
}
}
func TestChunkMessageMultipleChunksAndSome(t *testing.T) {
messageId := uint16(123)
extraDataLen := 1
message := make([]byte, CHUNK_MAX_LENGTH*2+extraDataLen)
chunks := ChunkMessage(message, messageId)
if len(chunks) != 3 {
t.Errorf("A message that should be 3 chunks is actually %d chunks", len(chunks))
}
const expectedChunkLength = CHUNK_MAX_LENGTH + CHUNK_V1_HEADER_LENGTH
if len(chunks[0]) != expectedChunkLength {
t.Errorf("Chunk 0 should be %d, is %d", expectedChunkLength, len(chunks[0]))
}
if len(chunks[1]) != expectedChunkLength {
t.Errorf("Chunk 1 should be %d, is %d", expectedChunkLength, len(chunks[1]))
}
if len(chunks[2]) != extraDataLen+CHUNK_V1_HEADER_LENGTH {
t.Errorf("Chunk 2 should be %d, is %d", extraDataLen+CHUNK_V1_HEADER_LENGTH, len(chunks[2]))
}
}

View File

@ -0,0 +1,3 @@
module jjb.dev/birdwhisperer
go 1.25.3

0
db/query.sql 100644
View File

0
db/schema.sql 100644
View File

11
db/sqlc.yaml 100644
View File

@ -0,0 +1,11 @@
version: "2"
sql:
- engine: "mysql"
queries: "query.sql"
schema: "schema.sql"
gen:
go:
# go package name to use
package: "roost_mysql"
# folder to generate
out: "mysql"

9
go.mod 100644
View File

@ -0,0 +1,9 @@
module jjb.dev/roost
go 1.25.3
require (
jjb.dev/birdwhisperer v0.0.0
)
replace jjb.dev/birdwhisperer => ./birdwhisperer

150
main.go 100644
View File

@ -0,0 +1,150 @@
/*
*
Roost - where the birds come to sleep.
Network ingest point for remote sensor modules.
*/
package main
import (
"fmt"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"
"jjb.dev/birdwhisperer"
)
const LOG_DIR = "./data-archive"
func runServer(ip string, port int) {
// Unbuffered channel; can add a buffer if other complexity results in drops
outbox := make(chan birdwhisperer.Packet)
// Max MTU is 64K for IPv4
receiveBuffer := make([]byte, 1<<16)
hostAddr := net.UDPAddr{
Port: port,
IP: net.ParseIP(ip),
}
udpConnection, err := net.ListenUDP("udp", &hostAddr)
if err != nil {
slog.Error("Failed to create UDP server!", "error", err)
}
// Service our outbox
go serviceOutbox(outbox, udpConnection)
for {
messageLength, remoteAddr, err := udpConnection.ReadFromUDP(receiveBuffer)
if err != nil {
slog.Error("Failed to read from UDP", "error", err)
continue
}
msg := make([]byte, messageLength)
copy(msg, receiveBuffer[:messageLength])
packet := birdwhisperer.Packet{TargetAddr: remoteAddr, Buffer: msg}
outbox <- packet
}
}
/**
* Listens for packets on the channel and writes them using the provided udpConnection
*/
func serviceOutbox(channel <-chan birdwhisperer.Packet, udpConnection *net.UDPConn) {
for {
packet := <-channel
_, err := udpConnection.WriteToUDP(packet.Buffer, packet.TargetAddr)
if err != nil {
slog.Error("Failed to send UDP response", "error", err, "target_address", packet.TargetAddr.String())
}
}
}
/*
* Handles a message that originates from an external source
*/
func processProbeMessages(channel <-chan birdwhisperer.Packet) {
var logFile *os.File = nil
var err error = nil
var currentLogDate string = ""
var bw birdwhisperer.BirdWhisperer = birdwhisperer.NewBirdWhisperer()
reconstructedChunkStream := make(chan []byte)
// Store packet
// Parse chunk
// Wait for complete message
// Store it in database
for {
packet, ok := <-channel
if !ok {
// Channel is closed
if logFile != nil {
logFile.Close()
}
break
}
today := time.Now()
// Note: the date format is go-specific! You can't use random dates or numbers
formattedDate := today.Format("2006-01-02")
if formattedDate != currentLogDate {
fileName := fmt.Sprintf("%s/%s-log.dat", LOG_DIR, formattedDate)
// Close previous
if logFile != nil {
logFile.Close()
}
// os.O_WRONLY only writes
// os.O_CREATE create if not exists
// os.O_APPEND append, don't overwrite existing
logFile, err = os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
slog.Error("Failed to open daily log file", "file_name", fileName)
}
}
if logFile != nil {
_, err = logFile.Write(packet.Buffer)
if err != nil {
slog.Error("Failed to write packet to archive log")
}
}
// Parse packet
bw.ReceiveChunk(packet, reconstructedChunkStream)
}
}
func main() {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
const port = 3000
const ip = "0.0.0.0"
slog.Info("Starting server", "IP", ip, "Port", port)
go runServer(ip, port)
for {
<-sigs
fmt.Println("Gracefully exiting...")
// TODO: Context cancel?
os.Exit(0)
}
}