const dgram = require("dgram"); const fs = require("fs"); const path = require("path"); const mysql = require("mysql2"); const cellularChunking = require("./cellularChunking"); const message_parser = require("./messages"); let thisDate = new Date().toISOString().slice(0, 10); const filePath = "./records"; let message_id = 0; let sentFiles = true; let dataBuffer = []; const response_record = {}; const settings = {}; const pool = createPool(); if (!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) { fs.mkdirSync(filePath); } function createPool() { const pool = mysql.createPool({ host: process.env.db_host, user: process.env.db_user, password: process.env.db_password, database: process.env.db_database, port: process.env.db_port, timezone: "+00:00", }); pool.getConnection(function (err, connection) { if (err) { console.log("error setting the database"); } else { connection.execute( `SELECT datavis_node2group.node_id, datavis_node2group.group_id, datavis_nodeconfiguration.measurement_interval_minutes, datavis_nodeconfiguration.offset_from_midnight_minutes, datavis_nodeconfiguration.wake_window_length_minutes FROM datavis_grouptype JOIN datavis_group2type ON datavis_grouptype.unique_id=datavis_group2type.type_id JOIN datavis_node2group ON datavis_group2type.group_id=datavis_node2group.group_id JOIN datavis_nodeconfiguration ON datavis_nodeconfiguration.node_id=datavis_node2group.node_id WHERE datavis_grouptype.name = 'Temperature Site';`, [], function (err2, results) { if (err2) { console.log(`Some other error getting node settings: ${err2}`); connection.release(); } else { results.forEach(function (thisRow) { settings[thisRow.node_id] = { measurement_interval: thisRow.measurement_interval_minutes, measurement_offset: thisRow.offset_from_midnight_minutes, wake_window_length: thisRow.wake_window_length_minutes, temperature_site: thisRow.group_id, }; }); connection.release(); } }, ); } }); return pool; } const socket = dgram.createSocket("udp4"); let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, { flags: "a+", }); function make_string_string_message( msg_type, version, source, target, key, value, this_message_id, ) { theHeader = make_message_header( msg_type, version, source, target, this_message_id, ); //console.log("message header: ", theHeader); theData = Buffer.alloc(key.length + value.length + 3); theData.write(key, 0); theData.writeUInt8(255, key.length); theData.write(value, key.length + 1); theData.writeUInt8(255, key.length + value.length + 2); return Buffer.concat([theHeader, theData, Buffer.from([0])]); } function make_message_header( msg_type, version, source, target, this_message_id, ) { target = ("0000000000000000" + target).slice(-16); target_buf = Buffer.from(target, "hex"); source = (source = "0000000000000000" + source).slice(-16); source_buf = Buffer.from(source, "hex"); timestamp = Math.floor(Date.now() / 1000) - 946684800; // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01 timestamp_buf = Buffer.alloc(4); timestamp_buf.writeUInt32BE(timestamp); buf = Buffer.alloc(4); buf.writeUInt16BE(this_message_id, 0); buf.writeUInt8(msg_type, 2); buf.writeUInt8(version, 3); header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]); return header; } function sendFile(socket, rinfo, thisFilePath, fileList) { theFilePath = path.join(thisFilePath, fileList[0]); fs.readFile(theFilePath, "utf-8", (err, data) => { console.log("have read file"); if (err) { console.log(err); } else { console.log("make the message to send"); const theMsg = Buffer.from( make_string_string_message( 8, 1, "00", "4827e28f7778", fileList[0], data, 537, ), ).toString("hex"); console.log("chunk the message"); const theRespChunks = cellularChunking.chunk_message(theMsg, message_id); console.log("have chunks"); thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList); } }); } function handleMessage(message, rinfo) { //console.log("store message: ", message); // this is going to be for storing the new format for the data // this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu //console.log( // new Date().toISOString(), // " store data 2 route ", // Buffer.from(message, "hex").toString("hex"), //); const parsed_data = message_parser.parse_messages({ data: message }); let settingsString = Buffer.from([]); //const seenIDs = [] parsed_data.forEach(function (thisData) { if ( thisData.source.replaceAll("0", "") === "" || thisData.source.startsWith("f") || thisData.source.startsWith("8") ) { return; } else { //console.log("parsed data: ", thisData); } dataBuffer.push(thisData); if ( !response_record[thisData.source] || response_record[thisData.source] + 1000 * 600 < Date.now() ) { // respond at moch once every 600 seconds to any single node console.log("respond with settings"); //settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)]) response_record[thisData.source] = Date.now(); } }); if (settingsString.length > 0) { //console.log("settingsString: ", settingsString.toString("hex")); message_id = message_id + 1; stream.write( "< " + new Date().toISOString() + " " + settingsString.toString("hex") + "\n", ); // breaking in here so we can test sending a file const theRespChunks = cellularChunking.chunk_message( settingsString, message_id, ); //console.log("msg chunks: ", theRespChunks); theRespChunks.forEach(function (thisChunk) { //console.log("thisChunk: ", thisChunk); socket.send(thisChunk, rinfo.port, rinfo.address, (err) => { if (err) { console.log(err); } else { console.log("sent message on"); } }); }); } if (!sentFiles) { console.log("send files"); sentFiles = true; const thiFilePath = "./firmware/Components"; fs.readdir(thiFilePath, (err, theFileList) => { console.log(theFileList); if (err) { console.log(err); } else { sendFile(socket, rinfo, thiFilePath, theFileList); } }); } } function makeSettingsString(msgData) { const device_id = msgData.device_id; msgData.version = msgData.version || 1; // make a message with all the info in it const location_id = status.devices[device_id]?.location_id; const site_id = status.locations[location_id]?.site_id || "7d931bd4-bf0d-459f-8864-6d6405908b9e"; const timezoneOffset = getOffset(status.sites[site_id]?.timezone || "UTC"); const measurementInterval = status.settings[status.settings_groups[device_id]]?.sleep_interval || "00:30:00"; const wakeWindowLength = status.settings[status.settings_groups[device_id]]?.window_length || "00:06:00"; const measurementOffset = status.settings[status.settings_groups[device_id]]?.start_time || "00:00:00"; msg = Buffer.from([]); settings = { "\x4B": [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset], }; msg = message_parser.make_two_byte_data_message( 3, msgData.version, "00", device_id, settings, ); settings2 = { "\x01": status.sites[site_id]?.name || "Unregistered", "\x02": status.devices[device_id]?.name || "Unregistered", }; msg = Buffer.concat([ msg, message_parser.make_byte_string_message( 7, msgData.version, "00", device_id, settings2, ), ]); settings3 = { "\x03": [sqlTimeToNumber(measurementInterval)], "\x04": [sqlTimeToNumber(measurementOffset)], "\x05": [sqlTimeToNumber(wakeWindowLength)], }; msg = Buffer.concat([ msg, message_parser.make_four_byte_data_message( 4, msgData.version, "00", device_id, settings3, ), ]); return msg; } function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) { socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => { if (err) { console.log(err); } else { console.log("send chunk"); if (theChunks.length > 1) { setTimeout( thisSend, 1000, theChunks.slice(1), socket, rinfo, thisFilePath, fileList, ); } else if (fileList.length > 1) { sendFile(socket, rinfo, thisFilePath, fileList.slice(1)); } } }); } function sqlTimeToNumber(theTime) { try { const parts = theTime.split(":"); timeOut = Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]); if (timeOut > 2 ** 16) { timeOut = 2 ** 16 - 1; } return timeOut; } catch (e) { if (theTime !== null) { console.log("sqlTimeToNumber error: ", e); } return 0; } } function storeType1Messages(msgs) { // 2 byte measurement data try { if (msgs.length === 0) { return; } else if (msgs.length > 2500) { // you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin // you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month. const num_batches = Math.floor(msgs.length / 2500); for (let i = 0; i < num_batches; i++) { storeType1Messages(msgs.slice(i * 2500, (i + 1) * 2500)); } storeType1Messages(msgs.slice(-1 * (msgs.length % 2500))); } else { let theseParams = []; msgs.forEach(function (msg) { // YYYY-MM-DD HH:mm:ss const server_received_time = sqlCurrentTimestamp(); theseParams.push( msg.source.slice(4), msg.reporting_node || "000000000000", settings[msg.source.slice(4)]?.temperature_site || "Unknowable 1", msg.timestamp, msg.data["18_inch_temperature"] || 500000, msg.data["36_inch_temperature"] || 500000, msg.data.device_temperature || 500000, msg.data.ambient_temperature || 500000, msg.data.relative_humidity || 500000, msg.data.barometric_pressure || 500000, msg.data.accelerometer_x || 500000, msg.data.accelerometer_y || 500000, msg.data.accelerometer_z || 500000, msg.data.battery_charge_percent || 500000, msg.data.battery_voltage || 500000, msg.data.remaining_charge_capacity || 500000, server_received_time, ); }); const thisSQL = "INSERT IGNORE INTO datavis_measurement (source_node_id, reporting_node_id, associated_group_id, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity, server_received_time) VALUES " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)," .repeat(msgs.length) .slice(0, -1) + ";"; pool.getConnection(function (err, connection) { if (err) { console.log(`Error storing type 1 message: ${err}`); } else { connection.execute(thisSQL, theseParams, function (err2, results) { if (err2) { console.log(`Some other error storing type 1 message: ${err2}`); connection.release(); } else { connection.release(); // TODO: anything here? } }); } }); } } catch (e) { console.log(`Some outer error storing type 1 message: ${e}`); } } function storeType6Messages(msgs) { // four byte device status try { let theseParams = []; let n = msgs.length; msgs.forEach(function (msg) { theseParams.push( msg.source.slice(4), msg.timestamp, msg.data.measurement_interval || 360, msg.data.wake_window_length || 4, msg.data.measurement_offset || 0, msg.data.sleep_duration || 0, 0, new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000) .toISOString() .slice(0, -5), sqlCurrentTimestamp(), ); }); const thisSQL = "INSERT IGNORE INTO datavis_nodereportedstatus (node_id,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated,server_received_time) VALUES " + "(?,?,?,?,?,?,?,?,?),".repeat(n).slice(0, -1) + ";"; pool.getConnection(function (err, connection) { if (err) { console.log(`Error storing type 6 message: ${err}`); } else { connection.execute(thisSQL, theseParams, function (err2, results) { if (err2) { console.log(`Some other error storing type 6 message: ${err2}`); connection.release(); } else { connection.release(); // TODO: anything here? } }); } }); } catch (e) { console.log(`Some outer error storing type 6 message: ${e}`); } } function storeType13Messages(msgs) { // file version manifest try { let theseParams = []; let n = 0; msgs.forEach(function (msg) { Object.keys(msg.data).forEach(function (thisFileId) { n += 1; theseParams.push( msg.source.slice(4), msg.timestamp, thisFileId, msg.data[thisFileId] || 0, sqlCurrentTimestamp(), ); }); }); const thisSQL = "INSERT INTO datavis_nodefilemanifest (node_id,collection_time,program_id,program_version,server_received_time) VALUES " + "(?,?,?,?,?),".repeat(n).slice(0, -1) + ";"; pool.getConnection(function (err, connection) { if (err) { console.log(`Error storing type 13 message: ${err}`); } else { connection.execute(thisSQL, theseParams, function (err2, results) { if (err2) { console.log(`Some other error storing type 13 message: ${err}`); connection.release(); } else { connection.release(); // TODO: anything here? } }); } }); } catch (e) { console.log(`Some outer error storing type 13 message: ${e}`); } } function storeType17Messages(msgs) { // rssi data try { let theseParams = []; let n = 0; msgs.forEach(function (msg) { Object.keys(msg.data).forEach(function (thisId) { n += 1; theseParams.push( msg.source.slice(4), thisId, msg.timestamp, msg.data[thisId] || 0, sqlCurrentTimestamp(), ); }); }); const thisSQL = "INSERT IGNORE INTO datavis_noderssirecord (node_id,neighbor_id,collection_time,rssi,server_received_time) VALUES " + "(?,?,?,?,?),".repeat(n).slice(0, -1) + ";"; pool.getConnection(function (err, connection) { if (err) { console.log(`Error storing type 17 message: ${err}`); } else { connection.execute(thisSQL, theseParams, function (err2, results) { if (err2) { console.log(`Some other error storing type 17 message: ${err2}`); connection.release(); } else { connection.release(); // TODO: anything here? } }); } }); } catch (e) { console.log(`Some outer error storing type 17 message: ${e}`); } } function storeMessages() { console.log("store messages!!"); try { const type1Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 1; }); const type6Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 6; }); const type13Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 13; }); const type17Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 17; }); // store the data in the database // check message type, then store it based on that if (type1Messages.length > 0) { console.log("type 1 messages!"); storeType1Messages(type1Messages); } if (type6Messages.length > 0) { console.log("type 6 messages!"); storeType6Messages(type6Messages); } if (type13Messages.length > 0) { console.log("type 13 messages!"); storeType13Messages(type13Messages); } if (type17Messages.length > 0) { console.log("type 17 messages!"); storeType17Messages(type17Messages); } dataBuffer = []; } catch (e) { console.log(`Some error storing messages: ${e}`); } } function sqlCurrentTimestamp() { // YYYY-MM-DDTHH:mm:ss.mmmZ const this_instant = new Date().toISOString(); // YYYY-MM-DD, HH:mm:ss.mmmZ const [date, time] = this_instant.split("T"); // YYYY-MM-DD HH:mm:ss const server_received_time = `${date} ${time.slice(0, 8)}`; return server_received_time; } setInterval(storeMessages, 10000); // store messages every minute socket.on("listening", () => { let addr = socket.address(); console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`); }); socket.on("error", (err) => { console.error(`UDP error: ${err.stack}`); }); socket.on("message", (msg, rinfo) => { if (thisDate !== new Date().toISOString().slice(0, 10)) { thisDate = new Date().toISOString().slice(0, 10); stream.close(); stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, { flags: "a+", }); } stream.write( "> " + new Date().toISOString() + " " + msg.toString("hex") + "\n", ); cellularChunking.receive_chunk(msg, handleMessage, rinfo); }); socket.bind(57321);