roost/node/index.js

628 lines
19 KiB
JavaScript

const dgram = require("dgram");
const fs = require("fs");
const path = require("path");
const mysql = require("mysql2");
const cellularChunking = require("./cellularChunking");
const message_parser = require("./messages");
let thisDate = new Date().toISOString().slice(0, 10);
const filePath = "./records";
let message_id = 0;
let sentFiles = true;
let dataBuffer = [];
const response_record = {};
const settings = {};
const pool = createPool();
if (!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) {
fs.mkdirSync(filePath);
}
function createPool() {
const pool = mysql.createPool({
host: process.env.db_host,
user: process.env.db_user,
password: process.env.db_password,
database: process.env.db_database,
port: process.env.db_port,
timezone: "+00:00",
});
pool.getConnection(function (err, connection) {
if (err) {
console.log("error setting the database: ", err);
} else {
connection.execute(
`SELECT
datavis_node2group.node_id,
datavis_node2group.group_id,
datavis_nodeconfiguration.measurement_interval_minutes,
datavis_nodeconfiguration.offset_from_midnight_minutes,
datavis_nodeconfiguration.wake_window_length_minutes
FROM datavis_grouptype
JOIN datavis_group2type
ON datavis_grouptype.unique_id=datavis_group2type.type_id
JOIN datavis_node2group
ON datavis_group2type.group_id=datavis_node2group.group_id
JOIN datavis_nodeconfiguration
ON datavis_nodeconfiguration.node_id=datavis_node2group.node_id
WHERE datavis_grouptype.name = 'Temperature Site';`,
[],
function (err2, results) {
if (err2) {
console.log(`Some other error getting node settings: ${err2}`);
//connection.release();
} else {
results.forEach(function (thisRow) {
settings[thisRow.node_id] = {
measurement_interval: thisRow.measurement_interval_minutes,
measurement_offset: thisRow.offset_from_midnight_minutes,
wake_window_length: thisRow.wake_window_length_minutes,
temperature_site: thisRow.group_id,
};
});
connection.release();
}
},
);
}
});
return pool;
}
const socket = dgram.createSocket("udp4");
let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
flags: "a+",
});
function make_string_string_message(
msg_type,
version,
source,
target,
key,
value,
this_message_id,
) {
theHeader = make_message_header(
msg_type,
version,
source,
target,
this_message_id,
);
//console.log("message header: ", theHeader);
theData = Buffer.alloc(key.length + value.length + 3);
theData.write(key, 0);
theData.writeUInt8(255, key.length);
theData.write(value, key.length + 1);
theData.writeUInt8(255, key.length + value.length + 2);
return Buffer.concat([theHeader, theData, Buffer.from([0])]);
}
function make_message_header(
msg_type,
version,
source,
target,
this_message_id,
) {
target = ("0000000000000000" + target).slice(-16);
target_buf = Buffer.from(target, "hex");
source = (source = "0000000000000000" + source).slice(-16);
source_buf = Buffer.from(source, "hex");
timestamp = Math.floor(Date.now() / 1000) - 946684800; // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01
timestamp_buf = Buffer.alloc(4);
timestamp_buf.writeUInt32BE(timestamp);
buf = Buffer.alloc(4);
buf.writeUInt16BE(this_message_id, 0);
buf.writeUInt8(msg_type, 2);
buf.writeUInt8(version, 3);
header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]);
return header;
}
function sendFile(socket, rinfo, thisFilePath, fileList) {
theFilePath = path.join(thisFilePath, fileList[0]);
fs.readFile(theFilePath, "utf-8", (err, data) => {
console.log("have read file");
if (err) {
console.log(err);
} else {
console.log("make the message to send");
const theMsg = Buffer.from(
make_string_string_message(
8,
1,
"00",
"4827e28f7778",
fileList[0],
data,
537,
),
).toString("hex");
console.log("chunk the message");
const theRespChunks = cellularChunking.chunk_message(theMsg, message_id);
console.log("have chunks");
thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList);
}
});
}
function handleMessage(message, rinfo) {
//console.log("store message: ", message);
// this is going to be for storing the new format for the data
// this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu
//console.log(
// new Date().toISOString(),
// " store data 2 route ",
// Buffer.from(message, "hex").toString("hex"),
//);
const parsed_data = message_parser.parse_messages({ data: message });
let settingsString = Buffer.from([]);
//const seenIDs = []
parsed_data.forEach(function (thisData) {
if (
thisData.source.replaceAll("0", "") === "" ||
thisData.source.startsWith("f") ||
thisData.source.startsWith("8")
) {
return;
} else {
//console.log("parsed data: ", thisData);
}
dataBuffer.push(thisData);
if (
!response_record[thisData.source] ||
response_record[thisData.source] + 1000 * 600 < Date.now()
) {
// respond at moch once every 600 seconds to any single node
console.log("respond with settings");
//settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)])
response_record[thisData.source] = Date.now();
}
});
if (settingsString.length > 0) {
//console.log("settingsString: ", settingsString.toString("hex"));
message_id = message_id + 1;
stream.write(
"< " +
new Date().toISOString() +
" " +
settingsString.toString("hex") +
"\n",
);
// breaking in here so we can test sending a file
const theRespChunks = cellularChunking.chunk_message(
settingsString,
message_id,
);
//console.log("msg chunks: ", theRespChunks);
theRespChunks.forEach(function (thisChunk) {
//console.log("thisChunk: ", thisChunk);
socket.send(thisChunk, rinfo.port, rinfo.address, (err) => {
if (err) {
console.log(err);
} else {
console.log("sent message on");
}
});
});
}
if (!sentFiles) {
console.log("send files");
sentFiles = true;
const thiFilePath = "./firmware/Components";
fs.readdir(thiFilePath, (err, theFileList) => {
console.log(theFileList);
if (err) {
console.log(err);
} else {
sendFile(socket, rinfo, thiFilePath, theFileList);
}
});
}
}
function makeSettingsString(msgData) {
const device_id = msgData.device_id;
msgData.version = msgData.version || 1;
// make a message with all the info in it
const location_id = status.devices[device_id]?.location_id;
const site_id =
status.locations[location_id]?.site_id ||
"7d931bd4-bf0d-459f-8864-6d6405908b9e";
const timezoneOffset = getOffset(status.sites[site_id]?.timezone || "UTC");
const measurementInterval =
status.settings[status.settings_groups[device_id]]?.sleep_interval ||
"00:30:00";
const wakeWindowLength =
status.settings[status.settings_groups[device_id]]?.window_length ||
"00:06:00";
const measurementOffset =
status.settings[status.settings_groups[device_id]]?.start_time ||
"00:00:00";
msg = Buffer.from([]);
settings = {
"\x4B": [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset],
};
msg = message_parser.make_two_byte_data_message(
3,
msgData.version,
"00",
device_id,
settings,
);
settings2 = {
"\x01": status.sites[site_id]?.name || "Unregistered",
"\x02": status.devices[device_id]?.name || "Unregistered",
};
msg = Buffer.concat([
msg,
message_parser.make_byte_string_message(
7,
msgData.version,
"00",
device_id,
settings2,
),
]);
settings3 = {
"\x03": [sqlTimeToNumber(measurementInterval)],
"\x04": [sqlTimeToNumber(measurementOffset)],
"\x05": [sqlTimeToNumber(wakeWindowLength)],
};
msg = Buffer.concat([
msg,
message_parser.make_four_byte_data_message(
4,
msgData.version,
"00",
device_id,
settings3,
),
]);
return msg;
}
function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) {
socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => {
if (err) {
console.log(err);
} else {
console.log("send chunk");
if (theChunks.length > 1) {
setTimeout(
thisSend,
1000,
theChunks.slice(1),
socket,
rinfo,
thisFilePath,
fileList,
);
} else if (fileList.length > 1) {
sendFile(socket, rinfo, thisFilePath, fileList.slice(1));
}
}
});
}
function sqlTimeToNumber(theTime) {
try {
const parts = theTime.split(":");
timeOut =
Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]);
if (timeOut > 2 ** 16) {
timeOut = 2 ** 16 - 1;
}
return timeOut;
} catch (e) {
if (theTime !== null) {
console.log("sqlTimeToNumber error: ", e);
}
return 0;
}
}
function storeType1Messages(msgs) {
// 2 byte measurement data
try {
if (msgs.length === 0) {
return;
} else if (msgs.length > 2500) {
// you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin
// you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month.
const num_batches = Math.floor(msgs.length / 2500);
for (let i = 0; i < num_batches; i++) {
storeType1Messages(msgs.slice(i * 2500, (i + 1) * 2500));
}
storeType1Messages(msgs.slice(-1 * (msgs.length % 2500)));
} else {
let theseParams = [];
msgs.forEach(function (msg) {
// YYYY-MM-DD HH:mm:ss
const server_received_time = sqlCurrentTimestamp();
theseParams.push(
msg.source.slice(4),
msg.reporting_node || "000000000000",
settings[msg.source.slice(4)]?.temperature_site || "Unknowable 1",
msg.timestamp,
msg.data["18_inch_temperature"] || 500000,
msg.data["36_inch_temperature"] || 500000,
msg.data.device_temperature || 500000,
msg.data.ambient_temperature || 500000,
msg.data.relative_humidity || 500000,
msg.data.barometric_pressure || 500000,
msg.data.accelerometer_x || 500000,
msg.data.accelerometer_y || 500000,
msg.data.accelerometer_z || 500000,
msg.data.battery_charge_percent || 500000,
msg.data.battery_voltage || 500000,
msg.data.remaining_charge_capacity || 500000,
server_received_time,
);
});
const thisSQL =
"INSERT IGNORE INTO datavis_measurement (source_node_id, reporting_node_id, associated_group_id, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity, server_received_time) VALUES " +
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),"
.repeat(msgs.length)
.slice(0, -1) +
";";
pool.getConnection(function (err, connection) {
if (err) {
console.log(`Error storing type 1 message: ${err}`);
} else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 1 message: ${err2}`);
if(connection) {
try {
connection.release();
} catch (e) {
console.log('some error releasing connection: ', e)
}
}
} else {
connection.release();
// TODO: anything here?
}
});
}
});
}
} catch (e) {
console.log(`Some outer error storing type 1 message: ${e}`);
}
}
function storeType6Messages(msgs) {
// four byte device status
try {
let theseParams = [];
let n = msgs.length;
msgs.forEach(function (msg) {
theseParams.push(
msg.source.slice(4),
msg.timestamp,
msg.data.measurement_interval || 360,
msg.data.wake_window_length || 4,
msg.data.measurement_offset || 0,
msg.data.sleep_duration || 0,
0,
new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000)
.toISOString()
.slice(0, -5),
sqlCurrentTimestamp(),
);
});
const thisSQL =
"INSERT IGNORE INTO datavis_nodereportedstatus (node_id,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated,server_received_time) VALUES " +
"(?,?,?,?,?,?,?,?,?),".repeat(n).slice(0, -1) +
";";
pool.getConnection(function (err, connection) {
if (err) {
console.log(`Error storing type 6 message: ${err}`);
} else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 6 message: ${err2}`);
if(connection) {
try {
connection.release();
} catch (e) {
console.log('some error releasing connection: ', e)
}
}
} else {
connection.release();
// TODO: anything here?
}
});
}
});
} catch (e) {
console.log(`Some outer error storing type 6 message: ${e}`);
}
}
function storeType13Messages(msgs) {
// file version manifest
try {
let theseParams = [];
let n = 0;
msgs.forEach(function (msg) {
Object.keys(msg.data).forEach(function (thisFileId) {
n += 1;
theseParams.push(
msg.source.slice(4),
msg.timestamp,
thisFileId,
msg.data[thisFileId] || 0,
sqlCurrentTimestamp(),
);
});
});
const thisSQL =
"INSERT INTO datavis_nodefilemanifest (node_id,collection_time,program_id,program_version,server_received_time) VALUES " +
"(?,?,?,?,?),".repeat(n).slice(0, -1) +
";";
pool.getConnection(function (err, connection) {
if (err) {
console.log(`Error storing type 13 message: ${err}`);
} else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 13 message: ${err2}`);
if(connection) {
try {
connection.release();
} catch (e) {
console.log('some error releasing connection: ', e)
}
}
} else {
connection.release();
// TODO: anything here?
}
});
}
});
} catch (e) {
console.log(`Some outer error storing type 13 message: ${e}`);
}
}
function storeType17Messages(msgs) {
// rssi data
try {
let theseParams = [];
let n = 0;
msgs.forEach(function (msg) {
Object.keys(msg.data).forEach(function (thisId) {
n += 1;
theseParams.push(
msg.source.slice(4),
thisId,
msg.timestamp,
msg.data[thisId] || 0,
sqlCurrentTimestamp(),
);
});
});
if(n === 0) {
return // don't try to store anything if there is nothing to store
}
const thisSQL =
"INSERT IGNORE INTO datavis_noderssirecord (node_id,neighbor_id,collection_time,rssi,server_received_time) VALUES " +
"(?,?,?,?,?),".repeat(n).slice(0, -1) +
";";
pool.getConnection(function (err, connection) {
if (err) {
console.log(`Error storing type 17 message: ${err}`);
} else {
connection.execute(thisSQL, theseParams, function (err2, results) {
if (err2) {
console.log(`Some other error storing type 17 message: ${err2}`);
if(connection) {
try {
connection.release();
} catch (e) {
console.log('some error releasing connection: ', e)
}
}
} else {
connection.release();
// TODO: anything here?
}
});
}
});
} catch (e) {
console.log(`Some outer error storing type 17 message: ${e}`);
}
}
function storeMessages() {
if(dataBuffer.length == 0) {
return
}
console.log("store messages!!");
try {
const type1Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 1;
});
const type6Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 6;
});
const type13Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 13;
});
const type17Messages = dataBuffer.filter(function (thisMsg) {
return thisMsg.msg_type == 17;
});
// store the data in the database
// check message type, then store it based on that
if (type1Messages.length > 0) {
console.log("type 1 messages!");
storeType1Messages(type1Messages);
}
if (type6Messages.length > 0) {
console.log("type 6 messages!");
storeType6Messages(type6Messages);
}
if (type13Messages.length > 0) {
console.log("type 13 messages!");
storeType13Messages(type13Messages);
}
if (type17Messages.length > 0) {
console.log("type 17 messages!");
storeType17Messages(type17Messages);
}
dataBuffer = [];
} catch (e) {
console.log(`Some error storing messages: ${e}`);
}
}
function sqlCurrentTimestamp() {
// YYYY-MM-DDTHH:mm:ss.mmmZ
const this_instant = new Date().toISOString();
// YYYY-MM-DD, HH:mm:ss.mmmZ
const [date, time] = this_instant.split("T");
// YYYY-MM-DD HH:mm:ss
const server_received_time = `${date} ${time.slice(0, 8)}`;
return server_received_time;
}
setInterval(storeMessages, 10000); // store messages every minute
socket.on("listening", () => {
let addr = socket.address();
console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`);
});
socket.on("error", (err) => {
console.error(`UDP error: ${err.stack}`);
});
socket.on("message", (msg, rinfo) => {
if (thisDate !== new Date().toISOString().slice(0, 10)) {
thisDate = new Date().toISOString().slice(0, 10);
stream.close();
stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
flags: "a+",
});
}
stream.write(
"> " + new Date().toISOString() + " " + msg.toString("hex") + "\n",
);
cellularChunking.receive_chunk(msg, handleMessage, rinfo);
});
socket.bind(57321);