Import/metrics calc fixes

This commit is contained in:
2026-02-08 22:44:57 -05:00
parent 12cc7a4639
commit 7c41a7f799
11 changed files with 828 additions and 55 deletions

View File

@@ -0,0 +1,184 @@
const { outputProgress, formatElapsedTime, calculateRate } = require('../metrics-new/utils/progress');
const BATCH_SIZE = 5000;
/**
* Imports daily stock snapshots from MySQL's snap_product_value table to PostgreSQL.
* This provides historical end-of-day stock quantities per product, dating back to 2012.
*
* MySQL source table: snap_product_value (date, pid, count, pending, value)
* - date: snapshot date (typically yesterday's date, recorded daily by cron)
* - pid: product ID
* - count: end-of-day stock quantity (sum of product_inventory.count)
* - pending: pending/on-order quantity
* - value: total inventory value at cost (sum of costeach * count)
*
* PostgreSQL target table: stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value)
*
* @param {object} prodConnection - MySQL connection to production DB
* @param {object} localConnection - PostgreSQL connection wrapper
* @param {boolean} incrementalUpdate - If true, only fetch new snapshots since last import
* @returns {object} Import statistics
*/
async function importStockSnapshots(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now();
outputProgress({
status: 'running',
operation: 'Stock snapshots import',
message: 'Starting stock snapshots import...',
current: 0,
total: 0,
elapsed: formatElapsedTime(startTime)
});
// Ensure target table exists
await localConnection.query(`
CREATE TABLE IF NOT EXISTS stock_snapshots (
snapshot_date DATE NOT NULL,
pid BIGINT NOT NULL,
stock_quantity INT NOT NULL DEFAULT 0,
pending_quantity INT NOT NULL DEFAULT 0,
stock_value NUMERIC(14, 4) NOT NULL DEFAULT 0,
PRIMARY KEY (snapshot_date, pid)
)
`);
// Create index for efficient lookups by pid
await localConnection.query(`
CREATE INDEX IF NOT EXISTS idx_stock_snapshots_pid ON stock_snapshots (pid)
`);
// Determine the start date for the import
let startDate = '2020-01-01'; // Default: match the orders/snapshots date range
if (incrementalUpdate) {
const [result] = await localConnection.query(`
SELECT MAX(snapshot_date)::text AS max_date FROM stock_snapshots
`);
if (result.rows[0]?.max_date) {
// Start from the day after the last imported date
startDate = result.rows[0].max_date;
}
}
outputProgress({
status: 'running',
operation: 'Stock snapshots import',
message: `Fetching stock snapshots from MySQL since ${startDate}...`,
current: 0,
total: 0,
elapsed: formatElapsedTime(startTime)
});
// Count total rows to import
const [countResult] = await prodConnection.query(
`SELECT COUNT(*) AS total FROM snap_product_value WHERE date > ?`,
[startDate]
);
const totalRows = countResult[0].total;
if (totalRows === 0) {
outputProgress({
status: 'complete',
operation: 'Stock snapshots import',
message: 'No new stock snapshots to import',
current: 0,
total: 0,
elapsed: formatElapsedTime(startTime)
});
return { recordsAdded: 0, recordsUpdated: 0, status: 'complete' };
}
outputProgress({
status: 'running',
operation: 'Stock snapshots import',
message: `Found ${totalRows.toLocaleString()} stock snapshot rows to import`,
current: 0,
total: totalRows,
elapsed: formatElapsedTime(startTime)
});
// Process in batches using date-based pagination (more efficient than OFFSET)
let processedRows = 0;
let recordsAdded = 0;
let currentDate = startDate;
while (processedRows < totalRows) {
// Fetch a batch of dates
const [dateBatch] = await prodConnection.query(
`SELECT DISTINCT date FROM snap_product_value
WHERE date > ? ORDER BY date LIMIT 10`,
[currentDate]
);
if (dateBatch.length === 0) break;
const dates = dateBatch.map(r => r.date);
const lastDate = dates[dates.length - 1];
// Fetch all rows for these dates
const [rows] = await prodConnection.query(
`SELECT date, pid, count AS stock_quantity, pending AS pending_quantity, value AS stock_value
FROM snap_product_value
WHERE date > ? AND date <= ?
ORDER BY date, pid`,
[currentDate, lastDate]
);
if (rows.length === 0) break;
// Batch insert into PostgreSQL using UNNEST for efficiency
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
const batch = rows.slice(i, i + BATCH_SIZE);
const dates = batch.map(r => r.date);
const pids = batch.map(r => r.pid);
const quantities = batch.map(r => r.stock_quantity);
const pending = batch.map(r => r.pending_quantity);
const values = batch.map(r => r.stock_value);
const [result] = await localConnection.query(`
INSERT INTO stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value)
SELECT * FROM UNNEST(
$1::date[], $2::bigint[], $3::int[], $4::int[], $5::numeric[]
)
ON CONFLICT (snapshot_date, pid) DO UPDATE SET
stock_quantity = EXCLUDED.stock_quantity,
pending_quantity = EXCLUDED.pending_quantity,
stock_value = EXCLUDED.stock_value
`, [dates, pids, quantities, pending, values]);
recordsAdded += batch.length;
}
processedRows += rows.length;
currentDate = lastDate;
outputProgress({
status: 'running',
operation: 'Stock snapshots import',
message: `Imported ${processedRows.toLocaleString()} / ${totalRows.toLocaleString()} rows (through ${currentDate})`,
current: processedRows,
total: totalRows,
elapsed: formatElapsedTime(startTime),
rate: calculateRate(processedRows, startTime)
});
}
outputProgress({
status: 'complete',
operation: 'Stock snapshots import',
message: `Stock snapshots import complete: ${recordsAdded.toLocaleString()} rows`,
current: processedRows,
total: totalRows,
elapsed: formatElapsedTime(startTime)
});
return {
recordsAdded,
recordsUpdated: 0,
status: 'complete'
};
}
module.exports = importStockSnapshots;