const mysql = require("mysql2/promise"); const { Client } = require("ssh2"); const dotenv = require("dotenv"); const path = require("path"); dotenv.config({ path: path.join(__dirname, "../.env") }); // Constants to control which imports run const IMPORT_CATEGORIES = true; const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; // SSH configuration const sshConfig = { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined, }; // Production database configuration const prodDbConfig = { host: process.env.PROD_DB_HOST || "localhost", user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306, }; // Local database configuration const localDbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, multipleStatements: true, waitForConnections: true, connectionLimit: 10, queueLimit: 0, namedPlaceholders: true, }; // Helper function to output progress function outputProgress(data) { process.stdout.write(JSON.stringify(data) + "\n"); } // Helper function to format duration function formatDuration(seconds) { const hours = Math.floor(seconds / 3600); const minutes = Math.floor((seconds % 3600) / 60); seconds = Math.floor(seconds % 60); const parts = []; if (hours > 0) parts.push(`${hours}h`); if (minutes > 0) parts.push(`${minutes}m`); if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`); return parts.join(" "); } // Helper function to update progress with time estimate function updateProgress(current, total, operation, startTime) { const elapsed = (Date.now() - startTime) / 1000; const rate = current / elapsed; const remaining = (total - current) / rate; outputProgress({ status: "running", operation, current, total, rate, elapsed: formatDuration(elapsed), remaining: formatDuration(remaining), percentage: ((current / total) * 100).toFixed(1), }); } let isImportCancelled = false; // Add cancel function function cancelImport() { isImportCancelled = true; outputProgress({ status: "cancelled", operation: "Import cancelled", }); } async function setupSshTunnel() { return new Promise((resolve, reject) => { const ssh = new Client(); ssh.on('error', (err) => { console.error('SSH connection error:', err); // Don't reject here, just log the error }); ssh.on('end', () => { console.log('SSH connection ended normally'); }); ssh.on('close', () => { console.log('SSH connection closed'); }); ssh .on("ready", () => { ssh.forwardOut( "127.0.0.1", 0, prodDbConfig.host, prodDbConfig.port, async (err, stream) => { if (err) reject(err); resolve({ ssh, stream }); } ); }) .connect(sshConfig); }); } async function importCategories(prodConnection, localConnection) { outputProgress({ operation: "Starting categories import", status: "running", }); const startTime = Date.now(); const typeOrder = [10, 20, 11, 21, 12, 13]; let totalInserted = 0; let skippedCategories = []; try { // Process each type in order with its own query for (const type of typeOrder) { const [categories] = await prodConnection.query( ` SELECT pc.cat_id, pc.name, pc.type, CASE WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent WHEN pc.master_cat_id IS NULL THEN NULL ELSE pc.master_cat_id END as parent_id, pc.combined_name as description FROM product_categories pc WHERE pc.type = ? ORDER BY pc.cat_id `, [type] ); if (categories.length === 0) continue; console.log(`\nProcessing ${categories.length} type ${type} categories`); if (type === 10) { console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); } // For types that can have parents (11, 21, 12, 13), verify parent existence let categoriesToInsert = categories; if (![10, 20].includes(type)) { // Get all parent IDs const parentIds = [ ...new Set( categories.map((c) => c.parent_id).filter((id) => id !== null) ), ]; // Check which parents exist const [existingParents] = await localConnection.query( "SELECT cat_id FROM categories WHERE cat_id IN (?)", [parentIds] ); const existingParentIds = new Set(existingParents.map((p) => p.cat_id)); // Filter categories and track skipped ones categoriesToInsert = categories.filter( (cat) => cat.parent_id === null || existingParentIds.has(cat.parent_id) ); const invalidCategories = categories.filter( (cat) => cat.parent_id !== null && !existingParentIds.has(cat.parent_id) ); if (invalidCategories.length > 0) { const skippedInfo = invalidCategories.map((c) => ({ id: c.cat_id, name: c.name, type: c.type, missing_parent: c.parent_id, })); skippedCategories.push(...skippedInfo); console.log( "\nSkipping categories with missing parents:", invalidCategories .map( (c) => `${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})` ) .join("\n") ); } if (categoriesToInsert.length === 0) { console.log( `No valid categories of type ${type} to insert - all had missing parents` ); continue; } } console.log( `Inserting ${categoriesToInsert.length} type ${type} categories` ); const placeholders = categoriesToInsert .map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") .join(","); const values = categoriesToInsert.flatMap((cat) => [ cat.cat_id, cat.name, cat.type, cat.parent_id, cat.description, "active", ]); // Insert categories and create relationships in one query to avoid race conditions await localConnection.query( ` INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) VALUES ${placeholders} ON DUPLICATE KEY UPDATE name = VALUES(name), type = VALUES(type), parent_id = VALUES(parent_id), description = VALUES(description), status = VALUES(status), updated_at = CURRENT_TIMESTAMP `, values ); totalInserted += categoriesToInsert.length; updateProgress( totalInserted, totalInserted, "Categories import", startTime ); } // After all imports, if we skipped any categories, throw an error if (skippedCategories.length > 0) { const error = new Error( "Categories import completed with errors - some categories were skipped due to missing parents" ); error.skippedCategories = skippedCategories; throw error; } outputProgress({ status: "complete", operation: "Categories import completed", current: totalInserted, total: totalInserted, duration: formatDuration((Date.now() - startTime) / 1000), }); } catch (error) { console.error("Error importing categories:", error); if (error.skippedCategories) { console.error( "Skipped categories:", JSON.stringify(error.skippedCategories, null, 2) ); } throw error; } } async function importProducts(prodConnection, localConnection) { outputProgress({ operation: "Starting products import - Getting schema", status: "running", }); const startTime = Date.now(); try { // First get the column names from the table structure const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'products' ORDER BY ORDINAL_POSITION `); const columnNames = columns.map((col) => col.COLUMN_NAME); // Get total count first for progress indication outputProgress({ operation: "Starting products import - Getting total count", status: "running", }); const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM products p LEFT JOIN product_last_sold pls ON p.pid = pls.pid WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR pls.date_sold IS NULL `); const totalProducts = countResult[0].total; outputProgress({ operation: `Starting products import - Fetching ${totalProducts} products from production`, status: "running", }); // Get products from production with optimized query const [rows] = await prodConnection.query(` SELECT p.pid, p.description AS title, p.notes AS description, p.itemnumber AS SKU, p.date_created, p.datein AS first_received, p.location, COALESCE(si.available_local, 0) - COALESCE( (SELECT SUM(oi.qty_ordered - oi.qty_placed) FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid = p.pid AND o.date_placed != '0000-00-00 00:00:00' AND o.date_shipped = '0000-00-00 00:00:00' AND oi.pick_finished = 0 AND oi.qty_back = 0 AND o.order_status != 15 AND o.order_status < 90 AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered > 0), 0) AS stock_quantity, ci.onpreorder AS preorder_count, pnb.inventory AS notions_inv_count, COALESCE(pcp.price_each, 0) as price, COALESCE(p.sellingprice, 0) AS regular_price, COALESCE((SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND COUNT > 0), 0) AS cost_price, NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code, p.stamp AS updated_at, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, s.companyname AS vendor, CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-t-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-175x175-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-o-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, pc1.name AS brand, pc2.name AS line, pc3.name AS subline, pc4.name AS artist, NULL AS options, NULL AS tags, COALESCE(CASE WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit ELSE sid.supplier_qty_per_unit END, sid.notions_qty_per_unit) AS moq, NULL AS uom, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, p.totalsold AS total_sold, p.country_of_origin, pls.date_sold as date_last_sold, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids FROM products p LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN product_category_index pci ON p.pid = pci.pid LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id AND pc.type IN (10, 20, 11, 21, 12, 13) AND pci.cat_id NOT IN (16, 17) LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN ( SELECT pid, MIN(price_each) as price_each FROM product_current_prices WHERE active = 1 GROUP BY pid ) pcp ON p.pid = pcp.pid WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) OR pls.date_sold IS NULL) GROUP BY p.pid `); // Debug log to check for specific product const debugProduct = rows.find((row) => row.pid === 620972); if (debugProduct) { console.log("Found product 620972:", debugProduct); } else { console.log("Product 620972 not found in query results"); // Debug query to check why it's missing const [debugResult] = await prodConnection.query( ` SELECT p.pid, p.itemnumber, p.date_created, p.datein, pls.date_sold, si.show, si.buyable, pcp.price_each FROM products p LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN ( SELECT pid, MIN(price_each) as price_each FROM product_current_prices WHERE active = 1 GROUP BY pid ) pcp ON p.pid = pcp.pid WHERE p.pid = ? `, [620972] ); console.log("Debug query result:", debugResult); } // Also check for the other missing products const missingPids = [ 208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394, 484755, 484756, 493549, 620972, ]; const [missingProducts] = await prodConnection.query( ` SELECT p.pid, p.itemnumber, p.date_created, p.datein, pls.date_sold, si.show, si.buyable, pcp.price_each FROM products p LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN ( SELECT pid, MIN(price_each) as price_each FROM product_current_prices WHERE active = 1 GROUP BY pid ) pcp ON p.pid = pcp.pid WHERE p.pid IN (?) `, [missingPids] ); console.log("Debug results for missing products:", missingProducts); let current = 0; const total = rows.length; // Process products in batches const BATCH_SIZE = 100; for (let i = 0; i < rows.length; i += BATCH_SIZE) { let batch = rows.slice(i, i + BATCH_SIZE); // Prepare product values and category relationships in parallel const productValues = []; const categoryRelationships = []; batch.forEach((row) => { // Map values in the same order as columns const rowValues = columnNames.map((col) => { const val = row[col] ?? null; if (col === "managing_stock") return 1; if (typeof val === "number") return val || 0; return val; }); productValues.push(...rowValues); // Add category relationships if (row.category_ids) { const catIds = row.category_ids .split(",") .map((id) => id.trim()) .filter((id) => id) .map(Number); catIds.forEach((catId) => { if (catId) categoryRelationships.push([catId, row.pid]); }); } }); // Generate placeholders based on column count const placeholderGroup = `(${Array(columnNames.length) .fill("?") .join(",")})`; const productPlaceholders = Array(batch.length) .fill(placeholderGroup) .join(","); // Build the query dynamically const insertQuery = ` INSERT INTO products (${columnNames.join(",")}) VALUES ${productPlaceholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")} `; // First insert the products and wait for it to complete await localConnection.query(insertQuery, productValues); // Now that products are inserted, handle category relationships if (categoryRelationships.length > 0) { // Get unique category IDs to verify they exist const uniqueCatIds = [ ...new Set(categoryRelationships.map(([catId]) => catId)), ]; console.log("Checking categories:", uniqueCatIds); // Check which categories exist const [existingCats] = await localConnection.query( "SELECT cat_id FROM categories WHERE cat_id IN (?)", [uniqueCatIds] ); const existingCatIds = new Set(existingCats.map((c) => c.cat_id)); // Log missing categories const missingCatIds = uniqueCatIds.filter( (id) => !existingCatIds.has(id) ); if (missingCatIds.length > 0) { console.error("Missing categories:", missingCatIds); // Query production to see what these categories are const [missingCats] = await prodConnection.query( ` SELECT cat_id, name, type, master_cat_id, hidden FROM product_categories WHERE cat_id IN (?) `, [missingCatIds] ); console.error("Missing category details:", missingCats); console.warn( "Skipping invalid category relationships - continuing with import" ); continue; } // Verify products exist before inserting relationships const productIds = [ ...new Set(categoryRelationships.map(([_, pid]) => pid)), ]; const [existingProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", [productIds] ); const existingProductIds = new Set(existingProducts.map((p) => p.pid)); // Filter relationships to only include existing products const validRelationships = categoryRelationships.filter(([_, pid]) => existingProductIds.has(pid) ); if (validRelationships.length > 0) { const catPlaceholders = validRelationships .map(() => "(?, ?)") .join(","); await localConnection.query( ` INSERT INTO product_categories (cat_id, pid) VALUES ${catPlaceholders} ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) `, validRelationships.flat() ); } } current += batch.length; updateProgress(current, total, "Products import", startTime); } outputProgress({ status: "complete", operation: "Products import completed", current: total, total, duration: formatDuration((Date.now() - startTime) / 1000), }); } catch (error) { console.error("Error importing products:", error); throw error; } } // Helper function to get date ranges for chunked queries async function getDateRanges( prodConnection, table, dateField, startYearsAgo = 2, chunkMonths = 3 ) { const ranges = []; const [result] = await prodConnection.query( ` SELECT DATE_SUB(CURRENT_DATE, INTERVAL ? YEAR) as start_date, CURRENT_DATE as end_date `, [startYearsAgo] ); let currentDate = new Date(result[0].end_date); const startDate = new Date(result[0].start_date); while (currentDate > startDate) { const rangeEnd = new Date(currentDate); currentDate.setMonth(currentDate.getMonth() - chunkMonths); const rangeStart = new Date(Math.max(currentDate, startDate)); ranges.push({ start: rangeStart.toISOString().split("T")[0], end: rangeEnd.toISOString().split("T")[0], }); } return ranges; } async function importMissingProducts(prodConnection, localConnection, missingPids) { // First get the column names from the table structure const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'products' ORDER BY ORDINAL_POSITION `); const columnNames = columns.map((col) => col.COLUMN_NAME); // Get the missing products from production const [products] = await prodConnection.query(` SELECT p.pid, p.description AS title, p.notes AS description, p.itemnumber AS SKU, p.date_created, p.datein AS first_received, p.location, COALESCE(si.available_local, 0) - COALESCE( (SELECT SUM(oi.qty_ordered - oi.qty_placed) FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.prod_pid = p.pid AND o.date_placed != '0000-00-00 00:00:00' AND o.date_shipped = '0000-00-00 00:00:00' AND oi.pick_finished = 0 AND oi.qty_back = 0 AND o.order_status != 15 AND o.order_status < 90 AND oi.qty_ordered >= oi.qty_placed AND oi.qty_ordered > 0), 0) AS stock_quantity, ci.onpreorder AS preorder_count, pnb.inventory AS notions_inv_count, COALESCE(pcp.price_each, 0) as price, COALESCE(p.sellingprice, 0) AS regular_price, COALESCE((SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND COUNT > 0), 0) AS cost_price, NULL AS landing_cost_price, p.upc AS barcode, p.harmonized_tariff_code, p.stamp AS updated_at, CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, s.companyname AS vendor, CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, sid.notions_itemnumber AS notions_reference, CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-t-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-175x175-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, (SELECT CONCAT('https://sbing.com/i/products/0000/', SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', p.pid, '-o-', MIN(PI.iid), '.jpg') FROM product_images PI WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, pc1.name AS brand, pc2.name AS line, pc3.name AS subline, pc4.name AS artist, NULL AS options, NULL AS tags, COALESCE(CASE WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit ELSE sid.supplier_qty_per_unit END, sid.notions_qty_per_unit) AS moq, NULL AS uom, p.rating, p.rating_votes AS reviews, p.weight, p.length, p.width, p.height, (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, p.totalsold AS total_sold, p.country_of_origin, pls.date_sold as date_last_sold, GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids FROM products p LEFT JOIN current_inventory ci ON p.pid = ci.pid LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 LEFT JOIN supplier_item_data sid ON p.pid = sid.pid LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid LEFT JOIN product_category_index pci ON p.pid = pci.pid LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id AND pc.type IN (10, 20, 11, 21, 12, 13) AND pci.cat_id NOT IN (16, 17) LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id LEFT JOIN product_last_sold pls ON p.pid = pls.pid LEFT JOIN ( SELECT pid, MIN(price_each) as price_each FROM product_current_prices WHERE active = 1 GROUP BY pid ) pcp ON p.pid = pcp.pid WHERE p.pid IN (?) GROUP BY p.pid `, [missingPids]); if (products.length > 0) { // Map values in the same order as columns const productValues = products.flatMap(product => columnNames.map(col => { const val = product[col] ?? null; if (col === "managing_stock") return 1; if (typeof val === "number") return val || 0; return val; }) ); // Generate placeholders for all products const placeholders = products .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); // Build and execute the query const query = ` INSERT INTO products (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")} `; await localConnection.query(query, productValues); // Verify products were inserted before proceeding with categories const [insertedProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", [products.map(p => p.pid)] ); const insertedPids = new Set(insertedProducts.map(p => p.pid)); // Handle category relationships if any const categoryRelationships = []; products.forEach(product => { // Only add category relationships for products that were successfully inserted if (insertedPids.has(product.pid) && product.category_ids) { const catIds = product.category_ids .split(",") .map(id => id.trim()) .filter(id => id) .map(Number); catIds.forEach(catId => { if (catId) categoryRelationships.push([catId, product.pid]); }); } }); if (categoryRelationships.length > 0) { // Verify categories exist before inserting relationships const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; const [existingCats] = await localConnection.query( "SELECT cat_id FROM categories WHERE cat_id IN (?)", [uniqueCatIds] ); const existingCatIds = new Set(existingCats.map(c => c.cat_id)); // Filter relationships to only include existing categories const validRelationships = categoryRelationships.filter(([catId]) => existingCatIds.has(catId) ); if (validRelationships.length > 0) { const catPlaceholders = validRelationships .map(() => "(?, ?)") .join(","); await localConnection.query( ` INSERT INTO product_categories (cat_id, pid) VALUES ${catPlaceholders} ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) `, validRelationships.flat() ); } } } } async function importOrders(prodConnection, localConnection) { outputProgress({ operation: "Starting orders import - Getting total count", status: "running", }); const startTime = Date.now(); const skippedOrders = new Set(); // Store orders that need to be retried const missingProducts = new Set(); // Store products that need to be imported try { // First get the column names from the table structure const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // Skip auto-increment ID // Get total count first for progress indication outputProgress({ operation: "Starting orders import - Getting total count", status: "running", }); const [countResult] = await prodConnection.query(` SELECT COUNT(*) as total FROM order_items oi FORCE INDEX (PRIMARY) JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) `); const totalOrders = countResult[0].total; outputProgress({ operation: `Starting orders import - Fetching ${totalOrders} orders from production`, status: "running", }); const total = countResult[0].total; let processed = 0; // Process in batches const batchSize = 1000; let offset = 0; while (offset < total) { const [orders] = await prodConnection.query(` SELECT oi.order_id as order_number, oi.prod_pid as pid, oi.prod_itemnumber as SKU, o.date_placed_onlydate as date, oi.prod_price_reg as price, oi.qty_ordered as quantity, (oi.prod_price_reg - oi.prod_price) as discount, ( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ) as tax, 0 as tax_included, ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ) as shipping, o.order_cid as customer, CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, 'pending' as status, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE o.order_status >= 15 AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) LIMIT ? OFFSET ? `, [batchSize, offset]); // Check if all products exist before inserting orders const orderProductPids = [...new Set(orders.map((o) => o.pid))]; const [existingProducts] = await localConnection.query( "SELECT pid FROM products WHERE pid IN (?)", [orderProductPids] ); const existingPids = new Set(existingProducts.map((p) => p.pid)); // Filter out orders with missing products and track them const validOrders = orders.filter((order) => { if (!existingPids.has(order.pid)) { missingProducts.add(order.pid); skippedOrders.add(order.order_number); return false; } return true; }); if (validOrders.length > 0) { const placeholders = validOrders .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateClauses = columnNames .filter((col) => col !== "order_number") // Don't update primary key .map((col) => `${col} = VALUES(${col})`) .join(","); const query = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${updateClauses} `; await localConnection.query( query, validOrders.flatMap(order => columnNames.map(col => order[col])) ); } processed += orders.length; offset += batchSize; updateProgress( processed, total, "Orders import", startTime ); } // Now handle missing products and retry skipped orders if (missingProducts.size > 0) { outputProgress({ operation: `Found ${missingProducts.size} missing products, importing them now`, status: "running", }); await importMissingProducts(prodConnection, localConnection, [ ...missingProducts, ]); // Retry skipped orders if (skippedOrders.size > 0) { outputProgress({ operation: `Retrying ${skippedOrders.size} skipped orders`, status: "running", }); const [retryOrders] = await prodConnection.query(` SELECT oi.order_id as order_number, oi.prod_pid as pid, oi.prod_itemnumber as SKU, o.date_placed_onlydate as date, oi.prod_price_reg as price, oi.qty_ordered as quantity, (oi.prod_price_reg - oi.prod_price) as discount, ( SELECT otp.item_taxes_to_collect FROM order_tax_info oti JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id WHERE oti.order_id = o.order_id AND otp.pid = oi.prod_pid ORDER BY oti.stamp DESC LIMIT 1 ) as tax, 0 as tax_included, ROUND( ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 ) as shipping, o.order_cid as customer, CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, 'pending' as status, CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled FROM order_items oi JOIN _order o ON oi.order_id = o.order_id WHERE oi.order_id IN (?) `, [[...skippedOrders]]); const placeholders = retryOrders .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) .join(","); const updateClauses = columnNames .filter((col) => col !== "order_number") // Don't update primary key .map((col) => `${col} = VALUES(${col})`) .join(","); const query = ` INSERT INTO orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${updateClauses} `; await localConnection.query( query, retryOrders.flatMap(order => columnNames.map(col => order[col])) ); } } const endTime = Date.now(); outputProgress({ operation: `Orders import complete in ${Math.round( (endTime - startTime) / 1000 )}s`, status: "complete", }); } catch (error) { outputProgress({ operation: "Orders import failed", status: "error", error: error.message, }); throw error; } } async function importPurchaseOrders(prodConnection, localConnection) { outputProgress({ operation: "Starting purchase orders import - Initializing", status: "running", }); const startTime = Date.now(); try { // Get column names for the insert const [columns] = await localConnection.query(` SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'purchase_orders' ORDER BY ORDINAL_POSITION `); const columnNames = columns .map((col) => col.COLUMN_NAME) .filter((name) => name !== "id"); // First get all relevant PO IDs with basic info - this is much faster than the full join const [[{ total }]] = await prodConnection.query(` SELECT COUNT(*) as total FROM ( SELECT DISTINCT pop.po_id, pop.pid FROM po p FORCE INDEX (idx_date_created) JOIN po_products pop ON p.po_id = pop.po_id JOIN suppliers s ON p.supplier_id = s.supplierid WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) UNION SELECT DISTINCT r.receiving_id as po_id, rp.pid FROM receivings_products rp LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ) all_items `); const [poList] = await prodConnection.query(` SELECT DISTINCT COALESCE(p.po_id, r.receiving_id) as po_id, CASE WHEN p.po_id IS NOT NULL THEN s1.companyname WHEN r.supplier_id IS NOT NULL THEN s2.companyname ELSE 'No Supplier' END as vendor, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, COALESCE(p.status, 50) as status, COALESCE(p.short_note, '') as notes, COALESCE(p.notes, '') as long_note FROM ( SELECT po_id FROM po WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) UNION SELECT DISTINCT r.receiving_id as po_id FROM receivings r JOIN receivings_products rp ON r.receiving_id = rp.receiving_id WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ) ids LEFT JOIN po p ON ids.po_id = p.po_id LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid LEFT JOIN receivings r ON ids.po_id = r.receiving_id LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid ORDER BY po_id `); const totalItems = total; let processed = 0; const BATCH_SIZE = 5000; const PROGRESS_INTERVAL = 500; let lastProgressUpdate = Date.now(); outputProgress({ operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, status: "running", }); for (let i = 0; i < poList.length; i += BATCH_SIZE) { const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); const poIds = batch.map(po => po.po_id); // Get all products for these POs in one query const [poProducts] = await prodConnection.query(` SELECT pop.po_id, pop.pid, pr.itemnumber as sku, pop.cost_each as cost_price, pop.qty_each as ordered FROM po_products pop FORCE INDEX (PRIMARY) JOIN products pr ON pop.pid = pr.pid WHERE pop.po_id IN (?) `, [poIds]); // Process PO products in smaller sub-batches to avoid packet size issues const SUB_BATCH_SIZE = 5000; for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); const productPids = [...new Set(productBatch.map(p => p.pid))]; const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; // Get receivings for this batch const [receivings] = await prodConnection.query(` SELECT r.po_id, rp.pid, rp.receiving_id, rp.qty_each, rp.cost_each, DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, rp.received_by, CASE WHEN r.po_id IS NULL THEN 2 -- No PO WHEN r.po_id IN (?) THEN 0 -- Original PO ELSE 1 -- Different PO END as is_alt_po FROM receivings_products rp LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id WHERE rp.pid IN (?) AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) ORDER BY r.po_id, rp.pid, rp.received_date `, [batchPoIds, productPids]); // Create maps for this sub-batch const poProductMap = new Map(); productBatch.forEach(product => { const key = `${product.po_id}-${product.pid}`; poProductMap.set(key, product); }); const receivingMap = new Map(); const altReceivingMap = new Map(); const noPOReceivingMap = new Map(); receivings.forEach(receiving => { const key = `${receiving.po_id}-${receiving.pid}`; if (receiving.is_alt_po === 2) { // No PO if (!noPOReceivingMap.has(receiving.pid)) { noPOReceivingMap.set(receiving.pid, []); } noPOReceivingMap.get(receiving.pid).push(receiving); } else if (receiving.is_alt_po === 1) { // Different PO if (!altReceivingMap.has(receiving.pid)) { altReceivingMap.set(receiving.pid, []); } altReceivingMap.get(receiving.pid).push(receiving); } else { // Original PO if (!receivingMap.has(key)) { receivingMap.set(key, []); } receivingMap.get(key).push(receiving); } }); // Verify PIDs exist const [existingPids] = await localConnection.query( 'SELECT pid FROM products WHERE pid IN (?)', [productPids] ); const validPids = new Set(existingPids.map(p => p.pid)); // Prepare values for this sub-batch const values = []; let batchProcessed = 0; for (const po of batch) { const poProducts = Array.from(poProductMap.values()) .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); for (const product of poProducts) { const key = `${po.po_id}-${product.pid}`; const receivingHistory = receivingMap.get(key) || []; const altReceivingHistory = altReceivingMap.get(product.pid) || []; const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); const totalReceived = received + altReceived + noPOReceived; const receiving_status = !totalReceived ? 1 : // created totalReceived < product.ordered ? 30 : // partial 40; // full const allReceivings = [...receivingHistory]; if (altReceivingHistory.length > 0) { allReceivings.push(...altReceivingHistory); } if (noPOReceivingHistory.length > 0) { allReceivings.push(...noPOReceivingHistory); } allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); const firstReceiving = allReceivings[0] || {}; const lastReceiving = allReceivings[allReceivings.length - 1] || {}; values.push(columnNames.map(col => { switch (col) { case 'po_id': return po.po_id; case 'vendor': return po.vendor; case 'date': return po.date; case 'expected_date': return po.expected_date; case 'pid': return product.pid; case 'sku': return product.sku; case 'cost_price': return product.cost_price; case 'status': return po.status; case 'notes': return po.notes; case 'long_note': return po.long_note; case 'ordered': return product.ordered; case 'received': return totalReceived; case 'received_date': return firstReceiving.received_date || null; case 'last_received_date': return lastReceiving.received_date || null; case 'received_by': return firstReceiving.received_by || null; case 'receiving_status': return receiving_status; case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({ receiving_id: r.receiving_id, qty: r.qty_each, cost: r.cost_each, date: r.received_date, received_by: r.received_by, alt_po: r.is_alt_po }))); default: return null; } })); batchProcessed++; } } if (values.length > 0) { const placeholders = values.map(() => `(${Array(columnNames.length).fill("?").join(",")})` ).join(","); const query = ` INSERT INTO purchase_orders (${columnNames.join(",")}) VALUES ${placeholders} ON DUPLICATE KEY UPDATE ${columnNames .filter((col) => col !== "po_id" && col !== "pid") .map((col) => `${col} = VALUES(${col})`) .join(",")}; `; await localConnection.query(query, values.flat()); } processed += batchProcessed; // Update progress based on time interval const now = Date.now(); if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { updateProgress(processed, totalItems, "Purchase orders import", startTime); lastProgressUpdate = now; } } } const endTime = Date.now(); outputProgress({ operation: `Purchase orders import complete`, status: "complete", processed_records: processed, total_records: totalItems, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) } }); } catch (error) { outputProgress({ operation: "Purchase orders import failed", status: "error", error: error.message, }); throw error; } } // Modify main function to handle cancellation and avoid process.exit async function main() { let ssh; let prodConnection; let localConnection; const startTime = Date.now(); try { outputProgress({ status: "running", operation: "Starting import process", message: "Setting up connections...", }); const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream, }); localConnection = await mysql.createPool({ ...localDbConfig, waitForConnections: true, connectionLimit: 10, queueLimit: 0 }); if (isImportCancelled) throw new Error("Import cancelled"); // Run each import based on constants if (IMPORT_CATEGORIES) { await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); } if (IMPORT_PRODUCTS) { await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); } if (IMPORT_ORDERS) { await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); } if (IMPORT_PURCHASE_ORDERS) { await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); } const endTime = Date.now(); outputProgress({ status: "complete", operation: "Import process completed", timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) } }); } catch (error) { const endTime = Date.now(); console.error("Error during import process:", error); outputProgress({ status: error.message === "Import cancelled" ? "cancelled" : "error", operation: "Import process", error: error.message, timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), elapsed_time: formatDuration((endTime - startTime) / 1000), elapsed_seconds: Math.round((endTime - startTime) / 1000) } }); throw error; } finally { try { // Close connections in order if (prodConnection) await prodConnection.end(); if (localConnection) await localConnection.end(); // Wait a bit for any pending data to be written before closing SSH await new Promise(resolve => setTimeout(resolve, 100)); if (ssh) { // Properly close the SSH connection ssh.on('close', () => { console.log('SSH connection closed cleanly'); }); ssh.end(); } } catch (err) { console.error('Error during cleanup:', err); } } } // Run the import only if this is the main module if (require.main === module) { main().catch((error) => { console.error("Unhandled error in main process:", error); process.exit(1); }); } // Export the functions needed by the route module.exports = { main, outputProgress, cancelImport, };