roost/node/index.js

441 lines
17 KiB
JavaScript

const dgram = require('dgram')
const fs = require('fs')
const path = require('path')
const mysql = require("mysql2");
const cellularChunking = require('./cellularChunking')
const message_parser = require('./messages')
let thisDate = new Date().toISOString().slice(0,10)
const filePath = './records'
let message_id = 0
let sentFiles = true
let dataBuffer = []
const response_record = {}
const settings = {}
const pool = createPool()
if(!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) {
fs.mkdirSync(filePath)
}
function createPool() {
const pool = mysql.createPool({
host: process.env.db_host,
user: process.env.db_user,
password: process.env.db_password,
database: process.env.db_database,
port: process.env.db_port,
timezone: '+00:00'
});
pool.getConnection(function(err,connection) {
if (err) {
console.log('error setting the database')
} else {
connection.execute(
"USE Chickens;",
[],
function(err2, results) {
if(err2) {
console.log(`Some other error setting database: ${err2}`)
connection.release()
} else {
connection.execute(
"SELECT Node2Group.node,Node2Group.node_group,NodeConfiguration.measurement_interval_minutes,NodeConfiguration.offset_from_midnight_minutes,NodeConfiguration.wake_window_length_minutes FROM GroupType JOIN Group2Type ON GroupType.name=Group2Type.type JOIN Node2Group ON Group2Type.group_name=Node2Group.node_group JOIN NodeConfiguration ON NodeConfiguration.node=Node2Group.node WHERE Group2Type.type = 'Temperature Site';",
[],
function (err2, results) {
if (err2) {
console.log(`Some other error getting node settings: ${err2}`)
connection.release()
} else {
results.forEach(function(thisRow) {
settings[thisRow.node] = {
measurement_interval: thisRow.measurement_interval_minutes,
measurement_offset: thisRow.offset_from_midnight_minutes,
wake_window_length: thisRow.wake_window_length_minutes,
temperature_site: thisRow.node_group
}
})
connection.release()
}
}
)
}
}
)
}
})
return pool
}
const socket = dgram.createSocket('udp4');
let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'})
function make_string_string_message(msg_type, version, source, target, key, value, this_message_id) {
theHeader = make_message_header(msg_type, version, source, target, this_message_id)
console.log('message header: ', theHeader)
theData = Buffer.alloc(key.length + value.length + 3)
theData.write(key,0)
theData.writeUInt8(255, key.length)
theData.write(value, key.length+1)
theData.writeUInt8(255, key.length+value.length+2)
return Buffer.concat([theHeader, theData, Buffer.from([0])])
}
function make_message_header(msg_type, version, source, target, this_message_id) {
target = ('0000000000000000' + target).slice(-16)
target_buf = Buffer.from(target, 'hex')
source = (source = '0000000000000000' + source).slice(-16)
source_buf = Buffer.from(source, 'hex')
timestamp = Math.floor(Date.now() / 1000) - 946684800 // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01
timestamp_buf = Buffer.alloc(4)
timestamp_buf.writeUInt32BE(timestamp)
buf = Buffer.alloc(4)
buf.writeUInt16BE(this_message_id, 0)
buf.writeUInt8(msg_type, 2)
buf.writeUInt8(version, 3)
header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf])
return header
}
function sendFile(socket, rinfo, thisFilePath, fileList) {
theFilePath = path.join(thisFilePath, fileList[0])
fs.readFile(theFilePath, 'utf-8', (err,data) => {
console.log('have read file')
if(err) {
console.log(err)
} else {
console.log('make the message to send')
const theMsg = Buffer.from(make_string_string_message(8, 1, '00', '4827e28f7778', fileList[0], data, 537)).toString('hex')
console.log('chunk the message')
const theRespChunks = cellularChunking.chunk_message(theMsg, message_id)
console.log('have chunks')
thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList)
}
})
}
function handleMessage(message, rinfo) {
console.log('store message: ', message)
// this is going to be for storing the new format for the data
// this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu
console.log(new Date().toISOString(), ' store data 2 route ', Buffer.from(message, 'hex').toString('hex'))
const parsed_data = message_parser.parse_messages({data: message})
let settingsString = Buffer.from([])
//const seenIDs = []
parsed_data.forEach(function(thisData) {
if(thisData.source.replaceAll('0', '') === '' || thisData.source.startsWith('f') || thisData.source.startsWith('8')) {
return
} else {
console.log('parsed data: ', thisData)
}
dataBuffer.push(thisData)
if((!response_record[thisData.source]) || response_record[thisData.source] + 1000 * 600 < Date.now()) { // respond at moch once every 600 seconds to any single node
console.log("respond with settings")
//settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)])
response_record[thisData.source] = Date.now()
}
})
if(settingsString.length > 0) {
console.log('settingsString: ', settingsString.toString('hex'))
message_id = message_id+1
stream.write('< ' + (new Date().toISOString()) + ' ' + settingsString.toString('hex') + '\n')
// breaking in here so we can test sending a file
const theRespChunks = cellularChunking.chunk_message(settingsString, message_id)
console.log('msg chunks: ', theRespChunks)
theRespChunks.forEach(function(thisChunk) {
console.log('thisChunk: ', thisChunk)
socket.send(thisChunk, rinfo.port, rinfo.address, (err) => {
if(err) {
console.log(err)
} else {
console.log('sent message on')
}
})
})
}
if(!sentFiles) {
console.log('send files')
sentFiles = true
const thiFilePath = './firmware/Components'
fs.readdir(thiFilePath, (err, theFileList) => {
console.log(theFileList)
if(err) {
console.log(err)
} else {
sendFile(socket, rinfo, thiFilePath, theFileList)
}
})
}
}
function makeSettingsString(msgData) {
const device_id = msgData.device_id
msgData.version = msgData.version || 1
// make a message with all the info in it
const location_id = status.devices[device_id]?.location_id
const site_id = status.locations[location_id]?.site_id || '7d931bd4-bf0d-459f-8864-6d6405908b9e'
const timezoneOffset = getOffset(status.sites[site_id]?.timezone || 'UTC')
const measurementInterval = status.settings[status.settings_groups[device_id]]?.sleep_interval || '00:30:00'
const wakeWindowLength = status.settings[status.settings_groups[device_id]]?.window_length || '00:06:00'
const measurementOffset = status.settings[status.settings_groups[device_id]]?.start_time || '00:00:00'
msg = Buffer.from([])
settings = {
'\x4B': [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset],
}
msg = message_parser.make_two_byte_data_message(3, msgData.version, '00', device_id, settings)
settings2 = {
'\x01': status.sites[site_id]?.name || 'Unregistered',
'\x02': status.devices[device_id]?.name || 'Unregistered'
}
msg = Buffer.concat([msg, message_parser.make_byte_string_message(7, msgData.version, '00', device_id, settings2)])
settings3 = {
'\x03': [sqlTimeToNumber(measurementInterval)],
'\x04': [sqlTimeToNumber(measurementOffset)],
'\x05': [sqlTimeToNumber(wakeWindowLength)],
}
msg = Buffer.concat([msg, message_parser.make_four_byte_data_message(4, msgData.version, '00', device_id, settings3)])
return msg
}
function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) {
socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => {
if(err) {
console.log(err)
} else {
console.log("send chunk")
if (theChunks.length > 1) {
setTimeout(thisSend, 1000, theChunks.slice(1), socket, rinfo, thisFilePath, fileList)
} else if (fileList.length > 1) {
sendFile(socket, rinfo, thisFilePath, fileList.slice(1))
}
}
})
}
function sqlTimeToNumber(theTime) {
try {
const parts = theTime.split(':')
timeOut = Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2])
if(timeOut > 2**16) {
timeOut = 2**16-1
}
return timeOut
} catch (e) {
if(theTime !== null) {
console.log('sqlTimeToNumber error: ', e)
}
return 0
}
}
function storeType1Messages(msgs) { // 2 byte measurement data
try {
if(msgs.length === 0) {
return
} else if(msgs.length > 2500) { // you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin
// you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month.
const num_batches = Math.floor(msgs.length / 2500)
for (let i = 0; i < num_batches; i++) {
storeType1Messages(msgs.slice(i * 2500, (i+1) * 2500))
}
storeType1Messages(msgs.slice(-1 * (msgs.length % 2500)))
} else {
let theseParams = []
msgs.forEach(function(msg) {
theseParams.push(msg.source.slice(4),msg.reporting_node || "000000000000",settings[msg.source.slice(4)].temperature_site,msg.timestamp,msg.data["18_inch_temperature"] || 500000,msg.data["36_inch_temperature"] || 500000,msg.data.device_temperature || 500000,msg.data.ambient_temperature || 500000,msg.data.relative_humidity || 500000,msg.data.barometric_pressure || 500000,msg.data.accelerometer_x || 500000,msg.data.accelerometer_y || 500000,msg.data.accelerometer_z || 500000,msg.data.battery_charge_percent || 500000,msg.data.battery_voltage || 500000,msg.data.remaining_charge_capacity || 500000)
})
const thisSQL = "INSERT IGNORE INTO Measurement (source_node, reporting_node, associated_group, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity) VALUES " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),".repeat(msgs.length).slice(0,-1) + ";"
pool.getConnection(
function(err, connection) {
if(err) {
console.log(`Error storing type 1 message: ${err}`)
} else {
connection.execute(
thisSQL,
theseParams,
function(err2, results) {
if(err2) {
console.log(`Some other error storing type 1 message: ${err2}`)
connection.release()
} else {
connection.release()
// TODO: anything here?
}
}
)
}
}
)
}
} catch (e) {
console.log(`Some outer error storing type 1 message: ${e}`)
}
}
function storeType6Messages(msgs) { // four byte device status
try {
let theseParams = []
let n = msgs.length
msgs.forEach(function(msg) {
theseParams.push(msg.source.slice(4),msg.timestamp,msg.data.measurement_interval || 360,msg.data.wake_window_length || 4,msg.data.measurement_offset || 0,msg.data.sleep_duration || 0,0,new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000).toISOString().slice(0,-5))
})
const thisSQL = "INSERT IGNORE INTO NodeReportedStatus (node,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated) VALUES " + "(?,?,?,?,?,?,?,?),".repeat(n).slice(0,-1) + ";"
pool.getConnection(
function(err, connection) {
if(err) {
console.log(`Error storing type 6 message: ${err}`)
} else {
connection.execute(
thisSQL,
theseParams,
function(err2, results) {
if(err2) {
console.log(`Some other error storing type 6 message: ${err2}`)
connection.release()
} else {
connection.release()
// TODO: anything here?
}
}
)
}
}
)
} catch (e) {
console.log(`Some outer error storing type 6 message: ${e}`)
}
}
function storeType13Messages(msgs) { // file version manifest
try {
let theseParams = []
let n = 0
msgs.forEach(function(msg) {
Object.keys(msg.data).forEach(function(thisFileId) {
n += 1
theseParams.push(msg.source.slice(4),msg.timestamp,thisFileId,msg.data[thisFileId] || 0)
})
})
const thisSQL = "INSERT INTO NodeFileManifest (node,collection_time,program_id,program_version) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";"
pool.getConnection(
function(err, connection) {
if(err) {
console.log(`Error storing type 13 message: ${err}`)
} else {
connection.execute(
thisSQL,
theseParams,
function(err2, results) {
if(err2) {
console.log(`Some other error storing type 13 message: ${err}`)
connection.release()
} else {
connection.release()
// TODO: anything here?
}
}
)
}
}
)
} catch (e) {
console.log(`Some outer error storing type 13 message: ${e}`)
}
}
function storeType17Messages(msgs) { // rssi data
try {
let theseParams = []
let n = 0
msgs.forEach(function(msg) {
Object.keys(msg.data).forEach(function(thisId) {
n += 1
theseParams.push(msg.source.slice(4),thisId,msg.timestamp, msg.data[thisId] || 0)
})
})
const thisSQL = "INSERT IGNORE INTO NodeRssiRecord (node,neighbor,collection_time,rssi) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";"
pool.getConnection(
function(err, connection) {
if(err) {
console.log(`Error storing type 17 message: ${err}`)
} else {
connection.execute(
thisSQL,
theseParams,
function(err2, results) {
if(err2) {
console.log(`Some other error storing type 17 message: ${err2}`)
connection.release()
} else {
connection.release()
// TODO: anything here?
}
}
)
}
}
)
} catch (e) {
console.log(`Some outer error storing type 17 message: ${e}`)
}
}
function storeMessages() {
console.log('store messages!!')
try {
const type1Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 1 })
const type6Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 6 })
const type13Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 13 })
const type17Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 17 })
// store the data in the database
// check message type, then store it based on that
if (type1Messages.length > 0) {
console.log("type 1 messages!")
storeType1Messages(type1Messages)
}
if (type6Messages.length > 0) {
console.log("type 6 messages!")
storeType6Messages(type6Messages)
}
if (type13Messages.length > 0) {
console.log("type 13 messages!")
storeType13Messages(type13Messages)
}
if (type17Messages.length > 0) {
console.log("type 17 messages!")
storeType17Messages(type17Messages)
}
dataBuffer = []
} catch (e) {
console.log(`Some error storing messages: ${e}`)
}
}
setInterval(storeMessages, 10000) // store messages every minute
socket.on('listening', () => {
let addr = socket.address();
console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`);
});
socket.on('error', (err) => {
console.error(`UDP error: ${err.stack}`);
});
socket.on('message', (msg, rinfo) => {
if(thisDate !== new Date().toISOString().slice(0,10)) {
thisDate = new Date().toISOString().slice(0,10)
stream.close()
stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'})
}
stream.write('> ' + (new Date().toISOString()) + ' ' + msg.toString('hex') + '\n')
cellularChunking.receive_chunk(msg, handleMessage, rinfo)
})
socket.bind(57321);