Files
inventory/inventory-server/scripts/import-from-prod.js
2025-01-26 16:52:53 -05:00

1275 lines
41 KiB
JavaScript

const mysql = require("mysql2/promise");
const { Client } = require("ssh2");
const dotenv = require("dotenv");
const path = require("path");
dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run
const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = false;
const IMPORT_PURCHASE_ORDERS = true;
// SSH configuration
const sshConfig = {
host: process.env.PROD_SSH_HOST,
port: process.env.PROD_SSH_PORT || 22,
username: process.env.PROD_SSH_USER,
privateKey: process.env.PROD_SSH_KEY_PATH
? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
};
// Production database configuration
const prodDbConfig = {
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
};
// Local database configuration
const localDbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true,
};
// Helper function to output progress
function outputProgress(data) {
process.stdout.write(JSON.stringify(data) + "\n");
}
// Helper function to format duration
function formatDuration(seconds) {
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
seconds = Math.floor(seconds % 60);
const parts = [];
if (hours > 0) parts.push(`${hours}h`);
if (minutes > 0) parts.push(`${minutes}m`);
if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`);
return parts.join(" ");
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed;
const remaining = (total - current) / rate;
outputProgress({
status: "running",
operation,
current,
total,
rate,
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((current / total) * 100).toFixed(1),
});
}
let isImportCancelled = false;
// Add cancel function
function cancelImport() {
isImportCancelled = true;
outputProgress({
status: "cancelled",
operation: "Import cancelled",
});
}
async function setupSshTunnel() {
return new Promise((resolve, reject) => {
const ssh = new Client();
ssh
.on("ready", () => {
ssh.forwardOut(
"127.0.0.1",
0,
prodDbConfig.host,
prodDbConfig.port,
async (err, stream) => {
if (err) reject(err);
resolve({ ssh, stream });
}
);
})
.connect(sshConfig);
});
}
async function importCategories(prodConnection, localConnection) {
outputProgress({
operation: "Starting categories import",
status: "running",
});
const startTime = Date.now();
const typeOrder = [10, 20, 11, 21, 12, 13];
let totalInserted = 0;
let skippedCategories = [];
try {
// Process each type in order with its own query
for (const type of typeOrder) {
const [categories] = await prodConnection.query(
`
SELECT
pc.cat_id,
pc.name,
pc.type,
CASE
WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
WHEN pc.master_cat_id IS NULL THEN NULL
ELSE pc.master_cat_id
END as parent_id,
pc.combined_name as description
FROM product_categories pc
WHERE pc.type = ?
ORDER BY pc.cat_id
`,
[type]
);
if (categories.length === 0) continue;
console.log(`\nProcessing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
// For types that can have parents (11, 21, 12, 13), verify parent existence
let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories.map((c) => c.parent_id).filter((id) => id !== null)
),
];
// Check which parents exist
const [existingParents] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[parentIds]
);
const existingParentIds = new Set(existingParents.map((p) => p.cat_id));
// Filter categories and track skipped ones
categoriesToInsert = categories.filter(
(cat) =>
cat.parent_id === null || existingParentIds.has(cat.parent_id)
);
const invalidCategories = categories.filter(
(cat) =>
cat.parent_id !== null && !existingParentIds.has(cat.parent_id)
);
if (invalidCategories.length > 0) {
const skippedInfo = invalidCategories.map((c) => ({
id: c.cat_id,
name: c.name,
type: c.type,
missing_parent: c.parent_id,
}));
skippedCategories.push(...skippedInfo);
console.log(
"\nSkipping categories with missing parents:",
invalidCategories
.map(
(c) =>
`${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})`
)
.join("\n")
);
}
if (categoriesToInsert.length === 0) {
console.log(
`No valid categories of type ${type} to insert - all had missing parents`
);
continue;
}
}
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
const placeholders = categoriesToInsert
.map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
.join(",");
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
"active",
]);
// Insert categories and create relationships in one query to avoid race conditions
await localConnection.query(
`
INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
name = VALUES(name),
type = VALUES(type),
parent_id = VALUES(parent_id),
description = VALUES(description),
status = VALUES(status),
updated_at = CURRENT_TIMESTAMP
`,
values
);
totalInserted += categoriesToInsert.length;
updateProgress(
totalInserted,
totalInserted,
"Categories import",
startTime
);
}
// After all imports, if we skipped any categories, throw an error
if (skippedCategories.length > 0) {
const error = new Error(
"Categories import completed with errors - some categories were skipped due to missing parents"
);
error.skippedCategories = skippedCategories;
throw error;
}
outputProgress({
status: "complete",
operation: "Categories import completed",
current: totalInserted,
total: totalInserted,
duration: formatDuration((Date.now() - startTime) / 1000),
});
} catch (error) {
console.error("Error importing categories:", error);
if (error.skippedCategories) {
console.error(
"Skipped categories:",
JSON.stringify(error.skippedCategories, null, 2)
);
}
throw error;
}
}
async function importProducts(prodConnection, localConnection) {
outputProgress({
operation: "Starting products import - Getting schema",
status: "running",
});
const startTime = Date.now();
try {
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map((col) => col.COLUMN_NAME);
// Get total count first for progress indication
outputProgress({
operation: "Starting products import - Getting total count",
status: "running",
});
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi FORCE INDEX (PRIMARY)
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
`);
const totalProducts = countResult[0].total;
outputProgress({
operation: `Starting products import - Fetching ${totalProducts} products from production`,
status: "running",
});
// Get products from production with optimized query
const [rows] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0) AS stock_quantity,
ci.onpreorder AS preorder_count,
pnb.inventory AS notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE((SELECT ROUND(AVG(costeach), 5)
FROM product_inventory
WHERE pid = p.pid
AND COUNT > 0), 0) AS cost_price,
NULL AS landing_cost_price,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-t-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-175x175-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-o-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS options,
NULL AS tags,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
NULL AS uom,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR pls.date_sold IS NULL)
GROUP BY p.pid
`);
// Debug log to check for specific product
const debugProduct = rows.find((row) => row.pid === 620972);
if (debugProduct) {
console.log("Found product 620972:", debugProduct);
} else {
console.log("Product 620972 not found in query results");
// Debug query to check why it's missing
const [debugResult] = await prodConnection.query(
`
SELECT
p.pid,
p.itemnumber,
p.date_created,
p.datein,
pls.date_sold,
si.show,
si.buyable,
pcp.price_each
FROM products p
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE p.pid = ?
`,
[620972]
);
console.log("Debug query result:", debugResult);
}
// Also check for the other missing products
const missingPids = [
208348, 317600, 370009, 429494, 466233, 471156, 474582, 476214, 484394,
484755, 484756, 493549, 620972,
];
const [missingProducts] = await prodConnection.query(
`
SELECT
p.pid,
p.itemnumber,
p.date_created,
p.datein,
pls.date_sold,
si.show,
si.buyable,
pcp.price_each
FROM products p
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE p.pid IN (?)
`,
[missingPids]
);
console.log("Debug results for missing products:", missingProducts);
let current = 0;
const total = rows.length;
// Process products in batches
const BATCH_SIZE = 100;
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
let batch = rows.slice(i, i + BATCH_SIZE);
// Prepare product values and category relationships in parallel
const productValues = [];
const categoryRelationships = [];
batch.forEach((row) => {
// Map values in the same order as columns
const rowValues = columnNames.map((col) => {
const val = row[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
});
productValues.push(...rowValues);
// Add category relationships
if (row.category_ids) {
const catIds = row.category_ids
.split(",")
.map((id) => id.trim())
.filter((id) => id)
.map(Number);
catIds.forEach((catId) => {
if (catId) categoryRelationships.push([catId, row.pid]);
});
}
});
// Generate placeholders based on column count
const placeholderGroup = `(${Array(columnNames.length)
.fill("?")
.join(",")})`;
const productPlaceholders = Array(batch.length)
.fill(placeholderGroup)
.join(",");
// Build the query dynamically
const insertQuery = `
INSERT INTO products (${columnNames.join(",")})
VALUES ${productPlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")}
`;
// First insert the products and wait for it to complete
await localConnection.query(insertQuery, productValues);
// Now that products are inserted, handle category relationships
if (categoryRelationships.length > 0) {
// Get unique category IDs to verify they exist
const uniqueCatIds = [
...new Set(categoryRelationships.map(([catId]) => catId)),
];
console.log("Checking categories:", uniqueCatIds);
// Check which categories exist
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map((c) => c.cat_id));
// Log missing categories
const missingCatIds = uniqueCatIds.filter(
(id) => !existingCatIds.has(id)
);
if (missingCatIds.length > 0) {
console.error("Missing categories:", missingCatIds);
// Query production to see what these categories are
const [missingCats] = await prodConnection.query(
`
SELECT cat_id, name, type, master_cat_id, hidden
FROM product_categories
WHERE cat_id IN (?)
`,
[missingCatIds]
);
console.error("Missing category details:", missingCats);
console.warn(
"Skipping invalid category relationships - continuing with import"
);
continue;
}
// Verify products exist before inserting relationships
const productIds = [
...new Set(categoryRelationships.map(([_, pid]) => pid)),
];
const [existingProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[productIds]
);
const existingProductIds = new Set(existingProducts.map((p) => p.pid));
// Filter relationships to only include existing products
const validRelationships = categoryRelationships.filter(([_, pid]) =>
existingProductIds.has(pid)
);
if (validRelationships.length > 0) {
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`
INSERT INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
`,
validRelationships.flat()
);
}
}
current += batch.length;
updateProgress(current, total, "Products import", startTime);
}
outputProgress({
status: "complete",
operation: "Products import completed",
current: total,
total,
duration: formatDuration((Date.now() - startTime) / 1000),
});
} catch (error) {
console.error("Error importing products:", error);
throw error;
}
}
// Helper function to get date ranges for chunked queries
async function getDateRanges(
prodConnection,
table,
dateField,
startYearsAgo = 2,
chunkMonths = 3
) {
const ranges = [];
const [result] = await prodConnection.query(
`
SELECT
DATE_SUB(CURRENT_DATE, INTERVAL ? YEAR) as start_date,
CURRENT_DATE as end_date
`,
[startYearsAgo]
);
let currentDate = new Date(result[0].end_date);
const startDate = new Date(result[0].start_date);
while (currentDate > startDate) {
const rangeEnd = new Date(currentDate);
currentDate.setMonth(currentDate.getMonth() - chunkMonths);
const rangeStart = new Date(Math.max(currentDate, startDate));
ranges.push({
start: rangeStart.toISOString().split("T")[0],
end: rangeEnd.toISOString().split("T")[0],
});
}
return ranges;
}
async function importMissingProducts(prodConnection, localConnection, missingPids) {
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map((col) => col.COLUMN_NAME);
// Get the missing products from production
const [products] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0) AS stock_quantity,
ci.onpreorder AS preorder_count,
pnb.inventory AS notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE((SELECT ROUND(AVG(costeach), 5)
FROM product_inventory
WHERE pid = p.pid
AND COUNT > 0), 0) AS cost_price,
NULL AS landing_cost_price,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-t-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-175x175-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-o-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS options,
NULL AS tags,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
NULL AS uom,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE p.pid IN (?)
GROUP BY p.pid
`, [missingPids]);
if (products.length > 0) {
// Map values in the same order as columns
const productValues = products.flatMap(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
})
);
// Generate placeholders for all products
const placeholders = products
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
// Build and execute the query
const query = `
INSERT INTO products (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")}
`;
await localConnection.query(query, productValues);
// Verify products were inserted before proceeding with categories
const [insertedProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[products.map(p => p.pid)]
);
const insertedPids = new Set(insertedProducts.map(p => p.pid));
// Handle category relationships if any
const categoryRelationships = [];
products.forEach(product => {
// Only add category relationships for products that were successfully inserted
if (insertedPids.has(product.pid) && product.category_ids) {
const catIds = product.category_ids
.split(",")
.map(id => id.trim())
.filter(id => id)
.map(Number);
catIds.forEach(catId => {
if (catId) categoryRelationships.push([catId, product.pid]);
});
}
});
if (categoryRelationships.length > 0) {
// Verify categories exist before inserting relationships
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
// Filter relationships to only include existing categories
const validRelationships = categoryRelationships.filter(([catId]) =>
existingCatIds.has(catId)
);
if (validRelationships.length > 0) {
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`
INSERT INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
`,
validRelationships.flat()
);
}
}
}
}
async function importOrders(prodConnection, localConnection) {
outputProgress({
operation: "Starting orders import - Getting total count",
status: "running",
});
const startTime = Date.now();
const skippedOrders = new Set(); // Store orders that need to be retried
const missingProducts = new Set(); // Store products that need to be imported
try {
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id"); // Skip auto-increment ID
// Get total count first for progress indication
outputProgress({
operation: "Starting orders import - Getting total count",
status: "running",
});
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi FORCE INDEX (PRIMARY)
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
`);
const totalOrders = countResult[0].total;
outputProgress({
operation: `Starting orders import - Fetching ${totalOrders} orders from production`,
status: "running",
});
const total = countResult[0].total;
let processed = 0;
// Process in batches
const batchSize = 1000;
let offset = 0;
while (offset < total) {
const [orders] = await prodConnection.query(`
SELECT
oi.order_id as order_number,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
o.date_placed_onlydate as date,
oi.prod_price_reg as price,
oi.qty_ordered as quantity,
(oi.prod_price_reg - oi.prod_price) as discount,
(
SELECT
otp.item_taxes_to_collect
FROM
order_tax_info oti
JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id
WHERE
oti.order_id = o.order_id
AND otp.pid = oi.prod_pid
ORDER BY
oti.stamp DESC
LIMIT 1
) as tax,
0 as tax_included,
ROUND(
((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) *
(oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2
) as shipping,
o.order_cid as customer,
CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name,
'pending' as status,
CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
LIMIT ? OFFSET ?
`, [batchSize, offset]);
// Check if all products exist before inserting orders
const orderProductPids = [...new Set(orders.map((o) => o.pid))];
const [existingProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[orderProductPids]
);
const existingPids = new Set(existingProducts.map((p) => p.pid));
// Filter out orders with missing products and track them
const validOrders = orders.filter((order) => {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
return false;
}
return true;
});
if (validOrders.length > 0) {
const placeholders = validOrders
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateClauses = columnNames
.filter((col) => col !== "order_number") // Don't update primary key
.map((col) => `${col} = VALUES(${col})`)
.join(",");
const query = `
INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${updateClauses}
`;
await localConnection.query(
query,
validOrders.flatMap(order => columnNames.map(col => order[col]))
);
}
processed += orders.length;
offset += batchSize;
updateProgress(
processed,
total,
"Orders import",
startTime
);
}
// Now handle missing products and retry skipped orders
if (missingProducts.size > 0) {
outputProgress({
operation: `Found ${missingProducts.size} missing products, importing them now`,
status: "running",
});
await importMissingProducts(prodConnection, localConnection, [
...missingProducts,
]);
// Retry skipped orders
if (skippedOrders.size > 0) {
outputProgress({
operation: `Retrying ${skippedOrders.size} skipped orders`,
status: "running",
});
const [retryOrders] = await prodConnection.query(`
SELECT
oi.order_id as order_number,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
o.date_placed_onlydate as date,
oi.prod_price_reg as price,
oi.qty_ordered as quantity,
(oi.prod_price_reg - oi.prod_price) as discount,
(
SELECT
otp.item_taxes_to_collect
FROM
order_tax_info oti
JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id
WHERE
oti.order_id = o.order_id
AND otp.pid = oi.prod_pid
ORDER BY
oti.stamp DESC
LIMIT 1
) as tax,
0 as tax_included,
ROUND(
((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) *
(oi.prod_price * oi.qty_ordered) / NULLIF(o.summary_subtotal, 0)), 2
) as shipping,
o.order_cid as customer,
CONCAT(o.bill_firstname, ' ', o.bill_lastname) as customer_name,
'pending' as status,
CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.order_id IN (?)
`, [[...skippedOrders]]);
const placeholders = retryOrders
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateClauses = columnNames
.filter((col) => col !== "order_number") // Don't update primary key
.map((col) => `${col} = VALUES(${col})`)
.join(",");
const query = `
INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${updateClauses}
`;
await localConnection.query(
query,
retryOrders.flatMap(order => columnNames.map(col => order[col]))
);
}
}
const endTime = Date.now();
outputProgress({
operation: `Orders import complete in ${Math.round(
(endTime - startTime) / 1000
)}s`,
status: "complete",
});
} catch (error) {
outputProgress({
operation: "Orders import failed",
status: "error",
error: error.message,
});
throw error;
}
}
async function importPurchaseOrders(prodConnection, localConnection) {
outputProgress({
operation: "Starting purchase orders import - Initializing",
status: "running",
});
const startTime = Date.now();
try {
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id"); // Skip auto-increment ID
outputProgress({
operation: "Starting purchase orders import - Fetching POs from production",
status: "running",
});
const [purchaseOrders] = await prodConnection.query(`
SELECT
po.po_id as po_number,
s.companyname as vendor,
po.date_ordered as date,
po.date_estin as expected_date,
pop.pid as product_id,
p.itemnumber as sku,
COALESCE(rp.cost_each, pop.cost_each) as cost_price,
po.status,
CONCAT(COALESCE(po.short_note, ''), ' ', COALESCE(po.notes, '')) as notes,
pop.qty_each as ordered,
rp.received_by,
rp.received_date
FROM po
JOIN po_products pop ON po.po_id = pop.po_id
JOIN products p ON pop.pid = p.pid
JOIN suppliers s ON po.supplier_id = s.supplierid
LEFT JOIN receivings r ON po.po_id = r.po_id
LEFT JOIN receivings_products rp ON r.receiving_id = rp.receiving_id
AND pop.pid = rp.pid
WHERE po.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
ORDER BY po.po_id
`);
if (purchaseOrders.length > 0) {
const placeholders = purchaseOrders
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateClauses = columnNames
.filter((col) => col !== "po_number") // Don't update primary key
.map((col) => `${col} = VALUES(${col})`)
.join(",");
const query = `
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${updateClauses}
`;
await localConnection.query(
query,
purchaseOrders.flatMap(po => columnNames.map(col => po[col]))
);
outputProgress({
operation: `Imported ${purchaseOrders.length} purchase order items`,
status: "running",
});
}
const endTime = Date.now();
outputProgress({
operation: `Purchase orders import complete in ${Math.round(
(endTime - startTime) / 1000
)}s - Imported ${purchaseOrders.length} PO items`,
status: "complete",
});
} catch (error) {
outputProgress({
operation: "Purchase orders import failed",
status: "error",
error: error.message,
});
throw error;
}
}
// Modify main function to handle cancellation and avoid process.exit
async function main() {
let ssh;
let prodConnection;
let localConnection;
try {
outputProgress({
status: "running",
operation: "Starting import process",
message: "Setting up connections...",
});
// Set up connections
const tunnel = await setupSshTunnel();
ssh = tunnel.ssh;
prodConnection = await mysql.createConnection({
...prodDbConfig,
stream: tunnel.stream,
});
localConnection = await mysql.createPool(localDbConfig);
if (isImportCancelled) throw new Error("Import cancelled");
// Run each import based on constants
if (IMPORT_CATEGORIES) {
await importCategories(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
}
if (IMPORT_PRODUCTS) {
await importProducts(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
}
if (IMPORT_ORDERS) {
await importOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
}
if (IMPORT_PURCHASE_ORDERS) {
await importPurchaseOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
}
outputProgress({
status: "complete",
operation: "Import process completed",
});
} catch (error) {
console.error("Error during import process:", error);
outputProgress({
status: error.message === "Import cancelled" ? "cancelled" : "error",
operation: "Import process",
error: error.message,
});
throw error;
} finally {
if (prodConnection) await prodConnection.end();
if (localConnection) await localConnection.end();
if (ssh) ssh.end();
}
}
// Run the import only if this is the main module
if (require.main === module) {
main().catch((error) => {
console.error("Unhandled error in main process:", error);
process.exit(1);
});
}
// Export the functions needed by the route
module.exports = {
main,
outputProgress,
cancelImport,
};