const { outputProgress, formatElapsedTime, calculateRate } = require('../metrics-new/utils/progress'); const BATCH_SIZE = 5000; /** * Imports daily stock snapshots from MySQL's snap_product_value table to PostgreSQL. * This provides historical end-of-day stock quantities per product, dating back to 2012. * * MySQL source table: snap_product_value (date, pid, count, pending, value) * - date: snapshot date (typically yesterday's date, recorded daily by cron) * - pid: product ID * - count: end-of-day stock quantity (sum of product_inventory.count) * - pending: pending/on-order quantity * - value: total inventory value at cost (sum of costeach * count) * * PostgreSQL target table: stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value) * * @param {object} prodConnection - MySQL connection to production DB * @param {object} localConnection - PostgreSQL connection wrapper * @param {boolean} incrementalUpdate - If true, only fetch new snapshots since last import * @returns {object} Import statistics */ async function importStockSnapshots(prodConnection, localConnection, incrementalUpdate = true) { const startTime = Date.now(); outputProgress({ status: 'running', operation: 'Stock snapshots import', message: 'Starting stock snapshots import...', current: 0, total: 0, elapsed: formatElapsedTime(startTime) }); // Ensure target table exists await localConnection.query(` CREATE TABLE IF NOT EXISTS stock_snapshots ( snapshot_date DATE NOT NULL, pid BIGINT NOT NULL, stock_quantity INT NOT NULL DEFAULT 0, pending_quantity INT NOT NULL DEFAULT 0, stock_value NUMERIC(14, 4) NOT NULL DEFAULT 0, PRIMARY KEY (snapshot_date, pid) ) `); // Create index for efficient lookups by pid await localConnection.query(` CREATE INDEX IF NOT EXISTS idx_stock_snapshots_pid ON stock_snapshots (pid) `); // Determine the start date for the import let startDate = '2020-01-01'; // Default: match the orders/snapshots date range if (incrementalUpdate) { const [result] = await localConnection.query(` SELECT MAX(snapshot_date)::text AS max_date FROM stock_snapshots `); if (result.rows[0]?.max_date) { // Start from the day after the last imported date startDate = result.rows[0].max_date; } } outputProgress({ status: 'running', operation: 'Stock snapshots import', message: `Fetching stock snapshots from MySQL since ${startDate}...`, current: 0, total: 0, elapsed: formatElapsedTime(startTime) }); // Count total rows to import const [countResult] = await prodConnection.query( `SELECT COUNT(*) AS total FROM snap_product_value WHERE date > ?`, [startDate] ); const totalRows = countResult[0].total; if (totalRows === 0) { outputProgress({ status: 'complete', operation: 'Stock snapshots import', message: 'No new stock snapshots to import', current: 0, total: 0, elapsed: formatElapsedTime(startTime) }); return { recordsAdded: 0, recordsUpdated: 0, status: 'complete' }; } outputProgress({ status: 'running', operation: 'Stock snapshots import', message: `Found ${totalRows.toLocaleString()} stock snapshot rows to import`, current: 0, total: totalRows, elapsed: formatElapsedTime(startTime) }); // Process in batches using date-based pagination (more efficient than OFFSET) let processedRows = 0; let recordsAdded = 0; let currentDate = startDate; while (processedRows < totalRows) { // Fetch a batch of dates const [dateBatch] = await prodConnection.query( `SELECT DISTINCT date FROM snap_product_value WHERE date > ? ORDER BY date LIMIT 10`, [currentDate] ); if (dateBatch.length === 0) break; const dates = dateBatch.map(r => r.date); const lastDate = dates[dates.length - 1]; // Fetch all rows for these dates const [rows] = await prodConnection.query( `SELECT date, pid, count AS stock_quantity, pending AS pending_quantity, value AS stock_value FROM snap_product_value WHERE date > ? AND date <= ? ORDER BY date, pid`, [currentDate, lastDate] ); if (rows.length === 0) break; // Batch insert into PostgreSQL using UNNEST for efficiency for (let i = 0; i < rows.length; i += BATCH_SIZE) { const batch = rows.slice(i, i + BATCH_SIZE); const dates = batch.map(r => r.date); const pids = batch.map(r => r.pid); const quantities = batch.map(r => r.stock_quantity); const pending = batch.map(r => r.pending_quantity); const values = batch.map(r => r.stock_value); try { const [result] = await localConnection.query(` INSERT INTO stock_snapshots (snapshot_date, pid, stock_quantity, pending_quantity, stock_value) SELECT * FROM UNNEST( $1::date[], $2::bigint[], $3::int[], $4::int[], $5::numeric[] ) ON CONFLICT (snapshot_date, pid) DO UPDATE SET stock_quantity = EXCLUDED.stock_quantity, pending_quantity = EXCLUDED.pending_quantity, stock_value = EXCLUDED.stock_value `, [dates, pids, quantities, pending, values]); recordsAdded += batch.length; } catch (err) { console.error(`Error inserting batch at offset ${i} (date range ending ${currentDate}):`, err.message); } } processedRows += rows.length; currentDate = lastDate; outputProgress({ status: 'running', operation: 'Stock snapshots import', message: `Imported ${processedRows.toLocaleString()} / ${totalRows.toLocaleString()} rows (through ${currentDate})`, current: processedRows, total: totalRows, elapsed: formatElapsedTime(startTime), rate: calculateRate(processedRows, startTime) }); } outputProgress({ status: 'complete', operation: 'Stock snapshots import', message: `Stock snapshots import complete: ${recordsAdded.toLocaleString()} rows`, current: processedRows, total: totalRows, elapsed: formatElapsedTime(startTime) }); return { recordsAdded, recordsUpdated: 0, status: 'complete' }; } module.exports = importStockSnapshots;