From 814d5d1a841e49af3a87ee9435435aaf2cdda260 Mon Sep 17 00:00:00 2001 From: Matt Date: Wed, 29 Jan 2025 00:18:04 -0500 Subject: [PATCH] Break up prod import script into pieces and move csv scripts into folder --- inventory-server/scripts/import-from-prod.js | 1442 +---------------- inventory-server/scripts/import/categories.js | 168 ++ inventory-server/scripts/import/orders.js | 235 +++ inventory-server/scripts/import/products.js | 561 +++++++ .../scripts/import/purchase-orders.js | 290 ++++ inventory-server/scripts/import/utils.js | 102 ++ .../scripts/{ => old_csv}/import-csv.js | 2 +- .../scripts/{ => old_csv}/update-csv.js | 2 +- 8 files changed, 1436 insertions(+), 1366 deletions(-) create mode 100644 inventory-server/scripts/import/categories.js create mode 100644 inventory-server/scripts/import/orders.js create mode 100644 inventory-server/scripts/import/products.js create mode 100644 inventory-server/scripts/import/purchase-orders.js create mode 100644 inventory-server/scripts/import/utils.js rename inventory-server/scripts/{ => old_csv}/import-csv.js (99%) rename inventory-server/scripts/{ => old_csv}/update-csv.js (98%) diff --git a/inventory-server/scripts/import-from-prod.js b/inventory-server/scripts/import-from-prod.js index a4226d3..94bb506 100644 --- a/inventory-server/scripts/import-from-prod.js +++ b/inventory-server/scripts/import-from-prod.js @@ -1,8 +1,11 @@ const mysql = require("mysql2/promise"); -const { Client } = require("ssh2"); const dotenv = require("dotenv"); const path = require("path"); -const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress'); +const { setupSshTunnel, outputProgress, formatElapsedTime, prodDbConfig, localDbConfig } = require('./import/utils'); +const importCategories = require('./import/categories'); +const { importProducts } = require('./import/products'); +const importOrders = require('./import/orders'); +const importPurchaseOrders = require('./import/purchase-orders'); dotenv.config({ path: path.join(__dirname, "../.env") }); @@ -12,42 +15,6 @@ const IMPORT_PRODUCTS = true; const IMPORT_ORDERS = true; const IMPORT_PURCHASE_ORDERS = true; -// SSH configuration -const sshConfig = { - host: process.env.PROD_SSH_HOST, - port: process.env.PROD_SSH_PORT || 22, - username: process.env.PROD_SSH_USER, - privateKey: process.env.PROD_SSH_KEY_PATH - ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) - : undefined, -}; - -// Production database configuration -const prodDbConfig = { - host: process.env.PROD_DB_HOST || "localhost", - user: process.env.PROD_DB_USER, - password: process.env.PROD_DB_PASSWORD, - database: process.env.PROD_DB_NAME, - port: process.env.PROD_DB_PORT || 3306, -}; - -// Local database configuration -const localDbConfig = { - host: process.env.DB_HOST, - user: process.env.DB_USER, - password: process.env.DB_PASSWORD, - database: process.env.DB_NAME, - multipleStatements: true, - waitForConnections: true, - connectionLimit: 10, - queueLimit: 0, - namedPlaceholders: true, -}; - -// Constants -const BATCH_SIZE = 1000; -const PROGRESS_INTERVAL = 1000; // Update progress every second - let isImportCancelled = false; // Add cancel function @@ -55,7 +22,8 @@ function cancelImport() { isImportCancelled = true; outputProgress({ status: 'cancelled', - operation: 'Import cancelled', + operation: 'Import process', + message: 'Import cancelled by user', current: 0, total: 0, elapsed: null, @@ -64,1329 +32,7 @@ function cancelImport() { }); } -// Helper function to update progress with time estimate -function updateProgress(current, total, operation, startTime) { - outputProgress({ - status: 'running', - operation, - current, - total, - rate: calculateRate(startTime, current), - elapsed: formatElapsedTime(startTime), - remaining: estimateRemaining(startTime, current, total), - percentage: ((current / total) * 100).toFixed(1) - }); -} - -async function setupSshTunnel() { - return new Promise((resolve, reject) => { - const ssh = new Client(); - - ssh.on('error', (err) => { - console.error('SSH connection error:', err); - // Don't reject here, just log the error - }); - - ssh.on('end', () => { - console.log('SSH connection ended normally'); - }); - - ssh.on('close', () => { - console.log('SSH connection closed'); - }); - - ssh - .on("ready", () => { - ssh.forwardOut( - "127.0.0.1", - 0, - prodDbConfig.host, - prodDbConfig.port, - async (err, stream) => { - if (err) reject(err); - resolve({ ssh, stream }); - } - ); - }) - .connect(sshConfig); - }); -} - -async function importCategories(prodConnection, localConnection) { - outputProgress({ - operation: "Starting categories import", - status: "running", - }); - - const startTime = Date.now(); - const typeOrder = [10, 20, 11, 21, 12, 13]; - let totalInserted = 0; - let skippedCategories = []; - - try { - // Process each type in order with its own query - for (const type of typeOrder) { - const [categories] = await prodConnection.query( - ` - SELECT - pc.cat_id, - pc.name, - pc.type, - CASE - WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent - WHEN pc.master_cat_id IS NULL THEN NULL - ELSE pc.master_cat_id - END as parent_id, - pc.combined_name as description - FROM product_categories pc - WHERE pc.type = ? - ORDER BY pc.cat_id - `, - [type] - ); - - if (categories.length === 0) continue; - - console.log(`\nProcessing ${categories.length} type ${type} categories`); - if (type === 10) { - console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); - } - - // For types that can have parents (11, 21, 12, 13), verify parent existence - let categoriesToInsert = categories; - if (![10, 20].includes(type)) { - // Get all parent IDs - const parentIds = [ - ...new Set( - categories.map((c) => c.parent_id).filter((id) => id !== null) - ), - ]; - - // Check which parents exist - const [existingParents] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [parentIds] - ); - const existingParentIds = new Set(existingParents.map((p) => p.cat_id)); - - // Filter categories and track skipped ones - categoriesToInsert = categories.filter( - (cat) => - cat.parent_id === null || existingParentIds.has(cat.parent_id) - ); - const invalidCategories = categories.filter( - (cat) => - cat.parent_id !== null && !existingParentIds.has(cat.parent_id) - ); - - if (invalidCategories.length > 0) { - const skippedInfo = invalidCategories.map((c) => ({ - id: c.cat_id, - name: c.name, - type: c.type, - missing_parent: c.parent_id, - })); - skippedCategories.push(...skippedInfo); - - console.log( - "\nSkipping categories with missing parents:", - invalidCategories - .map( - (c) => - `${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})` - ) - .join("\n") - ); - } - - if (categoriesToInsert.length === 0) { - console.log( - `No valid categories of type ${type} to insert - all had missing parents` - ); - continue; - } - } - - console.log( - `Inserting ${categoriesToInsert.length} type ${type} categories` - ); - - const placeholders = categoriesToInsert - .map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") - .join(","); - - const values = categoriesToInsert.flatMap((cat) => [ - cat.cat_id, - cat.name, - cat.type, - cat.parent_id, - cat.description, - "active", - ]); - - // Insert categories and create relationships in one query to avoid race conditions - await localConnection.query( - ` - INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE - name = VALUES(name), - type = VALUES(type), - parent_id = VALUES(parent_id), - description = VALUES(description), - status = VALUES(status), - updated_at = CURRENT_TIMESTAMP - `, - values - ); - - totalInserted += categoriesToInsert.length; - updateProgress( - totalInserted, - totalInserted, - "Categories import", - startTime - ); - } - - // After all imports, if we skipped any categories, throw an error - if (skippedCategories.length > 0) { - const error = new Error( - "Categories import completed with errors - some categories were skipped due to missing parents" - ); - error.skippedCategories = skippedCategories; - throw error; - } - - outputProgress({ - status: "complete", - operation: "Categories import completed", - current: totalInserted, - total: totalInserted, - duration: formatElapsedTime((Date.now() - startTime) / 1000), - }); - } catch (error) { - console.error("Error importing categories:", error); - if (error.skippedCategories) { - console.error( - "Skipped categories:", - JSON.stringify(error.skippedCategories, null, 2) - ); - } - throw error; - } -} - -async function importProducts(prodConnection, localConnection) { - outputProgress({ - operation: "Starting products import - Getting schema", - status: "running", - }); - - const startTime = Date.now(); - - try { - // First get the column names from the table structure - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'products' - ORDER BY ORDINAL_POSITION - `); - - const columnNames = columns.map((col) => col.COLUMN_NAME); - - // Get total count first for progress indication - outputProgress({ - operation: "Starting products import - Getting total count", - status: "running", - }); - - const [countResult] = await prodConnection.query(` - SELECT COUNT(*) as total - FROM products p - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR pls.date_sold IS NULL - `); - const totalProducts = countResult[0].total; - - outputProgress({ - operation: `Starting products import - Fetching ${totalProducts} products from production`, - status: "running", - }); - - // Get products from production with optimized query - const [rows] = await prodConnection.query(` - SELECT - p.pid, - p.description AS title, - p.notes AS description, - p.itemnumber AS SKU, - p.date_created, - p.datein AS first_received, - p.location, - COALESCE(si.available_local, 0) - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0), 0) AS stock_quantity, - ci.onpreorder AS preorder_count, - pnb.inventory AS notions_inv_count, - COALESCE(pcp.price_each, 0) as price, - COALESCE(p.sellingprice, 0) AS regular_price, - COALESCE((SELECT ROUND(AVG(costeach), 5) - FROM product_inventory - WHERE pid = p.pid - AND COUNT > 0), 0) AS cost_price, - NULL AS landing_cost_price, - p.upc AS barcode, - p.harmonized_tariff_code, - p.stamp AS updated_at, - CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, - CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, - s.companyname AS vendor, - CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-t-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-175x175-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-o-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - NULL AS options, - NULL AS tags, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - NULL AS uom, - p.rating, - p.rating_votes AS reviews, - p.weight, - p.length, - p.width, - p.height, - (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, - (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, - p.totalsold AS total_sold, - p.country_of_origin, - pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids - FROM products p - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci ON p.pid = pci.pid - LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id - AND pc.type IN (10, 20, 11, 21, 12, 13) - AND pci.cat_id NOT IN (16, 17) - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - OR pls.date_sold IS NULL) - GROUP BY p.pid - `); - - // Debug log to check for specific product - const debugProduct = rows.find((row) => row.pid === 620972); - if (debugProduct) { - console.log("Found product 620972:", debugProduct); - } else { - console.log("Product 620972 not found in query results"); - - // Debug query to check why it's missing - const [debugResult] = await prodConnection.query( - ` - SELECT - p.pid, - p.itemnumber, - p.date_created, - p.datein, - pls.date_sold, - si.show, - si.buyable, - pcp.price_each - FROM products p - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE p.pid = ? - `, - [620972] - ); - - console.log("Debug query result:", debugResult); - } - - // Also check for the other missing products - const missingPids = [ - 208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394, - 484755, 484756, 493549, 620972, - ]; - const [missingProducts] = await prodConnection.query( - ` - SELECT - p.pid, - p.itemnumber, - p.date_created, - p.datein, - pls.date_sold, - si.show, - si.buyable, - pcp.price_each - FROM products p - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE p.pid IN (?) - `, - [missingPids] - ); - - console.log("Debug results for missing products:", missingProducts); - - let current = 0; - const total = rows.length; - - // Process products in batches - for (let i = 0; i < rows.length; i += BATCH_SIZE) { - let batch = rows.slice(i, i + BATCH_SIZE); - - // Prepare product values and category relationships in parallel - const productValues = []; - const categoryRelationships = []; - - batch.forEach((row) => { - // Map values in the same order as columns - const rowValues = columnNames.map((col) => { - const val = row[col] ?? null; - if (col === "managing_stock") return 1; - if (typeof val === "number") return val || 0; - return val; - }); - productValues.push(...rowValues); - - // Add category relationships - if (row.category_ids) { - const catIds = row.category_ids - .split(",") - .map((id) => id.trim()) - .filter((id) => id) - .map(Number); - catIds.forEach((catId) => { - if (catId) categoryRelationships.push([catId, row.pid]); - }); - } - }); - - // Generate placeholders based on column count - const placeholderGroup = `(${Array(columnNames.length) - .fill("?") - .join(",")})`; - const productPlaceholders = Array(batch.length) - .fill(placeholderGroup) - .join(","); - - // Build the query dynamically - const insertQuery = ` - INSERT INTO products (${columnNames.join(",")}) - VALUES ${productPlaceholders} - ON DUPLICATE KEY UPDATE ${columnNames - .filter((col) => col !== "pid") - .map((col) => `${col} = VALUES(${col})`) - .join(",")} - `; - - // First insert the products and wait for it to complete - await localConnection.query(insertQuery, productValues); - - // Now that products are inserted, handle category relationships - if (categoryRelationships.length > 0) { - // Get unique category IDs to verify they exist - const uniqueCatIds = [ - ...new Set(categoryRelationships.map(([catId]) => catId)), - ]; - - console.log("Checking categories:", uniqueCatIds); - - // Check which categories exist - const [existingCats] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [uniqueCatIds] - ); - const existingCatIds = new Set(existingCats.map((c) => c.cat_id)); - - // Log missing categories - const missingCatIds = uniqueCatIds.filter( - (id) => !existingCatIds.has(id) - ); - if (missingCatIds.length > 0) { - console.error("Missing categories:", missingCatIds); - - // Query production to see what these categories are - const [missingCats] = await prodConnection.query( - ` - SELECT cat_id, name, type, master_cat_id, hidden - FROM product_categories - WHERE cat_id IN (?) - `, - [missingCatIds] - ); - - console.error("Missing category details:", missingCats); - console.warn( - "Skipping invalid category relationships - continuing with import" - ); - continue; - } - - // Verify products exist before inserting relationships - const productIds = [ - ...new Set(categoryRelationships.map(([_, pid]) => pid)), - ]; - const [existingProducts] = await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", - [productIds] - ); - const existingProductIds = new Set(existingProducts.map((p) => p.pid)); - - // Filter relationships to only include existing products - const validRelationships = categoryRelationships.filter(([_, pid]) => - existingProductIds.has(pid) - ); - - if (validRelationships.length > 0) { - const catPlaceholders = validRelationships - .map(() => "(?, ?)") - .join(","); - await localConnection.query( - ` - INSERT INTO product_categories (cat_id, pid) - VALUES ${catPlaceholders} - ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) - `, - validRelationships.flat() - ); - } - } - - current += batch.length; - updateProgress(current, total, "Products import", startTime); - } - - outputProgress({ - status: "complete", - operation: "Products import completed", - current: total, - total, - duration: formatElapsedTime((Date.now() - startTime) / 1000), - }); - } catch (error) { - console.error("Error importing products:", error); - throw error; - } -} - -// Helper function to get date ranges for chunked queries -async function getDateRanges( - prodConnection, - table, - dateField, - startYearsAgo = 2, - chunkMonths = 3 -) { - const ranges = []; - const [result] = await prodConnection.query( - ` - SELECT - DATE_SUB(CURRENT_DATE, INTERVAL ? YEAR) as start_date, - CURRENT_DATE as end_date - `, - [startYearsAgo] - ); - - let currentDate = new Date(result[0].end_date); - const startDate = new Date(result[0].start_date); - - while (currentDate > startDate) { - const rangeEnd = new Date(currentDate); - currentDate.setMonth(currentDate.getMonth() - chunkMonths); - const rangeStart = new Date(Math.max(currentDate, startDate)); - - ranges.push({ - start: rangeStart.toISOString().split("T")[0], - end: rangeEnd.toISOString().split("T")[0], - }); - } - - return ranges; -} - -async function importMissingProducts(prodConnection, localConnection, missingPids) { - // First get the column names from the table structure - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'products' - ORDER BY ORDINAL_POSITION - `); - - const columnNames = columns.map((col) => col.COLUMN_NAME); - - // Get the missing products from production - const [products] = await prodConnection.query(` - SELECT - p.pid, - p.description AS title, - p.notes AS description, - p.itemnumber AS SKU, - p.date_created, - p.datein AS first_received, - p.location, - COALESCE(si.available_local, 0) - COALESCE( - (SELECT SUM(oi.qty_ordered - oi.qty_placed) - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.prod_pid = p.pid - AND o.date_placed != '0000-00-00 00:00:00' - AND o.date_shipped = '0000-00-00 00:00:00' - AND oi.pick_finished = 0 - AND oi.qty_back = 0 - AND o.order_status != 15 - AND o.order_status < 90 - AND oi.qty_ordered >= oi.qty_placed - AND oi.qty_ordered > 0), 0) AS stock_quantity, - ci.onpreorder AS preorder_count, - pnb.inventory AS notions_inv_count, - COALESCE(pcp.price_each, 0) as price, - COALESCE(p.sellingprice, 0) AS regular_price, - COALESCE((SELECT ROUND(AVG(costeach), 5) - FROM product_inventory - WHERE pid = p.pid - AND COUNT > 0), 0) AS cost_price, - NULL AS landing_cost_price, - p.upc AS barcode, - p.harmonized_tariff_code, - p.stamp AS updated_at, - CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, - CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, - s.companyname AS vendor, - CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, - sid.notions_itemnumber AS notions_reference, - CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-t-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-175x175-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, - (SELECT CONCAT('https://sbing.com/i/products/0000/', - SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', - p.pid, '-o-', MIN(PI.iid), '.jpg') - FROM product_images PI - WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, - pc1.name AS brand, - pc2.name AS line, - pc3.name AS subline, - pc4.name AS artist, - NULL AS options, - NULL AS tags, - COALESCE(CASE - WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit - ELSE sid.supplier_qty_per_unit - END, sid.notions_qty_per_unit) AS moq, - NULL AS uom, - p.rating, - p.rating_votes AS reviews, - p.weight, - p.length, - p.width, - p.height, - (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, - (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, - p.totalsold AS total_sold, - p.country_of_origin, - pls.date_sold as date_last_sold, - GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids - FROM products p - LEFT JOIN current_inventory ci ON p.pid = ci.pid - LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid - LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 - LEFT JOIN supplier_item_data sid ON p.pid = sid.pid - LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid - LEFT JOIN product_category_index pci ON p.pid = pci.pid - LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id - AND pc.type IN (10, 20, 11, 21, 12, 13) - AND pci.cat_id NOT IN (16, 17) - LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id - LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id - LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id - LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id - LEFT JOIN product_last_sold pls ON p.pid = pls.pid - LEFT JOIN ( - SELECT pid, MIN(price_each) as price_each - FROM product_current_prices - WHERE active = 1 - GROUP BY pid - ) pcp ON p.pid = pcp.pid - WHERE p.pid IN (?) - GROUP BY p.pid - `, [missingPids]); - - if (products.length > 0) { - // Map values in the same order as columns - const productValues = products.flatMap(product => - columnNames.map(col => { - const val = product[col] ?? null; - if (col === "managing_stock") return 1; - if (typeof val === "number") return val || 0; - return val; - }) - ); - - // Generate placeholders for all products - const placeholders = products - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - - // Build and execute the query - const query = ` - INSERT INTO products (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${columnNames - .filter((col) => col !== "pid") - .map((col) => `${col} = VALUES(${col})`) - .join(",")} - `; - - await localConnection.query(query, productValues); - - // Verify products were inserted before proceeding with categories - const [insertedProducts] = await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", - [products.map(p => p.pid)] - ); - const insertedPids = new Set(insertedProducts.map(p => p.pid)); - - // Handle category relationships if any - const categoryRelationships = []; - products.forEach(product => { - // Only add category relationships for products that were successfully inserted - if (insertedPids.has(product.pid) && product.category_ids) { - const catIds = product.category_ids - .split(",") - .map(id => id.trim()) - .filter(id => id) - .map(Number); - catIds.forEach(catId => { - if (catId) categoryRelationships.push([catId, product.pid]); - }); - } - }); - - if (categoryRelationships.length > 0) { - // Verify categories exist before inserting relationships - const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; - const [existingCats] = await localConnection.query( - "SELECT cat_id FROM categories WHERE cat_id IN (?)", - [uniqueCatIds] - ); - const existingCatIds = new Set(existingCats.map(c => c.cat_id)); - - // Filter relationships to only include existing categories - const validRelationships = categoryRelationships.filter(([catId]) => - existingCatIds.has(catId) - ); - - if (validRelationships.length > 0) { - const catPlaceholders = validRelationships - .map(() => "(?, ?)") - .join(","); - await localConnection.query( - ` - INSERT INTO product_categories (cat_id, pid) - VALUES ${catPlaceholders} - ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) - `, - validRelationships.flat() - ); - } - } - } -} - -async function importOrders(prodConnection, localConnection) { - outputProgress({ - operation: "Starting orders import - Getting total count", - status: "running", - }); - - const startTime = Date.now(); - const skippedOrders = new Set(); // Store orders that need to be retried - const missingProducts = new Set(); // Store products that need to be imported - - try { - // First get the column names from the table structure - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'orders' - ORDER BY ORDINAL_POSITION - `); - - const columnNames = columns - .map((col) => col.COLUMN_NAME) - .filter((name) => name !== "id"); // Skip auto-increment ID - - // Get total count first for progress indication - outputProgress({ - operation: "Starting orders import - Getting total count", - status: "running", - }); - - const [countResult] = await prodConnection.query(` - SELECT COUNT(*) as total - FROM order_items oi FORCE INDEX (PRIMARY) - JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - `); - const totalOrders = countResult[0].total; - - outputProgress({ - operation: `Starting orders import - Fetching ${totalOrders} orders from production`, - status: "running", - }); - - const total = countResult[0].total; - let processed = 0; - - // Process in batches - const batchSize = 1000; - let offset = 0; - - while (offset < total) { - const [orders] = await prodConnection.query(` - SELECT - oi.order_id as order_number, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, - o.date_placed_onlydate as date, - oi.prod_price_reg as price, - oi.qty_ordered as quantity, - (oi.prod_price_reg - oi.prod_price) as discount, - ( - SELECT - otp.item_taxes_to_collect - FROM - order_tax_info oti - JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id - WHERE - oti.order_id = o.order_id - AND otp.pid = oi.prod_pid - ORDER BY - oti.stamp DESC - LIMIT 1 - ) as tax, - 0 as tax_included, - ROUND( - ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * - (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 - ) as shipping, - o.order_cid as customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, - 'pending' as status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE o.order_status >= 15 - AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - LIMIT ? OFFSET ? - `, [batchSize, offset]); - - // Check if all products exist before inserting orders - const orderProductPids = [...new Set(orders.map((o) => o.pid))]; - const [existingProducts] = await localConnection.query( - "SELECT pid FROM products WHERE pid IN (?)", - [orderProductPids] - ); - const existingPids = new Set(existingProducts.map((p) => p.pid)); - - // Filter out orders with missing products and track them - const validOrders = orders.filter((order) => { - if (!existingPids.has(order.pid)) { - missingProducts.add(order.pid); - skippedOrders.add(order.order_number); - return false; - } - return true; - }); - - if (validOrders.length > 0) { - const placeholders = validOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter((col) => col !== "order_number") // Don't update primary key - .map((col) => `${col} = VALUES(${col})`) - .join(","); - - const query = ` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} - `; - - await localConnection.query( - query, - validOrders.flatMap(order => columnNames.map(col => order[col])) - ); - } - - processed += orders.length; - offset += batchSize; - - updateProgress( - processed, - total, - "Orders import", - startTime - ); - } - - // Now handle missing products and retry skipped orders - if (missingProducts.size > 0) { - outputProgress({ - operation: `Found ${missingProducts.size} missing products, importing them now`, - status: "running", - }); - - await importMissingProducts(prodConnection, localConnection, [ - ...missingProducts, - ]); - - // Retry skipped orders - if (skippedOrders.size > 0) { - outputProgress({ - operation: `Retrying ${skippedOrders.size} skipped orders`, - status: "running", - }); - - const [retryOrders] = await prodConnection.query(` - SELECT - oi.order_id as order_number, - oi.prod_pid as pid, - oi.prod_itemnumber as SKU, - o.date_placed_onlydate as date, - oi.prod_price_reg as price, - oi.qty_ordered as quantity, - (oi.prod_price_reg - oi.prod_price) as discount, - ( - SELECT - otp.item_taxes_to_collect - FROM - order_tax_info oti - JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id - WHERE - oti.order_id = o.order_id - AND otp.pid = oi.prod_pid - ORDER BY - oti.stamp DESC - LIMIT 1 - ) as tax, - 0 as tax_included, - ROUND( - ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * - (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 - ) as shipping, - o.order_cid as customer, - CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, - 'pending' as status, - CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled - FROM order_items oi - JOIN _order o ON oi.order_id = o.order_id - WHERE oi.order_id IN (?) - `, [[...skippedOrders]]); - - const placeholders = retryOrders - .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) - .join(","); - const updateClauses = columnNames - .filter((col) => col !== "order_number") // Don't update primary key - .map((col) => `${col} = VALUES(${col})`) - .join(","); - - const query = ` - INSERT INTO orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${updateClauses} - `; - - await localConnection.query( - query, - retryOrders.flatMap(order => columnNames.map(col => order[col])) - ); - } - } - - const endTime = Date.now(); - outputProgress({ - operation: `Orders import complete in ${Math.round( - (endTime - startTime) / 1000 - )}s`, - status: "complete", - }); - } catch (error) { - outputProgress({ - operation: "Orders import failed", - status: "error", - error: error.message, - }); - throw error; - } -} - -async function importPurchaseOrders(prodConnection, localConnection) { - outputProgress({ - operation: "Starting purchase orders import - Initializing", - status: "running", - }); - - const startTime = Date.now(); - - try { - // Get column names for the insert - const [columns] = await localConnection.query(` - SELECT COLUMN_NAME - FROM INFORMATION_SCHEMA.COLUMNS - WHERE TABLE_NAME = 'purchase_orders' - ORDER BY ORDINAL_POSITION - `); - const columnNames = columns - .map((col) => col.COLUMN_NAME) - .filter((name) => name !== "id"); - - // First get all relevant PO IDs with basic info - this is much faster than the full join - const [[{ total }]] = await prodConnection.query(` - SELECT COUNT(*) as total - FROM ( - SELECT DISTINCT pop.po_id, pop.pid - FROM po p - FORCE INDEX (idx_date_created) - JOIN po_products pop ON p.po_id = pop.po_id - JOIN suppliers s ON p.supplier_id = s.supplierid - WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - UNION - SELECT DISTINCT r.receiving_id as po_id, rp.pid - FROM receivings_products rp - LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id - WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - ) all_items - `); - - const [poList] = await prodConnection.query(` - SELECT DISTINCT - COALESCE(p.po_id, r.receiving_id) as po_id, - CASE - WHEN p.po_id IS NOT NULL THEN s1.companyname - WHEN r.supplier_id IS NOT NULL THEN s2.companyname - ELSE 'No Supplier' - END as vendor, - CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, - CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, - COALESCE(p.status, 50) as status, - COALESCE(p.short_note, '') as notes, - COALESCE(p.notes, '') as long_note - FROM ( - SELECT po_id FROM po - WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - UNION - SELECT DISTINCT r.receiving_id as po_id - FROM receivings r - JOIN receivings_products rp ON r.receiving_id = rp.receiving_id - WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - ) ids - LEFT JOIN po p ON ids.po_id = p.po_id - LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid - LEFT JOIN receivings r ON ids.po_id = r.receiving_id - LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid - ORDER BY po_id - `); - - const totalItems = total; - let processed = 0; - - const BATCH_SIZE = 5000; - const PROGRESS_INTERVAL = 500; - let lastProgressUpdate = Date.now(); - - outputProgress({ - operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, - status: "running", - }); - - for (let i = 0; i < poList.length; i += BATCH_SIZE) { - const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); - const poIds = batch.map(po => po.po_id); - - // Get all products for these POs in one query - const [poProducts] = await prodConnection.query(` - SELECT - pop.po_id, - pop.pid, - pr.itemnumber as sku, - pop.cost_each as cost_price, - pop.qty_each as ordered - FROM po_products pop - FORCE INDEX (PRIMARY) - JOIN products pr ON pop.pid = pr.pid - WHERE pop.po_id IN (?) - `, [poIds]); - - // Process PO products in smaller sub-batches to avoid packet size issues - const SUB_BATCH_SIZE = 5000; - for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { - const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); - const productPids = [...new Set(productBatch.map(p => p.pid))]; - const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; - - // Get receivings for this batch - const [receivings] = await prodConnection.query(` - SELECT - r.po_id, - rp.pid, - rp.receiving_id, - rp.qty_each, - rp.cost_each, - DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, - rp.received_by, - CASE - WHEN r.po_id IS NULL THEN 2 -- No PO - WHEN r.po_id IN (?) THEN 0 -- Original PO - ELSE 1 -- Different PO - END as is_alt_po - FROM receivings_products rp - LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id - WHERE rp.pid IN (?) - AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) - ORDER BY r.po_id, rp.pid, rp.received_date - `, [batchPoIds, productPids]); - - // Create maps for this sub-batch - const poProductMap = new Map(); - productBatch.forEach(product => { - const key = `${product.po_id}-${product.pid}`; - poProductMap.set(key, product); - }); - - const receivingMap = new Map(); - const altReceivingMap = new Map(); - const noPOReceivingMap = new Map(); - - receivings.forEach(receiving => { - const key = `${receiving.po_id}-${receiving.pid}`; - if (receiving.is_alt_po === 2) { - // No PO - if (!noPOReceivingMap.has(receiving.pid)) { - noPOReceivingMap.set(receiving.pid, []); - } - noPOReceivingMap.get(receiving.pid).push(receiving); - } else if (receiving.is_alt_po === 1) { - // Different PO - if (!altReceivingMap.has(receiving.pid)) { - altReceivingMap.set(receiving.pid, []); - } - altReceivingMap.get(receiving.pid).push(receiving); - } else { - // Original PO - if (!receivingMap.has(key)) { - receivingMap.set(key, []); - } - receivingMap.get(key).push(receiving); - } - }); - - // Verify PIDs exist - const [existingPids] = await localConnection.query( - 'SELECT pid FROM products WHERE pid IN (?)', - [productPids] - ); - const validPids = new Set(existingPids.map(p => p.pid)); - - // Prepare values for this sub-batch - const values = []; - let batchProcessed = 0; - - for (const po of batch) { - const poProducts = Array.from(poProductMap.values()) - .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); - - for (const product of poProducts) { - const key = `${po.po_id}-${product.pid}`; - const receivingHistory = receivingMap.get(key) || []; - const altReceivingHistory = altReceivingMap.get(product.pid) || []; - const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; - - const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0); - const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); - const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); - const totalReceived = received + altReceived + noPOReceived; - - const receiving_status = !totalReceived ? 1 : // created - totalReceived < product.ordered ? 30 : // partial - 40; // full - - const allReceivings = [...receivingHistory]; - if (altReceivingHistory.length > 0) { - allReceivings.push(...altReceivingHistory); - } - if (noPOReceivingHistory.length > 0) { - allReceivings.push(...noPOReceivingHistory); - } - allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); - - const firstReceiving = allReceivings[0] || {}; - const lastReceiving = allReceivings[allReceivings.length - 1] || {}; - - values.push(columnNames.map(col => { - switch (col) { - case 'po_id': return po.po_id; - case 'vendor': return po.vendor; - case 'date': return po.date; - case 'expected_date': return po.expected_date; - case 'pid': return product.pid; - case 'sku': return product.sku; - case 'cost_price': return product.cost_price; - case 'status': return po.status; - case 'notes': return po.notes; - case 'long_note': return po.long_note; - case 'ordered': return product.ordered; - case 'received': return totalReceived; - case 'received_date': return firstReceiving.received_date || null; - case 'last_received_date': return lastReceiving.received_date || null; - case 'received_by': return firstReceiving.received_by || null; - case 'receiving_status': return receiving_status; - case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({ - receiving_id: r.receiving_id, - qty: r.qty_each, - cost: r.cost_each, - date: r.received_date, - received_by: r.received_by, - alt_po: r.is_alt_po - }))); - default: return null; - } - })); - batchProcessed++; - } - } - - if (values.length > 0) { - const placeholders = values.map(() => - `(${Array(columnNames.length).fill("?").join(",")})` - ).join(","); - - const query = ` - INSERT INTO purchase_orders (${columnNames.join(",")}) - VALUES ${placeholders} - ON DUPLICATE KEY UPDATE ${columnNames - .filter((col) => col !== "po_id" && col !== "pid") - .map((col) => `${col} = VALUES(${col})`) - .join(",")}; - `; - - await localConnection.query(query, values.flat()); - } - - processed += batchProcessed; - - // Update progress based on time interval - const now = Date.now(); - if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { - updateProgress(processed, totalItems, "Purchase orders import", startTime); - lastProgressUpdate = now; - } - } - } - - const endTime = Date.now(); - outputProgress({ - operation: `Purchase orders import complete`, - status: "complete", - processed_records: processed, - total_records: totalItems, - timing: { - start_time: new Date(startTime).toISOString(), - end_time: new Date(endTime).toISOString(), - elapsed_time: formatElapsedTime((endTime - startTime) / 1000), - elapsed_seconds: Math.round((endTime - startTime) / 1000) - } - }); - - } catch (error) { - outputProgress({ - operation: "Purchase orders import failed", - status: "error", - error: error.message, - }); - throw error; - } -} - // Modify main function to handle cancellation and avoid process.exit - async function main() { let ssh; let prodConnection; @@ -1394,20 +40,42 @@ async function main() { const startTime = Date.now(); try { + // Initial progress update outputProgress({ status: "running", - operation: "Starting import process", - message: "Setting up connections...", + operation: "Import process", + message: "Initializing SSH tunnel...", + current: 0, + total: 4, // Total number of major steps + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); const tunnel = await setupSshTunnel(); ssh = tunnel.ssh; + outputProgress({ + status: "running", + operation: "Import process", + message: "Connecting to production database...", + current: 0, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); + prodConnection = await mysql.createConnection({ ...prodDbConfig, stream: tunnel.stream, }); + outputProgress({ + status: "running", + operation: "Import process", + message: "Connecting to local database...", + current: 0, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); + localConnection = await mysql.createPool({ ...localDbConfig, waitForConnections: true, @@ -1417,31 +85,73 @@ async function main() { if (isImportCancelled) throw new Error("Import cancelled"); + let currentStep = 0; + // Run each import based on constants if (IMPORT_CATEGORIES) { + outputProgress({ + status: "running", + operation: "Import process", + message: "Starting categories import...", + current: currentStep, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); await importCategories(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); + currentStep++; } if (IMPORT_PRODUCTS) { + outputProgress({ + status: "running", + operation: "Import process", + message: "Starting products import...", + current: currentStep, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); await importProducts(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); + currentStep++; } if (IMPORT_ORDERS) { + outputProgress({ + status: "running", + operation: "Import process", + message: "Starting orders import...", + current: currentStep, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); await importOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); + currentStep++; } if (IMPORT_PURCHASE_ORDERS) { + outputProgress({ + status: "running", + operation: "Import process", + message: "Starting purchase orders import...", + current: currentStep, + total: 4, + elapsed: formatElapsedTime((Date.now() - startTime) / 1000) + }); await importPurchaseOrders(prodConnection, localConnection); if (isImportCancelled) throw new Error("Import cancelled"); + currentStep++; } const endTime = Date.now(); outputProgress({ status: "complete", - operation: "Import process completed", + operation: "Import process", + message: "All imports completed successfully", + current: 4, + total: 4, + elapsed: formatElapsedTime((endTime - startTime) / 1000), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), @@ -1455,7 +165,11 @@ async function main() { outputProgress({ status: error.message === "Import cancelled" ? "cancelled" : "error", operation: "Import process", + message: error.message === "Import cancelled" ? "Import cancelled by user" : "Import failed", error: error.message, + current: 0, + total: 4, + elapsed: formatElapsedTime((endTime - startTime) / 1000), timing: { start_time: new Date(startTime).toISOString(), end_time: new Date(endTime).toISOString(), diff --git a/inventory-server/scripts/import/categories.js b/inventory-server/scripts/import/categories.js new file mode 100644 index 0000000..34f7c61 --- /dev/null +++ b/inventory-server/scripts/import/categories.js @@ -0,0 +1,168 @@ +const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); + +async function importCategories(prodConnection, localConnection) { + outputProgress({ + operation: "Starting categories import", + status: "running", + }); + + const startTime = Date.now(); + const typeOrder = [10, 20, 11, 21, 12, 13]; + let totalInserted = 0; + let skippedCategories = []; + + try { + // Process each type in order with its own query + for (const type of typeOrder) { + const [categories] = await prodConnection.query( + ` + SELECT + pc.cat_id, + pc.name, + pc.type, + CASE + WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent + WHEN pc.master_cat_id IS NULL THEN NULL + ELSE pc.master_cat_id + END as parent_id, + pc.combined_name as description + FROM product_categories pc + WHERE pc.type = ? + ORDER BY pc.cat_id + `, + [type] + ); + + if (categories.length === 0) continue; + + console.log(`\nProcessing ${categories.length} type ${type} categories`); + if (type === 10) { + console.log("Type 10 categories:", JSON.stringify(categories, null, 2)); + } + + // For types that can have parents (11, 21, 12, 13), verify parent existence + let categoriesToInsert = categories; + if (![10, 20].includes(type)) { + // Get all parent IDs + const parentIds = [ + ...new Set( + categories.map((c) => c.parent_id).filter((id) => id !== null) + ), + ]; + + // Check which parents exist + const [existingParents] = await localConnection.query( + "SELECT cat_id FROM categories WHERE cat_id IN (?)", + [parentIds] + ); + const existingParentIds = new Set(existingParents.map((p) => p.cat_id)); + + // Filter categories and track skipped ones + categoriesToInsert = categories.filter( + (cat) => + cat.parent_id === null || existingParentIds.has(cat.parent_id) + ); + const invalidCategories = categories.filter( + (cat) => + cat.parent_id !== null && !existingParentIds.has(cat.parent_id) + ); + + if (invalidCategories.length > 0) { + const skippedInfo = invalidCategories.map((c) => ({ + id: c.cat_id, + name: c.name, + type: c.type, + missing_parent: c.parent_id, + })); + skippedCategories.push(...skippedInfo); + + console.log( + "\nSkipping categories with missing parents:", + invalidCategories + .map( + (c) => + `${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})` + ) + .join("\n") + ); + } + + if (categoriesToInsert.length === 0) { + console.log( + `No valid categories of type ${type} to insert - all had missing parents` + ); + continue; + } + } + + console.log( + `Inserting ${categoriesToInsert.length} type ${type} categories` + ); + + const placeholders = categoriesToInsert + .map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)") + .join(","); + + const values = categoriesToInsert.flatMap((cat) => [ + cat.cat_id, + cat.name, + cat.type, + cat.parent_id, + cat.description, + "active", + ]); + + // Insert categories and create relationships in one query to avoid race conditions + await localConnection.query( + ` + INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE + name = VALUES(name), + type = VALUES(type), + parent_id = VALUES(parent_id), + description = VALUES(description), + status = VALUES(status), + updated_at = CURRENT_TIMESTAMP + `, + values + ); + + totalInserted += categoriesToInsert.length; + updateProgress( + totalInserted, + totalInserted, + "Categories import", + startTime + ); + } + + // After all imports, if we skipped any categories, throw an error + if (skippedCategories.length > 0) { + const error = new Error( + "Categories import completed with errors - some categories were skipped due to missing parents" + ); + error.skippedCategories = skippedCategories; + throw error; + } + + outputProgress({ + status: "complete", + operation: "Categories import completed", + current: totalInserted, + total: totalInserted, + duration: formatElapsedTime((Date.now() - startTime) / 1000), + }); + } catch (error) { + console.error("Error importing categories:", error); + if (error.skippedCategories) { + console.error( + "Skipped categories:", + JSON.stringify(error.skippedCategories, null, 2) + ); + } + throw error; + } +} + +module.exports = importCategories; \ No newline at end of file diff --git a/inventory-server/scripts/import/orders.js b/inventory-server/scripts/import/orders.js new file mode 100644 index 0000000..1427d0a --- /dev/null +++ b/inventory-server/scripts/import/orders.js @@ -0,0 +1,235 @@ +const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); +const { importMissingProducts } = require('./products'); + +async function importOrders(prodConnection, localConnection) { + outputProgress({ + operation: "Starting orders import - Getting total count", + status: "running", + }); + + const startTime = Date.now(); + const skippedOrders = new Set(); // Store orders that need to be retried + const missingProducts = new Set(); // Store products that need to be imported + + try { + // First get the column names from the table structure + const [columns] = await localConnection.query(` + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'orders' + ORDER BY ORDINAL_POSITION + `); + + const columnNames = columns + .map((col) => col.COLUMN_NAME) + .filter((name) => name !== "id"); // Skip auto-increment ID + + // Get total count first for progress indication + outputProgress({ + operation: "Starting orders import - Getting total count", + status: "running", + }); + + const [countResult] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM order_items oi FORCE INDEX (PRIMARY) + JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + `); + const totalOrders = countResult[0].total; + + outputProgress({ + operation: `Starting orders import - Fetching ${totalOrders} orders from production`, + status: "running", + }); + + const total = countResult[0].total; + let processed = 0; + + // Process in batches + const batchSize = 1000; + let offset = 0; + + while (offset < total) { + const [orders] = await prodConnection.query(` + SELECT + oi.order_id as order_number, + oi.prod_pid as pid, + oi.prod_itemnumber as SKU, + o.date_placed_onlydate as date, + oi.prod_price_reg as price, + oi.qty_ordered as quantity, + (oi.prod_price_reg - oi.prod_price) as discount, + ( + SELECT + otp.item_taxes_to_collect + FROM + order_tax_info oti + JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id + WHERE + oti.order_id = o.order_id + AND otp.pid = oi.prod_pid + ORDER BY + oti.stamp DESC + LIMIT 1 + ) as tax, + 0 as tax_included, + ROUND( + ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * + (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 + ) as shipping, + o.order_cid as customer, + CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, + 'pending' as status, + CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE o.order_status >= 15 + AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + LIMIT ? OFFSET ? + `, [batchSize, offset]); + + // Check if all products exist before inserting orders + const orderProductPids = [...new Set(orders.map((o) => o.pid))]; + const [existingProducts] = await localConnection.query( + "SELECT pid FROM products WHERE pid IN (?)", + [orderProductPids] + ); + const existingPids = new Set(existingProducts.map((p) => p.pid)); + + // Filter out orders with missing products and track them + const validOrders = orders.filter((order) => { + if (!existingPids.has(order.pid)) { + missingProducts.add(order.pid); + skippedOrders.add(order.order_number); + return false; + } + return true; + }); + + if (validOrders.length > 0) { + const placeholders = validOrders + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); + const updateClauses = columnNames + .filter((col) => col !== "order_number") // Don't update primary key + .map((col) => `${col} = VALUES(${col})`) + .join(","); + + const query = ` + INSERT INTO orders (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${updateClauses} + `; + + await localConnection.query( + query, + validOrders.flatMap(order => columnNames.map(col => order[col])) + ); + } + + processed += orders.length; + offset += batchSize; + + updateProgress( + processed, + total, + "Orders import", + startTime + ); + } + + // Now handle missing products and retry skipped orders + if (missingProducts.size > 0) { + outputProgress({ + operation: `Found ${missingProducts.size} missing products, importing them now`, + status: "running", + }); + + await importMissingProducts(prodConnection, localConnection, [ + ...missingProducts, + ]); + + // Retry skipped orders + if (skippedOrders.size > 0) { + outputProgress({ + operation: `Retrying ${skippedOrders.size} skipped orders`, + status: "running", + }); + + const [retryOrders] = await prodConnection.query(` + SELECT + oi.order_id as order_number, + oi.prod_pid as pid, + oi.prod_itemnumber as SKU, + o.date_placed_onlydate as date, + oi.prod_price_reg as price, + oi.qty_ordered as quantity, + (oi.prod_price_reg - oi.prod_price) as discount, + ( + SELECT + otp.item_taxes_to_collect + FROM + order_tax_info oti + JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id + WHERE + oti.order_id = o.order_id + AND otp.pid = oi.prod_pid + ORDER BY + oti.stamp DESC + LIMIT 1 + ) as tax, + 0 as tax_included, + ROUND( + ((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) * + (oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2 + ) as shipping, + o.order_cid as customer, + CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name, + 'pending' as status, + CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.order_id IN (?) + `, [[...skippedOrders]]); + + const placeholders = retryOrders + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); + const updateClauses = columnNames + .filter((col) => col !== "order_number") // Don't update primary key + .map((col) => `${col} = VALUES(${col})`) + .join(","); + + const query = ` + INSERT INTO orders (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${updateClauses} + `; + + await localConnection.query( + query, + retryOrders.flatMap(order => columnNames.map(col => order[col])) + ); + } + } + + const endTime = Date.now(); + outputProgress({ + operation: `Orders import complete in ${Math.round( + (endTime - startTime) / 1000 + )}s`, + status: "complete", + }); + } catch (error) { + outputProgress({ + operation: "Orders import failed", + status: "error", + error: error.message, + }); + throw error; + } +} + +module.exports = importOrders; \ No newline at end of file diff --git a/inventory-server/scripts/import/products.js b/inventory-server/scripts/import/products.js new file mode 100644 index 0000000..70be5c5 --- /dev/null +++ b/inventory-server/scripts/import/products.js @@ -0,0 +1,561 @@ +const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); + +async function importMissingProducts(prodConnection, localConnection, missingPids) { + // First get the column names from the table structure + const [columns] = await localConnection.query(` + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'products' + ORDER BY ORDINAL_POSITION + `); + + const columnNames = columns.map((col) => col.COLUMN_NAME); + + // Get the missing products from production + const [products] = await prodConnection.query(` + SELECT + p.pid, + p.description AS title, + p.notes AS description, + p.itemnumber AS SKU, + p.date_created, + p.datein AS first_received, + p.location, + COALESCE(si.available_local, 0) - COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0), 0) AS stock_quantity, + ci.onpreorder AS preorder_count, + pnb.inventory AS notions_inv_count, + COALESCE(pcp.price_each, 0) as price, + COALESCE(p.sellingprice, 0) AS regular_price, + COALESCE((SELECT ROUND(AVG(costeach), 5) + FROM product_inventory + WHERE pid = p.pid + AND COUNT > 0), 0) AS cost_price, + NULL AS landing_cost_price, + p.upc AS barcode, + p.harmonized_tariff_code, + p.stamp AS updated_at, + CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, + CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, + s.companyname AS vendor, + CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-t-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-175x175-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-o-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + NULL AS options, + NULL AS tags, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, + NULL AS uom, + p.rating, + p.rating_votes AS reviews, + p.weight, + p.length, + p.width, + p.height, + (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, + (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, + p.totalsold AS total_sold, + p.country_of_origin, + pls.date_sold as date_last_sold, + GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids + FROM products p + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + AND pc.type IN (10, 20, 11, 21, 12, 13) + AND pci.cat_id NOT IN (16, 17) + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + LEFT JOIN ( + SELECT pid, MIN(price_each) as price_each + FROM product_current_prices + WHERE active = 1 + GROUP BY pid + ) pcp ON p.pid = pcp.pid + WHERE p.pid IN (?) + GROUP BY p.pid + `, [missingPids]); + + if (products.length > 0) { + // Map values in the same order as columns + const productValues = products.flatMap(product => + columnNames.map(col => { + const val = product[col] ?? null; + if (col === "managing_stock") return 1; + if (typeof val === "number") return val || 0; + return val; + }) + ); + + // Generate placeholders for all products + const placeholders = products + .map(() => `(${Array(columnNames.length).fill("?").join(",")})`) + .join(","); + + // Build and execute the query + const query = ` + INSERT INTO products (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${columnNames + .filter((col) => col !== "pid") + .map((col) => `${col} = VALUES(${col})`) + .join(",")} + `; + + await localConnection.query(query, productValues); + + // Verify products were inserted before proceeding with categories + const [insertedProducts] = await localConnection.query( + "SELECT pid FROM products WHERE pid IN (?)", + [products.map(p => p.pid)] + ); + const insertedPids = new Set(insertedProducts.map(p => p.pid)); + + // Handle category relationships if any + const categoryRelationships = []; + products.forEach(product => { + // Only add category relationships for products that were successfully inserted + if (insertedPids.has(product.pid) && product.category_ids) { + const catIds = product.category_ids + .split(",") + .map(id => id.trim()) + .filter(id => id) + .map(Number); + catIds.forEach(catId => { + if (catId) categoryRelationships.push([catId, product.pid]); + }); + } + }); + + if (categoryRelationships.length > 0) { + // Verify categories exist before inserting relationships + const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))]; + const [existingCats] = await localConnection.query( + "SELECT cat_id FROM categories WHERE cat_id IN (?)", + [uniqueCatIds] + ); + const existingCatIds = new Set(existingCats.map(c => c.cat_id)); + + // Filter relationships to only include existing categories + const validRelationships = categoryRelationships.filter(([catId]) => + existingCatIds.has(catId) + ); + + if (validRelationships.length > 0) { + const catPlaceholders = validRelationships + .map(() => "(?, ?)") + .join(","); + await localConnection.query( + ` + INSERT INTO product_categories (cat_id, pid) + VALUES ${catPlaceholders} + ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) + `, + validRelationships.flat() + ); + } + } + } +} + +async function importProducts(prodConnection, localConnection) { + outputProgress({ + operation: "Starting products import - Getting schema", + status: "running", + }); + + const startTime = Date.now(); + + try { + // First get the column names from the table structure + const [columns] = await localConnection.query(` + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'products' + ORDER BY ORDINAL_POSITION + `); + + const columnNames = columns.map((col) => col.COLUMN_NAME); + + // Get total count first for progress indication + outputProgress({ + operation: "Starting products import - Getting total count", + status: "running", + }); + + const [countResult] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM products p + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR pls.date_sold IS NULL + `); + const totalProducts = countResult[0].total; + + outputProgress({ + operation: `Starting products import - Fetching ${totalProducts} products from production`, + status: "running", + }); + + // Get products from production with optimized query + const [rows] = await prodConnection.query(` + SELECT + p.pid, + p.description AS title, + p.notes AS description, + p.itemnumber AS SKU, + p.date_created, + p.datein AS first_received, + p.location, + COALESCE(si.available_local, 0) - COALESCE( + (SELECT SUM(oi.qty_ordered - oi.qty_placed) + FROM order_items oi + JOIN _order o ON oi.order_id = o.order_id + WHERE oi.prod_pid = p.pid + AND o.date_placed != '0000-00-00 00:00:00' + AND o.date_shipped = '0000-00-00 00:00:00' + AND oi.pick_finished = 0 + AND oi.qty_back = 0 + AND o.order_status != 15 + AND o.order_status < 90 + AND oi.qty_ordered >= oi.qty_placed + AND oi.qty_ordered > 0), 0) AS stock_quantity, + ci.onpreorder AS preorder_count, + pnb.inventory AS notions_inv_count, + COALESCE(pcp.price_each, 0) as price, + COALESCE(p.sellingprice, 0) AS regular_price, + COALESCE((SELECT ROUND(AVG(costeach), 5) + FROM product_inventory + WHERE pid = p.pid + AND COUNT > 0), 0) AS cost_price, + NULL AS landing_cost_price, + p.upc AS barcode, + p.harmonized_tariff_code, + p.stamp AS updated_at, + CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible, + CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable, + s.companyname AS vendor, + CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference, + sid.notions_itemnumber AS notions_reference, + CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-t-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0) AS image, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-175x175-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175, + (SELECT CONCAT('https://sbing.com/i/products/0000/', + SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/', + p.pid, '-o-', MIN(PI.iid), '.jpg') + FROM product_images PI + WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full, + pc1.name AS brand, + pc2.name AS line, + pc3.name AS subline, + pc4.name AS artist, + NULL AS options, + NULL AS tags, + COALESCE(CASE + WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit + ELSE sid.supplier_qty_per_unit + END, sid.notions_qty_per_unit) AS moq, + NULL AS uom, + p.rating, + p.rating_votes AS reviews, + p.weight, + p.length, + p.width, + p.height, + (SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets, + (SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies, + p.totalsold AS total_sold, + p.country_of_origin, + pls.date_sold as date_last_sold, + GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids + FROM products p + LEFT JOIN current_inventory ci ON p.pid = ci.pid + LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN supplier_item_data sid ON p.pid = sid.pid + LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid + LEFT JOIN product_category_index pci ON p.pid = pci.pid + LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id + AND pc.type IN (10, 20, 11, 21, 12, 13) + AND pci.cat_id NOT IN (16, 17) + LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id + LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id + LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id + LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + LEFT JOIN ( + SELECT pid, MIN(price_each) as price_each + FROM product_current_prices + WHERE active = 1 + GROUP BY pid + ) pcp ON p.pid = pcp.pid + WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + OR pls.date_sold IS NULL) + GROUP BY p.pid + `); + + // Debug log to check for specific product + const debugProduct = rows.find((row) => row.pid === 620972); + if (debugProduct) { + console.log("Found product 620972:", debugProduct); + } else { + console.log("Product 620972 not found in query results"); + + // Debug query to check why it's missing + const [debugResult] = await prodConnection.query( + ` + SELECT + p.pid, + p.itemnumber, + p.date_created, + p.datein, + pls.date_sold, + si.show, + si.buyable, + pcp.price_each + FROM products p + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN ( + SELECT pid, MIN(price_each) as price_each + FROM product_current_prices + WHERE active = 1 + GROUP BY pid + ) pcp ON p.pid = pcp.pid + WHERE p.pid = ? + `, + [620972] + ); + + console.log("Debug query result:", debugResult); + } + + // Also check for the other missing products + const missingPids = [ + 208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394, + 484755, 484756, 493549, 620972, + ]; + const [missingProducts] = await prodConnection.query( + ` + SELECT + p.pid, + p.itemnumber, + p.date_created, + p.datein, + pls.date_sold, + si.show, + si.buyable, + pcp.price_each + FROM products p + LEFT JOIN product_last_sold pls ON p.pid = pls.pid + LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0 + LEFT JOIN ( + SELECT pid, MIN(price_each) as price_each + FROM product_current_prices + WHERE active = 1 + GROUP BY pid + ) pcp ON p.pid = pcp.pid + WHERE p.pid IN (?) + `, + [missingPids] + ); + + console.log("Debug results for missing products:", missingProducts); + + let current = 0; + const total = rows.length; + + // Process products in batches + for (let i = 0; i < rows.length; i += BATCH_SIZE) { + let batch = rows.slice(i, i + BATCH_SIZE); + + // Prepare product values and category relationships in parallel + const productValues = []; + const categoryRelationships = []; + + batch.forEach((row) => { + // Map values in the same order as columns + const rowValues = columnNames.map((col) => { + const val = row[col] ?? null; + if (col === "managing_stock") return 1; + if (typeof val === "number") return val || 0; + return val; + }); + productValues.push(...rowValues); + + // Add category relationships + if (row.category_ids) { + const catIds = row.category_ids + .split(",") + .map((id) => id.trim()) + .filter((id) => id) + .map(Number); + catIds.forEach((catId) => { + if (catId) categoryRelationships.push([catId, row.pid]); + }); + } + }); + + // Generate placeholders based on column count + const placeholderGroup = `(${Array(columnNames.length) + .fill("?") + .join(",")})`; + const productPlaceholders = Array(batch.length) + .fill(placeholderGroup) + .join(","); + + // Build the query dynamically + const insertQuery = ` + INSERT INTO products (${columnNames.join(",")}) + VALUES ${productPlaceholders} + ON DUPLICATE KEY UPDATE ${columnNames + .filter((col) => col !== "pid") + .map((col) => `${col} = VALUES(${col})`) + .join(",")} + `; + + // First insert the products and wait for it to complete + await localConnection.query(insertQuery, productValues); + + // Now that products are inserted, handle category relationships + if (categoryRelationships.length > 0) { + // Get unique category IDs to verify they exist + const uniqueCatIds = [ + ...new Set(categoryRelationships.map(([catId]) => catId)), + ]; + + console.log("Checking categories:", uniqueCatIds); + + // Check which categories exist + const [existingCats] = await localConnection.query( + "SELECT cat_id FROM categories WHERE cat_id IN (?)", + [uniqueCatIds] + ); + const existingCatIds = new Set(existingCats.map((c) => c.cat_id)); + + // Log missing categories + const missingCatIds = uniqueCatIds.filter( + (id) => !existingCatIds.has(id) + ); + if (missingCatIds.length > 0) { + console.error("Missing categories:", missingCatIds); + + // Query production to see what these categories are + const [missingCats] = await prodConnection.query( + ` + SELECT cat_id, name, type, master_cat_id, hidden + FROM product_categories + WHERE cat_id IN (?) + `, + [missingCatIds] + ); + + console.error("Missing category details:", missingCats); + console.warn( + "Skipping invalid category relationships - continuing with import" + ); + continue; + } + + // Verify products exist before inserting relationships + const productIds = [ + ...new Set(categoryRelationships.map(([_, pid]) => pid)), + ]; + const [existingProducts] = await localConnection.query( + "SELECT pid FROM products WHERE pid IN (?)", + [productIds] + ); + const existingProductIds = new Set(existingProducts.map((p) => p.pid)); + + // Filter relationships to only include existing products + const validRelationships = categoryRelationships.filter(([_, pid]) => + existingProductIds.has(pid) + ); + + if (validRelationships.length > 0) { + const catPlaceholders = validRelationships + .map(() => "(?, ?)") + .join(","); + await localConnection.query( + ` + INSERT INTO product_categories (cat_id, pid) + VALUES ${catPlaceholders} + ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id) + `, + validRelationships.flat() + ); + } + } + + current += batch.length; + updateProgress(current, total, "Products import", startTime); + } + + outputProgress({ + status: "complete", + operation: "Products import completed", + current: total, + total, + duration: formatElapsedTime((Date.now() - startTime) / 1000), + }); + } catch (error) { + console.error("Error importing products:", error); + throw error; + } +} + +module.exports = { + importProducts, + importMissingProducts +}; \ No newline at end of file diff --git a/inventory-server/scripts/import/purchase-orders.js b/inventory-server/scripts/import/purchase-orders.js new file mode 100644 index 0000000..323e894 --- /dev/null +++ b/inventory-server/scripts/import/purchase-orders.js @@ -0,0 +1,290 @@ +const { updateProgress, outputProgress, formatElapsedTime } = require('./utils'); + +async function importPurchaseOrders(prodConnection, localConnection) { + outputProgress({ + operation: "Starting purchase orders import - Initializing", + status: "running", + }); + + const startTime = Date.now(); + + try { + // Get column names for the insert + const [columns] = await localConnection.query(` + SELECT COLUMN_NAME + FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME = 'purchase_orders' + ORDER BY ORDINAL_POSITION + `); + const columnNames = columns + .map((col) => col.COLUMN_NAME) + .filter((name) => name !== "id"); + + // First get all relevant PO IDs with basic info - this is much faster than the full join + const [[{ total }]] = await prodConnection.query(` + SELECT COUNT(*) as total + FROM ( + SELECT DISTINCT pop.po_id, pop.pid + FROM po p + FORCE INDEX (idx_date_created) + JOIN po_products pop ON p.po_id = pop.po_id + JOIN suppliers s ON p.supplier_id = s.supplierid + WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + UNION + SELECT DISTINCT r.receiving_id as po_id, rp.pid + FROM receivings_products rp + LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id + WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ) all_items + `); + + const [poList] = await prodConnection.query(` + SELECT DISTINCT + COALESCE(p.po_id, r.receiving_id) as po_id, + CASE + WHEN p.po_id IS NOT NULL THEN s1.companyname + WHEN r.supplier_id IS NOT NULL THEN s2.companyname + ELSE 'No Supplier' + END as vendor, + CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_ordered) END as date, + CASE WHEN p.po_id IS NOT NULL THEN DATE(p.date_estin) END as expected_date, + COALESCE(p.status, 50) as status, + COALESCE(p.short_note, '') as notes, + COALESCE(p.notes, '') as long_note + FROM ( + SELECT po_id FROM po + WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + UNION + SELECT DISTINCT r.receiving_id as po_id + FROM receivings r + JOIN receivings_products rp ON r.receiving_id = rp.receiving_id + WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ) ids + LEFT JOIN po p ON ids.po_id = p.po_id + LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid + LEFT JOIN receivings r ON ids.po_id = r.receiving_id + LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid + ORDER BY po_id + `); + + const totalItems = total; + let processed = 0; + + const BATCH_SIZE = 5000; + const PROGRESS_INTERVAL = 500; + let lastProgressUpdate = Date.now(); + + outputProgress({ + operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`, + status: "running", + }); + + for (let i = 0; i < poList.length; i += BATCH_SIZE) { + const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length)); + const poIds = batch.map(po => po.po_id); + + // Get all products for these POs in one query + const [poProducts] = await prodConnection.query(` + SELECT + pop.po_id, + pop.pid, + pr.itemnumber as sku, + pop.cost_each as cost_price, + pop.qty_each as ordered + FROM po_products pop + FORCE INDEX (PRIMARY) + JOIN products pr ON pop.pid = pr.pid + WHERE pop.po_id IN (?) + `, [poIds]); + + // Process PO products in smaller sub-batches to avoid packet size issues + const SUB_BATCH_SIZE = 5000; + for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) { + const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE); + const productPids = [...new Set(productBatch.map(p => p.pid))]; + const batchPoIds = [...new Set(productBatch.map(p => p.po_id))]; + + // Get receivings for this batch + const [receivings] = await prodConnection.query(` + SELECT + r.po_id, + rp.pid, + rp.receiving_id, + rp.qty_each, + rp.cost_each, + DATE(NULLIF(rp.received_date, '0000-00-00 00:00:00')) as received_date, + rp.received_by, + CASE + WHEN r.po_id IS NULL THEN 2 -- No PO + WHEN r.po_id IN (?) THEN 0 -- Original PO + ELSE 1 -- Different PO + END as is_alt_po + FROM receivings_products rp + LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id + WHERE rp.pid IN (?) + AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR) + ORDER BY r.po_id, rp.pid, rp.received_date + `, [batchPoIds, productPids]); + + // Create maps for this sub-batch + const poProductMap = new Map(); + productBatch.forEach(product => { + const key = `${product.po_id}-${product.pid}`; + poProductMap.set(key, product); + }); + + const receivingMap = new Map(); + const altReceivingMap = new Map(); + const noPOReceivingMap = new Map(); + + receivings.forEach(receiving => { + const key = `${receiving.po_id}-${receiving.pid}`; + if (receiving.is_alt_po === 2) { + // No PO + if (!noPOReceivingMap.has(receiving.pid)) { + noPOReceivingMap.set(receiving.pid, []); + } + noPOReceivingMap.get(receiving.pid).push(receiving); + } else if (receiving.is_alt_po === 1) { + // Different PO + if (!altReceivingMap.has(receiving.pid)) { + altReceivingMap.set(receiving.pid, []); + } + altReceivingMap.get(receiving.pid).push(receiving); + } else { + // Original PO + if (!receivingMap.has(key)) { + receivingMap.set(key, []); + } + receivingMap.get(key).push(receiving); + } + }); + + // Verify PIDs exist + const [existingPids] = await localConnection.query( + 'SELECT pid FROM products WHERE pid IN (?)', + [productPids] + ); + const validPids = new Set(existingPids.map(p => p.pid)); + + // Prepare values for this sub-batch + const values = []; + let batchProcessed = 0; + + for (const po of batch) { + const poProducts = Array.from(poProductMap.values()) + .filter(p => p.po_id === po.po_id && validPids.has(p.pid)); + + for (const product of poProducts) { + const key = `${po.po_id}-${product.pid}`; + const receivingHistory = receivingMap.get(key) || []; + const altReceivingHistory = altReceivingMap.get(product.pid) || []; + const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || []; + + const received = receivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const altReceived = altReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const noPOReceived = noPOReceivingHistory.reduce((sum, r) => sum + r.qty_each, 0); + const totalReceived = received + altReceived + noPOReceived; + + const receiving_status = !totalReceived ? 1 : // created + totalReceived < product.ordered ? 30 : // partial + 40; // full + + const allReceivings = [...receivingHistory]; + if (altReceivingHistory.length > 0) { + allReceivings.push(...altReceivingHistory); + } + if (noPOReceivingHistory.length > 0) { + allReceivings.push(...noPOReceivingHistory); + } + allReceivings.sort((a, b) => new Date(a.received_date) - new Date(b.received_date)); + + const firstReceiving = allReceivings[0] || {}; + const lastReceiving = allReceivings[allReceivings.length - 1] || {}; + + values.push(columnNames.map(col => { + switch (col) { + case 'po_id': return po.po_id; + case 'vendor': return po.vendor; + case 'date': return po.date; + case 'expected_date': return po.expected_date; + case 'pid': return product.pid; + case 'sku': return product.sku; + case 'cost_price': return product.cost_price; + case 'status': return po.status; + case 'notes': return po.notes; + case 'long_note': return po.long_note; + case 'ordered': return product.ordered; + case 'received': return totalReceived; + case 'received_date': return firstReceiving.received_date || null; + case 'last_received_date': return lastReceiving.received_date || null; + case 'received_by': return firstReceiving.received_by || null; + case 'receiving_status': return receiving_status; + case 'receiving_history': return JSON.stringify(allReceivings.map(r => ({ + receiving_id: r.receiving_id, + qty: r.qty_each, + cost: r.cost_each, + date: r.received_date, + received_by: r.received_by, + alt_po: r.is_alt_po + }))); + default: return null; + } + })); + batchProcessed++; + } + } + + if (values.length > 0) { + const placeholders = values.map(() => + `(${Array(columnNames.length).fill("?").join(",")})` + ).join(","); + + const query = ` + INSERT INTO purchase_orders (${columnNames.join(",")}) + VALUES ${placeholders} + ON DUPLICATE KEY UPDATE ${columnNames + .filter((col) => col !== "po_id" && col !== "pid") + .map((col) => `${col} = VALUES(${col})`) + .join(",")}; + `; + + await localConnection.query(query, values.flat()); + } + + processed += batchProcessed; + + // Update progress based on time interval + const now = Date.now(); + if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) { + updateProgress(processed, totalItems, "Purchase orders import", startTime); + lastProgressUpdate = now; + } + } + } + + const endTime = Date.now(); + outputProgress({ + operation: `Purchase orders import complete`, + status: "complete", + processed_records: processed, + total_records: totalItems, + timing: { + start_time: new Date(startTime).toISOString(), + end_time: new Date(endTime).toISOString(), + elapsed_time: formatElapsedTime((endTime - startTime) / 1000), + elapsed_seconds: Math.round((endTime - startTime) / 1000) + } + }); + + } catch (error) { + outputProgress({ + operation: "Purchase orders import failed", + status: "error", + error: error.message, + }); + throw error; + } +} + +module.exports = importPurchaseOrders; \ No newline at end of file diff --git a/inventory-server/scripts/import/utils.js b/inventory-server/scripts/import/utils.js new file mode 100644 index 0000000..3f71b9c --- /dev/null +++ b/inventory-server/scripts/import/utils.js @@ -0,0 +1,102 @@ +const mysql = require("mysql2/promise"); +const { Client } = require("ssh2"); +const dotenv = require("dotenv"); +const path = require("path"); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); + +dotenv.config({ path: path.join(__dirname, "../../.env") }); + +// SSH configuration +const sshConfig = { + host: process.env.PROD_SSH_HOST, + port: process.env.PROD_SSH_PORT || 22, + username: process.env.PROD_SSH_USER, + privateKey: process.env.PROD_SSH_KEY_PATH + ? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH) + : undefined, +}; + +// Production database configuration +const prodDbConfig = { + host: process.env.PROD_DB_HOST || "localhost", + user: process.env.PROD_DB_USER, + password: process.env.PROD_DB_PASSWORD, + database: process.env.PROD_DB_NAME, + port: process.env.PROD_DB_PORT || 3306, +}; + +// Local database configuration +const localDbConfig = { + host: process.env.DB_HOST, + user: process.env.DB_USER, + password: process.env.DB_PASSWORD, + database: process.env.DB_NAME, + multipleStatements: true, + waitForConnections: true, + connectionLimit: 10, + queueLimit: 0, + namedPlaceholders: true, +}; + +// Constants +const BATCH_SIZE = 1000; +const PROGRESS_INTERVAL = 1000; // Update progress every second + +async function setupSshTunnel() { + return new Promise((resolve, reject) => { + const ssh = new Client(); + + ssh.on('error', (err) => { + console.error('SSH connection error:', err); + // Don't reject here, just log the error + }); + + ssh.on('end', () => { + console.log('SSH connection ended normally'); + }); + + ssh.on('close', () => { + console.log('SSH connection closed'); + }); + + ssh + .on("ready", () => { + ssh.forwardOut( + "127.0.0.1", + 0, + prodDbConfig.host, + prodDbConfig.port, + async (err, stream) => { + if (err) reject(err); + resolve({ ssh, stream }); + } + ); + }) + .connect(sshConfig); + }); +} + +// Helper function to update progress with time estimate +function updateProgress(current, total, operation, startTime) { + outputProgress({ + status: 'running', + operation, + current, + total, + rate: calculateRate(startTime, current), + elapsed: formatElapsedTime(startTime), + remaining: estimateRemaining(startTime, current, total), + percentage: ((current / total) * 100).toFixed(1) + }); +} + +module.exports = { + setupSshTunnel, + updateProgress, + prodDbConfig, + localDbConfig, + BATCH_SIZE, + PROGRESS_INTERVAL, + outputProgress, + formatElapsedTime +}; \ No newline at end of file diff --git a/inventory-server/scripts/import-csv.js b/inventory-server/scripts/old_csv/import-csv.js similarity index 99% rename from inventory-server/scripts/import-csv.js rename to inventory-server/scripts/old_csv/import-csv.js index 04ab8ef..33fe6fa 100644 --- a/inventory-server/scripts/import-csv.js +++ b/inventory-server/scripts/old_csv/import-csv.js @@ -3,7 +3,7 @@ const path = require('path'); const csv = require('csv-parse'); const mysql = require('mysql2/promise'); const dotenv = require('dotenv'); -const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress'); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); // Get test limits from environment variables const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0'); diff --git a/inventory-server/scripts/update-csv.js b/inventory-server/scripts/old_csv/update-csv.js similarity index 98% rename from inventory-server/scripts/update-csv.js rename to inventory-server/scripts/old_csv/update-csv.js index 26e5556..4f49fcb 100644 --- a/inventory-server/scripts/update-csv.js +++ b/inventory-server/scripts/old_csv/update-csv.js @@ -1,7 +1,7 @@ const path = require('path'); const fs = require('fs'); const axios = require('axios'); -const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./metrics/utils/progress'); +const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress'); // Change working directory to script directory process.chdir(path.dirname(__filename));