151 lines
3.2 KiB
Go
151 lines
3.2 KiB
Go
/*
|
|
*
|
|
|
|
Roost - where the birds come to sleep.
|
|
|
|
Network ingest point for remote sensor modules.
|
|
*/
|
|
package main
|
|
|
|
import (
|
|
"fmt"
|
|
"log/slog"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"jjb.dev/birdwhisperer"
|
|
)
|
|
|
|
const LOG_DIR = "./data-archive"
|
|
|
|
func runServer(ip string, port int) {
|
|
// Unbuffered channel; can add a buffer if other complexity results in drops
|
|
outbox := make(chan birdwhisperer.Packet)
|
|
// Max MTU is 64K for IPv4
|
|
receiveBuffer := make([]byte, 1<<16)
|
|
|
|
hostAddr := net.UDPAddr{
|
|
Port: port,
|
|
IP: net.ParseIP(ip),
|
|
}
|
|
|
|
udpConnection, err := net.ListenUDP("udp", &hostAddr)
|
|
if err != nil {
|
|
slog.Error("Failed to create UDP server!", "error", err)
|
|
}
|
|
|
|
// Service our outbox
|
|
go serviceOutbox(outbox, udpConnection)
|
|
|
|
for {
|
|
messageLength, remoteAddr, err := udpConnection.ReadFromUDP(receiveBuffer)
|
|
|
|
if err != nil {
|
|
slog.Error("Failed to read from UDP", "error", err)
|
|
continue
|
|
}
|
|
|
|
msg := make([]byte, messageLength)
|
|
copy(msg, receiveBuffer[:messageLength])
|
|
|
|
packet := birdwhisperer.Packet{TargetAddr: remoteAddr, Buffer: msg}
|
|
outbox <- packet
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Listens for packets on the channel and writes them using the provided udpConnection
|
|
*/
|
|
func serviceOutbox(channel <-chan birdwhisperer.Packet, udpConnection *net.UDPConn) {
|
|
for {
|
|
packet := <-channel
|
|
_, err := udpConnection.WriteToUDP(packet.Buffer, packet.TargetAddr)
|
|
if err != nil {
|
|
slog.Error("Failed to send UDP response", "error", err, "target_address", packet.TargetAddr.String())
|
|
}
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Handles a message that originates from an external source
|
|
*/
|
|
func processProbeMessages(channel <-chan birdwhisperer.Packet) {
|
|
var logFile *os.File = nil
|
|
var err error = nil
|
|
var currentLogDate string = ""
|
|
|
|
var bw birdwhisperer.BirdWhisperer = birdwhisperer.NewBirdWhisperer()
|
|
|
|
reconstructedChunkStream := make(chan []byte)
|
|
|
|
// Store packet
|
|
// Parse chunk
|
|
// Wait for complete message
|
|
// Store it in database
|
|
for {
|
|
packet, ok := <-channel
|
|
|
|
if !ok {
|
|
// Channel is closed
|
|
if logFile != nil {
|
|
logFile.Close()
|
|
}
|
|
break
|
|
}
|
|
|
|
today := time.Now()
|
|
|
|
// Note: the date format is go-specific! You can't use random dates or numbers
|
|
formattedDate := today.Format("2006-01-02")
|
|
if formattedDate != currentLogDate {
|
|
fileName := fmt.Sprintf("%s/%s-log.dat", LOG_DIR, formattedDate)
|
|
|
|
// Close previous
|
|
if logFile != nil {
|
|
logFile.Close()
|
|
}
|
|
|
|
// os.O_WRONLY only writes
|
|
// os.O_CREATE create if not exists
|
|
// os.O_APPEND append, don't overwrite existing
|
|
logFile, err = os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
slog.Error("Failed to open daily log file", "file_name", fileName)
|
|
}
|
|
}
|
|
|
|
if logFile != nil {
|
|
_, err = logFile.Write(packet.Buffer)
|
|
|
|
if err != nil {
|
|
slog.Error("Failed to write packet to archive log")
|
|
}
|
|
}
|
|
|
|
// Parse packet
|
|
bw.ReceiveChunk(packet, reconstructedChunkStream)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
sigs := make(chan os.Signal, 1)
|
|
|
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
const port = 3000
|
|
const ip = "0.0.0.0"
|
|
|
|
slog.Info("Starting server", "IP", ip, "Port", port)
|
|
go runServer(ip, port)
|
|
|
|
for {
|
|
<-sigs
|
|
fmt.Println("Gracefully exiting...")
|
|
// TODO: Context cancel?
|
|
os.Exit(0)
|
|
}
|
|
}
|