Add incremental import support and tracking for database synchronization

This commit is contained in:
2025-01-29 16:22:00 -05:00
parent d60b2d4fae
commit d2a2dbc812
7 changed files with 760 additions and 379 deletions

View File

@@ -170,3 +170,26 @@ ORDER BY
END,
c.name,
st.vendor;
CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT,
INDEX idx_last_sync (last_sync_timestamp)
);
CREATE TABLE IF NOT EXISTS import_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
duration_seconds INT,
records_added INT DEFAULT 0,
records_updated INT DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE,
status ENUM('running', 'completed', 'failed', 'cancelled') DEFAULT 'running',
error_message TEXT,
additional_info JSON,
INDEX idx_table_time (table_name, start_time),
INDEX idx_status (status)
);

View File

@@ -10,12 +10,16 @@ const importPurchaseOrders = require('./import/purchase-orders');
dotenv.config({ path: path.join(__dirname, "../.env") });
// Constants to control which imports run
const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = false;
const IMPORT_CATEGORIES = true;
const IMPORT_PRODUCTS = true;
const IMPORT_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = true;
// Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true';
// SSH configuration
// In import-from-prod.js
const sshConfig = {
ssh: {
host: process.env.PROD_SSH_HOST,
@@ -24,16 +28,16 @@ const sshConfig = {
privateKey: process.env.PROD_SSH_KEY_PATH
? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH)
: undefined,
compress: true, // Enable SSH compression
},
// Production database configuration
prodDbConfig: {
host: process.env.PROD_DB_HOST || "localhost",
user: process.env.PROD_DB_USER,
password: process.env.PROD_DB_PASSWORD,
database: process.env.PROD_DB_NAME,
port: process.env.PROD_DB_PORT || 3306,
timezone: 'Z',
},
// Local database configuration
localDbConfig: {
host: process.env.DB_HOST,
user: process.env.DB_USER,
@@ -44,6 +48,13 @@ const sshConfig = {
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true,
maxAllowedPacket: 64 * 1024 * 1024, // 64MB
connectTimeout: 60000,
enableKeepAlive: true,
keepAliveInitialDelay: 10000,
compress: true,
timezone: 'Z',
stringifyObjects: false,
}
};
@@ -68,6 +79,7 @@ async function main() {
const startTime = Date.now();
let connections;
let completedSteps = 0;
let importHistoryId;
const totalSteps = [
IMPORT_CATEGORIES,
IMPORT_PRODUCTS,
@@ -80,7 +92,7 @@ async function main() {
outputProgress({
status: "running",
operation: "Import process",
message: "Initializing SSH tunnel...",
message: `Initializing SSH tunnel for ${INCREMENTAL_UPDATE ? 'incremental' : 'full'} import...`,
current: completedSteps,
total: totalSteps,
elapsed: formatElapsedTime(startTime)
@@ -91,6 +103,39 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
// Initialize sync_status table if it doesn't exist
await localConnection.query(`
CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT,
INDEX idx_last_sync (last_sync_timestamp)
);
`);
// Create import history record for the overall session
const [historyResult] = await localConnection.query(`
INSERT INTO import_history (
table_name,
start_time,
is_incremental,
status,
additional_info
) VALUES (
'all_tables',
NOW(),
?,
'running',
JSON_OBJECT(
'categories_enabled', ?,
'products_enabled', ?,
'orders_enabled', ?,
'purchase_orders_enabled', ?
)
)
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
importHistoryId = historyResult.insertId;
const results = {
categories: null,
products: null,
@@ -98,37 +143,84 @@ async function main() {
purchaseOrders: null
};
let totalRecordsAdded = 0;
let totalRecordsUpdated = 0;
// Run each import based on constants
if (IMPORT_CATEGORIES) {
results.categories = await importCategories(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.categories.recordsAdded) totalRecordsAdded += results.categories.recordsAdded;
if (results.categories.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated;
}
if (IMPORT_PRODUCTS) {
results.products = await importProducts(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded;
if (results.products.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated;
}
if (IMPORT_ORDERS) {
results.orders = await importOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded;
if (results.orders.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated;
}
if (IMPORT_PURCHASE_ORDERS) {
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded;
if (results.purchaseOrders.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated;
}
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update import history with final stats
await localConnection.query(`
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
records_added = ?,
records_updated = ?,
status = 'completed',
additional_info = JSON_OBJECT(
'categories_enabled', ?,
'products_enabled', ?,
'orders_enabled', ?,
'purchase_orders_enabled', ?,
'categories_result', CAST(? AS JSON),
'products_result', CAST(? AS JSON),
'orders_result', CAST(? AS JSON),
'purchase_orders_result', CAST(? AS JSON)
)
WHERE id = ?
`, [
totalElapsedSeconds,
totalRecordsAdded,
totalRecordsUpdated,
IMPORT_CATEGORIES,
IMPORT_PRODUCTS,
IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS,
JSON.stringify(results.categories),
JSON.stringify(results.products),
JSON.stringify(results.orders),
JSON.stringify(results.purchaseOrders),
importHistoryId
]);
outputProgress({
status: "complete",
operation: "Import process",
message: `All imports completed successfully in ${formatElapsedTime(totalElapsedSeconds)}`,
message: `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import completed successfully in ${formatElapsedTime(totalElapsedSeconds)}`,
current: completedSteps,
total: totalSteps,
elapsed: formatElapsedTime(startTime),
@@ -146,13 +238,27 @@ async function main() {
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update import history with error
if (importHistoryId) {
await connections?.localConnection?.query(`
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
status = ?,
error_message = ?
WHERE id = ?
`, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]);
}
console.error("Error during import process:", error);
outputProgress({
status: error.message === "Import cancelled" ? "cancelled" : "error",
operation: "Import process",
message: error.message === "Import cancelled"
? `Import cancelled by user after ${formatElapsedTime(totalElapsedSeconds)}`
: `Import failed after ${formatElapsedTime(totalElapsedSeconds)}`,
? `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import cancelled by user after ${formatElapsedTime(totalElapsedSeconds)}`
: `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import failed after ${formatElapsedTime(totalElapsedSeconds)}`,
error: error.message,
current: completedSteps,
total: totalSteps,

View File

@@ -12,6 +12,12 @@ async function importOrders(prodConnection, localConnection) {
const missingProducts = new Set(); // Store products that need to be imported
try {
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
@@ -24,14 +30,16 @@ async function importOrders(prodConnection, localConnection) {
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id"); // Skip auto-increment ID
// Get total count first for progress indication
// Get total count first for progress indication - modified for incremental
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi FORCE INDEX (PRIMARY)
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
`);
AND (o.date_placed_onlydate > ?
OR o.stamp > ?)
`, [lastSyncTime, lastSyncTime]);
const totalOrders = countResult[0].total;
outputProgress({
@@ -81,9 +89,10 @@ async function importOrders(prodConnection, localConnection) {
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
AND (o.date_placed_onlydate > ?
OR o.stamp > ?)
LIMIT ? OFFSET ?
`, [batchSize, offset]);
`, [lastSyncTime, lastSyncTime, batchSize, offset]);
// Check if all products exist before inserting orders
const orderProductPids = [...new Set(orders.map((o) => o.pid))];
@@ -213,6 +222,13 @@ async function importOrders(prodConnection, localConnection) {
}
}
// After successful import, update the sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
outputProgress({
status: "complete",
operation: "Orders import completed",
@@ -225,7 +241,9 @@ async function importOrders(prodConnection, localConnection) {
status: "complete",
totalImported: total,
missingProducts: missingProducts.size,
retriedOrders: skippedOrders.size
retriedOrders: skippedOrders.size,
incrementalUpdate: true,
lastSyncTime
};
} catch (error) {
outputProgress({

View File

@@ -1,6 +1,473 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Utility functions
const imageUrlBase = 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid) => {
const paddedPid = pid.toString().padStart(6, '0');
const basePath = `${imageUrlBase}${paddedPid.slice(0, 3)}/${pid}`;
return {
image: `${basePath}-t-`,
image_175: `${basePath}-175x175-`,
image_full: `${basePath}-o-`
};
};
async function setupTemporaryTables(connection) {
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories (
cat_id INT PRIMARY KEY,
name VARCHAR(255)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images (
pid INT,
iid INT,
image_type ENUM('thumbnail', '175', 'full'),
url VARCHAR(255),
PRIMARY KEY (pid, image_type)
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status (
pid INT PRIMARY KEY,
stock_quantity INT,
pending_qty INT,
preorder_count INT,
notions_inv_count INT
) ENGINE=InnoDB;
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices (
pid INT PRIMARY KEY,
price DECIMAL(10,2),
regular_price DECIMAL(10,2),
cost_price DECIMAL(10,5)
) ENGINE=InnoDB;
INSERT INTO temp_categories
SELECT cat_id, name FROM categories;
CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);
`);
}
async function cleanupTemporaryTables(connection) {
await connection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_categories;
DROP TEMPORARY TABLE IF EXISTS temp_product_images;
DROP TEMPORARY TABLE IF EXISTS temp_inventory_status;
DROP TEMPORARY TABLE IF EXISTS temp_product_prices;
`);
}
async function materializeCalculations(prodConnection, localConnection) {
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching inventory and order data from production"
});
// Get all inventory and order data from production in one query
const [prodInventory] = await prodConnection.query(`
SELECT
p.pid,
COALESCE(si.available_local, 0) as stock_quantity,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(
(
SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodInventory.length} inventory records`
});
// Insert inventory data into local temp table in batches
for (let i = 0; i < prodInventory.length; i += 1000) {
const batch = prodInventory.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity
row.pending_qty,
row.preorder_count,
row.notions_inv_count
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count)
VALUES ?
ON DUPLICATE KEY UPDATE
stock_quantity = VALUES(stock_quantity),
pending_qty = VALUES(pending_qty),
preorder_count = VALUES(preorder_count),
notions_inv_count = VALUES(notions_inv_count)
`, [values]);
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodInventory.length)} of ${prodInventory.length} inventory records`,
current: i + batch.length,
total: prodInventory.length
});
}
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching pricing data from production"
});
// Get prices from production
const [prodPrices] = await prodConnection.query(`
SELECT
p.pid,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE(
(SELECT ROUND(AVG(costeach), 5)
FROM product_inventory
WHERE pid = p.pid
AND COUNT > 0), 0
) AS cost_price
FROM products p
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid
WHERE pcp.active = 1
`);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodPrices.length} price records`
});
// Insert prices into local temp table in batches
for (let i = 0; i < prodPrices.length; i += 1000) {
const batch = prodPrices.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
row.price,
row.regular_price,
row.cost_price
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_product_prices (pid, price, regular_price, cost_price)
VALUES ?
ON DUPLICATE KEY UPDATE
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price)
`, [values]);
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodPrices.length)} of ${prodPrices.length} price records`,
current: i + batch.length,
total: prodPrices.length
});
}
outputProgress({
status: "running",
operation: "Products import",
message: "Finished materializing calculations"
});
}
async function importProducts(prodConnection, localConnection) {
const startTime = Date.now();
try {
// Get column names first
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// Setup temporary tables
await setupTemporaryTables(localConnection);
// Materialize calculations
await materializeCalculations(prodConnection, localConnection);
// Optimized count query for changes since last sync
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM products p
WHERE p.stamp > ?
OR EXISTS (
SELECT 1 FROM product_last_sold pls
WHERE p.pid = pls.pid
AND pls.date_sold > ?
)
OR p.date_created > ?
OR p.datein > ?
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
const totalProducts = countResult[0].total;
// Main product query using materialized data - modified for incremental
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching product data from production"
});
// Create temporary table for production data
await localConnection.query(`
CREATE TEMPORARY TABLE temp_prod_data (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
date_created TIMESTAMP NULL,
first_received TIMESTAMP NULL,
location VARCHAR(50),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(20),
updated_at TIMESTAMP,
visible BOOLEAN,
replenishable BOOLEAN,
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
moq INT,
rating TINYINT UNSIGNED,
reviews INT UNSIGNED,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
total_sold INT UNSIGNED,
country_of_origin VARCHAR(5),
date_last_sold DATE,
category_ids TEXT,
PRIMARY KEY (pid)
) ENGINE=InnoDB
`);
// Get data from production and insert into temp table
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions'
THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE p.stamp > ?
OR pls.date_sold > ?
OR p.date_created > ?
OR p.datein > ?
GROUP BY p.pid
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
// Insert production data in batches
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
const placeholders = batch.map(() => "(?)").join(",");
await localConnection.query(`
INSERT INTO temp_prod_data VALUES ${placeholders}
`, batch.map(row => Object.values(row)));
outputProgress({
status: "running",
operation: "Products import",
message: `Loaded ${Math.min(i + 1000, prodData.length)} of ${prodData.length} products from production`,
current: i + batch.length,
total: prodData.length
});
}
// Now join with local temp tables
const [rows] = await localConnection.query(`
SELECT
p.*,
COALESCE(tis.stock_quantity, 0) as stock_quantity,
COALESCE(tis.preorder_count, 0) as preorder_count,
COALESCE(tis.notions_inv_count, 0) as notions_inv_count,
COALESCE(tpp.price, 0) as price,
COALESCE(tpp.regular_price, 0) as regular_price,
COALESCE(tpp.cost_price, 0) as cost_price
FROM temp_prod_data p
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
`);
// Drop the temporary production data table
await localConnection.query("DROP TEMPORARY TABLE IF EXISTS temp_prod_data");
// Process products in batches
const BATCH_SIZE = 1000;
let processed = 0;
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
const batch = rows.slice(i, i + BATCH_SIZE);
// Add image URLs
batch.forEach(row => {
const urls = getImageUrls(row.pid);
row.image = urls.image;
row.image_175 = urls.image_175;
row.image_full = urls.image_full;
});
// Prepare product values - now using columnNames from above
const productValues = batch.flatMap(row =>
columnNames.map(col => {
const val = row[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
})
);
// MySQL 8.0 optimized insert
const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`;
const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(",");
const insertQuery = `
INSERT INTO products (${columnNames.join(",")})
VALUES ${productPlaceholders}
AS new_products
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== "pid")
.map(col => `${col} = new_products.${col}`)
.join(",")};
`;
await localConnection.query(insertQuery, productValues);
processed += batch.length;
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${processed} of ${rows.length} products`,
current: processed,
total: rows.length
});
}
// After successful import, update the sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('products', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
return {
status: "complete",
totalImported: rows.length,
incrementalUpdate: true,
lastSyncTime
};
} catch (error) {
throw error;
} finally {
// Cleanup temporary tables
await cleanupTemporaryTables(localConnection);
}
}
async function importMissingProducts(prodConnection, localConnection, missingPids) {
try {
// Setup temporary tables
await setupTemporaryTables(localConnection);
// Materialize calculations for missing products
await localConnection.query(`
INSERT INTO temp_inventory_status
WITH product_stock AS (
SELECT oi.prod_pid,
SUM(oi.qty_ordered - oi.qty_placed) as pending_qty
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid IN (?)
AND [rest of conditions]
GROUP BY oi.prod_pid
)
SELECT [same as above]
WHERE p.pid IN (?)
`, [missingPids, missingPids]);
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
@@ -195,314 +662,16 @@ async function importMissingProducts(prodConnection, localConnection, missingPid
}
}
}
}
async function importProducts(prodConnection, localConnection) {
outputProgress({
operation: "Starting products import - Getting schema",
status: "running",
});
const startTime = Date.now();
try {
// First get the column names from the table structure
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map((col) => col.COLUMN_NAME);
// Get total count first for progress indication
outputProgress({
operation: "Starting products import - Getting total count",
status: "running",
});
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM products p
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR pls.date_sold IS NULL
`);
const totalProducts = countResult[0].total;
outputProgress({
operation: `Starting products import - Fetching ${totalProducts} products from production`,
status: "running",
});
// Get products from production with optimized query
const [rows] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0), 0) AS stock_quantity,
ci.onpreorder AS preorder_count,
pnb.inventory AS notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
COALESCE((SELECT ROUND(AVG(costeach), 5)
FROM product_inventory
WHERE pid = p.pid
AND COUNT > 0), 0) AS cost_price,
NULL AS landing_cost_price,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
s.companyname AS vendor,
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-t-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-175x175-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175,
(SELECT CONCAT('https://sbing.com/i/products/0000/',
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
p.pid, '-o-', MIN(PI.iid), '.jpg')
FROM product_images PI
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
NULL AS options,
NULL AS tags,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
NULL AS uom,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
FROM products p
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN (
SELECT pid, MIN(price_each) as price_each
FROM product_current_prices
WHERE active = 1
GROUP BY pid
) pcp ON p.pid = pcp.pid
WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
OR pls.date_sold IS NULL)
GROUP BY p.pid
`);
let current = 0;
const total = rows.length;
const BATCH_SIZE = 1000;
// Process products in batches
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
let batch = rows.slice(i, i + BATCH_SIZE);
// Prepare product values and category relationships in parallel
const productValues = [];
const categoryRelationships = [];
batch.forEach((row) => {
// Map values in the same order as columns
const rowValues = columnNames.map((col) => {
const val = row[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
});
productValues.push(...rowValues);
// Add category relationships
if (row.category_ids) {
const catIds = row.category_ids
.split(",")
.map((id) => id.trim())
.filter((id) => id)
.map(Number);
catIds.forEach((catId) => {
if (catId) categoryRelationships.push([catId, row.pid]);
});
}
});
// Generate placeholders based on column count
const placeholderGroup = `(${Array(columnNames.length)
.fill("?")
.join(",")})`;
const productPlaceholders = Array(batch.length)
.fill(placeholderGroup)
.join(",");
// Build the query dynamically
const insertQuery = `
INSERT INTO products (${columnNames.join(",")})
VALUES ${productPlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")}
`;
// First insert the products and wait for it to complete
await localConnection.query(insertQuery, productValues);
// Now that products are inserted, handle category relationships
if (categoryRelationships.length > 0) {
// Get unique category IDs to verify they exist
const uniqueCatIds = [
...new Set(categoryRelationships.map(([catId]) => catId)),
];
// Check which categories exist
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map((c) => c.cat_id));
// Log missing categories
const missingCatIds = uniqueCatIds.filter(
(id) => !existingCatIds.has(id)
);
if (missingCatIds.length > 0) {
console.error("Missing categories:", missingCatIds);
// Query production to see what these categories are
const [missingCats] = await prodConnection.query(
`
SELECT cat_id, name, type, master_cat_id, hidden
FROM product_categories
WHERE cat_id IN (?)
`,
[missingCatIds]
);
console.error("Missing category details:", missingCats);
console.warn(
"Skipping invalid category relationships - continuing with import"
);
continue;
}
// Verify products exist before inserting relationships
const productIds = [
...new Set(categoryRelationships.map(([_, pid]) => pid)),
];
const [existingProducts] = await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[productIds]
);
const existingProductIds = new Set(existingProducts.map((p) => p.pid));
// Filter relationships to only include existing products
const validRelationships = categoryRelationships.filter(([_, pid]) =>
existingProductIds.has(pid)
);
if (validRelationships.length > 0) {
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`
INSERT INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
`,
validRelationships.flat()
);
}
}
current += batch.length;
outputProgress({
status: "running",
operation: "Products import",
current,
total,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
rate: calculateRate(startTime, current)
});
}
outputProgress({
status: "complete",
operation: "Products import completed",
current: total,
total,
duration: formatElapsedTime(Date.now() - startTime),
});
return {
status: "complete",
totalImported: total
totalImported: products.length
};
} catch (error) {
console.error("Error importing products:", error);
outputProgress({
status: "error",
operation: "Products import failed",
error: error.message
});
throw error;
} finally {
// Cleanup temporary tables
await cleanupTemporaryTables(localConnection);
}
}

View File

@@ -1,14 +1,37 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
async function importPurchaseOrders(prodConnection, localConnection) {
const startTime = Date.now();
let importHistoryId;
try {
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
// Create import history record
const [historyResult] = await localConnection.query(`
INSERT INTO import_history (
table_name,
start_time,
is_incremental,
status
) VALUES (
'purchase_orders',
NOW(),
?,
'running'
)
`, [!!syncInfo?.[0]]);
importHistoryId = historyResult.insertId;
outputProgress({
operation: "Starting purchase orders import - Initializing",
status: "running",
});
const startTime = Date.now();
try {
// Get column names for the insert
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
@@ -20,7 +43,7 @@ async function importPurchaseOrders(prodConnection, localConnection) {
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id");
// First get all relevant PO IDs with basic info - this is much faster than the full join
// First get all relevant PO IDs with basic info - modified for incremental
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM (
@@ -29,14 +52,17 @@ async function importPurchaseOrders(prodConnection, localConnection) {
FORCE INDEX (idx_date_created)
JOIN po_products pop ON p.po_id = pop.po_id
JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
WHERE p.date_ordered > ?
OR p.stamp > ?
OR p.date_modified > ?
UNION
SELECT DISTINCT r.receiving_id as po_id, rp.pid
FROM receivings_products rp
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
WHERE rp.received_date > ?
OR rp.stamp > ?
) all_items
`);
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
const [poList] = await prodConnection.query(`
SELECT DISTINCT
@@ -53,22 +79,27 @@ async function importPurchaseOrders(prodConnection, localConnection) {
COALESCE(p.notes, '') as long_note
FROM (
SELECT po_id FROM po
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
WHERE date_ordered > ?
OR stamp > ?
OR date_modified > ?
UNION
SELECT DISTINCT r.receiving_id as po_id
FROM receivings r
JOIN receivings_products rp ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
WHERE rp.received_date > ?
OR rp.stamp > ?
) ids
LEFT JOIN po p ON ids.po_id = p.po_id
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
ORDER BY po_id
`);
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
const totalItems = total;
let processed = 0;
let recordsAdded = 0;
let recordsUpdated = 0;
const BATCH_SIZE = 5000;
const PROGRESS_INTERVAL = 500;
@@ -249,7 +280,9 @@ async function importPurchaseOrders(prodConnection, localConnection) {
.join(",")};
`;
await localConnection.query(query, values.flat());
const result = await localConnection.query(query, values.flat());
recordsAdded += result.affectedRows - result.changedRows;
recordsUpdated += result.changedRows;
}
processed += batchProcessed;
@@ -271,19 +304,56 @@ async function importPurchaseOrders(prodConnection, localConnection) {
}
}
outputProgress({
status: "complete",
operation: "Purchase orders import completed",
current: totalItems,
total: totalItems,
duration: formatElapsedTime((Date.now() - startTime) / 1000),
});
// After successful import, update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('purchase_orders', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
// Update import history with final stats
const endTime = Date.now();
const durationSeconds = Math.round((endTime - startTime) / 1000);
await localConnection.query(`
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
records_added = ?,
records_updated = ?,
status = 'completed',
additional_info = JSON_OBJECT(
'total_processed', ?,
'last_sync_time', ?,
'next_sync_time', NOW()
)
WHERE id = ?
`, [durationSeconds, recordsAdded, recordsUpdated, totalItems, lastSyncTime, importHistoryId]);
return {
status: "complete",
totalImported: totalItems
totalImported: totalItems,
recordsAdded,
recordsUpdated,
durationSeconds,
incrementalUpdate: !!syncInfo?.[0]
};
} catch (error) {
// Update import history with error
if (importHistoryId) {
await localConnection.query(`
UPDATE import_history
SET
end_time = NOW(),
duration_seconds = ?,
status = 'failed',
error_message = ?
WHERE id = ?
`, [Math.round((Date.now() - startTime) / 1000), error.message, importHistoryId]);
}
outputProgress({
operation: "Purchase orders import failed",
status: "error",

View File

@@ -156,7 +156,7 @@ async function resetDatabase() {
SELECT GROUP_CONCAT(table_name) as tables
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name != 'users'
AND table_name NOT IN ('users', 'import_history')
`);
if (!tables[0].tables) {

View File

@@ -13,7 +13,7 @@ import {
AlertDialogTitle,
AlertDialogTrigger,
} from "@/components/ui/alert-dialog";
import { Loader2, RefreshCw, Upload, X, Database } from "lucide-react";
import { Loader2, RefreshCw, X, Database } from "lucide-react";
import config from '../../config';
import { toast } from "sonner";
@@ -36,11 +36,6 @@ interface ImportProgress {
duration?: string;
}
interface ImportLimits {
products: number;
orders: number;
purchaseOrders: number;
}
export function DataManagement() {
const [isImportingProd, setIsImportingProd] = useState(false);