diff --git a/node/index.js b/node/index.js index 0120b93..a4a9f9c 100644 --- a/node/index.js +++ b/node/index.js @@ -1,24 +1,24 @@ -const dgram = require('dgram') -const fs = require('fs') -const path = require('path') +const dgram = require("dgram"); +const fs = require("fs"); +const path = require("path"); const mysql = require("mysql2"); -const cellularChunking = require('./cellularChunking') -const message_parser = require('./messages') +const cellularChunking = require("./cellularChunking"); +const message_parser = require("./messages"); -let thisDate = new Date().toISOString().slice(0,10) -const filePath = './records' -let message_id = 0 +let thisDate = new Date().toISOString().slice(0, 10); +const filePath = "./records"; +let message_id = 0; -let sentFiles = true +let sentFiles = true; -let dataBuffer = [] -const response_record = {} -const settings = {} +let dataBuffer = []; +const response_record = {}; +const settings = {}; -const pool = createPool() +const pool = createPool(); -if(!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) { - fs.mkdirSync(filePath) +if (!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) { + fs.mkdirSync(filePath); } function createPool() { @@ -28,413 +28,570 @@ function createPool() { password: process.env.db_password, database: process.env.db_database, port: process.env.db_port, - timezone: '+00:00' + timezone: "+00:00", }); - pool.getConnection(function(err,connection) { + pool.getConnection(function (err, connection) { if (err) { - console.log('error setting the database') + console.log("error setting the database"); } else { connection.execute( - "USE Chickens;", + `SELECT + datavis_node2group.node_id, + datavis_node2group.group_id, + datavis_nodeconfiguration.measurement_interval_minutes, + datavis_nodeconfiguration.offset_from_midnight_minutes, + datavis_nodeconfiguration.wake_window_length_minutes + FROM datavis_grouptype + JOIN datavis_group2type + ON datavis_grouptype.unique_id=datavis_group2type.type_id + JOIN datavis_node2group + ON datavis_group2type.group_id=datavis_node2group.group_id + JOIN datavis_nodeconfiguration + ON datavis_nodeconfiguration.node_id=datavis_node2group.node_id + WHERE datavis_grouptype.name = 'Temperature Site';`, [], - function(err2, results) { - if(err2) { - console.log(`Some other error setting database: ${err2}`) - connection.release() + function (err2, results) { + if (err2) { + console.log(`Some other error getting node settings: ${err2}`); + connection.release(); } else { - connection.execute( - "SELECT Node2Group.node,Node2Group.node_group,NodeConfiguration.measurement_interval_minutes,NodeConfiguration.offset_from_midnight_minutes,NodeConfiguration.wake_window_length_minutes FROM GroupType JOIN Group2Type ON GroupType.name=Group2Type.type JOIN Node2Group ON Group2Type.group_name=Node2Group.node_group JOIN NodeConfiguration ON NodeConfiguration.node=Node2Group.node WHERE Group2Type.type = 'Temperature Site';", - [], - function (err2, results) { - if (err2) { - console.log(`Some other error getting node settings: ${err2}`) - connection.release() - } else { - results.forEach(function(thisRow) { - settings[thisRow.node] = { - measurement_interval: thisRow.measurement_interval_minutes, - measurement_offset: thisRow.offset_from_midnight_minutes, - wake_window_length: thisRow.wake_window_length_minutes, - temperature_site: thisRow.node_group - } - }) - connection.release() - } - } - ) + results.forEach(function (thisRow) { + settings[thisRow.node_id] = { + measurement_interval: thisRow.measurement_interval_minutes, + measurement_offset: thisRow.offset_from_midnight_minutes, + wake_window_length: thisRow.wake_window_length_minutes, + temperature_site: thisRow.group_id, + }; + }); + connection.release(); } - } - ) + }, + ); } - }) - return pool + }); + return pool; } -const socket = dgram.createSocket('udp4'); -let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'}) +const socket = dgram.createSocket("udp4"); +let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, { + flags: "a+", +}); -function make_string_string_message(msg_type, version, source, target, key, value, this_message_id) { - theHeader = make_message_header(msg_type, version, source, target, this_message_id) - console.log('message header: ', theHeader) - theData = Buffer.alloc(key.length + value.length + 3) - theData.write(key,0) - theData.writeUInt8(255, key.length) - theData.write(value, key.length+1) - theData.writeUInt8(255, key.length+value.length+2) - return Buffer.concat([theHeader, theData, Buffer.from([0])]) +function make_string_string_message( + msg_type, + version, + source, + target, + key, + value, + this_message_id, +) { + theHeader = make_message_header( + msg_type, + version, + source, + target, + this_message_id, + ); + console.log("message header: ", theHeader); + theData = Buffer.alloc(key.length + value.length + 3); + theData.write(key, 0); + theData.writeUInt8(255, key.length); + theData.write(value, key.length + 1); + theData.writeUInt8(255, key.length + value.length + 2); + return Buffer.concat([theHeader, theData, Buffer.from([0])]); } -function make_message_header(msg_type, version, source, target, this_message_id) { - target = ('0000000000000000' + target).slice(-16) - target_buf = Buffer.from(target, 'hex') - source = (source = '0000000000000000' + source).slice(-16) - source_buf = Buffer.from(source, 'hex') - timestamp = Math.floor(Date.now() / 1000) - 946684800 // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01 - timestamp_buf = Buffer.alloc(4) - timestamp_buf.writeUInt32BE(timestamp) - buf = Buffer.alloc(4) - buf.writeUInt16BE(this_message_id, 0) - buf.writeUInt8(msg_type, 2) - buf.writeUInt8(version, 3) - header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]) - return header +function make_message_header( + msg_type, + version, + source, + target, + this_message_id, +) { + target = ("0000000000000000" + target).slice(-16); + target_buf = Buffer.from(target, "hex"); + source = (source = "0000000000000000" + source).slice(-16); + source_buf = Buffer.from(source, "hex"); + timestamp = Math.floor(Date.now() / 1000) - 946684800; // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01 + timestamp_buf = Buffer.alloc(4); + timestamp_buf.writeUInt32BE(timestamp); + buf = Buffer.alloc(4); + buf.writeUInt16BE(this_message_id, 0); + buf.writeUInt8(msg_type, 2); + buf.writeUInt8(version, 3); + header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]); + return header; } function sendFile(socket, rinfo, thisFilePath, fileList) { - theFilePath = path.join(thisFilePath, fileList[0]) - fs.readFile(theFilePath, 'utf-8', (err,data) => { - console.log('have read file') - if(err) { - console.log(err) - } else { - console.log('make the message to send') - const theMsg = Buffer.from(make_string_string_message(8, 1, '00', '4827e28f7778', fileList[0], data, 537)).toString('hex') - console.log('chunk the message') - const theRespChunks = cellularChunking.chunk_message(theMsg, message_id) - console.log('have chunks') - thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList) - } - }) + theFilePath = path.join(thisFilePath, fileList[0]); + fs.readFile(theFilePath, "utf-8", (err, data) => { + console.log("have read file"); + if (err) { + console.log(err); + } else { + console.log("make the message to send"); + const theMsg = Buffer.from( + make_string_string_message( + 8, + 1, + "00", + "4827e28f7778", + fileList[0], + data, + 537, + ), + ).toString("hex"); + console.log("chunk the message"); + const theRespChunks = cellularChunking.chunk_message(theMsg, message_id); + console.log("have chunks"); + thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList); + } + }); } function handleMessage(message, rinfo) { - console.log('store message: ', message) + console.log("store message: ", message); // this is going to be for storing the new format for the data // this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu - console.log(new Date().toISOString(), ' store data 2 route ', Buffer.from(message, 'hex').toString('hex')) - const parsed_data = message_parser.parse_messages({data: message}) - let settingsString = Buffer.from([]) + console.log( + new Date().toISOString(), + " store data 2 route ", + Buffer.from(message, "hex").toString("hex"), + ); + const parsed_data = message_parser.parse_messages({ data: message }); + let settingsString = Buffer.from([]); //const seenIDs = [] - parsed_data.forEach(function(thisData) { - if(thisData.source.replaceAll('0', '') === '' || thisData.source.startsWith('f') || thisData.source.startsWith('8')) { - return + parsed_data.forEach(function (thisData) { + if ( + thisData.source.replaceAll("0", "") === "" || + thisData.source.startsWith("f") || + thisData.source.startsWith("8") + ) { + return; } else { - console.log('parsed data: ', thisData) + console.log("parsed data: ", thisData); } - dataBuffer.push(thisData) - if((!response_record[thisData.source]) || response_record[thisData.source] + 1000 * 600 < Date.now()) { // respond at moch once every 600 seconds to any single node - console.log("respond with settings") + dataBuffer.push(thisData); + if ( + !response_record[thisData.source] || + response_record[thisData.source] + 1000 * 600 < Date.now() + ) { + // respond at moch once every 600 seconds to any single node + console.log("respond with settings"); //settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)]) - response_record[thisData.source] = Date.now() + response_record[thisData.source] = Date.now(); } - }) + }); - if(settingsString.length > 0) { - console.log('settingsString: ', settingsString.toString('hex')) - message_id = message_id+1 - stream.write('< ' + (new Date().toISOString()) + ' ' + settingsString.toString('hex') + '\n') + if (settingsString.length > 0) { + console.log("settingsString: ", settingsString.toString("hex")); + message_id = message_id + 1; + stream.write( + "< " + + new Date().toISOString() + + " " + + settingsString.toString("hex") + + "\n", + ); // breaking in here so we can test sending a file - const theRespChunks = cellularChunking.chunk_message(settingsString, message_id) - console.log('msg chunks: ', theRespChunks) - theRespChunks.forEach(function(thisChunk) { - console.log('thisChunk: ', thisChunk) + const theRespChunks = cellularChunking.chunk_message( + settingsString, + message_id, + ); + console.log("msg chunks: ", theRespChunks); + theRespChunks.forEach(function (thisChunk) { + console.log("thisChunk: ", thisChunk); socket.send(thisChunk, rinfo.port, rinfo.address, (err) => { - if(err) { - console.log(err) + if (err) { + console.log(err); } else { - console.log('sent message on') + console.log("sent message on"); } - }) - }) + }); + }); } - if(!sentFiles) { - console.log('send files') - sentFiles = true - const thiFilePath = './firmware/Components' + if (!sentFiles) { + console.log("send files"); + sentFiles = true; + const thiFilePath = "./firmware/Components"; fs.readdir(thiFilePath, (err, theFileList) => { - console.log(theFileList) - if(err) { - console.log(err) + console.log(theFileList); + if (err) { + console.log(err); } else { - sendFile(socket, rinfo, thiFilePath, theFileList) + sendFile(socket, rinfo, thiFilePath, theFileList); } - }) + }); } } function makeSettingsString(msgData) { - const device_id = msgData.device_id - msgData.version = msgData.version || 1 - // make a message with all the info in it - const location_id = status.devices[device_id]?.location_id - const site_id = status.locations[location_id]?.site_id || '7d931bd4-bf0d-459f-8864-6d6405908b9e' - const timezoneOffset = getOffset(status.sites[site_id]?.timezone || 'UTC') - const measurementInterval = status.settings[status.settings_groups[device_id]]?.sleep_interval || '00:30:00' - const wakeWindowLength = status.settings[status.settings_groups[device_id]]?.window_length || '00:06:00' - const measurementOffset = status.settings[status.settings_groups[device_id]]?.start_time || '00:00:00' - msg = Buffer.from([]) - settings = { - '\x4B': [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset], - } + const device_id = msgData.device_id; + msgData.version = msgData.version || 1; + // make a message with all the info in it + const location_id = status.devices[device_id]?.location_id; + const site_id = + status.locations[location_id]?.site_id || + "7d931bd4-bf0d-459f-8864-6d6405908b9e"; + const timezoneOffset = getOffset(status.sites[site_id]?.timezone || "UTC"); + const measurementInterval = + status.settings[status.settings_groups[device_id]]?.sleep_interval || + "00:30:00"; + const wakeWindowLength = + status.settings[status.settings_groups[device_id]]?.window_length || + "00:06:00"; + const measurementOffset = + status.settings[status.settings_groups[device_id]]?.start_time || + "00:00:00"; + msg = Buffer.from([]); + settings = { + "\x4B": [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset], + }; - msg = message_parser.make_two_byte_data_message(3, msgData.version, '00', device_id, settings) - settings2 = { - '\x01': status.sites[site_id]?.name || 'Unregistered', - '\x02': status.devices[device_id]?.name || 'Unregistered' - } - msg = Buffer.concat([msg, message_parser.make_byte_string_message(7, msgData.version, '00', device_id, settings2)]) - settings3 = { - '\x03': [sqlTimeToNumber(measurementInterval)], - '\x04': [sqlTimeToNumber(measurementOffset)], - '\x05': [sqlTimeToNumber(wakeWindowLength)], - } - msg = Buffer.concat([msg, message_parser.make_four_byte_data_message(4, msgData.version, '00', device_id, settings3)]) - return msg + msg = message_parser.make_two_byte_data_message( + 3, + msgData.version, + "00", + device_id, + settings, + ); + settings2 = { + "\x01": status.sites[site_id]?.name || "Unregistered", + "\x02": status.devices[device_id]?.name || "Unregistered", + }; + msg = Buffer.concat([ + msg, + message_parser.make_byte_string_message( + 7, + msgData.version, + "00", + device_id, + settings2, + ), + ]); + settings3 = { + "\x03": [sqlTimeToNumber(measurementInterval)], + "\x04": [sqlTimeToNumber(measurementOffset)], + "\x05": [sqlTimeToNumber(wakeWindowLength)], + }; + msg = Buffer.concat([ + msg, + message_parser.make_four_byte_data_message( + 4, + msgData.version, + "00", + device_id, + settings3, + ), + ]); + return msg; } function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) { - socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => { - if(err) { - console.log(err) - } else { - console.log("send chunk") - if (theChunks.length > 1) { - setTimeout(thisSend, 1000, theChunks.slice(1), socket, rinfo, thisFilePath, fileList) - } else if (fileList.length > 1) { - sendFile(socket, rinfo, thisFilePath, fileList.slice(1)) - } - } - }) + socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => { + if (err) { + console.log(err); + } else { + console.log("send chunk"); + if (theChunks.length > 1) { + setTimeout( + thisSend, + 1000, + theChunks.slice(1), + socket, + rinfo, + thisFilePath, + fileList, + ); + } else if (fileList.length > 1) { + sendFile(socket, rinfo, thisFilePath, fileList.slice(1)); + } + } + }); } function sqlTimeToNumber(theTime) { try { - const parts = theTime.split(':') - timeOut = Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]) - if(timeOut > 2**16) { - timeOut = 2**16-1 + const parts = theTime.split(":"); + timeOut = + Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]); + if (timeOut > 2 ** 16) { + timeOut = 2 ** 16 - 1; } - return timeOut + return timeOut; } catch (e) { - if(theTime !== null) { - console.log('sqlTimeToNumber error: ', e) + if (theTime !== null) { + console.log("sqlTimeToNumber error: ", e); } - return 0 + return 0; } } -function storeType1Messages(msgs) { // 2 byte measurement data +function storeType1Messages(msgs) { + // 2 byte measurement data try { - if(msgs.length === 0) { - return - } else if(msgs.length > 2500) { // you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin + if (msgs.length === 0) { + return; + } else if (msgs.length > 2500) { + // you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin // you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month. - const num_batches = Math.floor(msgs.length / 2500) + const num_batches = Math.floor(msgs.length / 2500); for (let i = 0; i < num_batches; i++) { - storeType1Messages(msgs.slice(i * 2500, (i+1) * 2500)) + storeType1Messages(msgs.slice(i * 2500, (i + 1) * 2500)); } - storeType1Messages(msgs.slice(-1 * (msgs.length % 2500))) + storeType1Messages(msgs.slice(-1 * (msgs.length % 2500))); } else { - let theseParams = [] - msgs.forEach(function(msg) { - theseParams.push(msg.source.slice(4),msg.reporting_node || "000000000000",settings[msg.source.slice(4)].temperature_site,msg.timestamp,msg.data["18_inch_temperature"] || 500000,msg.data["36_inch_temperature"] || 500000,msg.data.device_temperature || 500000,msg.data.ambient_temperature || 500000,msg.data.relative_humidity || 500000,msg.data.barometric_pressure || 500000,msg.data.accelerometer_x || 500000,msg.data.accelerometer_y || 500000,msg.data.accelerometer_z || 500000,msg.data.battery_charge_percent || 500000,msg.data.battery_voltage || 500000,msg.data.remaining_charge_capacity || 500000) - }) - const thisSQL = "INSERT IGNORE INTO Measurement (source_node, reporting_node, associated_group, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity) VALUES " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),".repeat(msgs.length).slice(0,-1) + ";" - pool.getConnection( - function(err, connection) { - if(err) { - console.log(`Error storing type 1 message: ${err}`) - } else { - connection.execute( - thisSQL, - theseParams, - function(err2, results) { - if(err2) { - console.log(`Some other error storing type 1 message: ${err2}`) - connection.release() - } else { - connection.release() - // TODO: anything here? - } - } - ) - } + let theseParams = []; + msgs.forEach(function (msg) { + // YYYY-MM-DD HH:mm:ss + const server_received_time = sqlCurrentTimestamp(); + theseParams.push( + msg.source.slice(4), + msg.reporting_node || "000000000000", + settings[msg.source.slice(4)].temperature_site, + msg.timestamp, + msg.data["18_inch_temperature"] || 500000, + msg.data["36_inch_temperature"] || 500000, + msg.data.device_temperature || 500000, + msg.data.ambient_temperature || 500000, + msg.data.relative_humidity || 500000, + msg.data.barometric_pressure || 500000, + msg.data.accelerometer_x || 500000, + msg.data.accelerometer_y || 500000, + msg.data.accelerometer_z || 500000, + msg.data.battery_charge_percent || 500000, + msg.data.battery_voltage || 500000, + msg.data.remaining_charge_capacity || 500000, + server_received_time, + ); + }); + const thisSQL = + "INSERT IGNORE INTO datavis_measurement (source_node_id, reporting_node_id, associated_group_id, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity, server_received_time) VALUES " + + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)," + .repeat(msgs.length) + .slice(0, -1) + + ";"; + pool.getConnection(function (err, connection) { + if (err) { + console.log(`Error storing type 1 message: ${err}`); + } else { + connection.execute(thisSQL, theseParams, function (err2, results) { + if (err2) { + console.log(`Some other error storing type 1 message: ${err2}`); + connection.release(); + } else { + connection.release(); + // TODO: anything here? + } + }); } - ) + }); } } catch (e) { - console.log(`Some outer error storing type 1 message: ${e}`) + console.log(`Some outer error storing type 1 message: ${e}`); } } -function storeType6Messages(msgs) { // four byte device status +function storeType6Messages(msgs) { + // four byte device status try { - let theseParams = [] - let n = msgs.length - msgs.forEach(function(msg) { - theseParams.push(msg.source.slice(4),msg.timestamp,msg.data.measurement_interval || 360,msg.data.wake_window_length || 4,msg.data.measurement_offset || 0,msg.data.sleep_duration || 0,0,new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000).toISOString().slice(0,-5)) - }) - const thisSQL = "INSERT IGNORE INTO NodeReportedStatus (node,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated) VALUES " + "(?,?,?,?,?,?,?,?),".repeat(n).slice(0,-1) + ";" - pool.getConnection( - function(err, connection) { - if(err) { - console.log(`Error storing type 6 message: ${err}`) - } else { - connection.execute( - thisSQL, - theseParams, - function(err2, results) { - if(err2) { - console.log(`Some other error storing type 6 message: ${err2}`) - connection.release() - } else { - connection.release() - // TODO: anything here? - } - } - ) - } + let theseParams = []; + let n = msgs.length; + msgs.forEach(function (msg) { + theseParams.push( + msg.source.slice(4), + msg.timestamp, + msg.data.measurement_interval || 360, + msg.data.wake_window_length || 4, + msg.data.measurement_offset || 0, + msg.data.sleep_duration || 0, + 0, + new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000) + .toISOString() + .slice(0, -5), + sqlCurrentTimestamp(), + ); + }); + const thisSQL = + "INSERT IGNORE INTO datavis_nodereportedstatus (node_id,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated,server_received_time) VALUES " + + "(?,?,?,?,?,?,?,?,?),".repeat(n).slice(0, -1) + + ";"; + pool.getConnection(function (err, connection) { + if (err) { + console.log(`Error storing type 6 message: ${err}`); + } else { + connection.execute(thisSQL, theseParams, function (err2, results) { + if (err2) { + console.log(`Some other error storing type 6 message: ${err2}`); + connection.release(); + } else { + connection.release(); + // TODO: anything here? + } + }); } - ) + }); } catch (e) { - console.log(`Some outer error storing type 6 message: ${e}`) + console.log(`Some outer error storing type 6 message: ${e}`); } } -function storeType13Messages(msgs) { // file version manifest +function storeType13Messages(msgs) { + // file version manifest try { - let theseParams = [] - let n = 0 - msgs.forEach(function(msg) { - Object.keys(msg.data).forEach(function(thisFileId) { - n += 1 - theseParams.push(msg.source.slice(4),msg.timestamp,thisFileId,msg.data[thisFileId] || 0) - }) - }) - const thisSQL = "INSERT INTO NodeFileManifest (node,collection_time,program_id,program_version) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";" - pool.getConnection( - function(err, connection) { - if(err) { - console.log(`Error storing type 13 message: ${err}`) - } else { - connection.execute( - thisSQL, - theseParams, - function(err2, results) { - if(err2) { - console.log(`Some other error storing type 13 message: ${err}`) - connection.release() - } else { - connection.release() - // TODO: anything here? - } - } - ) - } + let theseParams = []; + let n = 0; + msgs.forEach(function (msg) { + Object.keys(msg.data).forEach(function (thisFileId) { + n += 1; + theseParams.push( + msg.source.slice(4), + msg.timestamp, + thisFileId, + msg.data[thisFileId] || 0, + sqlCurrentTimestamp(), + ); + }); + }); + const thisSQL = + "INSERT INTO datavis_nodefilemanifest (node_id,collection_time,program_id,program_version,server_received_time) VALUES " + + "(?,?,?,?,?),".repeat(n).slice(0, -1) + + ";"; + pool.getConnection(function (err, connection) { + if (err) { + console.log(`Error storing type 13 message: ${err}`); + } else { + connection.execute(thisSQL, theseParams, function (err2, results) { + if (err2) { + console.log(`Some other error storing type 13 message: ${err}`); + connection.release(); + } else { + connection.release(); + // TODO: anything here? + } + }); } - ) + }); } catch (e) { - console.log(`Some outer error storing type 13 message: ${e}`) + console.log(`Some outer error storing type 13 message: ${e}`); } } -function storeType17Messages(msgs) { // rssi data +function storeType17Messages(msgs) { + // rssi data try { - let theseParams = [] - let n = 0 - msgs.forEach(function(msg) { - Object.keys(msg.data).forEach(function(thisId) { - n += 1 - theseParams.push(msg.source.slice(4),thisId,msg.timestamp, msg.data[thisId] || 0) - }) - }) - const thisSQL = "INSERT IGNORE INTO NodeRssiRecord (node,neighbor,collection_time,rssi) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";" - pool.getConnection( - function(err, connection) { - if(err) { - console.log(`Error storing type 17 message: ${err}`) - } else { - connection.execute( - thisSQL, - theseParams, - function(err2, results) { - if(err2) { - console.log(`Some other error storing type 17 message: ${err2}`) - connection.release() - } else { - connection.release() - // TODO: anything here? - } - } - ) - } + let theseParams = []; + let n = 0; + msgs.forEach(function (msg) { + Object.keys(msg.data).forEach(function (thisId) { + n += 1; + theseParams.push( + msg.source.slice(4), + thisId, + msg.timestamp, + msg.data[thisId] || 0, + sqlCurrentTimestamp(), + ); + }); + }); + const thisSQL = + "INSERT IGNORE INTO datavis_noderssirecord (node_id,neighbor_id,collection_time,rssi,server_received_time) VALUES " + + "(?,?,?,?,?),".repeat(n).slice(0, -1) + + ";"; + pool.getConnection(function (err, connection) { + if (err) { + console.log(`Error storing type 17 message: ${err}`); + } else { + connection.execute(thisSQL, theseParams, function (err2, results) { + if (err2) { + console.log(`Some other error storing type 17 message: ${err2}`); + connection.release(); + } else { + connection.release(); + // TODO: anything here? + } + }); } - ) + }); } catch (e) { - console.log(`Some outer error storing type 17 message: ${e}`) + console.log(`Some outer error storing type 17 message: ${e}`); } } function storeMessages() { - console.log('store messages!!') + console.log("store messages!!"); try { - const type1Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 1 }) - const type6Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 6 }) - const type13Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 13 }) - const type17Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 17 }) + const type1Messages = dataBuffer.filter(function (thisMsg) { + return thisMsg.msg_type == 1; + }); + const type6Messages = dataBuffer.filter(function (thisMsg) { + return thisMsg.msg_type == 6; + }); + const type13Messages = dataBuffer.filter(function (thisMsg) { + return thisMsg.msg_type == 13; + }); + const type17Messages = dataBuffer.filter(function (thisMsg) { + return thisMsg.msg_type == 17; + }); // store the data in the database // check message type, then store it based on that if (type1Messages.length > 0) { - console.log("type 1 messages!") - storeType1Messages(type1Messages) + console.log("type 1 messages!"); + storeType1Messages(type1Messages); } if (type6Messages.length > 0) { - console.log("type 6 messages!") - storeType6Messages(type6Messages) + console.log("type 6 messages!"); + storeType6Messages(type6Messages); } if (type13Messages.length > 0) { - console.log("type 13 messages!") - storeType13Messages(type13Messages) + console.log("type 13 messages!"); + storeType13Messages(type13Messages); } if (type17Messages.length > 0) { - console.log("type 17 messages!") - storeType17Messages(type17Messages) + console.log("type 17 messages!"); + storeType17Messages(type17Messages); } - dataBuffer = [] + dataBuffer = []; } catch (e) { - console.log(`Some error storing messages: ${e}`) + console.log(`Some error storing messages: ${e}`); } } -setInterval(storeMessages, 10000) // store messages every minute +function sqlCurrentTimestamp() { + // YYYY-MM-DDTHH:mm:ss.mmmZ + const this_instant = new Date().toISOString(); + // YYYY-MM-DD, HH:mm:ss.mmmZ + const [date, time] = this_instant.split("T"); + // YYYY-MM-DD HH:mm:ss + const server_received_time = `${date} ${time.slice(0, 8)}`; + return server_received_time; +} -socket.on('listening', () => { +setInterval(storeMessages, 10000); // store messages every minute + +socket.on("listening", () => { let addr = socket.address(); console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`); }); -socket.on('error', (err) => { +socket.on("error", (err) => { console.error(`UDP error: ${err.stack}`); }); -socket.on('message', (msg, rinfo) => { - if(thisDate !== new Date().toISOString().slice(0,10)) { - thisDate = new Date().toISOString().slice(0,10) - stream.close() - stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'}) - } - stream.write('> ' + (new Date().toISOString()) + ' ' + msg.toString('hex') + '\n') - cellularChunking.receive_chunk(msg, handleMessage, rinfo) -}) +socket.on("message", (msg, rinfo) => { + if (thisDate !== new Date().toISOString().slice(0, 10)) { + thisDate = new Date().toISOString().slice(0, 10); + stream.close(); + stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, { + flags: "a+", + }); + } + stream.write( + "> " + new Date().toISOString() + " " + msg.toString("hex") + "\n", + ); + cellularChunking.receive_chunk(msg, handleMessage, rinfo); +}); socket.bind(57321); diff --git a/node/test_sender/index.js b/node/test_sender/index.js index 23ccda5..328cd8f 100644 --- a/node/test_sender/index.js +++ b/node/test_sender/index.js @@ -1,28 +1,32 @@ -const dgram = require('dgram') -const fs = require('fs') -const path = require('path') -const cellularChunking = require('./../cellularChunking') -const messageParser = require('./../messages') +const dgram = require("dgram"); +const fs = require("fs"); +const path = require("path"); +const cellularChunking = require("./../cellularChunking"); +const messageParser = require("./../messages"); -const socket = dgram.createSocket('udp4'); +const socket = dgram.createSocket("udp4"); function send_next_line(i, theLines) { + console.log(`sending line ${i} of ${theLines.length}`); if (theLines.length > i) { - socket.send(Buffer.from(theLines[i], 'hex'), 57321, '127.0.0.1', function (err) { - setTimeout(send_next_line, 100, i+1, theLines) - //send_next_line(i + 1, theLines) - }) + socket.send( + Buffer.from(theLines[i], "hex"), + 57321, + "127.0.0.1", + function (err) { + setTimeout(send_next_line, 100, i + 1, theLines); + //send_next_line(i + 1, theLines) + }, + ); } else { - process.exit() + process.exit(); } } // open the file with the messages and then send them try { - const fileData = fs.readFileSync('./data/test_data.txt', 'utf8') - const theLines = fileData.split('\n') - let i = 0 - send_next_line(i,theLines) -} catch (e) { - -} + const fileData = fs.readFileSync("./data/test_data.txt", "utf8"); + const theLines = fileData.split("\n"); + let i = 0; + send_next_line(i, theLines); +} catch (e) {}