const fs = require('fs'); const path = require('path'); const csv = require('csv-parse'); const mysql = require('mysql2/promise'); const dotenv = require('dotenv'); // Get test limits from environment variables const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0'); const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000'); const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000'); dotenv.config({ path: path.join(__dirname, '../.env') }); const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true }; // Helper function to output progress in JSON format function outputProgress(data) { if (!data.status) { data = { status: 'running', ...data }; } console.log(JSON.stringify(data)); } // Helper function to count total rows in a CSV file async function countRows(filePath) { return new Promise((resolve, reject) => { let count = 0; fs.createReadStream(filePath) .pipe(csv.parse()) .on('data', () => count++) .on('error', reject) .on('end', () => resolve(count - 1)); // Subtract 1 for header row }); } // Helper function to format time duration function formatDuration(seconds) { if (seconds < 60) return `${Math.round(seconds)}s`; const minutes = Math.floor(seconds / 60); seconds = Math.round(seconds % 60); return `${minutes}m ${seconds}s`; } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; // rows per second const remaining = (total - current) / rate; outputProgress({ status: 'running', operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1) }); } async function importProducts(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting products import', current: 0, total: totalRows, testLimit: PRODUCTS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) { outputProgress({ operation: 'Products import', message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms to avoid console flooding const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Products import', startTime); lastUpdate = now; } // Check if product exists const [existing] = await connection.query('SELECT product_id FROM products WHERE product_id = ?', [record.product_id]); try { await connection.query('INSERT INTO products VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE title = VALUES(title), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), permalink = VALUES(permalink), categories = VALUES(categories), image = VALUES(image), brand = VALUES(brand), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom)', [ record.product_id, record.title, record.SKU, convertDate(record.created_at), parseInt(record.stock_quantity) || 0, parseFloat(record.price) || 0, parseFloat(record.regular_price) || 0, parseFloat(record.cost_price) || null, parseFloat(record.landing_cost_price) || null, record.barcode, convertDate(record.updated_at), record.visible === '1', record.managing_stock === '1', record.replenishable === '1', record.vendor, record.vendor_reference, record.permalink, record.categories, record.image, record.brand, record.options, record.tags, parseInt(record.moq) || 1, parseInt(record.uom) || 1 ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing product ${record.product_id}:`, error.message); } } outputProgress({ status: 'running', operation: 'Products import completed', current: rowCount, total: totalRows, added, updated, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); } async function importOrders(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting orders import', current: 0, total: totalRows, testLimit: ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const [rows] = await connection.query('SELECT product_id FROM products'); const validProductIds = new Set(rows.map(row => row.product_id.toString())); let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) { outputProgress({ operation: 'Orders import', message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Orders import', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } try { // Check if order exists const [existing] = await connection.query( 'SELECT id FROM orders WHERE order_number = ? AND product_id = ?', [record.order_number, record.product_id] ); await connection.query('INSERT INTO orders (order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, canceled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), canceled = VALUES(canceled)', [ record.order_number, record.product_id, record.SKU, convertDate(record.date), parseFloat(record.price) || 0, parseInt(record.quantity) || 0, parseFloat(record.discount) || 0, parseFloat(record.tax) || 0, record.tax_included === '1', parseFloat(record.shipping) || 0, record.customer, record.canceled === '1' ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing order ${record.order_number}, product ${record.product_id}:`, error.message); skipped++; } } outputProgress({ status: 'running', operation: 'Orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); } async function importPurchaseOrders(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); outputProgress({ operation: 'Starting purchase orders import', current: 0, total: totalRows, testLimit: PURCHASE_ORDERS_TEST_LIMIT, percentage: '0' }); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const [rows] = await connection.query('SELECT product_id FROM products'); const validProductIds = new Set(rows.map(row => row.product_id.toString())); let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) { outputProgress({ operation: 'Purchase orders import', message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`, current: rowCount, total: totalRows }); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Purchase orders import', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } try { // Check if PO exists const [existing] = await connection.query( 'SELECT id FROM purchase_orders WHERE po_id = ? AND product_id = ?', [record.po_id, record.product_id] ); await connection.query('INSERT INTO purchase_orders (po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date)', [ record.po_id, record.vendor, convertDate(record.date), convertDate(record.expected_date), record.product_id, record.sku, parseFloat(record.cost_price) || 0, record.status || 'pending', record.notes, parseInt(record.ordered) || 0, parseInt(record.received) || 0, convertDate(record.received_date) ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing PO ${record.po_id}, product ${record.product_id}:`, error.message); skipped++; } } outputProgress({ status: 'running', operation: 'Purchase orders import completed', current: rowCount, total: totalRows, added, updated, skipped, duration: formatDuration((Date.now() - startTime) / 1000), percentage: '100' }); } async function main() { outputProgress({ operation: 'Starting import process', message: 'Connecting to database...' }); const startTime = Date.now(); const connection = await mysql.createConnection(dbConfig); try { // Check if tables exist, if not create them outputProgress({ operation: 'Checking database schema', message: 'Creating tables if needed...' }); const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8'); await connection.query(schemaSQL); // Import products first since they're referenced by other tables await importProducts(connection, path.join(__dirname, '../csv/39f2x83-products.csv')); await importOrders(connection, path.join(__dirname, '../csv/39f2x83-orders.csv')); await importPurchaseOrders(connection, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv')); outputProgress({ status: 'complete', operation: 'Import process completed', duration: formatDuration((Date.now() - startTime) / 1000) }); } catch (error) { outputProgress({ status: 'error', error: error.message }); process.exit(1); } finally { await connection.end(); } } // Run the import main();