598 lines
18 KiB
JavaScript
598 lines
18 KiB
JavaScript
const dgram = require("dgram");
|
|
const fs = require("fs");
|
|
const path = require("path");
|
|
const mysql = require("mysql2");
|
|
const cellularChunking = require("./cellularChunking");
|
|
const message_parser = require("./messages");
|
|
|
|
let thisDate = new Date().toISOString().slice(0, 10);
|
|
const filePath = "./records";
|
|
let message_id = 0;
|
|
|
|
let sentFiles = true;
|
|
|
|
let dataBuffer = [];
|
|
const response_record = {};
|
|
const settings = {};
|
|
|
|
const pool = createPool();
|
|
|
|
if (!(fs.existsSync(filePath) && fs.lstatSync(filePath).isDirectory())) {
|
|
fs.mkdirSync(filePath);
|
|
}
|
|
|
|
function createPool() {
|
|
const pool = mysql.createPool({
|
|
host: process.env.db_host,
|
|
user: process.env.db_user,
|
|
password: process.env.db_password,
|
|
database: process.env.db_database,
|
|
port: process.env.db_port,
|
|
timezone: "+00:00",
|
|
});
|
|
pool.getConnection(function (err, connection) {
|
|
if (err) {
|
|
console.log("error setting the database");
|
|
} else {
|
|
connection.execute(
|
|
`SELECT
|
|
datavis_node2group.node_id,
|
|
datavis_node2group.group_id,
|
|
datavis_nodeconfiguration.measurement_interval_minutes,
|
|
datavis_nodeconfiguration.offset_from_midnight_minutes,
|
|
datavis_nodeconfiguration.wake_window_length_minutes
|
|
FROM datavis_grouptype
|
|
JOIN datavis_group2type
|
|
ON datavis_grouptype.unique_id=datavis_group2type.type_id
|
|
JOIN datavis_node2group
|
|
ON datavis_group2type.group_id=datavis_node2group.group_id
|
|
JOIN datavis_nodeconfiguration
|
|
ON datavis_nodeconfiguration.node_id=datavis_node2group.node_id
|
|
WHERE datavis_grouptype.name = 'Temperature Site';`,
|
|
[],
|
|
function (err2, results) {
|
|
if (err2) {
|
|
console.log(`Some other error getting node settings: ${err2}`);
|
|
connection.release();
|
|
} else {
|
|
results.forEach(function (thisRow) {
|
|
settings[thisRow.node_id] = {
|
|
measurement_interval: thisRow.measurement_interval_minutes,
|
|
measurement_offset: thisRow.offset_from_midnight_minutes,
|
|
wake_window_length: thisRow.wake_window_length_minutes,
|
|
temperature_site: thisRow.group_id,
|
|
};
|
|
});
|
|
connection.release();
|
|
}
|
|
},
|
|
);
|
|
}
|
|
});
|
|
return pool;
|
|
}
|
|
|
|
const socket = dgram.createSocket("udp4");
|
|
let stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
|
|
flags: "a+",
|
|
});
|
|
|
|
function make_string_string_message(
|
|
msg_type,
|
|
version,
|
|
source,
|
|
target,
|
|
key,
|
|
value,
|
|
this_message_id,
|
|
) {
|
|
theHeader = make_message_header(
|
|
msg_type,
|
|
version,
|
|
source,
|
|
target,
|
|
this_message_id,
|
|
);
|
|
console.log("message header: ", theHeader);
|
|
theData = Buffer.alloc(key.length + value.length + 3);
|
|
theData.write(key, 0);
|
|
theData.writeUInt8(255, key.length);
|
|
theData.write(value, key.length + 1);
|
|
theData.writeUInt8(255, key.length + value.length + 2);
|
|
return Buffer.concat([theHeader, theData, Buffer.from([0])]);
|
|
}
|
|
|
|
function make_message_header(
|
|
msg_type,
|
|
version,
|
|
source,
|
|
target,
|
|
this_message_id,
|
|
) {
|
|
target = ("0000000000000000" + target).slice(-16);
|
|
target_buf = Buffer.from(target, "hex");
|
|
source = (source = "0000000000000000" + source).slice(-16);
|
|
source_buf = Buffer.from(source, "hex");
|
|
timestamp = Math.floor(Date.now() / 1000) - 946684800; // timestamp in the format that the ESP32 expects it, seconds since 2000-01-01
|
|
timestamp_buf = Buffer.alloc(4);
|
|
timestamp_buf.writeUInt32BE(timestamp);
|
|
buf = Buffer.alloc(4);
|
|
buf.writeUInt16BE(this_message_id, 0);
|
|
buf.writeUInt8(msg_type, 2);
|
|
buf.writeUInt8(version, 3);
|
|
header = Buffer.concat([buf, source_buf, target_buf, timestamp_buf]);
|
|
return header;
|
|
}
|
|
|
|
function sendFile(socket, rinfo, thisFilePath, fileList) {
|
|
theFilePath = path.join(thisFilePath, fileList[0]);
|
|
fs.readFile(theFilePath, "utf-8", (err, data) => {
|
|
console.log("have read file");
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
console.log("make the message to send");
|
|
const theMsg = Buffer.from(
|
|
make_string_string_message(
|
|
8,
|
|
1,
|
|
"00",
|
|
"4827e28f7778",
|
|
fileList[0],
|
|
data,
|
|
537,
|
|
),
|
|
).toString("hex");
|
|
console.log("chunk the message");
|
|
const theRespChunks = cellularChunking.chunk_message(theMsg, message_id);
|
|
console.log("have chunks");
|
|
thisSend(theRespChunks, socket, rinfo, thisFilePath, fileList);
|
|
}
|
|
});
|
|
}
|
|
|
|
function handleMessage(message, rinfo) {
|
|
console.log("store message: ", message);
|
|
|
|
// this is going to be for storing the new format for the data
|
|
// this needs to be set up to handle different message types differently, or it could be done when the databuffer is storedu
|
|
console.log(
|
|
new Date().toISOString(),
|
|
" store data 2 route ",
|
|
Buffer.from(message, "hex").toString("hex"),
|
|
);
|
|
const parsed_data = message_parser.parse_messages({ data: message });
|
|
let settingsString = Buffer.from([]);
|
|
//const seenIDs = []
|
|
parsed_data.forEach(function (thisData) {
|
|
if (
|
|
thisData.source.replaceAll("0", "") === "" ||
|
|
thisData.source.startsWith("f") ||
|
|
thisData.source.startsWith("8")
|
|
) {
|
|
return;
|
|
} else {
|
|
console.log("parsed data: ", thisData);
|
|
}
|
|
dataBuffer.push(thisData);
|
|
if (
|
|
!response_record[thisData.source] ||
|
|
response_record[thisData.source] + 1000 * 600 < Date.now()
|
|
) {
|
|
// respond at moch once every 600 seconds to any single node
|
|
console.log("respond with settings");
|
|
//settingsString = Buffer.concat([settingsString, makeSettingsString(thisData)])
|
|
response_record[thisData.source] = Date.now();
|
|
}
|
|
});
|
|
|
|
if (settingsString.length > 0) {
|
|
console.log("settingsString: ", settingsString.toString("hex"));
|
|
message_id = message_id + 1;
|
|
stream.write(
|
|
"< " +
|
|
new Date().toISOString() +
|
|
" " +
|
|
settingsString.toString("hex") +
|
|
"\n",
|
|
);
|
|
// breaking in here so we can test sending a file
|
|
const theRespChunks = cellularChunking.chunk_message(
|
|
settingsString,
|
|
message_id,
|
|
);
|
|
console.log("msg chunks: ", theRespChunks);
|
|
theRespChunks.forEach(function (thisChunk) {
|
|
console.log("thisChunk: ", thisChunk);
|
|
socket.send(thisChunk, rinfo.port, rinfo.address, (err) => {
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
console.log("sent message on");
|
|
}
|
|
});
|
|
});
|
|
}
|
|
if (!sentFiles) {
|
|
console.log("send files");
|
|
sentFiles = true;
|
|
const thiFilePath = "./firmware/Components";
|
|
fs.readdir(thiFilePath, (err, theFileList) => {
|
|
console.log(theFileList);
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
sendFile(socket, rinfo, thiFilePath, theFileList);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function makeSettingsString(msgData) {
|
|
const device_id = msgData.device_id;
|
|
msgData.version = msgData.version || 1;
|
|
// make a message with all the info in it
|
|
const location_id = status.devices[device_id]?.location_id;
|
|
const site_id =
|
|
status.locations[location_id]?.site_id ||
|
|
"7d931bd4-bf0d-459f-8864-6d6405908b9e";
|
|
const timezoneOffset = getOffset(status.sites[site_id]?.timezone || "UTC");
|
|
const measurementInterval =
|
|
status.settings[status.settings_groups[device_id]]?.sleep_interval ||
|
|
"00:30:00";
|
|
const wakeWindowLength =
|
|
status.settings[status.settings_groups[device_id]]?.window_length ||
|
|
"00:06:00";
|
|
const measurementOffset =
|
|
status.settings[status.settings_groups[device_id]]?.start_time ||
|
|
"00:00:00";
|
|
msg = Buffer.from([]);
|
|
settings = {
|
|
"\x4B": [timezoneOffset < 0 ? 256 - timezoneOffset : timezoneOffset],
|
|
};
|
|
|
|
msg = message_parser.make_two_byte_data_message(
|
|
3,
|
|
msgData.version,
|
|
"00",
|
|
device_id,
|
|
settings,
|
|
);
|
|
settings2 = {
|
|
"\x01": status.sites[site_id]?.name || "Unregistered",
|
|
"\x02": status.devices[device_id]?.name || "Unregistered",
|
|
};
|
|
msg = Buffer.concat([
|
|
msg,
|
|
message_parser.make_byte_string_message(
|
|
7,
|
|
msgData.version,
|
|
"00",
|
|
device_id,
|
|
settings2,
|
|
),
|
|
]);
|
|
settings3 = {
|
|
"\x03": [sqlTimeToNumber(measurementInterval)],
|
|
"\x04": [sqlTimeToNumber(measurementOffset)],
|
|
"\x05": [sqlTimeToNumber(wakeWindowLength)],
|
|
};
|
|
msg = Buffer.concat([
|
|
msg,
|
|
message_parser.make_four_byte_data_message(
|
|
4,
|
|
msgData.version,
|
|
"00",
|
|
device_id,
|
|
settings3,
|
|
),
|
|
]);
|
|
return msg;
|
|
}
|
|
|
|
function thisSend(theChunks, socket, rinfo, thisFilePath, fileList) {
|
|
socket.send(theChunks[0], rinfo.port, rinfo.address, (err) => {
|
|
if (err) {
|
|
console.log(err);
|
|
} else {
|
|
console.log("send chunk");
|
|
if (theChunks.length > 1) {
|
|
setTimeout(
|
|
thisSend,
|
|
1000,
|
|
theChunks.slice(1),
|
|
socket,
|
|
rinfo,
|
|
thisFilePath,
|
|
fileList,
|
|
);
|
|
} else if (fileList.length > 1) {
|
|
sendFile(socket, rinfo, thisFilePath, fileList.slice(1));
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
function sqlTimeToNumber(theTime) {
|
|
try {
|
|
const parts = theTime.split(":");
|
|
timeOut =
|
|
Number(parts[0]) * 60 * 60 + Number(parts[1]) * 60 + Number(parts[2]);
|
|
if (timeOut > 2 ** 16) {
|
|
timeOut = 2 ** 16 - 1;
|
|
}
|
|
return timeOut;
|
|
} catch (e) {
|
|
if (theTime !== null) {
|
|
console.log("sqlTimeToNumber error: ", e);
|
|
}
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
function storeType1Messages(msgs) {
|
|
// 2 byte measurement data
|
|
try {
|
|
if (msgs.length === 0) {
|
|
return;
|
|
} else if (msgs.length > 2500) {
|
|
// you can't do more than 65k entries in a paramaterized sql statement, there are 17 things per message, 65000 / 17 = 3823.529..., use 2500 to have a big margin
|
|
// you could have thousands of measurement messages if a bunch of probes are sending back data and there is a large deployment. It is 20 nodes over about a month.
|
|
const num_batches = Math.floor(msgs.length / 2500);
|
|
for (let i = 0; i < num_batches; i++) {
|
|
storeType1Messages(msgs.slice(i * 2500, (i + 1) * 2500));
|
|
}
|
|
storeType1Messages(msgs.slice(-1 * (msgs.length % 2500)));
|
|
} else {
|
|
let theseParams = [];
|
|
msgs.forEach(function (msg) {
|
|
// YYYY-MM-DD HH:mm:ss
|
|
const server_received_time = sqlCurrentTimestamp();
|
|
theseParams.push(
|
|
msg.source.slice(4),
|
|
msg.reporting_node || "000000000000",
|
|
settings[msg.source.slice(4)].temperature_site,
|
|
msg.timestamp,
|
|
msg.data["18_inch_temperature"] || 500000,
|
|
msg.data["36_inch_temperature"] || 500000,
|
|
msg.data.device_temperature || 500000,
|
|
msg.data.ambient_temperature || 500000,
|
|
msg.data.relative_humidity || 500000,
|
|
msg.data.barometric_pressure || 500000,
|
|
msg.data.accelerometer_x || 500000,
|
|
msg.data.accelerometer_y || 500000,
|
|
msg.data.accelerometer_z || 500000,
|
|
msg.data.battery_charge_percent || 500000,
|
|
msg.data.battery_voltage || 500000,
|
|
msg.data.remaining_charge_capacity || 500000,
|
|
server_received_time,
|
|
);
|
|
});
|
|
const thisSQL =
|
|
"INSERT IGNORE INTO datavis_measurement (source_node_id, reporting_node_id, associated_group_id, collection_time, temperature_18_inch, temperature_36_inch, device_temperature, ambient_temperature, relative_humidity, barometric_pressure, accelerometer_x, accelerometer_y, accelerometer_z, battery_charge_percent, battery_voltage, remaining_battery_capacity, server_received_time) VALUES " +
|
|
"(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?),"
|
|
.repeat(msgs.length)
|
|
.slice(0, -1) +
|
|
";";
|
|
pool.getConnection(function (err, connection) {
|
|
if (err) {
|
|
console.log(`Error storing type 1 message: ${err}`);
|
|
} else {
|
|
connection.execute(thisSQL, theseParams, function (err2, results) {
|
|
if (err2) {
|
|
console.log(`Some other error storing type 1 message: ${err2}`);
|
|
connection.release();
|
|
} else {
|
|
connection.release();
|
|
// TODO: anything here?
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
} catch (e) {
|
|
console.log(`Some outer error storing type 1 message: ${e}`);
|
|
}
|
|
}
|
|
|
|
function storeType6Messages(msgs) {
|
|
// four byte device status
|
|
try {
|
|
let theseParams = [];
|
|
let n = msgs.length;
|
|
msgs.forEach(function (msg) {
|
|
theseParams.push(
|
|
msg.source.slice(4),
|
|
msg.timestamp,
|
|
msg.data.measurement_interval || 360,
|
|
msg.data.wake_window_length || 4,
|
|
msg.data.measurement_offset || 0,
|
|
msg.data.sleep_duration || 0,
|
|
0,
|
|
new Date(msg.data.previous_update_time * 1000 || 0 + 946684800000)
|
|
.toISOString()
|
|
.slice(0, -5),
|
|
sqlCurrentTimestamp(),
|
|
);
|
|
});
|
|
const thisSQL =
|
|
"INSERT IGNORE INTO datavis_nodereportedstatus (node_id,collection_time,measurement_interval_minutes,wake_window_length_minutes,offset_from_midnight_minutes,sleep_duration_minutes,number_saved_measurements,when_time_was_last_updated,server_received_time) VALUES " +
|
|
"(?,?,?,?,?,?,?,?,?),".repeat(n).slice(0, -1) +
|
|
";";
|
|
pool.getConnection(function (err, connection) {
|
|
if (err) {
|
|
console.log(`Error storing type 6 message: ${err}`);
|
|
} else {
|
|
connection.execute(thisSQL, theseParams, function (err2, results) {
|
|
if (err2) {
|
|
console.log(`Some other error storing type 6 message: ${err2}`);
|
|
connection.release();
|
|
} else {
|
|
connection.release();
|
|
// TODO: anything here?
|
|
}
|
|
});
|
|
}
|
|
});
|
|
} catch (e) {
|
|
console.log(`Some outer error storing type 6 message: ${e}`);
|
|
}
|
|
}
|
|
|
|
function storeType13Messages(msgs) {
|
|
// file version manifest
|
|
try {
|
|
let theseParams = [];
|
|
let n = 0;
|
|
msgs.forEach(function (msg) {
|
|
Object.keys(msg.data).forEach(function (thisFileId) {
|
|
n += 1;
|
|
theseParams.push(
|
|
msg.source.slice(4),
|
|
msg.timestamp,
|
|
thisFileId,
|
|
msg.data[thisFileId] || 0,
|
|
sqlCurrentTimestamp(),
|
|
);
|
|
});
|
|
});
|
|
const thisSQL =
|
|
"INSERT INTO datavis_nodefilemanifest (node_id,collection_time,program_id,program_version,server_received_time) VALUES " +
|
|
"(?,?,?,?,?),".repeat(n).slice(0, -1) +
|
|
";";
|
|
pool.getConnection(function (err, connection) {
|
|
if (err) {
|
|
console.log(`Error storing type 13 message: ${err}`);
|
|
} else {
|
|
connection.execute(thisSQL, theseParams, function (err2, results) {
|
|
if (err2) {
|
|
console.log(`Some other error storing type 13 message: ${err}`);
|
|
connection.release();
|
|
} else {
|
|
connection.release();
|
|
// TODO: anything here?
|
|
}
|
|
});
|
|
}
|
|
});
|
|
} catch (e) {
|
|
console.log(`Some outer error storing type 13 message: ${e}`);
|
|
}
|
|
}
|
|
|
|
function storeType17Messages(msgs) {
|
|
// rssi data
|
|
try {
|
|
let theseParams = [];
|
|
let n = 0;
|
|
msgs.forEach(function (msg) {
|
|
Object.keys(msg.data).forEach(function (thisId) {
|
|
n += 1;
|
|
theseParams.push(
|
|
msg.source.slice(4),
|
|
thisId,
|
|
msg.timestamp,
|
|
msg.data[thisId] || 0,
|
|
sqlCurrentTimestamp(),
|
|
);
|
|
});
|
|
});
|
|
const thisSQL =
|
|
"INSERT IGNORE INTO datavis_noderssirecord (node_id,neighbor_id,collection_time,rssi,server_received_time) VALUES " +
|
|
"(?,?,?,?,?),".repeat(n).slice(0, -1) +
|
|
";";
|
|
pool.getConnection(function (err, connection) {
|
|
if (err) {
|
|
console.log(`Error storing type 17 message: ${err}`);
|
|
} else {
|
|
connection.execute(thisSQL, theseParams, function (err2, results) {
|
|
if (err2) {
|
|
console.log(`Some other error storing type 17 message: ${err2}`);
|
|
connection.release();
|
|
} else {
|
|
connection.release();
|
|
// TODO: anything here?
|
|
}
|
|
});
|
|
}
|
|
});
|
|
} catch (e) {
|
|
console.log(`Some outer error storing type 17 message: ${e}`);
|
|
}
|
|
}
|
|
|
|
function storeMessages() {
|
|
console.log("store messages!!");
|
|
try {
|
|
const type1Messages = dataBuffer.filter(function (thisMsg) {
|
|
return thisMsg.msg_type == 1;
|
|
});
|
|
const type6Messages = dataBuffer.filter(function (thisMsg) {
|
|
return thisMsg.msg_type == 6;
|
|
});
|
|
const type13Messages = dataBuffer.filter(function (thisMsg) {
|
|
return thisMsg.msg_type == 13;
|
|
});
|
|
const type17Messages = dataBuffer.filter(function (thisMsg) {
|
|
return thisMsg.msg_type == 17;
|
|
});
|
|
// store the data in the database
|
|
// check message type, then store it based on that
|
|
if (type1Messages.length > 0) {
|
|
console.log("type 1 messages!");
|
|
storeType1Messages(type1Messages);
|
|
}
|
|
if (type6Messages.length > 0) {
|
|
console.log("type 6 messages!");
|
|
storeType6Messages(type6Messages);
|
|
}
|
|
if (type13Messages.length > 0) {
|
|
console.log("type 13 messages!");
|
|
storeType13Messages(type13Messages);
|
|
}
|
|
if (type17Messages.length > 0) {
|
|
console.log("type 17 messages!");
|
|
storeType17Messages(type17Messages);
|
|
}
|
|
dataBuffer = [];
|
|
} catch (e) {
|
|
console.log(`Some error storing messages: ${e}`);
|
|
}
|
|
}
|
|
|
|
function sqlCurrentTimestamp() {
|
|
// YYYY-MM-DDTHH:mm:ss.mmmZ
|
|
const this_instant = new Date().toISOString();
|
|
// YYYY-MM-DD, HH:mm:ss.mmmZ
|
|
const [date, time] = this_instant.split("T");
|
|
// YYYY-MM-DD HH:mm:ss
|
|
const server_received_time = `${date} ${time.slice(0, 8)}`;
|
|
return server_received_time;
|
|
}
|
|
|
|
setInterval(storeMessages, 10000); // store messages every minute
|
|
|
|
socket.on("listening", () => {
|
|
let addr = socket.address();
|
|
console.log(`Listening for UDP packets at ${addr.address}:${addr.port}`);
|
|
});
|
|
|
|
socket.on("error", (err) => {
|
|
console.error(`UDP error: ${err.stack}`);
|
|
});
|
|
|
|
socket.on("message", (msg, rinfo) => {
|
|
if (thisDate !== new Date().toISOString().slice(0, 10)) {
|
|
thisDate = new Date().toISOString().slice(0, 10);
|
|
stream.close();
|
|
stream = fs.createWriteStream(`${filePath}/${thisDate}-log.dat`, {
|
|
flags: "a+",
|
|
});
|
|
}
|
|
stream.write(
|
|
"> " + new Date().toISOString() + " " + msg.toString("hex") + "\n",
|
|
);
|
|
cellularChunking.receive_chunk(msg, handleMessage, rinfo);
|
|
});
|
|
socket.bind(57321);
|