const fs = require('fs'); const path = require('path'); const csv = require('csv-parse'); const mysql = require('mysql2/promise'); const dotenv = require('dotenv'); // For testing purposes, limit the number of rows to import (0 = no limit) const PRODUCTS_TEST_LIMIT = 0; const ORDERS_TEST_LIMIT = 10000; const PURCHASE_ORDERS_TEST_LIMIT = 10000; dotenv.config({ path: path.join(__dirname, '../.env') }); const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true }; // Helper function to count total rows in a CSV file async function countRows(filePath) { return new Promise((resolve, reject) => { let count = 0; fs.createReadStream(filePath) .pipe(csv.parse()) .on('data', () => count++) .on('error', reject) .on('end', () => resolve(count - 1)); // Subtract 1 for header row }); } // Helper function to format time duration function formatDuration(seconds) { if (seconds < 60) return `${Math.round(seconds)}s`; const minutes = Math.floor(seconds / 60); seconds = Math.round(seconds % 60); return `${minutes}m ${seconds}s`; } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const percentage = ((current / total) * 100).toFixed(1); const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; // rows per second const remaining = (total - current) / rate; process.stdout.write( `\r${operation}: ${current.toLocaleString()}/${total.toLocaleString()} rows ` + `(${percentage}%) - Rate: ${Math.round(rate)}/s - ` + `Elapsed: ${formatDuration(elapsed)} - ` + `Est. remaining: ${formatDuration(remaining)}` ); } async function importProducts(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); console.log(`\nStarting products import (${totalRows.toLocaleString()} total rows${PRODUCTS_TEST_LIMIT > 0 ? ` - limited to ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows` : ''})`); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) { console.log(`\nReached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`); break; } rowCount++; // Update progress every 100ms to avoid console flooding const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Products', startTime); lastUpdate = now; } // Check if product exists const [existing] = await connection.query('SELECT product_id FROM products WHERE product_id = ?', [record.product_id]); try { await connection.query('INSERT INTO products VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE title = VALUES(title), stock_quantity = VALUES(stock_quantity), price = VALUES(price), regular_price = VALUES(regular_price), cost_price = VALUES(cost_price), landing_cost_price = VALUES(landing_cost_price), barcode = VALUES(barcode), updated_at = VALUES(updated_at), visible = VALUES(visible), managing_stock = VALUES(managing_stock), replenishable = VALUES(replenishable), vendor = VALUES(vendor), vendor_reference = VALUES(vendor_reference), permalink = VALUES(permalink), categories = VALUES(categories), image = VALUES(image), brand = VALUES(brand), options = VALUES(options), tags = VALUES(tags), moq = VALUES(moq), uom = VALUES(uom)', [ record.product_id, record.title, record.SKU, convertDate(record.created_at), parseInt(record.stock_quantity) || 0, parseFloat(record.price) || 0, parseFloat(record.regular_price) || 0, parseFloat(record.cost_price) || null, parseFloat(record.landing_cost_price) || null, record.barcode, convertDate(record.updated_at), record.visible === '1', record.managing_stock === '1', record.replenishable === '1', record.vendor, record.vendor_reference, record.permalink, record.categories, record.image, record.brand, record.options, record.tags, parseInt(record.moq) || 1, parseInt(record.uom) || 1 ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing product ${record.product_id}:`, error.message); } } const duration = ((Date.now() - startTime) / 1000).toFixed(1); console.log(`\nProducts import completed in ${duration}s: ${added.toLocaleString()} added, ${updated.toLocaleString()} updated (processed ${rowCount.toLocaleString()} rows)`); } async function importOrders(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); console.log(`\nStarting orders import (${totalRows.toLocaleString()} total rows${ORDERS_TEST_LIMIT > 0 ? ` - limited to ${ORDERS_TEST_LIMIT.toLocaleString()} rows` : ''})`); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const [rows] = await connection.query('SELECT product_id FROM products'); const validProductIds = new Set(rows.map(row => row.product_id.toString())); let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) { console.log(`\nReached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Orders', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } try { // Check if order exists const [existing] = await connection.query( 'SELECT id FROM orders WHERE order_number = ? AND product_id = ?', [record.order_number, record.product_id] ); await connection.query('INSERT INTO orders (order_number, product_id, SKU, date, price, quantity, discount, tax, tax_included, shipping, customer, canceled) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE price = VALUES(price), quantity = VALUES(quantity), discount = VALUES(discount), tax = VALUES(tax), tax_included = VALUES(tax_included), shipping = VALUES(shipping), canceled = VALUES(canceled)', [ record.order_number, record.product_id, record.SKU, convertDate(record.date), parseFloat(record.price) || 0, parseInt(record.quantity) || 0, parseFloat(record.discount) || 0, parseFloat(record.tax) || 0, record.tax_included === '1', parseFloat(record.shipping) || 0, record.customer, record.canceled === '1' ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing order ${record.order_number}, product ${record.product_id}:`, error.message); skipped++; } } const duration = ((Date.now() - startTime) / 1000).toFixed(1); console.log(`\nOrders import completed in ${duration}s: ${added.toLocaleString()} added, ${updated.toLocaleString()} updated, ${skipped.toLocaleString()} skipped (processed ${rowCount.toLocaleString()} rows)`); } async function importPurchaseOrders(connection, filePath) { const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true })); const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath); const startTime = Date.now(); console.log(`\nStarting purchase orders import (${totalRows.toLocaleString()} total rows${PURCHASE_ORDERS_TEST_LIMIT > 0 ? ` - limited to ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows` : ''})`); function convertDate(dateStr) { if (!dateStr) return null; const [day, month, year] = dateStr.split('-'); return `${year}-${month}-${day}`; } // First, get all valid product IDs const [rows] = await connection.query('SELECT product_id FROM products'); const validProductIds = new Set(rows.map(row => row.product_id.toString())); let skipped = 0; let updated = 0; let added = 0; let rowCount = 0; let lastUpdate = Date.now(); for await (const record of parser) { if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) { console.log(`\nReached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`); break; } rowCount++; // Update progress every 100ms const now = Date.now(); if (now - lastUpdate > 100) { updateProgress(rowCount, totalRows, 'Purchase Orders', startTime); lastUpdate = now; } if (!validProductIds.has(record.product_id)) { skipped++; continue; } try { // Check if PO exists const [existing] = await connection.query( 'SELECT id FROM purchase_orders WHERE po_id = ? AND product_id = ?', [record.po_id, record.product_id] ); await connection.query('INSERT INTO purchase_orders (po_id, vendor, date, expected_date, product_id, sku, cost_price, status, notes, ordered, received, received_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE vendor = VALUES(vendor), expected_date = VALUES(expected_date), cost_price = VALUES(cost_price), status = VALUES(status), notes = VALUES(notes), ordered = VALUES(ordered), received = VALUES(received), received_date = VALUES(received_date)', [ record.po_id, record.vendor, convertDate(record.date), convertDate(record.expected_date), record.product_id, record.sku, parseFloat(record.cost_price) || 0, record.status || 'pending', record.notes, parseInt(record.ordered) || 0, parseInt(record.received) || 0, convertDate(record.received_date) ]); existing.length ? updated++ : added++; } catch (error) { console.error(`\nError importing PO ${record.po_id}, product ${record.product_id}:`, error.message); skipped++; } } const duration = ((Date.now() - startTime) / 1000).toFixed(1); console.log(`\nPurchase orders import completed in ${duration}s: ${added.toLocaleString()} added, ${updated.toLocaleString()} updated, ${skipped.toLocaleString()} skipped (processed ${rowCount.toLocaleString()} rows)`); } async function main() { console.log('Starting import process...'); const startTime = Date.now(); const connection = await mysql.createConnection(dbConfig); try { // Check if tables exist, if not create them console.log('Checking database schema...'); const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8'); await connection.query(schemaSQL); // Import products first since they're referenced by other tables await importProducts(connection, path.join(__dirname, '../csv/39f2x83-products.csv')); await importOrders(connection, path.join(__dirname, '../csv/39f2x83-orders.csv')); await importPurchaseOrders(connection, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv')); const duration = ((Date.now() - startTime) / 1000).toFixed(1); console.log(`\nAll imports completed successfully in ${duration} seconds`); } catch (error) { console.error('\nError during import:', error); process.exit(1); } finally { await connection.end(); } } main();