const fs = require('fs'); const path = require('path'); const csv = require('csv-parse'); const mysql = require('mysql2/promise'); const dotenv = require('dotenv'); const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); // Get test limits from environment variables const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0'); const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000'); const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000'); dotenv.config({ path: path.join(__dirname, '../.env') }); const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Set up logging const LOG_DIR = path.join(__dirname, '../logs'); const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log'); const IMPORT_LOG = path.join(LOG_DIR, 'import.log'); // Ensure log directory exists if (!fs.existsSync(LOG_DIR)) { fs.mkdirSync(LOG_DIR, { recursive: true }); } // Helper function to log errors function logError(error, context = '') { const timestamp = new Date().toISOString(); const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`; // Log to error file fs.appendFileSync(ERROR_LOG, errorMessage); // Also log to console console.error(`\n${context}\nError: ${error.message}`); } // Helper function to log import progress function logImport(message, isSignificant = false) { // Only write to disk if it's a significant event if (isSignificant) { const timestamp = new Date().toISOString(); const logMessage = `[${timestamp}] ${message}\n`; fs.appendFileSync(IMPORT_LOG, logMessage); } } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(' '); } // Helper function to output progress function outputProgress(data) { // Always send to stdout for frontend process.stdout.write(JSON.stringify(data) + '\n'); // Log significant events to disk const isSignificant = // Operation starts (data.operation && !data.current) || // Operation completions and errors data.status === 'complete' || data.status === 'error' || // Test limits reached data.message?.includes('test limit') || // Schema changes data.operation?.includes('Creating database schema') || // Parallel import starts data.message?.includes('Processing orders and purchase orders simultaneously'); if (isSignificant) { logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`, true); } } // Helper function to count total rows in a CSV file async function countRows(filePath) { return new Promise((resolve, reject) => { let count = 0; fs.createReadStream(filePath) .pipe(csv.parse()) .on('data', () => count++) .on('error', reject) .on('end', () => resolve(count - 1)); // Subtract 1 for header row }); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime, added = 0, updated = 0, skipped = 0) { outputProgress({ status: 'running', operation, current, total, rate: calculateRate(startTime, current), elapsed: formatElapsedTime(startTime), remaining: estimateRemaining(startTime, current, total), percentage: ((current / total) * 100).toFixed(1), added, updated, skipped }); } // Helper function to handle category normalization async function handleCategories(connection, productId, categoriesStr) { if (!categoriesStr) { // If no categories, remove all existing relationships await connection.query( 'DELETE FROM product_categories WHERE product_id = ?', [productId] ); return; } // Special cases that should not be split const specialCategories = [ 'Paint, Dyes & Chalk', 'Fabric Paint, Markers, and Dye', 'Crystals, Gems & Rhinestones', 'Pens, Pencils & Markers' ]; // Split categories and clean them, preserving special cases const categories = []; let remainingStr = categoriesStr; // First check for special categories for (const special of specialCategories) { if (remainingStr.includes(special)) { categories.push(special); // Remove the special category from the string remainingStr = remainingStr.replace(special, ''); } } // Then process any remaining regular categories remainingStr.split(',') .map(cat => cat.trim()) .filter(cat => cat.length > 0) .forEach(cat => { if (!categories.includes(cat)) { categories.push(cat); } }); // Remove existing category relationships for this product await connection.query( 'DELETE FROM product_categories WHERE product_id = ?', [productId] ); // Insert categories and create relationships for (const category of categories) { // Insert category if it doesn't exist await connection.query( 'INSERT IGNORE INTO categories (name) VALUES (?)', [category] ); // Get category ID and create relationship in one query to avoid race conditions await connection.query(` INSERT IGNORE INTO product_categories (product_id, category_id) SELECT ?, id FROM categories WHERE name = ?`, [productId, category] ); } } // Helper function to calculate sales velocity metrics async function calculateSalesVelocity(connection, productId) { const [rows] = await connection.query(` SELECT COALESCE(COUNT(*) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0), 0) as daily_sales_avg, COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 7, 0) as weekly_sales_avg, COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 30, 0) as monthly_sales_avg FROM orders WHERE product_id = ? AND canceled = false GROUP BY product_id `, [productId]); return rows[0] || { daily_sales_avg: 0, weekly_sales_avg: 0, monthly_sales_avg: 0 }; } // Helper function to calculate stock metrics async function calculateStockMetrics(connection, productId, dailySalesAvg) { const [product] = await connection.query( 'SELECT stock_quantity FROM products WHERE product_id = ?', [productId] ); if (!product[0]) return null; const stockQty = product[0].stock_quantity; const daysOfInventory = dailySalesAvg > 0 ? Math.floor(stockQty / dailySalesAvg) : 999; const weeksOfInventory = Math.floor(daysOfInventory / 7); // Calculate safety stock (2 weeks of average sales) const safetyStock = Math.ceil(dailySalesAvg * 14); // Calculate reorder point (safety stock + 1 week of sales) const reorderPoint = Math.ceil(safetyStock + (dailySalesAvg * 7)); return { days_of_inventory: daysOfInventory, weeks_of_inventory: weeksOfInventory, safety_stock: safetyStock, reorder_point: reorderPoint }; } // Helper function to calculate financial metrics async function calculateFinancialMetrics(connection, productId) { const [rows] = await connection.query(` SELECT SUM(o.price * o.quantity) as total_revenue, AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent FROM orders o JOIN products p ON o.product_id = p.product_id WHERE o.product_id = ? AND o.canceled = false GROUP BY o.product_id `, [productId]); return rows[0] || { total_revenue: 0, avg_margin_percent: 0 }; } // Helper function to calculate purchase metrics async function calculatePurchaseMetrics(connection, productId) { const [rows] = await connection.query(` SELECT AVG(DATEDIFF(received_date, date)) as avg_lead_time_days, MAX(date) as last_purchase_date, MAX(received_date) as last_received_date FROM purchase_orders WHERE product_id = ? AND status = 'closed' GROUP BY product_id `, [productId]); return rows[0] || { avg_lead_time_days: 0, last_purchase_date: null, last_received_date: null }; } // Helper function to calculate ABC classification async function calculateABCClass(connection, productId) { // Get total revenue for this product const [productRevenue] = await connection.query(` SELECT SUM(price * quantity) as revenue FROM orders WHERE product_id = ? AND canceled = false `, [productId]); // Get total revenue across all products const [totalRevenue] = await connection.query(` SELECT SUM(price * quantity) as total FROM orders WHERE canceled = false `); const revenue = productRevenue[0]?.revenue || 0; const total = totalRevenue[0]?.total || 0; if (total === 0) return 'C'; const percentage = (revenue / total) * 100; // A: Top 20% of revenue // B: Next 30% of revenue // C: Remaining 50% of revenue if (percentage >= 20) return 'A'; if (percentage >= 5) return 'B'; return 'C'; } // Helper function to calculate time-based aggregates async function calculateTimeAggregates(connection, productId) { await connection.query(` INSERT INTO product_time_aggregates ( product_id, year, month, total_quantity_sold, total_revenue, total_cost, order_count, stock_received, stock_ordered, avg_price, profit_margin ) SELECT o.product_id, YEAR(o.date) as year, MONTH(o.date) as month, SUM(o.quantity) as total_quantity_sold, SUM(o.price * o.quantity) as total_revenue, SUM(p.cost_price * o.quantity) as total_cost, COUNT(DISTINCT o.order_number) as order_count, COALESCE(SUM(po.received), 0) as stock_received, COALESCE(SUM(po.ordered), 0) as stock_ordered, AVG(o.price) as avg_price, CASE WHEN SUM(o.price * o.quantity) = 0 THEN 0 ELSE ((SUM(o.price * o.quantity) - COALESCE(SUM(p.cost_price * o.quantity), 0)) / NULLIF(SUM(o.price * o.quantity), 0) * 100) END as profit_margin FROM orders o JOIN products p ON o.product_id = p.product_id LEFT JOIN purchase_orders po ON o.product_id = po.product_id AND YEAR(o.date) = YEAR(po.date) AND MONTH(o.date) = MONTH(po.date) WHERE o.product_id = ? AND o.canceled = false GROUP BY o.product_id, YEAR(o.date), MONTH(o.date) ON DUPLICATE KEY UPDATE total_quantity_sold = VALUES(total_quantity_sold), total_revenue = VALUES(total_revenue), total_cost = VALUES(total_cost), order_count = VALUES(order_count), stock_received = VALUES(stock_received), stock_ordered = VALUES(stock_ordered), avg_price = VALUES(avg_price), profit_margin = VALUES(profit_margin) `, [productId]); } // Helper function to calculate vendor metrics async function calculateVendorMetrics(connection) { try { // Get list of vendors const [vendors] = await connection.query('SELECT DISTINCT vendor FROM products WHERE vendor IS NOT NULL'); const startTime = Date.now(); let current = 0; const total = vendors.length; outputProgress({ operation: 'Calculating vendor metrics', current: 0, total, percentage: '0.0' }); for (const { vendor } of vendors) { // Calculate average lead time const [leadTimeResult] = await connection.query(` SELECT AVG(DATEDIFF(received_date, date)) as avg_lead_time, COUNT(*) as total_orders, SUM(CASE WHEN ordered = received THEN 1 ELSE 0 END) as fulfilled_orders FROM purchase_orders WHERE vendor = ? AND status = 'closed' GROUP BY vendor `, [vendor]); const metrics = leadTimeResult[0] || { avg_lead_time: 0, total_orders: 0, fulfilled_orders: 0 }; // Calculate fill rate const fillRate = metrics.total_orders > 0 ? (metrics.fulfilled_orders / metrics.total_orders * 100) : 0; // Update vendor metrics await connection.query(` INSERT INTO vendor_metrics ( vendor, avg_lead_time_days, total_orders, fulfilled_orders, fill_rate ) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE avg_lead_time_days = VALUES(avg_lead_time_days), total_orders = VALUES(total_orders), fulfilled_orders = VALUES(fulfilled_orders), fill_rate = VALUES(fill_rate) `, [ vendor, metrics.avg_lead_time || 0, metrics.total_orders, metrics.fulfilled_orders, fillRate ]); current++; updateProgress(current, total, 'Calculating vendor metrics', startTime); } outputProgress({ status: 'complete', operation: 'Vendor metrics calculation completed', current: total, total, percentage: '100.0' }); } catch (error) { logError(error, 'Error calculating vendor metrics'); throw error; } } async function importProducts(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting products import', current: 0, total: totalRows, testLimit: PRODUCTS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 100; let batch = []; let categoryUpdates = new Map(); // Store category updates for batch processing // Get a connection from the pool that we'll reuse const connection = await pool.getConnection(); try { for await (const record of parser) { if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch, categoryUpdates); } outputProgress({ operation: 'Products import', message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms to avoid console flooding const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Products import', startTime, added, updated, 0); lastUpdate = now; } // Add to batch batch.push({ product_id: record.product_id, title: record.title, SKU: record.SKU, created_at: convertDate(record.created_at), stock_quantity: parseInt(record.stock_quantity) || 0, price: parseFloat(record.price) || 0, regular_price: parseFloat(record.regular_price) || 0, cost_price: parseFloat(record.cost_price) || null, landing_cost_price: parseFloat(record.landing_cost_price) || null, barcode: record.barcode, updated_at: convertDate(record.updated_at), visible: record.visible === '1', managing_stock: record.managing_stock === '1', replenishable: record.replenishable === '1', vendor: record.vendor, vendor_reference: record.vendor_reference, permalink: record.permalink, categories: record.categories, image: record.image, brand: record.brand, options: record.options, tags: record.tags, moq: parseInt(record.moq) || 1, uom: parseInt(record.uom) || 1 }); if (record.categories) { categoryUpdates.set(record.product_id, record.categories); } // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch, categoryUpdates); batch = []; categoryUpdates.clear(); } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch, categoryUpdates); } outputProgress({ status: 'running', operation: 'Products import completed', current: rowCount, total: totalRows, added, updated, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); } catch (error) { console.error('Error during products import:', error); throw error; } finally { if (connection) { connection.release(); } } // Helper function to process a batch of records async function processBatch(records, categoryUpdates) { if (records.length === 0) return; try { await connection.beginTransaction(); try { // Create the batch insert/update query const values = records.map(r => [ r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity, r.price, r.regular_price, r.cost_price, r.landing_cost_price, r.barcode, r.updated_at, r.visible, r.managing_stock, r.replenishable, r.vendor, r.vendor_reference, r.permalink, r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO products VALUES ${placeholders} ON DUPLICATE KEY UPDATE title = VALUES(title), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), permalink = VALUES(permalink), categories = VALUES(categories), image = VALUES(image), brand = VALUES(brand), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } // Process categories within the same transaction for (const [productId, categories] of categoryUpdates) { await handleCategories(connection, productId, categories); } await connection.commit(); } catch (error) { await connection.rollback(); logError(error, `Error processing batch of ${records.length} records`); throw error; } } catch (error) { logError(error, `Error in batch processing:\nFirst record: ${JSON.stringify(records[0])}`); // Continue with next batch instead of failing completely } } } async function importOrders(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting orders import', current: 0, total: totalRows, testLimit: ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const connection = await pool.getConnection(); let validProductIds; try { const [rows] = await connection.query('SELECT product_id FROM products'); validProductIds = new Set(rows.map(row => row.product_id.toString())); } finally { connection.release(); } let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 500; let batch = []; for await (const record of parser) { if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ operation: 'Orders import', message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Orders import', startTime, added, updated, skipped); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } // Add to batch batch.push({ order_number: record.order_number, product_id: record.product_id, SKU: record.SKU, date: convertDate(record.date), price: parseFloat(record.price) || 0, quantity: parseInt(record.quantity) || 0, discount: parseFloat(record.discount) || 0, tax: parseFloat(record.tax) || 0, tax_included: record.tax_included === '1', shipping: parseFloat(record.shipping) || 0, customer: record.customer, canceled: record.canceled === '1' }); // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch); batch = []; } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ status: 'running', operation: 'Orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); // Helper function to process a batch of records async function processBatch(records) { if (records.length === 0) return; const connection = await pool.getConnection(); try { // Create the batch insert/update query const values = records.map(r => [ r.order_number, r.product_id, r.SKU, r.date, r.price, r.quantity, r.discount, r.tax, r.tax_included, r.shipping, r.customer, r.canceled ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, canceled) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), canceled = VALUES(canceled)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { // For INSERT ... ON DUPLICATE KEY UPDATE: // - If a row is inserted, affectedRows = 1 // - If a row is updated, affectedRows = 2 // So we can calculate: // - Number of inserts = number of rows where affectedRows = 1 // - Number of updates = number of rows where affectedRows = 2 const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } } catch (error) { console.error(`\nError processing batch:`, error.message); // Continue with next batch instead of failing completely skipped += records.length; } finally { connection.release(); } } } async function importPurchaseOrders(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting purchase orders import', current: 0, total: totalRows, testLimit: PURCHASE_ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const connection = await pool.getConnection(); let validProductIds; try { const [rows] = await connection.query('SELECT product_id FROM products'); validProductIds = new Set(rows.map(row => row.product_id.toString())); } finally { connection.release(); } let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 500; let batch = []; for await (const record of parser) { if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ operation: 'Purchase orders import', message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Purchase orders import', startTime, added, updated, skipped); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } // Add to batch batch.push({ po_id: record.po_id, vendor: record.vendor, date: convertDate(record.date), expected_date: convertDate(record.expected_date), product_id: record.product_id, sku: record.sku, cost_price: parseFloat(record.cost_price) || 0, status: record.status || 'pending', notes: record.notes, ordered: parseInt(record.ordered) || 0, received: parseInt(record.received) || 0, received_date: convertDate(record.received_date) }); // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch); batch = []; } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ status: 'running', operation: 'Purchase orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); // Helper function to process a batch of records async function processBatch(records) { if (records.length === 0) return; const connection = await pool.getConnection(); try { // Create the batch insert/update query const values = records.map(r => [ r.po_id, r.vendor, r.date, r.expected_date, r.product_id, r.sku, r.cost_price, r.status, r.notes, r.ordered, r.received, r.received_date ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { // For INSERT ... ON DUPLICATE KEY UPDATE: // - If a row is inserted, affectedRows = 1 // - If a row is updated, affectedRows = 2 // So we can calculate: // - Number of inserts = number of rows where affectedRows = 1 // - Number of updates = number of rows where affectedRows = 2 const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } } catch (error) { console.error(`\nError processing batch:`, error.message); // Continue with next batch instead of failing completely skipped += records.length; } finally { connection.release(); } } } async function main() { const startTime = Date.now(); let pool; let importInProgress = false; try { outputProgress({ operation: 'Starting import process', message: 'Creating connection pool...' }); pool = mysql.createPool(dbConfig); // Check if tables exist, if not create them outputProgress({ operation: 'Checking database schema', message: 'Verifying tables exist...' }); const connection = await pool.getConnection(); try { // Check if products table exists as a proxy for schema being initialized const [tables] = await connection.query( 'SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = ? AND table_name = ?', [dbConfig.database, 'products'] ); if (tables[0].count === 0) { outputProgress({ operation: 'Creating database schema', message: 'Tables not found, creating schema...' }); const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8'); await connection.query(schemaSQL); } } finally { connection.release(); } // Import all data try { importInProgress = true; // Import products first since they're referenced by other tables await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv')); // Process orders and purchase orders in parallel outputProgress({ operation: 'Starting parallel import', message: 'Processing orders and purchase orders simultaneously...' }); await Promise.all([ importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv')), importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv')) ]); // Only output completion if we haven't encountered an error if (importInProgress) { outputProgress({ status: 'complete', operation: 'Import process completed', duration: formatDuration((Date.now() - startTime) / 1000) }); } } catch (error) { importInProgress = false; logError(error, 'Error during import'); outputProgress({ status: 'error', operation: 'Import process', error: error.message }); throw error; } } catch (error) { importInProgress = false; logError(error, 'Fatal error during import process'); outputProgress({ status: 'error', operation: 'Import process', error: error.message }); process.exit(1); } finally { if (pool) { await pool.end(); } } } // Run the import main().catch(error => { logError(error, 'Unhandled error in main process'); process.exit(1); });