Node: Update to work with django schema

main
ookjosh 2026-01-12 00:12:33 -07:00
parent 97896772ce
commit 926a68d6b6
2 changed files with 496 additions and 335 deletions

View File

@ -1,24 +1,24 @@
const dgram = require('dgram') const dgram = require("dgram");
const fs = require('fs') const fs = require("fs");
const path = require('path') const path = require("path");
const mysql = require("mysql2"); const mysql = require("mysql2");
const cellularChunking = require('./cellularChunking') const cellularChunking = require("./cellularChunking");
const message_parser = require('./messages') const message_parser = require("./messages");
let thisDate = new Date().toISOString().slice(0,10) let thisDate = new Date().toISOString().slice(0, 10);
const filePath = './records' const filePath = "./records";
let message_id = 0 let message_id = 0;
let sentFiles = true let sentFiles = true;
let dataBuffer = [] let dataBuffer = [];
const response_record = {} const response_record = {};
const settings = {} const settings = {};
const pool = createPool() const pool = createPool();
if(!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) { if (!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) {
fs.mkdirSync(filePath) fs.mkdirSync(filePath);
} }
function createPool() { function createPool() {
@ -28,413 +28,570 @@ function createPool() {
password: process.env.db_password, password: process.env.db_password,
database: process.env.db_database, database: process.env.db_database,
port: process.env.db_port, port: process.env.db_port,
timezone: '+00:00' timezone: "+00:00",
}); });
pool.getConnection(function(err,connection) { pool.getConnection(function (err, connection) {
if (err) { if (err) {
console.log('error setting the database') console.log("error setting the database");
} else { } else {
connection.execute( connection.execute(
"USE Chickens;", `SELECT
datavis_node2group.node_id,
datavis_node2group.group_id,
datavis_nodeconfiguration.measurement_interval_minutes,
datavis_nodeconfiguration.offset_from_midnight_minutes,
datavis_nodeconfiguration.wake_window_length_minutes
FROM datavis_grouptype
JOIN datavis_group2type
ON datavis_grouptype.unique_id=datavis_group2type.type_id
JOIN datavis_node2group
ON datavis_group2type.group_id=datavis_node2group.group_id
JOIN datavis_nodeconfiguration
ON datavis_nodeconfiguration.node_id=datavis_node2group.node_id
WHERE datavis_grouptype.name = 'Temperature Site';`,
[], [],
function(err2, results) { function (err2, results) {
if(err2) { if (err2) {
console.log(`Some other error setting database: ${err2}`) console.log(`Some other error getting node settings: ${err2}`);
connection.release() connection.release();
} else { } else {
connection.execute( results.forEach(function (thisRow) {
"SELECT Node2Group.node,Node2Group.node_group,NodeConfiguration.measurement_interval_minutes,NodeConfiguration.offset_from_midnight_minutes,NodeConfiguration.wake_window_length_minutes FROM GroupType JOIN Group2Type ON GroupType.name=Group2Type.type JOIN Node2Group ON Group2Type.group_name=Node2Group.node_group JOIN NodeConfiguration ON NodeConfiguration.node=Node2Group.node WHERE Group2Type.type = 'Temperature Site';", settings[thisRow.node_id] = {
[], measurement_interval: thisRow.measurement_interval_minutes,
function (err2, results) { measurement_offset: thisRow.offset_from_midnight_minutes,
if (err2) { wake_window_length: thisRow.wake_window_length_minutes,
console.log(`Some other error getting node settings: ${err2}`) temperature_site: thisRow.group_id,
connection.release() };
} else { });
results.forEach(function(thisRow) { connection.release();
settings[thisRow.node] = {
measurement_interval: thisRow.measurement_interval_minutes,
measurement_offset: thisRow.offset_from_midnight_minutes,
wake_window_length: thisRow.wake_window_length_minutes,
temperature_site: thisRow.node_group
}
})
connection.release()
}
}
)
} }
} },
) );
} }
}) });
return pool return pool;
} }
const socket = dgram.createSocket('udp4'); const socket = dgram.createSocket("udp4");
let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'}) let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
flags: "a+",
});
function make_string_string_message(msg_type, version, source, target, key, value, this_message_id) { function make_string_string_message(
theHeader = make_message_header(msg_type, version, source, target, this_message_id) msg_type,
console.log('message header: ', theHeader) version,
theData = Buffer.alloc(key.length + value.length + 3) source,
theData.write(key,0) target,
theData.writeUInt8(255, key.length) key,
theData.write(value, key.length+1) value,
theData.writeUInt8(255, key.length+value.length+2) this_message_id,
return Buffer.concat([theHeader, theData, Buffer.from([0])]) ) {
theHeader = make_message_header(
msg_type,
version,
source,
target,
this_message_id,
);
console.log("message header: ", theHeader);
theData = Buffer.alloc(key.length + value.length + 3);
theData.write(key, 0);
theData.writeUInt8(255, key.length);
theData.write(value, key.length + 1);
theData.writeUInt8(255, key.length + value.length + 2);
return Buffer.concat([theHeader, theData, Buffer.from([0])]);
} }
function make_message_header(msg_type, version, source, target, this_message_id) { function make_message_header(
target = ('0000000000000000' + target).slice(-16) msg_type,
target_buf = Buffer.from(target, 'hex') version,
source = (source = '0000000000000000' + source).slice(-16) source,
source_buf = Buffer.from(source, 'hex') target,
timestamp = Math.floor(Date.now() / 1000) - 946684800 // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01 this_message_id,
timestamp_buf = Buffer.alloc(4) ) {
timestamp_buf.writeUInt32BE(timestamp) target = ("0000000000000000" + target).slice(-16);
buf = Buffer.alloc(4) target_buf = Buffer.from(target, "hex");
buf.writeUInt16BE(this_message_id, 0) source = (source = "0000000000000000" + source).slice(-16);
buf.writeUInt8(msg_type, 2) source_buf = Buffer.from(source, "hex");
buf.writeUInt8(version, 3) timestamp = Math.floor(Date.now() / 1000) - 946684800; // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01
header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]) timestamp_buf = Buffer.alloc(4);
return header timestamp_buf.writeUInt32BE(timestamp);
buf = Buffer.alloc(4);
buf.writeUInt16BE(this_message_id, 0);
buf.writeUInt8(msg_type, 2);
buf.writeUInt8(version, 3);
header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]);
return header;
} }
function sendFile(socket, rinfo, thisFilePath, fileList) { function sendFile(socket, rinfo, thisFilePath, fileList) {
theFilePath = path.join(thisFilePath, fileList[0]) theFilePath = path.join(thisFilePath, fileList[0]);
fs.readFile(theFilePath, 'utf-8', (err,data) => { fs.readFile(theFilePath, "utf-8", (err, data) => {
console.log('have read file') console.log("have read file");
if(err) { if (err) {
console.log(err) console.log(err);
} else { } else {
console.log('make the message to send') console.log("make the message to send");
const theMsg = Buffer.from(make_string_string_message(8, 1, '00', '4827e28f7778', fileList[0], data, 537)).toString('hex') const theMsg = Buffer.from(
console.log('chunk the message') make_string_string_message(
const theRespChunks = cellularChunking.chunk_message(theMsg, message_id) 8,
console.log('have chunks') 1,
thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList) "00",
} "4827e28f7778",
}) fileList[0],
data,
537,
),
).toString("hex");
console.log("chunk the message");
const theRespChunks = cellularChunking.chunk_message(theMsg, message_id);
console.log("have chunks");
thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList);
}
});
} }
function handleMessage(message, rinfo) { function handleMessage(message, rinfo) {
console.log('store message: ', message) console.log("store message: ", message);
// this is going to be for storing the new format for the data // this is going to be for storing the new format for the data
// this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu // this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu
console.log(new Date().toISOString(), ' store data 2 route ', Buffer.from(message, 'hex').toString('hex')) console.log(
const parsed_data = message_parser.parse_messages({data: message}) new Date().toISOString(),
let settingsString = Buffer.from([]) " store data 2 route ",
Buffer.from(message, "hex").toString("hex"),
);
const parsed_data = message_parser.parse_messages({ data: message });
let settingsString = Buffer.from([]);
//const seenIDs = [] //const seenIDs = []
parsed_data.forEach(function(thisData) { parsed_data.forEach(function (thisData) {
if(thisData.source.replaceAll('0', '') === '' || thisData.source.startsWith('f') || thisData.source.startsWith('8')) { if (
return thisData.source.replaceAll("0", "") === "" ||
thisData.source.startsWith("f") ||
thisData.source.startsWith("8")
) {
return;
} else { } else {
console.log('parsed data: ', thisData) console.log("parsed data: ", thisData);
} }
dataBuffer.push(thisData) dataBuffer.push(thisData);
if((!response_record[thisData.source]) || response_record[thisData.source] + 1000 * 600 < Date.now()) { // respond at moch once every 600 seconds to any single node if (
console.log("respond with settings") !response_record[thisData.source] ||
response_record[thisData.source] + 1000 * 600 < Date.now()
) {
// respond at moch once every 600 seconds to any single node
console.log("respond with settings");
//settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)]) //settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)])
response_record[thisData.source] = Date.now() response_record[thisData.source] = Date.now();
} }
}) });
if(settingsString.length > 0) { if (settingsString.length > 0) {
console.log('settingsString: ', settingsString.toString('hex')) console.log("settingsString: ", settingsString.toString("hex"));
message_id = message_id+1 message_id = message_id + 1;
stream.write('< ' + (new Date().toISOString()) + ' ' + settingsString.toString('hex') + '\n') stream.write(
"< " +
new Date().toISOString() +
" " +
settingsString.toString("hex") +
"\n",
);
// breaking in here so we can test sending a file // breaking in here so we can test sending a file
const theRespChunks = cellularChunking.chunk_message(settingsString, message_id) const theRespChunks = cellularChunking.chunk_message(
console.log('msg chunks: ', theRespChunks) settingsString,
theRespChunks.forEach(function(thisChunk) { message_id,
console.log('thisChunk: ', thisChunk) );
console.log("msg chunks: ", theRespChunks);
theRespChunks.forEach(function (thisChunk) {
console.log("thisChunk: ", thisChunk);
socket.send(thisChunk, rinfo.port, rinfo.address, (err) => { socket.send(thisChunk, rinfo.port, rinfo.address, (err) => {
if(err) { if (err) {
console.log(err) console.log(err);
} else { } else {
console.log('sent message on') console.log("sent message on");
} }
}) });
}) });
} }
if(!sentFiles) { if (!sentFiles) {
console.log('send files') console.log("send files");
sentFiles = true sentFiles = true;
const thiFilePath = './firmware/Components' const thiFilePath = "./firmware/Components";
fs.readdir(thiFilePath, (err, theFileList) => { fs.readdir(thiFilePath, (err, theFileList) => {
console.log(theFileList) console.log(theFileList);
if(err) { if (err) {
console.log(err) console.log(err);
} else { } else {
sendFile(socket, rinfo, thiFilePath, theFileList) sendFile(socket, rinfo, thiFilePath, theFileList);
} }
}) });
} }
} }
function makeSettingsString(msgData) { function makeSettingsString(msgData) {
const device_id = msgData.device_id const device_id = msgData.device_id;
msgData.version = msgData.version || 1 msgData.version = msgData.version || 1;
// make a message with all the info in it // make a message with all the info in it
const location_id = status.devices[device_id]?.location_id const location_id = status.devices[device_id]?.location_id;
const site_id = status.locations[location_id]?.site_id || '7d931bd4-bf0d-459f-8864-6d6405908b9e' const site_id =
const timezoneOffset = getOffset(status.sites[site_id]?.timezone || 'UTC') status.locations[location_id]?.site_id ||
const measurementInterval = status.settings[status.settings_groups[device_id]]?.sleep_interval || '00:30:00' "7d931bd4-bf0d-459f-8864-6d6405908b9e";
const wakeWindowLength = status.settings[status.settings_groups[device_id]]?.window_length || '00:06:00' const timezoneOffset = getOffset(status.sites[site_id]?.timezone || "UTC");
const measurementOffset = status.settings[status.settings_groups[device_id]]?.start_time || '00:00:00' const measurementInterval =
msg = Buffer.from([]) status.settings[status.settings_groups[device_id]]?.sleep_interval ||
settings = { "00:30:00";
'\x4B': [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset], const wakeWindowLength =
} status.settings[status.settings_groups[device_id]]?.window_length ||
"00:06:00";
const measurementOffset =
status.settings[status.settings_groups[device_id]]?.start_time ||
"00:00:00";
msg = Buffer.from([]);
settings = {
"\x4B": [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset],
};
msg = message_parser.make_two_byte_data_message(3, msgData.version, '00', device_id, settings) msg = message_parser.make_two_byte_data_message(
settings2 = { 3,
'\x01': status.sites[site_id]?.name || 'Unregistered', msgData.version,
'\x02': status.devices[device_id]?.name || 'Unregistered' "00",
} device_id,
msg = Buffer.concat([msg, message_parser.make_byte_string_message(7, msgData.version, '00', device_id, settings2)]) settings,
settings3 = { );
'\x03': [sqlTimeToNumber(measurementInterval)], settings2 = {
'\x04': [sqlTimeToNumber(measurementOffset)], "\x01": status.sites[site_id]?.name || "Unregistered",
'\x05': [sqlTimeToNumber(wakeWindowLength)], "\x02": status.devices[device_id]?.name || "Unregistered",
} };
msg = Buffer.concat([msg, message_parser.make_four_byte_data_message(4, msgData.version, '00', device_id, settings3)]) msg = Buffer.concat([
return msg msg,
message_parser.make_byte_string_message(
7,
msgData.version,
"00",
device_id,
settings2,
),
]);
settings3 = {
"\x03": [sqlTimeToNumber(measurementInterval)],
"\x04": [sqlTimeToNumber(measurementOffset)],
"\x05": [sqlTimeToNumber(wakeWindowLength)],
};
msg = Buffer.concat([
msg,
message_parser.make_four_byte_data_message(
4,
msgData.version,
"00",
device_id,
settings3,
),
]);
return msg;
} }
function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) { function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) {
socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => { socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => {
if(err) { if (err) {
console.log(err) console.log(err);
} else { } else {
console.log("send chunk") console.log("send chunk");
if (theChunks.length > 1) { if (theChunks.length > 1) {
setTimeout(thisSend, 1000, theChunks.slice(1), socket, rinfo, thisFilePath, fileList) setTimeout(
} else if (fileList.length > 1) { thisSend,
sendFile(socket, rinfo, thisFilePath, fileList.slice(1)) 1000,
} theChunks.slice(1),
} socket,
}) rinfo,
thisFilePath,
fileList,
);
} else if (fileList.length > 1) {
sendFile(socket, rinfo, thisFilePath, fileList.slice(1));
}
}
});
} }
function sqlTimeToNumber(theTime) { function sqlTimeToNumber(theTime) {
try { try {
const parts = theTime.split(':') const parts = theTime.split(":");
timeOut = Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]) timeOut =
if(timeOut > 2**16) { Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]);
timeOut = 2**16-1 if (timeOut > 2 ** 16) {
timeOut = 2 ** 16 - 1;
} }
return timeOut return timeOut;
} catch (e) { } catch (e) {
if(theTime !== null) { if (theTime !== null) {
console.log('sqlTimeToNumber error: ', e) console.log("sqlTimeToNumber error: ", e);
} }
return 0 return 0;
} }
} }
function storeType1Messages(msgs) { // 2 byte measurement data function storeType1Messages(msgs) {
// 2 byte measurement data
try { try {
if(msgs.length === 0) { if (msgs.length === 0) {
return return;
} else if(msgs.length > 2500) { // you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin } else if (msgs.length > 2500) {
// you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin
// you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month. // you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month.
const num_batches = Math.floor(msgs.length / 2500) const num_batches = Math.floor(msgs.length / 2500);
for (let i = 0; i < num_batches; i++) { for (let i = 0; i < num_batches; i++) {
storeType1Messages(msgs.slice(i * 2500, (i+1) * 2500)) storeType1Messages(msgs.slice(i * 2500, (i + 1) * 2500));
} }
storeType1Messages(msgs.slice(-1 * (msgs.length % 2500))) storeType1Messages(msgs.slice(-1 * (msgs.length % 2500)));
} else { } else {
let theseParams = [] let theseParams = [];
msgs.forEach(function(msg) { msgs.forEach(function (msg) {
theseParams.push(msg.source.slice(4),msg.reporting_node || "000000000000",settings[msg.source.slice(4)].temperature_site,msg.timestamp,msg.data["18_inch_temperature"] || 500000,msg.data["36_inch_temperature"] || 500000,msg.data.device_temperature || 500000,msg.data.ambient_temperature || 500000,msg.data.relative_humidity || 500000,msg.data.barometric_pressure || 500000,msg.data.accelerometer_x || 500000,msg.data.accelerometer_y || 500000,msg.data.accelerometer_z || 500000,msg.data.battery_charge_percent || 500000,msg.data.battery_voltage || 500000,msg.data.remaining_charge_capacity || 500000) // YYYY-MM-DD HH:mm:ss
}) const server_received_time = sqlCurrentTimestamp();
const thisSQL = "INSERT IGNORE INTO Measurement (source_node, reporting_node, associated_group, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity) VALUES " + "(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),".repeat(msgs.length).slice(0,-1) + ";" theseParams.push(
pool.getConnection( msg.source.slice(4),
function(err, connection) { msg.reporting_node || "000000000000",
if(err) { settings[msg.source.slice(4)].temperature_site,
console.log(`Error storing type 1 message: ${err}`) msg.timestamp,
} else { msg.data["18_inch_temperature"] || 500000,
connection.execute( msg.data["36_inch_temperature"] || 500000,
thisSQL, msg.data.device_temperature || 500000,
theseParams, msg.data.ambient_temperature || 500000,
function(err2, results) { msg.data.relative_humidity || 500000,
if(err2) { msg.data.barometric_pressure || 500000,
console.log(`Some other error storing type 1 message: ${err2}`) msg.data.accelerometer_x || 500000,
connection.release() msg.data.accelerometer_y || 500000,
} else { msg.data.accelerometer_z || 500000,
connection.release() msg.data.battery_charge_percent || 500000,
// TODO: anything here? msg.data.battery_voltage || 500000,
} msg.data.remaining_charge_capacity || 500000,
} server_received_time,
) );
} });
const thisSQL =
"INSERT IGNORE INTO datavis_measurement (source_node_id, reporting_node_id, associated_group_id, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity, server_received_time) VALUES " +
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),"
.repeat(msgs.length)
.slice(0, -1) +
";";
pool.getConnection(function (err, connection) {
if (err) {
console.log(`Error storing type 1 message: ${err}`);
} else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 1 message: ${err2}`);
connection.release();
} else {
connection.release();
// TODO: anything here?
}
});
} }
) });
} }
} catch (e) { } catch (e) {
console.log(`Some outer error storing type 1 message: ${e}`) console.log(`Some outer error storing type 1 message: ${e}`);
} }
} }
function storeType6Messages(msgs) { // four byte device status function storeType6Messages(msgs) {
// four byte device status
try { try {
let theseParams = [] let theseParams = [];
let n = msgs.length let n = msgs.length;
msgs.forEach(function(msg) { msgs.forEach(function (msg) {
theseParams.push(msg.source.slice(4),msg.timestamp,msg.data.measurement_interval || 360,msg.data.wake_window_length || 4,msg.data.measurement_offset || 0,msg.data.sleep_duration || 0,0,new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000).toISOString().slice(0,-5)) theseParams.push(
}) msg.source.slice(4),
const thisSQL = "INSERT IGNORE INTO NodeReportedStatus (node,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated) VALUES " + "(?,?,?,?,?,?,?,?),".repeat(n).slice(0,-1) + ";" msg.timestamp,
pool.getConnection( msg.data.measurement_interval || 360,
function(err, connection) { msg.data.wake_window_length || 4,
if(err) { msg.data.measurement_offset || 0,
console.log(`Error storing type 6 message: ${err}`) msg.data.sleep_duration || 0,
} else { 0,
connection.execute( new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000)
thisSQL, .toISOString()
theseParams, .slice(0, -5),
function(err2, results) { sqlCurrentTimestamp(),
if(err2) { );
console.log(`Some other error storing type 6 message: ${err2}`) });
connection.release() const thisSQL =
} else { "INSERT IGNORE INTO datavis_nodereportedstatus (node_id,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated,server_received_time) VALUES " +
connection.release() "(?,?,?,?,?,?,?,?,?),".repeat(n).slice(0, -1) +
// TODO: anything here? ";";
} pool.getConnection(function (err, connection) {
} if (err) {
) console.log(`Error storing type 6 message: ${err}`);
} } else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 6 message: ${err2}`);
connection.release();
} else {
connection.release();
// TODO: anything here?
}
});
} }
) });
} catch (e) { } catch (e) {
console.log(`Some outer error storing type 6 message: ${e}`) console.log(`Some outer error storing type 6 message: ${e}`);
} }
} }
function storeType13Messages(msgs) { // file version manifest function storeType13Messages(msgs) {
// file version manifest
try { try {
let theseParams = [] let theseParams = [];
let n = 0 let n = 0;
msgs.forEach(function(msg) { msgs.forEach(function (msg) {
Object.keys(msg.data).forEach(function(thisFileId) { Object.keys(msg.data).forEach(function (thisFileId) {
n += 1 n += 1;
theseParams.push(msg.source.slice(4),msg.timestamp,thisFileId,msg.data[thisFileId] || 0) theseParams.push(
}) msg.source.slice(4),
}) msg.timestamp,
const thisSQL = "INSERT INTO NodeFileManifest (node,collection_time,program_id,program_version) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";" thisFileId,
pool.getConnection( msg.data[thisFileId] || 0,
function(err, connection) { sqlCurrentTimestamp(),
if(err) { );
console.log(`Error storing type 13 message: ${err}`) });
} else { });
connection.execute( const thisSQL =
thisSQL, "INSERT INTO datavis_nodefilemanifest (node_id,collection_time,program_id,program_version,server_received_time) VALUES " +
theseParams, "(?,?,?,?,?),".repeat(n).slice(0, -1) +
function(err2, results) { ";";
if(err2) { pool.getConnection(function (err, connection) {
console.log(`Some other error storing type 13 message: ${err}`) if (err) {
connection.release() console.log(`Error storing type 13 message: ${err}`);
} else { } else {
connection.release() connection.execute(thisSQL, theseParams, function (err2, results) {
// TODO: anything here? if (err2) {
} console.log(`Some other error storing type 13 message: ${err}`);
} connection.release();
) } else {
} connection.release();
// TODO: anything here?
}
});
} }
) });
} catch (e) { } catch (e) {
console.log(`Some outer error storing type 13 message: ${e}`) console.log(`Some outer error storing type 13 message: ${e}`);
} }
} }
function storeType17Messages(msgs) { // rssi data function storeType17Messages(msgs) {
// rssi data
try { try {
let theseParams = [] let theseParams = [];
let n = 0 let n = 0;
msgs.forEach(function(msg) { msgs.forEach(function (msg) {
Object.keys(msg.data).forEach(function(thisId) { Object.keys(msg.data).forEach(function (thisId) {
n += 1 n += 1;
theseParams.push(msg.source.slice(4),thisId,msg.timestamp, msg.data[thisId] || 0) theseParams.push(
}) msg.source.slice(4),
}) thisId,
const thisSQL = "INSERT IGNORE INTO NodeRssiRecord (node,neighbor,collection_time,rssi) VALUES " + "(?,?,?,?),".repeat(n).slice(0,-1) + ";" msg.timestamp,
pool.getConnection( msg.data[thisId] || 0,
function(err, connection) { sqlCurrentTimestamp(),
if(err) { );
console.log(`Error storing type 17 message: ${err}`) });
} else { });
connection.execute( const thisSQL =
thisSQL, "INSERT IGNORE INTO datavis_noderssirecord (node_id,neighbor_id,collection_time,rssi,server_received_time) VALUES " +
theseParams, "(?,?,?,?,?),".repeat(n).slice(0, -1) +
function(err2, results) { ";";
if(err2) { pool.getConnection(function (err, connection) {
console.log(`Some other error storing type 17 message: ${err2}`) if (err) {
connection.release() console.log(`Error storing type 17 message: ${err}`);
} else { } else {
connection.release() connection.execute(thisSQL, theseParams, function (err2, results) {
// TODO: anything here? if (err2) {
} console.log(`Some other error storing type 17 message: ${err2}`);
} connection.release();
) } else {
} connection.release();
// TODO: anything here?
}
});
} }
) });
} catch (e) { } catch (e) {
console.log(`Some outer error storing type 17 message: ${e}`) console.log(`Some outer error storing type 17 message: ${e}`);
} }
} }
function storeMessages() { function storeMessages() {
console.log('store messages!!') console.log("store messages!!");
try { try {
const type1Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 1 }) const type1Messages = dataBuffer.filter(function (thisMsg) {
const type6Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 6 }) return thisMsg.msg_type == 1;
const type13Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 13 }) });
const type17Messages = dataBuffer.filter(function (thisMsg) { return thisMsg.msg_type == 17 }) const type6Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 6;
});
const type13Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 13;
});
const type17Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 17;
});
// store the data in the database // store the data in the database
// check message type, then store it based on that // check message type, then store it based on that
if (type1Messages.length > 0) { if (type1Messages.length > 0) {
console.log("type 1 messages!") console.log("type 1 messages!");
storeType1Messages(type1Messages) storeType1Messages(type1Messages);
} }
if (type6Messages.length > 0) { if (type6Messages.length > 0) {
console.log("type 6 messages!") console.log("type 6 messages!");
storeType6Messages(type6Messages) storeType6Messages(type6Messages);
} }
if (type13Messages.length > 0) { if (type13Messages.length > 0) {
console.log("type 13 messages!") console.log("type 13 messages!");
storeType13Messages(type13Messages) storeType13Messages(type13Messages);
} }
if (type17Messages.length > 0) { if (type17Messages.length > 0) {
console.log("type 17 messages!") console.log("type 17 messages!");
storeType17Messages(type17Messages) storeType17Messages(type17Messages);
} }
dataBuffer = [] dataBuffer = [];
} catch (e) { } catch (e) {
console.log(`Some error storing messages: ${e}`) console.log(`Some error storing messages: ${e}`);
} }
} }
setInterval(storeMessages, 10000) // store messages every minute function sqlCurrentTimestamp() {
// YYYY-MM-DDTHH:mm:ss.mmmZ
const this_instant = new Date().toISOString();
// YYYY-MM-DD, HH:mm:ss.mmmZ
const [date, time] = this_instant.split("T");
// YYYY-MM-DD HH:mm:ss
const server_received_time = `${date} ${time.slice(0, 8)}`;
return server_received_time;
}
socket.on('listening', () => { setInterval(storeMessages, 10000); // store messages every minute
socket.on("listening", () => {
let addr = socket.address(); let addr = socket.address();
console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`); console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`);
}); });
socket.on('error', (err) => { socket.on("error", (err) => {
console.error(`UDP error: ${err.stack}`); console.error(`UDP error: ${err.stack}`);
}); });
socket.on('message', (msg, rinfo) => { socket.on("message", (msg, rinfo) => {
if(thisDate !== new Date().toISOString().slice(0,10)) { if (thisDate !== new Date().toISOString().slice(0, 10)) {
thisDate = new Date().toISOString().slice(0,10) thisDate = new Date().toISOString().slice(0, 10);
stream.close() stream.close();
stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {flags: 'a+'}) stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
} flags: "a+",
stream.write('> ' + (new Date().toISOString()) + ' ' + msg.toString('hex') + '\n') });
cellularChunking.receive_chunk(msg, handleMessage, rinfo) }
}) stream.write(
"> " + new Date().toISOString() + " " + msg.toString("hex") + "\n",
);
cellularChunking.receive_chunk(msg, handleMessage, rinfo);
});
socket.bind(57321); socket.bind(57321);

View File

@ -1,28 +1,32 @@
const dgram = require('dgram') const dgram = require("dgram");
const fs = require('fs') const fs = require("fs");
const path = require('path') const path = require("path");
const cellularChunking = require('./../cellularChunking') const cellularChunking = require("./../cellularChunking");
const messageParser = require('./../messages') const messageParser = require("./../messages");
const socket = dgram.createSocket('udp4'); const socket = dgram.createSocket("udp4");
function send_next_line(i, theLines) { function send_next_line(i, theLines) {
console.log(`sending line ${i} of ${theLines.length}`);
if (theLines.length > i) { if (theLines.length > i) {
socket.send(Buffer.from(theLines[i], 'hex'), 57321, '127.0.0.1', function (err) { socket.send(
setTimeout(send_next_line, 100, i+1, theLines) Buffer.from(theLines[i], "hex"),
//send_next_line(i + 1, theLines) 57321,
}) "127.0.0.1",
function (err) {
setTimeout(send_next_line, 100, i + 1, theLines);
//send_next_line(i + 1, theLines)
},
);
} else { } else {
process.exit() process.exit();
} }
} }
// open the file with the messages and then send them // open the file with the messages and then send them
try { try {
const fileData = fs.readFileSync('./data/test_data.txt', 'utf8') const fileData = fs.readFileSync("./data/test_data.txt", "utf8");
const theLines = fileData.split('\n') const theLines = fileData.split("\n");
let i = 0 let i = 0;
send_next_line(i,theLines) send_next_line(i, theLines);
} catch (e) { } catch (e) {}
}