Restore accidentally removed files, a few forecast tweaks
This commit is contained in:
188
inventory-server/scripts/import/stock-snapshots.js
Normal file
188
inventory-server/scripts/import/stock-snapshots.js
Normal file
@@ -0,0 +1,188 @@
|
||||
const { outputProgress, formatElapsedTime, calculateRate } = require('../metrics-new/utils/progress');
|
||||
|
||||
const BATCH_SIZE = 5000;
|
||||
|
||||
/**
|
||||
* Imports daily stock snapshots from MySQL's snap_product_value table to PostgreSQL.
|
||||
* This provides historical end-of-day stock quantities per product, dating back to 2012.
|
||||
*
|
||||
* MySQL source table: snap_product_value (date, pid, count, pending, value)
|
||||
* - date: snapshot date (typically yesterday's date, recorded daily by cron)
|
||||
* - pid: product ID
|
||||
* - count: end-of-day stock quantity (sum of product_inventory.count)
|
||||
* - pending: pending/on-order quantity
|
||||
* - value: total inventory value at cost (sum of costeach * count)
|
||||
*
|
||||
* PostgreSQL target table: stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value)
|
||||
*
|
||||
* @param {object} prodConnection - MySQL connection to production DB
|
||||
* @param {object} localConnection - PostgreSQL connection wrapper
|
||||
* @param {boolean} incrementalUpdate - If true, only fetch new snapshots since last import
|
||||
* @returns {object} Import statistics
|
||||
*/
|
||||
async function importStockSnapshots(prodConnection, localConnection, incrementalUpdate = true) {
|
||||
const startTime = Date.now();
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Stock snapshots import',
|
||||
message: 'Starting stock snapshots import...',
|
||||
current: 0,
|
||||
total: 0,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
});
|
||||
|
||||
// Ensure target table exists
|
||||
await localConnection.query(`
|
||||
CREATE TABLE IF NOT EXISTS stock_snapshots (
|
||||
snapshot_date DATE NOT NULL,
|
||||
pid BIGINT NOT NULL,
|
||||
stock_quantity INT NOT NULL DEFAULT 0,
|
||||
pending_quantity INT NOT NULL DEFAULT 0,
|
||||
stock_value NUMERIC(14, 4) NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (snapshot_date, pid)
|
||||
)
|
||||
`);
|
||||
|
||||
// Create index for efficient lookups by pid
|
||||
await localConnection.query(`
|
||||
CREATE INDEX IF NOT EXISTS idx_stock_snapshots_pid ON stock_snapshots (pid)
|
||||
`);
|
||||
|
||||
// Determine the start date for the import
|
||||
let startDate = '2020-01-01'; // Default: match the orders/snapshots date range
|
||||
if (incrementalUpdate) {
|
||||
const [result] = await localConnection.query(`
|
||||
SELECT MAX(snapshot_date)::text AS max_date FROM stock_snapshots
|
||||
`);
|
||||
if (result.rows[0]?.max_date) {
|
||||
// Start from the day after the last imported date
|
||||
startDate = result.rows[0].max_date;
|
||||
}
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Stock snapshots import',
|
||||
message: `Fetching stock snapshots from MySQL since ${startDate}...`,
|
||||
current: 0,
|
||||
total: 0,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
});
|
||||
|
||||
// Count total rows to import
|
||||
const [countResult] = await prodConnection.query(
|
||||
`SELECT COUNT(*) AS total FROM snap_product_value WHERE date > ?`,
|
||||
[startDate]
|
||||
);
|
||||
const totalRows = countResult[0].total;
|
||||
|
||||
if (totalRows === 0) {
|
||||
outputProgress({
|
||||
status: 'complete',
|
||||
operation: 'Stock snapshots import',
|
||||
message: 'No new stock snapshots to import',
|
||||
current: 0,
|
||||
total: 0,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
});
|
||||
return { recordsAdded: 0, recordsUpdated: 0, status: 'complete' };
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Stock snapshots import',
|
||||
message: `Found ${totalRows.toLocaleString()} stock snapshot rows to import`,
|
||||
current: 0,
|
||||
total: totalRows,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
});
|
||||
|
||||
// Process in batches using date-based pagination (more efficient than OFFSET)
|
||||
let processedRows = 0;
|
||||
let recordsAdded = 0;
|
||||
let currentDate = startDate;
|
||||
|
||||
while (processedRows < totalRows) {
|
||||
// Fetch a batch of dates
|
||||
const [dateBatch] = await prodConnection.query(
|
||||
`SELECT DISTINCT date FROM snap_product_value
|
||||
WHERE date > ? ORDER BY date LIMIT 10`,
|
||||
[currentDate]
|
||||
);
|
||||
|
||||
if (dateBatch.length === 0) break;
|
||||
|
||||
const dates = dateBatch.map(r => r.date);
|
||||
const lastDate = dates[dates.length - 1];
|
||||
|
||||
// Fetch all rows for these dates
|
||||
const [rows] = await prodConnection.query(
|
||||
`SELECT date, pid, count AS stock_quantity, pending AS pending_quantity, value AS stock_value
|
||||
FROM snap_product_value
|
||||
WHERE date > ? AND date <= ?
|
||||
ORDER BY date, pid`,
|
||||
[currentDate, lastDate]
|
||||
);
|
||||
|
||||
if (rows.length === 0) break;
|
||||
|
||||
// Batch insert into PostgreSQL using UNNEST for efficiency
|
||||
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
||||
const batch = rows.slice(i, i + BATCH_SIZE);
|
||||
|
||||
const dates = batch.map(r => r.date);
|
||||
const pids = batch.map(r => r.pid);
|
||||
const quantities = batch.map(r => r.stock_quantity);
|
||||
const pending = batch.map(r => r.pending_quantity);
|
||||
const values = batch.map(r => r.stock_value);
|
||||
|
||||
try {
|
||||
const [result] = await localConnection.query(`
|
||||
INSERT INTO stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value)
|
||||
SELECT * FROM UNNEST(
|
||||
$1::date[], $2::bigint[], $3::int[], $4::int[], $5::numeric[]
|
||||
)
|
||||
ON CONFLICT (snapshot_date, pid) DO UPDATE SET
|
||||
stock_quantity = EXCLUDED.stock_quantity,
|
||||
pending_quantity = EXCLUDED.pending_quantity,
|
||||
stock_value = EXCLUDED.stock_value
|
||||
`, [dates, pids, quantities, pending, values]);
|
||||
|
||||
recordsAdded += batch.length;
|
||||
} catch (err) {
|
||||
console.error(`Error inserting batch at offset ${i} (date range ending ${currentDate}):`, err.message);
|
||||
}
|
||||
}
|
||||
|
||||
processedRows += rows.length;
|
||||
currentDate = lastDate;
|
||||
|
||||
outputProgress({
|
||||
status: 'running',
|
||||
operation: 'Stock snapshots import',
|
||||
message: `Imported ${processedRows.toLocaleString()} / ${totalRows.toLocaleString()} rows (through ${currentDate})`,
|
||||
current: processedRows,
|
||||
total: totalRows,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
rate: calculateRate(processedRows, startTime)
|
||||
});
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: 'complete',
|
||||
operation: 'Stock snapshots import',
|
||||
message: `Stock snapshots import complete: ${recordsAdded.toLocaleString()} rows`,
|
||||
current: processedRows,
|
||||
total: totalRows,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
});
|
||||
|
||||
return {
|
||||
recordsAdded,
|
||||
recordsUpdated: 0,
|
||||
status: 'complete'
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = importStockSnapshots;
|
||||
Reference in New Issue
Block a user