const fs = require('fs'); const path = require('path'); const csv = require('csv-parse'); const mysql = require('mysql2/promise'); const dotenv = require('dotenv'); // Get test limits from environment variables const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0'); const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000'); const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000'); dotenv.config({ path: path.join(__dirname, '../.env') }); const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true }; // Helper function to output progress in JSON format function outputProgress(data) { if (!data.status) { data = { status: 'running', ...data }; } console.log(JSON.stringify(data)); } // Helper function to count total rows in a CSV file async function countRows(filePath) { return new Promise((resolve, reject) => { let count = 0; fs.createReadStream(filePath) .pipe(csv.parse()) .on('data', () => count++) .on('error', reject) .on('end', () => resolve(count - 1)); // Subtract 1 for header row }); } // Helper function to format time duration function formatDuration(seconds) { if (seconds < 60) return `${Math.round(seconds)}s`; const minutes = Math.floor(seconds / 60); seconds = Math.round(seconds % 60); return `${minutes}m ${seconds}s`; } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; // rows per second const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } // Helper function to handle category normalization async function handleCategories(connection, productId, categoriesStr) { if (!categoriesStr) { // If no categories, remove all existing relationships await connection.query( 'DELETE FROM product_categories WHERE product_id = ?', [productId] ); return; } // Special cases that should not be split const specialCategories = [ 'Paint, Dyes & Chalk', 'Fabric Paint, Markers, and Dye', 'Crystals, Gems & Rhinestones', 'Pens, Pencils & Markers' ]; // Split categories and clean them, preserving special cases const categories = []; let remainingStr = categoriesStr; // First check for special categories for (const special of specialCategories) { if (remainingStr.includes(special)) { categories.push(special); // Remove the special category from the string remainingStr = remainingStr.replace(special, ''); } } // Then process any remaining regular categories remainingStr.split(',') .map(cat => cat.trim()) .filter(cat => cat.length > 0) .forEach(cat => { if (!categories.includes(cat)) { categories.push(cat); } }); // Remove existing category relationships for this product await connection.query( 'DELETE FROM product_categories WHERE product_id = ?', [productId] ); // Insert categories and create relationships for (const category of categories) { // Insert category if it doesn't exist await connection.query( 'INSERT IGNORE INTO categories (name) VALUES (?)', [category] ); // Get category ID and create relationship in one query to avoid race conditions await connection.query(` INSERT IGNORE INTO product_categories (product_id, category_id) SELECT ?, id FROM categories WHERE name = ?`, [productId, category] ); } } async function importProducts(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting products import', current: 0, total: totalRows, testLimit: PRODUCTS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 100; let batch = []; let categoryUpdates = new Map(); // Store category updates for batch processing for await (const record of parser) { if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch, categoryUpdates); } outputProgress({ operation: 'Products import', message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms to avoid console flooding const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Products import', startTime); lastUpdate = now; } // Add to batch batch.push({ product_id: record.product_id, title: record.title, SKU: record.SKU, created_at: convertDate(record.created_at), stock_quantity: parseInt(record.stock_quantity) || 0, price: parseFloat(record.price) || 0, regular_price: parseFloat(record.regular_price) || 0, cost_price: parseFloat(record.cost_price) || null, landing_cost_price: parseFloat(record.landing_cost_price) || null, barcode: record.barcode, updated_at: convertDate(record.updated_at), visible: record.visible === '1', managing_stock: record.managing_stock === '1', replenishable: record.replenishable === '1', vendor: record.vendor, vendor_reference: record.vendor_reference, permalink: record.permalink, categories: record.categories, image: record.image, brand: record.brand, options: record.options, tags: record.tags, moq: parseInt(record.moq) || 1, uom: parseInt(record.uom) || 1 }); // Store category updates for later if (record.categories) { categoryUpdates.set(record.product_id, record.categories); } // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch, categoryUpdates); batch = []; categoryUpdates.clear(); } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch, categoryUpdates); } outputProgress({ status: 'running', operation: 'Products import completed', current: rowCount, total: totalRows, added, updated, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); // Helper function to process a batch of records async function processBatch(records, categoryUpdates) { if (records.length === 0) return; const connection = await pool.getConnection(); try { await connection.beginTransaction(); try { // Create the batch insert/update query const values = records.map(r => [ r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity, r.price, r.regular_price, r.cost_price, r.landing_cost_price, r.barcode, r.updated_at, r.visible, r.managing_stock, r.replenishable, r.vendor, r.vendor_reference, r.permalink, r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO products VALUES ${placeholders} ON DUPLICATE KEY UPDATE title = VALUES(title), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), permalink = VALUES(permalink), categories = VALUES(categories), image = VALUES(image), brand = VALUES(brand), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { // For INSERT ... ON DUPLICATE KEY UPDATE: // - If a row is inserted, affectedRows = 1 // - If a row is updated, affectedRows = 2 // So we can calculate: // - Number of inserts = number of rows where affectedRows = 1 // - Number of updates = number of rows where affectedRows = 2 const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } // Process categories within the same transaction for (const [productId, categories] of categoryUpdates) { await handleCategories(connection, productId, categories); } await connection.commit(); } catch (error) { await connection.rollback(); throw error; } } catch (error) { console.error(`\nError processing batch:`, error.message); // Continue with next batch instead of failing completely } finally { connection.release(); } } } async function importOrders(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting orders import', current: 0, total: totalRows, testLimit: ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const connection = await pool.getConnection(); let validProductIds; try { const [rows] = await connection.query('SELECT product_id FROM products'); validProductIds = new Set(rows.map(row => row.product_id.toString())); } finally { connection.release(); } let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 500; let batch = []; for await (const record of parser) { if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ operation: 'Orders import', message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Orders import', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } // Add to batch batch.push({ order_number: record.order_number, product_id: record.product_id, SKU: record.SKU, date: convertDate(record.date), price: parseFloat(record.price) || 0, quantity: parseInt(record.quantity) || 0, discount: parseFloat(record.discount) || 0, tax: parseFloat(record.tax) || 0, tax_included: record.tax_included === '1', shipping: parseFloat(record.shipping) || 0, customer: record.customer, canceled: record.canceled === '1' }); // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch); batch = []; } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ status: 'running', operation: 'Orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); // Helper function to process a batch of records async function processBatch(records) { if (records.length === 0) return; const connection = await pool.getConnection(); try { // Create the batch insert/update query const values = records.map(r => [ r.order_number, r.product_id, r.SKU, r.date, r.price, r.quantity, r.discount, r.tax, r.tax_included, r.shipping, r.customer, r.canceled ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, canceled) VALUES ${placeholders} ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), canceled = VALUES(canceled)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { // For INSERT ... ON DUPLICATE KEY UPDATE: // - If a row is inserted, affectedRows = 1 // - If a row is updated, affectedRows = 2 // So we can calculate: // - Number of inserts = number of rows where affectedRows = 1 // - Number of updates = number of rows where affectedRows = 2 const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } } catch (error) { console.error(`\nError processing batch:`, error.message); // Continue with next batch instead of failing completely skipped += records.length; } finally { connection.release(); } } } async function importPurchaseOrders(pool, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting purchase orders import', current: 0, total: totalRows, testLimit: PURCHASE_ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) { // Default to current date for missing dates const now = new Date(); const year = now.getFullYear(); const month = String(now.getMonth() + 1).padStart(2, '0'); const day = String(now.getDate()).padStart(2, '0'); return `${year}-${month}-${day}`; } const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const connection = await pool.getConnection(); let validProductIds; try { const [rows] = await connection.query('SELECT product_id FROM products'); validProductIds = new Set(rows.map(row => row.product_id.toString())); } finally { connection.release(); } let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); // Batch processing variables const BATCH_SIZE = 500; let batch = []; for await (const record of parser) { if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) { // Process remaining batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ operation: 'Purchase orders import', message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Purchase orders import', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } // Add to batch batch.push({ po_id: record.po_id, vendor: record.vendor, date: convertDate(record.date), expected_date: convertDate(record.expected_date), product_id: record.product_id, sku: record.sku, cost_price: parseFloat(record.cost_price) || 0, status: record.status || 'pending', notes: record.notes, ordered: parseInt(record.ordered) || 0, received: parseInt(record.received) || 0, received_date: convertDate(record.received_date) }); // Process batch if it reaches BATCH_SIZE if (batch.length >= BATCH_SIZE) { await processBatch(batch); batch = []; } } // Process any remaining records in the final batch if (batch.length > 0) { await processBatch(batch); } outputProgress({ status: 'running', operation: 'Purchase orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); // Helper function to process a batch of records async function processBatch(records) { if (records.length === 0) return; const connection = await pool.getConnection(); try { // Create the batch insert/update query const values = records.map(r => [ r.po_id, r.vendor, r.date, r.expected_date, r.product_id, r.sku, r.cost_price, r.status, r.notes, r.ordered, r.received, r.received_date ]); const placeholders = records.map(() => '(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)' ).join(','); const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date) VALUES ${placeholders} ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date)`; const [result] = await connection.query(sql, values.flat()); // Update stats if (result.affectedRows > 0) { // For INSERT ... ON DUPLICATE KEY UPDATE: // - If a row is inserted, affectedRows = 1 // - If a row is updated, affectedRows = 2 // So we can calculate: // - Number of inserts = number of rows where affectedRows = 1 // - Number of updates = number of rows where affectedRows = 2 const insertCount = result.affectedRows - result.changedRows; const updateCount = result.changedRows; added += insertCount; updated += updateCount; } } catch (error) { console.error(`\nError processing batch:`, error.message); // Continue with next batch instead of failing completely skipped += records.length; } finally { connection.release(); } } } async function main() { outputProgress({ operation: 'Starting import process', message: 'Creating connection pool...' }); const startTime = Date.now(); const pool = mysql.createPool(dbConfig); try { // Check if tables exist, if not create them outputProgress({ operation: 'Checking database schema', message: 'Creating tables if needed...' }); const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8'); await pool.query(schemaSQL); // Import products first since they're referenced by other tables await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv')); // Process orders and purchase orders in parallel outputProgress({ operation: 'Starting parallel import', message: 'Processing orders and purchase orders simultaneously...' }); await Promise.all([ importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv')), importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv')) ]); outputProgress({ status: 'complete', operation: 'Import process completed', duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { outputProgress({ status: 'error', error: error.message }); process.exit(1); } finally { await pool.end(); } } // Run the import main();