const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress'); const fs = require('fs'); const path = require('path'); const { pipeline } = require('stream'); const { promisify } = require('util'); // Configuration constants to control which tables get imported const IMPORT_PRODUCT_CURRENT_PRICES = false; const IMPORT_DAILY_INVENTORY = false; const IMPORT_PRODUCT_STAT_HISTORY = true; // For product stat history, limit to more recent data for faster initial import const USE_RECENT_MONTHS = 12; // Just use the most recent months for product_stat_history /** * Validates a date from MySQL before inserting it into PostgreSQL * @param {string|Date|null} mysqlDate - Date string or object from MySQL * @returns {string|null} Valid date string or null if invalid */ function validateDate(mysqlDate) { // Handle null, undefined, or empty values if (!mysqlDate) { return null; } // Convert to string if it's not already const dateStr = String(mysqlDate); // Handle MySQL zero dates and empty values if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00' || dateStr.indexOf('0000-00-00') !== -1 || dateStr === '') { return null; } // Check if the date is valid const date = new Date(mysqlDate); // If the date is invalid or suspiciously old (pre-1970), return null if (isNaN(date.getTime()) || date.getFullYear() < 1970) { return null; } return mysqlDate; } /** * Imports historical data from MySQL to PostgreSQL */ async function importHistoricalData( prodConnection, localConnection, options = {} ) { const { incrementalUpdate = true, oneYearAgo = new Date(new Date().setFullYear(new Date().getFullYear() - 1)) } = options; const oneYearAgoStr = oneYearAgo.toISOString().split('T')[0]; const startTime = Date.now(); // Use larger batch sizes to improve performance const BATCH_SIZE = 5000; // For fetching from small tables const INSERT_BATCH_SIZE = 500; // For inserting to small tables const LARGE_BATCH_SIZE = 10000; // For fetching from large tables const LARGE_INSERT_BATCH_SIZE = 1000; // For inserting to large tables // Calculate date for recent data const recentDateStr = new Date( new Date().setMonth(new Date().getMonth() - USE_RECENT_MONTHS) ).toISOString().split('T')[0]; console.log(`Starting import with: - One year ago date: ${oneYearAgoStr} - Recent months date: ${recentDateStr} (for product_stat_history) - Incremental update: ${incrementalUpdate} - Standard batch size: ${BATCH_SIZE} - Standard insert batch size: ${INSERT_BATCH_SIZE} - Large table batch size: ${LARGE_BATCH_SIZE} - Large table insert batch size: ${LARGE_INSERT_BATCH_SIZE} - Import product_current_prices: ${IMPORT_PRODUCT_CURRENT_PRICES} - Import daily_inventory: ${IMPORT_DAILY_INVENTORY} - Import product_stat_history: ${IMPORT_PRODUCT_STAT_HISTORY}`); try { // Get last sync time for incremental updates const lastSyncTimes = {}; if (incrementalUpdate) { try { const syncResult = await localConnection.query(` SELECT table_name, last_sync_timestamp FROM sync_status WHERE table_name IN ( 'imported_product_current_prices', 'imported_daily_inventory', 'imported_product_stat_history' ) `); // Add check for rows existence and type if (syncResult && Array.isArray(syncResult.rows)) { for (const row of syncResult.rows) { lastSyncTimes[row.table_name] = row.last_sync_timestamp; console.log(`Last sync time for ${row.table_name}: ${row.last_sync_timestamp}`); } } else { console.warn('Sync status query did not return expected rows. Proceeding without last sync times.'); } } catch (error) { console.error('Error fetching sync status:', error); } } // Determine how many tables will be imported const tablesCount = [ IMPORT_PRODUCT_CURRENT_PRICES, IMPORT_DAILY_INVENTORY, IMPORT_PRODUCT_STAT_HISTORY ].filter(Boolean).length; // Run all imports sequentially for better reliability console.log(`Starting sequential imports for ${tablesCount} tables...`); outputProgress({ status: "running", operation: "Historical data import", message: `Starting sequential imports for ${tablesCount} tables...`, current: 0, total: tablesCount, elapsed: formatElapsedTime(startTime) }); let progressCount = 0; let productCurrentPricesResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; let dailyInventoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; let productStatHistoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] }; // Import product current prices if (IMPORT_PRODUCT_CURRENT_PRICES) { console.log('Importing product current prices...'); productCurrentPricesResult = await importProductCurrentPrices( prodConnection, localConnection, oneYearAgoStr, lastSyncTimes['imported_product_current_prices'], BATCH_SIZE, INSERT_BATCH_SIZE, incrementalUpdate, startTime ); progressCount++; outputProgress({ status: "running", operation: "Historical data import", message: `Completed import ${progressCount} of ${tablesCount}`, current: progressCount, total: tablesCount, elapsed: formatElapsedTime(startTime) }); } // Import daily inventory if (IMPORT_DAILY_INVENTORY) { console.log('Importing daily inventory...'); dailyInventoryResult = await importDailyInventory( prodConnection, localConnection, oneYearAgoStr, lastSyncTimes['imported_daily_inventory'], BATCH_SIZE, INSERT_BATCH_SIZE, incrementalUpdate, startTime ); progressCount++; outputProgress({ status: "running", operation: "Historical data import", message: `Completed import ${progressCount} of ${tablesCount}`, current: progressCount, total: tablesCount, elapsed: formatElapsedTime(startTime) }); } // Import product stat history - using optimized approach if (IMPORT_PRODUCT_STAT_HISTORY) { console.log('Importing product stat history...'); productStatHistoryResult = await importProductStatHistory( prodConnection, localConnection, recentDateStr, // Use more recent date for this massive table lastSyncTimes['imported_product_stat_history'], LARGE_BATCH_SIZE, LARGE_INSERT_BATCH_SIZE, incrementalUpdate, startTime, USE_RECENT_MONTHS // Pass the recent months constant ); progressCount++; outputProgress({ status: "running", operation: "Historical data import", message: `Completed import ${progressCount} of ${tablesCount}`, current: progressCount, total: tablesCount, elapsed: formatElapsedTime(startTime) }); } // Aggregate results const totalRecordsAdded = productCurrentPricesResult.recordsAdded + dailyInventoryResult.recordsAdded + productStatHistoryResult.recordsAdded; const totalRecordsUpdated = productCurrentPricesResult.recordsUpdated + dailyInventoryResult.recordsUpdated + productStatHistoryResult.recordsUpdated; const totalProcessed = productCurrentPricesResult.totalProcessed + dailyInventoryResult.totalProcessed + productStatHistoryResult.totalProcessed; const allErrors = [ ...productCurrentPricesResult.errors, ...dailyInventoryResult.errors, ...productStatHistoryResult.errors ]; // Log import summary console.log(` Historical data import complete: ------------------------------- Records added: ${totalRecordsAdded} Records updated: ${totalRecordsUpdated} Total processed: ${totalProcessed} Errors: ${allErrors.length} Time taken: ${formatElapsedTime(startTime)} `); // Final progress update outputProgress({ status: "complete", operation: "Historical data import", message: `Import complete. Added: ${totalRecordsAdded}, Updated: ${totalRecordsUpdated}, Errors: ${allErrors.length}`, current: tablesCount, total: tablesCount, elapsed: formatElapsedTime(startTime) }); // Log any errors if (allErrors.length > 0) { console.log('Errors encountered during import:'); console.log(JSON.stringify(allErrors, null, 2)); } // Calculate duration const endTime = Date.now(); const durationSeconds = Math.round((endTime - startTime) / 1000); const finalStatus = allErrors.length === 0 ? 'complete' : 'failed'; const errorMessage = allErrors.length > 0 ? JSON.stringify(allErrors) : null; // Update import history await localConnection.query(` INSERT INTO import_history ( table_name, end_time, duration_seconds, records_added, records_updated, is_incremental, status, error_message, additional_info ) VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7, $8) `, [ 'historical_data_combined', durationSeconds, totalRecordsAdded, totalRecordsUpdated, incrementalUpdate, finalStatus, errorMessage, JSON.stringify({ totalProcessed, tablesImported: { imported_product_current_prices: IMPORT_PRODUCT_CURRENT_PRICES, imported_daily_inventory: IMPORT_DAILY_INVENTORY, imported_product_stat_history: IMPORT_PRODUCT_STAT_HISTORY } }) ]); // Return summary return { recordsAdded: totalRecordsAdded, recordsUpdated: totalRecordsUpdated, totalProcessed, errors: allErrors, timeTaken: formatElapsedTime(startTime) }; } catch (error) { console.error('Error importing historical data:', error); // Final progress update on error outputProgress({ status: "failed", operation: "Historical data import", message: `Import failed: ${error.message}`, elapsed: formatElapsedTime(startTime) }); throw error; } } /** * Imports product_current_prices data from MySQL to PostgreSQL */ async function importProductCurrentPrices( prodConnection, localConnection, oneYearAgoStr, lastSyncTime, batchSize, insertBatchSize, incrementalUpdate, startTime ) { let recordsAdded = 0; let recordsUpdated = 0; let totalProcessed = 0; let errors = []; let offset = 0; let allProcessed = false; try { // Get total count for progress reporting const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM product_current_prices WHERE (date_active >= ? OR date_deactive >= ?) ${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''} `, [oneYearAgoStr, oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); const totalCount = countResult[0].total; outputProgress({ status: "running", operation: "Historical data import - Product Current Prices", message: `Found ${totalCount} records to process`, current: 0, total: totalCount, elapsed: formatElapsedTime(startTime) }); // Process in batches for better performance while (!allProcessed) { try { // Fetch batch from production const [rows] = await prodConnection.query(` SELECT price_id, pid, qty_buy, is_min_qty_buy, price_each, qty_limit, no_promo, checkout_offer, active, date_active, date_deactive FROM product_current_prices WHERE (date_active >= ? OR date_deactive >= ?) ${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''} ORDER BY price_id LIMIT ? OFFSET ? `, [ oneYearAgoStr, oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), batchSize, offset ]); if (rows.length === 0) { allProcessed = true; break; } // Process rows in smaller batches for better performance for (let i = 0; i < rows.length; i += insertBatchSize) { const batch = rows.slice(i, i + insertBatchSize); if (batch.length === 0) continue; try { // Build parameterized query to handle NULL values properly const values = []; const placeholders = []; let placeholderIndex = 1; for (const row of batch) { const rowPlaceholders = [ `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}` ]; placeholders.push(`(${rowPlaceholders.join(', ')})`); values.push( row.price_id, row.pid, row.qty_buy, row.is_min_qty_buy ? true : false, row.price_each, row.qty_limit, // PostgreSQL will handle null values properly row.no_promo ? true : false, row.checkout_offer ? true : false, row.active ? true : false, validateDate(row.date_active), validateDate(row.date_deactive) ); } // Execute batch insert const result = await localConnection.query(` WITH ins AS ( INSERT INTO imported_product_current_prices ( price_id, pid, qty_buy, is_min_qty_buy, price_each, qty_limit, no_promo, checkout_offer, active, date_active, date_deactive ) VALUES ${placeholders.join(',\n')} ON CONFLICT (price_id) DO UPDATE SET pid = EXCLUDED.pid, qty_buy = EXCLUDED.qty_buy, is_min_qty_buy = EXCLUDED.is_min_qty_buy, price_each = EXCLUDED.price_each, qty_limit = EXCLUDED.qty_limit, no_promo = EXCLUDED.no_promo, checkout_offer = EXCLUDED.checkout_offer, active = EXCLUDED.active, date_active = EXCLUDED.date_active, date_deactive = EXCLUDED.date_deactive, updated = CURRENT_TIMESTAMP RETURNING (xmax = 0) AS inserted ) SELECT COUNT(*) FILTER (WHERE inserted) AS inserted_count, COUNT(*) FILTER (WHERE NOT inserted) AS updated_count FROM ins `, values); // Safely update counts based on the result if (result && result.rows && result.rows.length > 0) { const insertedCount = parseInt(result.rows[0].inserted_count || 0); const updatedCount = parseInt(result.rows[0].updated_count || 0); recordsAdded += insertedCount; recordsUpdated += updatedCount; } } catch (error) { console.error(`Error in batch import of product_current_prices at offset ${i}:`, error); errors.push({ table: 'imported_product_current_prices', batchOffset: i, batchSize: batch.length, error: error.message }); } } totalProcessed += rows.length; offset += rows.length; // Update progress outputProgress({ status: "running", operation: "Historical data import - Product Current Prices", message: `Processed ${totalProcessed} of ${totalCount} records`, current: totalProcessed, total: totalCount, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProcessed, totalCount), rate: calculateRate(startTime, totalProcessed) }); } catch (error) { console.error('Error in batch import of product_current_prices:', error); errors.push({ table: 'imported_product_current_prices', error: error.message, offset: offset, batchSize: batchSize }); // Try to continue with next batch offset += batchSize; } } // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('imported_product_current_prices', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); return { recordsAdded, recordsUpdated, totalProcessed, errors }; } catch (error) { console.error('Error in product current prices import:', error); return { recordsAdded, recordsUpdated, totalProcessed, errors: [...errors, { table: 'imported_product_current_prices', error: error.message }] }; } } /** * Imports daily_inventory data from MySQL to PostgreSQL */ async function importDailyInventory( prodConnection, localConnection, oneYearAgoStr, lastSyncTime, batchSize, insertBatchSize, incrementalUpdate, startTime ) { let recordsAdded = 0; let recordsUpdated = 0; let totalProcessed = 0; let errors = []; let offset = 0; let allProcessed = false; try { // Get total count for progress reporting const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM daily_inventory WHERE date >= ? ${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''} `, [oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); const totalCount = countResult[0].total; outputProgress({ status: "running", operation: "Historical data import - Daily Inventory", message: `Found ${totalCount} records to process`, current: 0, total: totalCount, elapsed: formatElapsedTime(startTime) }); // Process in batches for better performance while (!allProcessed) { try { // Fetch batch from production const [rows] = await prodConnection.query(` SELECT date, pid, amountsold, times_sold, qtyreceived, price, costeach, stamp FROM daily_inventory WHERE date >= ? ${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''} ORDER BY date, pid LIMIT ? OFFSET ? `, [ oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), batchSize, offset ]); if (rows.length === 0) { allProcessed = true; break; } // Process rows in smaller batches for better performance for (let i = 0; i < rows.length; i += insertBatchSize) { const batch = rows.slice(i, i + insertBatchSize); if (batch.length === 0) continue; try { // Build parameterized query to handle NULL values properly const values = []; const placeholders = []; let placeholderIndex = 1; for (const row of batch) { const rowPlaceholders = [ `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}` ]; placeholders.push(`(${rowPlaceholders.join(', ')})`); values.push( validateDate(row.date), row.pid, row.amountsold || 0, row.times_sold || 0, row.qtyreceived || 0, row.price || 0, row.costeach || 0, validateDate(row.stamp) ); } // Execute batch insert const result = await localConnection.query(` WITH ins AS ( INSERT INTO imported_daily_inventory ( date, pid, amountsold, times_sold, qtyreceived, price, costeach, stamp ) VALUES ${placeholders.join(',\n')} ON CONFLICT (date, pid) DO UPDATE SET amountsold = EXCLUDED.amountsold, times_sold = EXCLUDED.times_sold, qtyreceived = EXCLUDED.qtyreceived, price = EXCLUDED.price, costeach = EXCLUDED.costeach, stamp = EXCLUDED.stamp, updated = CURRENT_TIMESTAMP RETURNING (xmax = 0) AS inserted ) SELECT COUNT(*) FILTER (WHERE inserted) AS inserted_count, COUNT(*) FILTER (WHERE NOT inserted) AS updated_count FROM ins `, values); // Safely update counts based on the result if (result && result.rows && result.rows.length > 0) { const insertedCount = parseInt(result.rows[0].inserted_count || 0); const updatedCount = parseInt(result.rows[0].updated_count || 0); recordsAdded += insertedCount; recordsUpdated += updatedCount; } } catch (error) { console.error(`Error in batch import of daily_inventory at offset ${i}:`, error); errors.push({ table: 'imported_daily_inventory', batchOffset: i, batchSize: batch.length, error: error.message }); } } totalProcessed += rows.length; offset += rows.length; // Update progress outputProgress({ status: "running", operation: "Historical data import - Daily Inventory", message: `Processed ${totalProcessed} of ${totalCount} records`, current: totalProcessed, total: totalCount, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProcessed, totalCount), rate: calculateRate(startTime, totalProcessed) }); } catch (error) { console.error('Error in batch import of daily_inventory:', error); errors.push({ table: 'imported_daily_inventory', error: error.message, offset: offset, batchSize: batchSize }); // Try to continue with next batch offset += batchSize; } } // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('imported_daily_inventory', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); return { recordsAdded, recordsUpdated, totalProcessed, errors }; } catch (error) { console.error('Error in daily inventory import:', error); return { recordsAdded, recordsUpdated, totalProcessed, errors: [...errors, { table: 'imported_daily_inventory', error: error.message }] }; } } /** * Imports product_stat_history data from MySQL to PostgreSQL * Using fast direct inserts without conflict checking */ async function importProductStatHistory( prodConnection, localConnection, recentDateStr, // Use more recent date instead of one year ago lastSyncTime, batchSize, insertBatchSize, incrementalUpdate, startTime, recentMonths // Add parameter for recent months ) { let recordsAdded = 0; let recordsUpdated = 0; let totalProcessed = 0; let errors = []; let offset = 0; let allProcessed = false; let lastRateCheck = Date.now(); let lastProcessed = 0; try { // Get total count for progress reporting const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM product_stat_history WHERE date >= ? ${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''} `, [recentDateStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]); const totalCount = countResult[0].total; console.log(`Found ${totalCount} records to process in product_stat_history (using recent date: ${recentDateStr})`); // Progress indicator outputProgress({ status: "running", operation: "Historical data import - Product Stat History", message: `Found ${totalCount} records to process (last ${recentMonths} months only)`, current: 0, total: totalCount, elapsed: formatElapsedTime(startTime) }); // If not incremental, truncate the table first for better performance if (!incrementalUpdate) { console.log('Truncating imported_product_stat_history for full import...'); await localConnection.query('TRUNCATE TABLE imported_product_stat_history'); } else if (lastSyncTime) { // For incremental updates, delete records that will be reimported console.log(`Deleting records from imported_product_stat_history since ${lastSyncTime}...`); await localConnection.query('DELETE FROM imported_product_stat_history WHERE date > $1', [lastSyncTime]); } // Process in batches for better performance while (!allProcessed) { try { // Fetch batch from production with minimal filtering and no sorting const [rows] = await prodConnection.query(` SELECT pid, date, COALESCE(score, 0) as score, COALESCE(score2, 0) as score2, COALESCE(qty_in_baskets, 0) as qty_in_baskets, COALESCE(qty_sold, 0) as qty_sold, COALESCE(notifies_set, 0) as notifies_set, COALESCE(visibility_score, 0) as visibility_score, COALESCE(health_score, 0) as health_score, COALESCE(sold_view_score, 0) as sold_view_score FROM product_stat_history WHERE date >= ? ${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''} LIMIT ? OFFSET ? `, [ recentDateStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []), batchSize, offset ]); if (rows.length === 0) { allProcessed = true; break; } // Process rows in smaller batches for better performance for (let i = 0; i < rows.length; i += insertBatchSize) { const batch = rows.slice(i, i + insertBatchSize); if (batch.length === 0) continue; try { // Build parameterized query to handle NULL values properly const values = []; const placeholders = []; let placeholderIndex = 1; for (const row of batch) { const rowPlaceholders = [ `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}`, `$${placeholderIndex++}` ]; placeholders.push(`(${rowPlaceholders.join(', ')})`); values.push( row.pid, validateDate(row.date), row.score, row.score2, row.qty_in_baskets, row.qty_sold, row.notifies_set, row.visibility_score, row.health_score, row.sold_view_score ); } // Execute direct batch insert without conflict checking await localConnection.query(` INSERT INTO imported_product_stat_history ( pid, date, score, score2, qty_in_baskets, qty_sold, notifies_set, visibility_score, health_score, sold_view_score ) VALUES ${placeholders.join(',\n')} `, values); // All inserts are new records when using this approach recordsAdded += batch.length; } catch (error) { console.error(`Error in batch insert of product_stat_history at offset ${i}:`, error); errors.push({ table: 'imported_product_stat_history', batchOffset: i, batchSize: batch.length, error: error.message }); } } totalProcessed += rows.length; offset += rows.length; // Calculate current rate every 10 seconds or 100,000 records const now = Date.now(); if (now - lastRateCheck > 10000 || totalProcessed - lastProcessed > 100000) { const timeElapsed = (now - lastRateCheck) / 1000; // seconds const recordsProcessed = totalProcessed - lastProcessed; const currentRate = Math.round(recordsProcessed / timeElapsed); console.log(`Current import rate: ${currentRate} records/second`); lastRateCheck = now; lastProcessed = totalProcessed; } // Update progress outputProgress({ status: "running", operation: "Historical data import - Product Stat History", message: `Processed ${totalProcessed} of ${totalCount} records`, current: totalProcessed, total: totalCount, elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, totalProcessed, totalCount), rate: calculateRate(startTime, totalProcessed) }); } catch (error) { console.error('Error in batch import of product_stat_history:', error); errors.push({ table: 'imported_product_stat_history', error: error.message, offset: offset, batchSize: batchSize }); // Try to continue with next batch offset += batchSize; } } // Update sync status await localConnection.query(` INSERT INTO sync_status (table_name, last_sync_timestamp) VALUES ('imported_product_stat_history', NOW()) ON CONFLICT (table_name) DO UPDATE SET last_sync_timestamp = NOW() `); return { recordsAdded, recordsUpdated, totalProcessed, errors }; } catch (error) { console.error('Error in product stat history import:', error); return { recordsAdded, recordsUpdated, totalProcessed, errors: [...errors, { table: 'imported_product_stat_history', error: error.message }] }; } } module.exports = importHistoricalData;