Compare commits
4 Commits
d60b2d4fae
...
fb5bf4a144
| Author | SHA1 | Date | |
|---|---|---|---|
| fb5bf4a144 | |||
| 4d8a677c5b | |||
| 655c071960 | |||
| d2a2dbc812 |
@@ -169,4 +169,27 @@ ORDER BY
|
||||
ELSE 4
|
||||
END,
|
||||
c.name,
|
||||
st.vendor;
|
||||
st.vendor;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS sync_status (
|
||||
table_name VARCHAR(50) PRIMARY KEY,
|
||||
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_sync_id BIGINT,
|
||||
INDEX idx_last_sync (last_sync_timestamp)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS import_history (
|
||||
id BIGINT AUTO_INCREMENT PRIMARY KEY,
|
||||
table_name VARCHAR(50) NOT NULL,
|
||||
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
end_time TIMESTAMP NULL,
|
||||
duration_seconds INT,
|
||||
records_added INT DEFAULT 0,
|
||||
records_updated INT DEFAULT 0,
|
||||
is_incremental BOOLEAN DEFAULT FALSE,
|
||||
status ENUM('running', 'completed', 'failed', 'cancelled') DEFAULT 'running',
|
||||
error_message TEXT,
|
||||
additional_info JSON,
|
||||
INDEX idx_table_time (table_name, start_time),
|
||||
INDEX idx_status (status)
|
||||
);
|
||||
@@ -12,10 +12,14 @@ dotenv.config({ path: path.join(__dirname, "../.env") });
|
||||
// Constants to control which imports run
|
||||
const IMPORT_CATEGORIES = false;
|
||||
const IMPORT_PRODUCTS = false;
|
||||
const IMPORT_ORDERS = false;
|
||||
const IMPORT_ORDERS = true;
|
||||
const IMPORT_PURCHASE_ORDERS = true;
|
||||
|
||||
// Add flag for incremental updates
|
||||
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE === 'true';
|
||||
|
||||
// SSH configuration
|
||||
// In import-from-prod.js
|
||||
const sshConfig = {
|
||||
ssh: {
|
||||
host: process.env.PROD_SSH_HOST,
|
||||
@@ -24,16 +28,16 @@ const sshConfig = {
|
||||
privateKey: process.env.PROD_SSH_KEY_PATH
|
||||
? require("fs").readFileSync(process.env.PROD_SSH_KEY_PATH)
|
||||
: undefined,
|
||||
compress: true, // Enable SSH compression
|
||||
},
|
||||
// Production database configuration
|
||||
prodDbConfig: {
|
||||
host: process.env.PROD_DB_HOST || "localhost",
|
||||
user: process.env.PROD_DB_USER,
|
||||
password: process.env.PROD_DB_PASSWORD,
|
||||
database: process.env.PROD_DB_NAME,
|
||||
port: process.env.PROD_DB_PORT || 3306,
|
||||
timezone: 'Z',
|
||||
},
|
||||
// Local database configuration
|
||||
localDbConfig: {
|
||||
host: process.env.DB_HOST,
|
||||
user: process.env.DB_USER,
|
||||
@@ -44,6 +48,13 @@ const sshConfig = {
|
||||
connectionLimit: 10,
|
||||
queueLimit: 0,
|
||||
namedPlaceholders: true,
|
||||
maxAllowedPacket: 64 * 1024 * 1024, // 64MB
|
||||
connectTimeout: 60000,
|
||||
enableKeepAlive: true,
|
||||
keepAliveInitialDelay: 10000,
|
||||
compress: true,
|
||||
timezone: 'Z',
|
||||
stringifyObjects: false,
|
||||
}
|
||||
};
|
||||
|
||||
@@ -68,6 +79,7 @@ async function main() {
|
||||
const startTime = Date.now();
|
||||
let connections;
|
||||
let completedSteps = 0;
|
||||
let importHistoryId;
|
||||
const totalSteps = [
|
||||
IMPORT_CATEGORIES,
|
||||
IMPORT_PRODUCTS,
|
||||
@@ -80,7 +92,7 @@ async function main() {
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Import process",
|
||||
message: "Initializing SSH tunnel...",
|
||||
message: `Initializing SSH tunnel for ${INCREMENTAL_UPDATE ? 'incremental' : 'full'} import...`,
|
||||
current: completedSteps,
|
||||
total: totalSteps,
|
||||
elapsed: formatElapsedTime(startTime)
|
||||
@@ -91,6 +103,39 @@ async function main() {
|
||||
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
|
||||
// Initialize sync_status table if it doesn't exist
|
||||
await localConnection.query(`
|
||||
CREATE TABLE IF NOT EXISTS sync_status (
|
||||
table_name VARCHAR(50) PRIMARY KEY,
|
||||
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
last_sync_id BIGINT,
|
||||
INDEX idx_last_sync (last_sync_timestamp)
|
||||
);
|
||||
`);
|
||||
|
||||
// Create import history record for the overall session
|
||||
const [historyResult] = await localConnection.query(`
|
||||
INSERT INTO import_history (
|
||||
table_name,
|
||||
start_time,
|
||||
is_incremental,
|
||||
status,
|
||||
additional_info
|
||||
) VALUES (
|
||||
'all_tables',
|
||||
NOW(),
|
||||
?,
|
||||
'running',
|
||||
JSON_OBJECT(
|
||||
'categories_enabled', ?,
|
||||
'products_enabled', ?,
|
||||
'orders_enabled', ?,
|
||||
'purchase_orders_enabled', ?
|
||||
)
|
||||
)
|
||||
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
|
||||
importHistoryId = historyResult.insertId;
|
||||
|
||||
const results = {
|
||||
categories: null,
|
||||
products: null,
|
||||
@@ -98,37 +143,84 @@ async function main() {
|
||||
purchaseOrders: null
|
||||
};
|
||||
|
||||
let totalRecordsAdded = 0;
|
||||
let totalRecordsUpdated = 0;
|
||||
|
||||
// Run each import based on constants
|
||||
if (IMPORT_CATEGORIES) {
|
||||
results.categories = await importCategories(prodConnection, localConnection);
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
completedSteps++;
|
||||
if (results.categories.recordsAdded) totalRecordsAdded += results.categories.recordsAdded;
|
||||
if (results.categories.recordsUpdated) totalRecordsUpdated += results.categories.recordsUpdated;
|
||||
}
|
||||
|
||||
if (IMPORT_PRODUCTS) {
|
||||
results.products = await importProducts(prodConnection, localConnection);
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
completedSteps++;
|
||||
if (results.products.recordsAdded) totalRecordsAdded += results.products.recordsAdded;
|
||||
if (results.products.recordsUpdated) totalRecordsUpdated += results.products.recordsUpdated;
|
||||
}
|
||||
|
||||
if (IMPORT_ORDERS) {
|
||||
results.orders = await importOrders(prodConnection, localConnection);
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
completedSteps++;
|
||||
if (results.orders.recordsAdded) totalRecordsAdded += results.orders.recordsAdded;
|
||||
if (results.orders.recordsUpdated) totalRecordsUpdated += results.orders.recordsUpdated;
|
||||
}
|
||||
|
||||
if (IMPORT_PURCHASE_ORDERS) {
|
||||
results.purchaseOrders = await importPurchaseOrders(prodConnection, localConnection);
|
||||
if (isImportCancelled) throw new Error("Import cancelled");
|
||||
completedSteps++;
|
||||
if (results.purchaseOrders.recordsAdded) totalRecordsAdded += results.purchaseOrders.recordsAdded;
|
||||
if (results.purchaseOrders.recordsUpdated) totalRecordsUpdated += results.purchaseOrders.recordsUpdated;
|
||||
}
|
||||
|
||||
const endTime = Date.now();
|
||||
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
||||
|
||||
// Update import history with final stats
|
||||
await localConnection.query(`
|
||||
UPDATE import_history
|
||||
SET
|
||||
end_time = NOW(),
|
||||
duration_seconds = ?,
|
||||
records_added = ?,
|
||||
records_updated = ?,
|
||||
status = 'completed',
|
||||
additional_info = JSON_OBJECT(
|
||||
'categories_enabled', ?,
|
||||
'products_enabled', ?,
|
||||
'orders_enabled', ?,
|
||||
'purchase_orders_enabled', ?,
|
||||
'categories_result', CAST(? AS JSON),
|
||||
'products_result', CAST(? AS JSON),
|
||||
'orders_result', CAST(? AS JSON),
|
||||
'purchase_orders_result', CAST(? AS JSON)
|
||||
)
|
||||
WHERE id = ?
|
||||
`, [
|
||||
totalElapsedSeconds,
|
||||
totalRecordsAdded,
|
||||
totalRecordsUpdated,
|
||||
IMPORT_CATEGORIES,
|
||||
IMPORT_PRODUCTS,
|
||||
IMPORT_ORDERS,
|
||||
IMPORT_PURCHASE_ORDERS,
|
||||
JSON.stringify(results.categories),
|
||||
JSON.stringify(results.products),
|
||||
JSON.stringify(results.orders),
|
||||
JSON.stringify(results.purchaseOrders),
|
||||
importHistoryId
|
||||
]);
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
operation: "Import process",
|
||||
message: `All imports completed successfully in ${formatElapsedTime(totalElapsedSeconds)}`,
|
||||
message: `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import completed successfully in ${formatElapsedTime(totalElapsedSeconds)}`,
|
||||
current: completedSteps,
|
||||
total: totalSteps,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
@@ -146,13 +238,27 @@ async function main() {
|
||||
} catch (error) {
|
||||
const endTime = Date.now();
|
||||
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
|
||||
|
||||
// Update import history with error
|
||||
if (importHistoryId) {
|
||||
await connections?.localConnection?.query(`
|
||||
UPDATE import_history
|
||||
SET
|
||||
end_time = NOW(),
|
||||
duration_seconds = ?,
|
||||
status = ?,
|
||||
error_message = ?
|
||||
WHERE id = ?
|
||||
`, [totalElapsedSeconds, error.message === "Import cancelled" ? 'cancelled' : 'failed', error.message, importHistoryId]);
|
||||
}
|
||||
|
||||
console.error("Error during import process:", error);
|
||||
outputProgress({
|
||||
status: error.message === "Import cancelled" ? "cancelled" : "error",
|
||||
operation: "Import process",
|
||||
message: error.message === "Import cancelled"
|
||||
? `Import cancelled by user after ${formatElapsedTime(totalElapsedSeconds)}`
|
||||
: `Import failed after ${formatElapsedTime(totalElapsedSeconds)}`,
|
||||
? `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import cancelled by user after ${formatElapsedTime(totalElapsedSeconds)}`
|
||||
: `${INCREMENTAL_UPDATE ? 'Incremental' : 'Full'} import failed after ${formatElapsedTime(totalElapsedSeconds)}`,
|
||||
error: error.message,
|
||||
current: completedSteps,
|
||||
total: totalSteps,
|
||||
|
||||
@@ -12,6 +12,12 @@ async function importOrders(prodConnection, localConnection) {
|
||||
const missingProducts = new Set(); // Store products that need to be imported
|
||||
|
||||
try {
|
||||
// Get last sync info
|
||||
const [syncInfo] = await localConnection.query(
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
|
||||
// First get the column names from the table structure
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
@@ -24,14 +30,17 @@ async function importOrders(prodConnection, localConnection) {
|
||||
.map((col) => col.COLUMN_NAME)
|
||||
.filter((name) => name !== "id"); // Skip auto-increment ID
|
||||
|
||||
// Get total count first for progress indication
|
||||
// Get total count first for progress indication - modified for incremental
|
||||
const [countResult] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM order_items oi FORCE INDEX (PRIMARY)
|
||||
JOIN _order o FORCE INDEX (PRIMARY) ON oi.order_id = o.order_id
|
||||
WHERE o.order_status >= 15
|
||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
`);
|
||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (o.date_placed_onlydate > ?
|
||||
OR o.stamp > ?)
|
||||
`, [lastSyncTime, lastSyncTime]);
|
||||
|
||||
const totalOrders = countResult[0].total;
|
||||
|
||||
outputProgress({
|
||||
@@ -43,10 +52,11 @@ async function importOrders(prodConnection, localConnection) {
|
||||
let processed = 0;
|
||||
|
||||
// Process in batches
|
||||
const batchSize = 1000;
|
||||
const batchSize = 20000; // Increased from 1000 since order records are small
|
||||
let offset = 0;
|
||||
|
||||
while (offset < total) {
|
||||
// First get orders without tax info
|
||||
const [orders] = await prodConnection.query(`
|
||||
SELECT
|
||||
oi.order_id as order_number,
|
||||
@@ -56,19 +66,7 @@ async function importOrders(prodConnection, localConnection) {
|
||||
oi.prod_price_reg as price,
|
||||
oi.qty_ordered as quantity,
|
||||
(oi.prod_price_reg - oi.prod_price) as discount,
|
||||
(
|
||||
SELECT
|
||||
otp.item_taxes_to_collect
|
||||
FROM
|
||||
order_tax_info oti
|
||||
JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id
|
||||
WHERE
|
||||
oti.order_id = o.order_id
|
||||
AND otp.pid = oi.prod_pid
|
||||
ORDER BY
|
||||
oti.stamp DESC
|
||||
LIMIT 1
|
||||
) as tax,
|
||||
0 as tax,
|
||||
0 as tax_included,
|
||||
ROUND(
|
||||
((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) *
|
||||
@@ -79,11 +77,46 @@ async function importOrders(prodConnection, localConnection) {
|
||||
'pending' as status,
|
||||
CASE WHEN o.order_status = 15 THEN 1 ELSE 0 END as canceled
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
FORCE INDEX (PRIMARY)
|
||||
JOIN _order o USE INDEX (date_placed_onlydate, idx_status)
|
||||
ON oi.order_id = o.order_id
|
||||
WHERE o.order_status >= 15
|
||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (o.date_placed_onlydate > ?
|
||||
OR o.stamp > ?)
|
||||
LIMIT ? OFFSET ?
|
||||
`, [batchSize, offset]);
|
||||
`, [lastSyncTime, lastSyncTime, batchSize, offset]);
|
||||
|
||||
// Then get tax info for these orders
|
||||
if (orders.length > 0) {
|
||||
const orderIds = [...new Set(orders.map(o => o.order_number))];
|
||||
const [taxInfo] = await prodConnection.query(`
|
||||
SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect
|
||||
FROM (
|
||||
SELECT order_id, MAX(stamp) as latest_stamp
|
||||
FROM order_tax_info USE INDEX (order_id, stamp)
|
||||
WHERE order_id IN (?)
|
||||
GROUP BY order_id
|
||||
) latest
|
||||
JOIN order_tax_info oti USE INDEX (order_id, stamp)
|
||||
ON oti.order_id = latest.order_id
|
||||
AND oti.stamp = latest.latest_stamp
|
||||
JOIN order_tax_info_products otp FORCE INDEX (PRIMARY)
|
||||
ON oti.taxinfo_id = otp.taxinfo_id
|
||||
`, [orderIds]);
|
||||
|
||||
// Create a map for quick tax lookup
|
||||
const taxMap = new Map();
|
||||
taxInfo.forEach(t => {
|
||||
taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect);
|
||||
});
|
||||
|
||||
// Add tax info to orders
|
||||
orders.forEach(order => {
|
||||
const taxKey = `${order.order_number}-${order.pid}`;
|
||||
order.tax = taxMap.get(taxKey) || 0;
|
||||
});
|
||||
}
|
||||
|
||||
// Check if all products exist before inserting orders
|
||||
const orderProductPids = [...new Set(orders.map((o) => o.pid))];
|
||||
@@ -165,19 +198,7 @@ async function importOrders(prodConnection, localConnection) {
|
||||
oi.prod_price_reg as price,
|
||||
oi.qty_ordered as quantity,
|
||||
(oi.prod_price_reg - oi.prod_price) as discount,
|
||||
(
|
||||
SELECT
|
||||
otp.item_taxes_to_collect
|
||||
FROM
|
||||
order_tax_info oti
|
||||
JOIN order_tax_info_products otp ON oti.taxinfo_id = otp.taxinfo_id
|
||||
WHERE
|
||||
oti.order_id = o.order_id
|
||||
AND otp.pid = oi.prod_pid
|
||||
ORDER BY
|
||||
oti.stamp DESC
|
||||
LIMIT 1
|
||||
) as tax,
|
||||
0 as tax,
|
||||
0 as tax_included,
|
||||
ROUND(
|
||||
((o.summary_shipping - COALESCE(o.summary_discount_shipping, 0)) *
|
||||
@@ -192,6 +213,36 @@ async function importOrders(prodConnection, localConnection) {
|
||||
WHERE oi.order_id IN (?)
|
||||
`, [[...skippedOrders]]);
|
||||
|
||||
if (retryOrders.length > 0) {
|
||||
const retryOrderIds = [...new Set(retryOrders.map(o => o.order_number))];
|
||||
const [retryTaxInfo] = await prodConnection.query(`
|
||||
SELECT oti.order_id, otp.pid, otp.item_taxes_to_collect
|
||||
FROM (
|
||||
SELECT order_id, MAX(stamp) as latest_stamp
|
||||
FROM order_tax_info USE INDEX (order_id, stamp)
|
||||
WHERE order_id IN (?)
|
||||
GROUP BY order_id
|
||||
) latest
|
||||
JOIN order_tax_info oti USE INDEX (order_id, stamp)
|
||||
ON oti.order_id = latest.order_id
|
||||
AND oti.stamp = latest.latest_stamp
|
||||
JOIN order_tax_info_products otp FORCE INDEX (PRIMARY)
|
||||
ON oti.taxinfo_id = otp.taxinfo_id
|
||||
`, [retryOrderIds]);
|
||||
|
||||
// Create a map for quick tax lookup
|
||||
const taxMap = new Map();
|
||||
retryTaxInfo.forEach(t => {
|
||||
taxMap.set(`${t.order_id}-${t.pid}`, t.item_taxes_to_collect);
|
||||
});
|
||||
|
||||
// Add tax info to orders
|
||||
retryOrders.forEach(order => {
|
||||
const taxKey = `${order.order_number}-${order.pid}`;
|
||||
order.tax = taxMap.get(taxKey) || 0;
|
||||
});
|
||||
}
|
||||
|
||||
const placeholders = retryOrders
|
||||
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
|
||||
.join(",");
|
||||
@@ -213,6 +264,16 @@ async function importOrders(prodConnection, localConnection) {
|
||||
}
|
||||
}
|
||||
|
||||
// After successful import, update the sync status
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('orders', NOW())
|
||||
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
const endTime = Date.now();
|
||||
const durationSeconds = Math.round((endTime - startTime) / 1000);
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
operation: "Orders import completed",
|
||||
@@ -225,7 +286,9 @@ async function importOrders(prodConnection, localConnection) {
|
||||
status: "complete",
|
||||
totalImported: total,
|
||||
missingProducts: missingProducts.size,
|
||||
retriedOrders: skippedOrders.size
|
||||
retriedOrders: skippedOrders.size,
|
||||
incrementalUpdate: true,
|
||||
lastSyncTime
|
||||
};
|
||||
} catch (error) {
|
||||
outputProgress({
|
||||
|
||||
@@ -1,211 +1,473 @@
|
||||
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
|
||||
|
||||
async function importMissingProducts(prodConnection, localConnection, missingPids) {
|
||||
// First get the column names from the table structure
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = 'products'
|
||||
ORDER BY ORDINAL_POSITION
|
||||
// Utility functions
|
||||
const imageUrlBase = 'https://sbing.com/i/products/0000/';
|
||||
const getImageUrls = (pid) => {
|
||||
const paddedPid = pid.toString().padStart(6, '0');
|
||||
const basePath = `${imageUrlBase}${paddedPid.slice(0, 3)}/${pid}`;
|
||||
return {
|
||||
image: `${basePath}-t-`,
|
||||
image_175: `${basePath}-175x175-`,
|
||||
image_full: `${basePath}-o-`
|
||||
};
|
||||
};
|
||||
|
||||
async function setupTemporaryTables(connection) {
|
||||
await connection.query(`
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_categories (
|
||||
cat_id INT PRIMARY KEY,
|
||||
name VARCHAR(255)
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_images (
|
||||
pid INT,
|
||||
iid INT,
|
||||
image_type ENUM('thumbnail', '175', 'full'),
|
||||
url VARCHAR(255),
|
||||
PRIMARY KEY (pid, image_type)
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_inventory_status (
|
||||
pid INT PRIMARY KEY,
|
||||
stock_quantity INT,
|
||||
pending_qty INT,
|
||||
preorder_count INT,
|
||||
notions_inv_count INT
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_prices (
|
||||
pid INT PRIMARY KEY,
|
||||
price DECIMAL(10,2),
|
||||
regular_price DECIMAL(10,2),
|
||||
cost_price DECIMAL(10,5)
|
||||
) ENGINE=InnoDB;
|
||||
|
||||
INSERT INTO temp_categories
|
||||
SELECT cat_id, name FROM categories;
|
||||
|
||||
CREATE INDEX idx_temp_cat_id ON temp_categories(cat_id);
|
||||
`);
|
||||
}
|
||||
|
||||
const columnNames = columns.map((col) => col.COLUMN_NAME);
|
||||
async function cleanupTemporaryTables(connection) {
|
||||
await connection.query(`
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_categories;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_product_images;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_inventory_status;
|
||||
DROP TEMPORARY TABLE IF EXISTS temp_product_prices;
|
||||
`);
|
||||
}
|
||||
|
||||
// Get the missing products from production
|
||||
const [products] = await prodConnection.query(`
|
||||
async function materializeCalculations(prodConnection, localConnection) {
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: "Fetching inventory and order data from production"
|
||||
});
|
||||
|
||||
// Get all inventory and order data from production in one query
|
||||
const [prodInventory] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
COALESCE(si.available_local, 0) as stock_quantity,
|
||||
COALESCE(ci.onpreorder, 0) as preorder_count,
|
||||
COALESCE(pnb.inventory, 0) as notions_inv_count,
|
||||
COALESCE(
|
||||
(
|
||||
SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
WHERE oi.prod_pid = p.pid
|
||||
AND o.date_placed != '0000-00-00 00:00:00'
|
||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
||||
AND oi.pick_finished = 0
|
||||
AND oi.qty_back = 0
|
||||
AND o.order_status != 15
|
||||
AND o.order_status < 90
|
||||
AND oi.qty_ordered >= oi.qty_placed
|
||||
AND oi.qty_ordered > 0
|
||||
), 0
|
||||
) as pending_qty
|
||||
FROM products p
|
||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||
LEFT JOIN current_inventory ci ON p.pid = ci.pid
|
||||
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||
`);
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processing ${prodInventory.length} inventory records`
|
||||
});
|
||||
|
||||
// Insert inventory data into local temp table in batches
|
||||
for (let i = 0; i < prodInventory.length; i += 1000) {
|
||||
const batch = prodInventory.slice(i, i + 1000);
|
||||
const values = batch.map(row => [
|
||||
row.pid,
|
||||
Math.max(0, row.stock_quantity - row.pending_qty), // Calculate final stock quantity
|
||||
row.pending_qty,
|
||||
row.preorder_count,
|
||||
row.notions_inv_count
|
||||
]);
|
||||
|
||||
if (values.length > 0) {
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_inventory_status (pid, stock_quantity, pending_qty, preorder_count, notions_inv_count)
|
||||
VALUES ?
|
||||
ON DUPLICATE KEY UPDATE
|
||||
stock_quantity = VALUES(stock_quantity),
|
||||
pending_qty = VALUES(pending_qty),
|
||||
preorder_count = VALUES(preorder_count),
|
||||
notions_inv_count = VALUES(notions_inv_count)
|
||||
`, [values]);
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processed ${Math.min(i + 1000, prodInventory.length)} of ${prodInventory.length} inventory records`,
|
||||
current: i + batch.length,
|
||||
total: prodInventory.length
|
||||
});
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: "Fetching pricing data from production"
|
||||
});
|
||||
|
||||
// Get prices from production
|
||||
const [prodPrices] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
p.description AS title,
|
||||
p.notes AS description,
|
||||
p.itemnumber AS SKU,
|
||||
p.date_created,
|
||||
p.datein AS first_received,
|
||||
p.location,
|
||||
COALESCE(si.available_local, 0) - COALESCE(
|
||||
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
WHERE oi.prod_pid = p.pid
|
||||
AND o.date_placed != '0000-00-00 00:00:00'
|
||||
AND o.date_shipped = '0000-00-00 00:00:00'
|
||||
AND oi.pick_finished = 0
|
||||
AND oi.qty_back = 0
|
||||
AND o.order_status != 15
|
||||
AND o.order_status < 90
|
||||
AND oi.qty_ordered >= oi.qty_placed
|
||||
AND oi.qty_ordered > 0), 0) AS stock_quantity,
|
||||
ci.onpreorder AS preorder_count,
|
||||
pnb.inventory AS notions_inv_count,
|
||||
COALESCE(pcp.price_each, 0) as price,
|
||||
COALESCE(p.sellingprice, 0) AS regular_price,
|
||||
COALESCE((SELECT ROUND(AVG(costeach), 5)
|
||||
FROM product_inventory
|
||||
WHERE pid = p.pid
|
||||
AND COUNT > 0), 0) AS cost_price,
|
||||
NULL AS landing_cost_price,
|
||||
p.upc AS barcode,
|
||||
p.harmonized_tariff_code,
|
||||
p.stamp AS updated_at,
|
||||
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
|
||||
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
|
||||
s.companyname AS vendor,
|
||||
CASE WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber ELSE sid.supplier_itemnumber END AS vendor_reference,
|
||||
sid.notions_itemnumber AS notions_reference,
|
||||
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
|
||||
(SELECT CONCAT('https://sbing.com/i/products/0000/',
|
||||
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
|
||||
p.pid, '-t-', MIN(PI.iid), '.jpg')
|
||||
FROM product_images PI
|
||||
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image,
|
||||
(SELECT CONCAT('https://sbing.com/i/products/0000/',
|
||||
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
|
||||
p.pid, '-175x175-', MIN(PI.iid), '.jpg')
|
||||
FROM product_images PI
|
||||
WHERE PI.pid = p.pid AND PI.hidden = 0 AND PI.width = 175) AS image_175,
|
||||
(SELECT CONCAT('https://sbing.com/i/products/0000/',
|
||||
SUBSTRING(LPAD(p.pid, 6, '0'), 1, 3), '/',
|
||||
p.pid, '-o-', MIN(PI.iid), '.jpg')
|
||||
FROM product_images PI
|
||||
WHERE PI.pid = p.pid AND PI.hidden = 0) AS image_full,
|
||||
pc1.name AS brand,
|
||||
pc2.name AS line,
|
||||
pc3.name AS subline,
|
||||
pc4.name AS artist,
|
||||
NULL AS options,
|
||||
NULL AS tags,
|
||||
COALESCE(CASE
|
||||
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
|
||||
ELSE sid.supplier_qty_per_unit
|
||||
END, sid.notions_qty_per_unit) AS moq,
|
||||
NULL AS uom,
|
||||
p.rating,
|
||||
p.rating_votes AS reviews,
|
||||
p.weight,
|
||||
p.length,
|
||||
p.width,
|
||||
p.height,
|
||||
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
|
||||
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
|
||||
p.totalsold AS total_sold,
|
||||
p.country_of_origin,
|
||||
pls.date_sold as date_last_sold,
|
||||
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
|
||||
COALESCE(
|
||||
(SELECT ROUND(AVG(costeach), 5)
|
||||
FROM product_inventory
|
||||
WHERE pid = p.pid
|
||||
AND COUNT > 0), 0
|
||||
) AS cost_price
|
||||
FROM products p
|
||||
LEFT JOIN current_inventory ci ON p.pid = ci.pid
|
||||
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
|
||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
|
||||
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
|
||||
LEFT JOIN product_category_index pci ON p.pid = pci.pid
|
||||
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
|
||||
AND pc.type IN (10, 20, 11, 21, 12, 13)
|
||||
AND pci.cat_id NOT IN (16, 17)
|
||||
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
|
||||
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
|
||||
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
|
||||
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
|
||||
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
||||
LEFT JOIN (
|
||||
SELECT pid, MIN(price_each) as price_each
|
||||
FROM product_current_prices
|
||||
WHERE active = 1
|
||||
GROUP BY pid
|
||||
) pcp ON p.pid = pcp.pid
|
||||
WHERE p.pid IN (?)
|
||||
GROUP BY p.pid
|
||||
`, [missingPids]);
|
||||
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid
|
||||
WHERE pcp.active = 1
|
||||
`);
|
||||
|
||||
if (products.length > 0) {
|
||||
// Map values in the same order as columns
|
||||
const productValues = products.flatMap(product =>
|
||||
columnNames.map(col => {
|
||||
const val = product[col] ?? null;
|
||||
if (col === "managing_stock") return 1;
|
||||
if (typeof val === "number") return val || 0;
|
||||
return val;
|
||||
})
|
||||
);
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processing ${prodPrices.length} price records`
|
||||
});
|
||||
|
||||
// Generate placeholders for all products
|
||||
const placeholders = products
|
||||
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
|
||||
.join(",");
|
||||
// Insert prices into local temp table in batches
|
||||
for (let i = 0; i < prodPrices.length; i += 1000) {
|
||||
const batch = prodPrices.slice(i, i + 1000);
|
||||
const values = batch.map(row => [
|
||||
row.pid,
|
||||
row.price,
|
||||
row.regular_price,
|
||||
row.cost_price
|
||||
]);
|
||||
|
||||
// Build and execute the query
|
||||
const query = `
|
||||
INSERT INTO products (${columnNames.join(",")})
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE ${columnNames
|
||||
.filter((col) => col !== "pid")
|
||||
.map((col) => `${col} = VALUES(${col})`)
|
||||
.join(",")}
|
||||
`;
|
||||
|
||||
await localConnection.query(query, productValues);
|
||||
|
||||
// Verify products were inserted before proceeding with categories
|
||||
const [insertedProducts] = await localConnection.query(
|
||||
"SELECT pid FROM products WHERE pid IN (?)",
|
||||
[products.map(p => p.pid)]
|
||||
);
|
||||
const insertedPids = new Set(insertedProducts.map(p => p.pid));
|
||||
|
||||
// Handle category relationships if any
|
||||
const categoryRelationships = [];
|
||||
products.forEach(product => {
|
||||
// Only add category relationships for products that were successfully inserted
|
||||
if (insertedPids.has(product.pid) && product.category_ids) {
|
||||
const catIds = product.category_ids
|
||||
.split(",")
|
||||
.map(id => id.trim())
|
||||
.filter(id => id)
|
||||
.map(Number);
|
||||
catIds.forEach(catId => {
|
||||
if (catId) categoryRelationships.push([catId, product.pid]);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
if (categoryRelationships.length > 0) {
|
||||
// Verify categories exist before inserting relationships
|
||||
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
|
||||
const [existingCats] = await localConnection.query(
|
||||
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
|
||||
[uniqueCatIds]
|
||||
);
|
||||
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
|
||||
|
||||
// Filter relationships to only include existing categories
|
||||
const validRelationships = categoryRelationships.filter(([catId]) =>
|
||||
existingCatIds.has(catId)
|
||||
);
|
||||
|
||||
if (validRelationships.length > 0) {
|
||||
const catPlaceholders = validRelationships
|
||||
.map(() => "(?, ?)")
|
||||
.join(",");
|
||||
await localConnection.query(
|
||||
`
|
||||
INSERT INTO product_categories (cat_id, pid)
|
||||
VALUES ${catPlaceholders}
|
||||
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||
`,
|
||||
validRelationships.flat()
|
||||
);
|
||||
}
|
||||
if (values.length > 0) {
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_product_prices (pid, price, regular_price, cost_price)
|
||||
VALUES ?
|
||||
ON DUPLICATE KEY UPDATE
|
||||
price = VALUES(price),
|
||||
regular_price = VALUES(regular_price),
|
||||
cost_price = VALUES(cost_price)
|
||||
`, [values]);
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processed ${Math.min(i + 1000, prodPrices.length)} of ${prodPrices.length} price records`,
|
||||
current: i + batch.length,
|
||||
total: prodPrices.length
|
||||
});
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: "Finished materializing calculations"
|
||||
});
|
||||
}
|
||||
|
||||
async function importProducts(prodConnection, localConnection) {
|
||||
outputProgress({
|
||||
operation: "Starting products import - Getting schema",
|
||||
status: "running",
|
||||
});
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
// Get column names first
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
FROM INFORMATION_SCHEMA.COLUMNS
|
||||
WHERE TABLE_NAME = 'products'
|
||||
ORDER BY ORDINAL_POSITION
|
||||
`);
|
||||
const columnNames = columns.map(col => col.COLUMN_NAME);
|
||||
|
||||
// Get last sync info
|
||||
const [syncInfo] = await localConnection.query(
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
|
||||
// Setup temporary tables
|
||||
await setupTemporaryTables(localConnection);
|
||||
|
||||
// Materialize calculations
|
||||
await materializeCalculations(prodConnection, localConnection);
|
||||
|
||||
// Optimized count query for changes since last sync
|
||||
const [countResult] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM products p
|
||||
WHERE p.stamp > ?
|
||||
OR EXISTS (
|
||||
SELECT 1 FROM product_last_sold pls
|
||||
WHERE p.pid = pls.pid
|
||||
AND pls.date_sold > ?
|
||||
)
|
||||
OR p.date_created > ?
|
||||
OR p.datein > ?
|
||||
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
|
||||
|
||||
const totalProducts = countResult[0].total;
|
||||
|
||||
// Main product query using materialized data - modified for incremental
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: "Fetching product data from production"
|
||||
});
|
||||
|
||||
// Create temporary table for production data
|
||||
await localConnection.query(`
|
||||
CREATE TEMPORARY TABLE temp_prod_data (
|
||||
pid BIGINT NOT NULL,
|
||||
title VARCHAR(255),
|
||||
description TEXT,
|
||||
SKU VARCHAR(50),
|
||||
date_created TIMESTAMP NULL,
|
||||
first_received TIMESTAMP NULL,
|
||||
location VARCHAR(50),
|
||||
barcode VARCHAR(50),
|
||||
harmonized_tariff_code VARCHAR(20),
|
||||
updated_at TIMESTAMP,
|
||||
visible BOOLEAN,
|
||||
replenishable BOOLEAN,
|
||||
vendor VARCHAR(100),
|
||||
vendor_reference VARCHAR(100),
|
||||
notions_reference VARCHAR(100),
|
||||
brand VARCHAR(100),
|
||||
line VARCHAR(100),
|
||||
subline VARCHAR(100),
|
||||
artist VARCHAR(100),
|
||||
moq INT,
|
||||
rating TINYINT UNSIGNED,
|
||||
reviews INT UNSIGNED,
|
||||
weight DECIMAL(10,3),
|
||||
length DECIMAL(10,3),
|
||||
width DECIMAL(10,3),
|
||||
height DECIMAL(10,3),
|
||||
total_sold INT UNSIGNED,
|
||||
country_of_origin VARCHAR(5),
|
||||
date_last_sold DATE,
|
||||
category_ids TEXT,
|
||||
PRIMARY KEY (pid)
|
||||
) ENGINE=InnoDB
|
||||
`);
|
||||
|
||||
// Get data from production and insert into temp table
|
||||
const [prodData] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
p.description AS title,
|
||||
p.notes AS description,
|
||||
p.itemnumber AS SKU,
|
||||
p.date_created,
|
||||
p.datein AS first_received,
|
||||
p.location,
|
||||
p.upc AS barcode,
|
||||
p.harmonized_tariff_code,
|
||||
p.stamp AS updated_at,
|
||||
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
|
||||
CASE WHEN p.reorder >= 0 THEN 1 ELSE 0 END AS replenishable,
|
||||
s.companyname AS vendor,
|
||||
CASE WHEN s.companyname = 'Notions'
|
||||
THEN sid.notions_itemnumber
|
||||
ELSE sid.supplier_itemnumber
|
||||
END AS vendor_reference,
|
||||
sid.notions_itemnumber AS notions_reference,
|
||||
pc1.name AS brand,
|
||||
pc2.name AS line,
|
||||
pc3.name AS subline,
|
||||
pc4.name AS artist,
|
||||
COALESCE(CASE
|
||||
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
|
||||
ELSE sid.supplier_qty_per_unit
|
||||
END, sid.notions_qty_per_unit) AS moq,
|
||||
p.rating,
|
||||
p.rating_votes AS reviews,
|
||||
p.weight,
|
||||
p.length,
|
||||
p.width,
|
||||
p.height,
|
||||
p.totalsold AS total_sold,
|
||||
p.country_of_origin,
|
||||
pls.date_sold as date_last_sold,
|
||||
GROUP_CONCAT(DISTINCT pci.cat_id) as category_ids
|
||||
FROM products p
|
||||
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
|
||||
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
|
||||
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
|
||||
LEFT JOIN product_category_index pci ON p.pid = pci.pid
|
||||
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
|
||||
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
|
||||
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
|
||||
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
|
||||
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
||||
WHERE p.stamp > ?
|
||||
OR pls.date_sold > ?
|
||||
OR p.date_created > ?
|
||||
OR p.datein > ?
|
||||
GROUP BY p.pid
|
||||
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
|
||||
|
||||
// Insert production data in batches
|
||||
for (let i = 0; i < prodData.length; i += 1000) {
|
||||
const batch = prodData.slice(i, i + 1000);
|
||||
const placeholders = batch.map(() => "(?)").join(",");
|
||||
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_prod_data VALUES ${placeholders}
|
||||
`, batch.map(row => Object.values(row)));
|
||||
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Loaded ${Math.min(i + 1000, prodData.length)} of ${prodData.length} products from production`,
|
||||
current: i + batch.length,
|
||||
total: prodData.length
|
||||
});
|
||||
}
|
||||
|
||||
// Now join with local temp tables
|
||||
const [rows] = await localConnection.query(`
|
||||
SELECT
|
||||
p.*,
|
||||
COALESCE(tis.stock_quantity, 0) as stock_quantity,
|
||||
COALESCE(tis.preorder_count, 0) as preorder_count,
|
||||
COALESCE(tis.notions_inv_count, 0) as notions_inv_count,
|
||||
COALESCE(tpp.price, 0) as price,
|
||||
COALESCE(tpp.regular_price, 0) as regular_price,
|
||||
COALESCE(tpp.cost_price, 0) as cost_price
|
||||
FROM temp_prod_data p
|
||||
LEFT JOIN temp_inventory_status tis ON p.pid = tis.pid
|
||||
LEFT JOIN temp_product_prices tpp ON p.pid = tpp.pid
|
||||
`);
|
||||
|
||||
// Drop the temporary production data table
|
||||
await localConnection.query("DROP TEMPORARY TABLE IF EXISTS temp_prod_data");
|
||||
|
||||
// Process products in batches
|
||||
const BATCH_SIZE = 1000;
|
||||
let processed = 0;
|
||||
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
||||
const batch = rows.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Add image URLs
|
||||
batch.forEach(row => {
|
||||
const urls = getImageUrls(row.pid);
|
||||
row.image = urls.image;
|
||||
row.image_175 = urls.image_175;
|
||||
row.image_full = urls.image_full;
|
||||
});
|
||||
|
||||
// Prepare product values - now using columnNames from above
|
||||
const productValues = batch.flatMap(row =>
|
||||
columnNames.map(col => {
|
||||
const val = row[col] ?? null;
|
||||
if (col === "managing_stock") return 1;
|
||||
if (typeof val === "number") return val || 0;
|
||||
return val;
|
||||
})
|
||||
);
|
||||
|
||||
// MySQL 8.0 optimized insert
|
||||
const placeholderGroup = `(${Array(columnNames.length).fill("?").join(",")})`;
|
||||
const productPlaceholders = Array(batch.length).fill(placeholderGroup).join(",");
|
||||
|
||||
const insertQuery = `
|
||||
INSERT INTO products (${columnNames.join(",")})
|
||||
VALUES ${productPlaceholders}
|
||||
AS new_products
|
||||
ON DUPLICATE KEY UPDATE
|
||||
${columnNames
|
||||
.filter(col => col !== "pid")
|
||||
.map(col => `${col} = new_products.${col}`)
|
||||
.join(",")};
|
||||
`;
|
||||
|
||||
await localConnection.query(insertQuery, productValues);
|
||||
|
||||
processed += batch.length;
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
message: `Processed ${processed} of ${rows.length} products`,
|
||||
current: processed,
|
||||
total: rows.length
|
||||
});
|
||||
}
|
||||
|
||||
// After successful import, update the sync status
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('products', NOW())
|
||||
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
return {
|
||||
status: "complete",
|
||||
totalImported: rows.length,
|
||||
incrementalUpdate: true,
|
||||
lastSyncTime
|
||||
};
|
||||
} catch (error) {
|
||||
throw error;
|
||||
} finally {
|
||||
// Cleanup temporary tables
|
||||
await cleanupTemporaryTables(localConnection);
|
||||
}
|
||||
}
|
||||
|
||||
async function importMissingProducts(prodConnection, localConnection, missingPids) {
|
||||
try {
|
||||
// Setup temporary tables
|
||||
await setupTemporaryTables(localConnection);
|
||||
|
||||
// Materialize calculations for missing products
|
||||
await localConnection.query(`
|
||||
INSERT INTO temp_inventory_status
|
||||
WITH product_stock AS (
|
||||
SELECT oi.prod_pid,
|
||||
SUM(oi.qty_ordered - oi.qty_placed) as pending_qty
|
||||
FROM order_items oi
|
||||
JOIN _order o ON oi.order_id = o.order_id
|
||||
WHERE oi.prod_pid IN (?)
|
||||
AND [rest of conditions]
|
||||
GROUP BY oi.prod_pid
|
||||
)
|
||||
SELECT [same as above]
|
||||
WHERE p.pid IN (?)
|
||||
`, [missingPids, missingPids]);
|
||||
|
||||
// First get the column names from the table structure
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
@@ -216,30 +478,8 @@ async function importProducts(prodConnection, localConnection) {
|
||||
|
||||
const columnNames = columns.map((col) => col.COLUMN_NAME);
|
||||
|
||||
// Get total count first for progress indication
|
||||
outputProgress({
|
||||
operation: "Starting products import - Getting total count",
|
||||
status: "running",
|
||||
});
|
||||
|
||||
const [countResult] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM products p
|
||||
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
|
||||
WHERE pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR pls.date_sold IS NULL
|
||||
`);
|
||||
const totalProducts = countResult[0].total;
|
||||
|
||||
outputProgress({
|
||||
operation: `Starting products import - Fetching ${totalProducts} products from production`,
|
||||
status: "running",
|
||||
});
|
||||
|
||||
// Get products from production with optimized query
|
||||
const [rows] = await prodConnection.query(`
|
||||
// Get the missing products from production
|
||||
const [products] = await prodConnection.query(`
|
||||
SELECT
|
||||
p.pid,
|
||||
p.description AS title,
|
||||
@@ -338,120 +578,73 @@ async function importProducts(prodConnection, localConnection) {
|
||||
WHERE active = 1
|
||||
GROUP BY pid
|
||||
) pcp ON p.pid = pcp.pid
|
||||
WHERE (pls.date_sold >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.date_created >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR p.datein >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
OR pls.date_sold IS NULL)
|
||||
WHERE p.pid IN (?)
|
||||
GROUP BY p.pid
|
||||
`);
|
||||
`, [missingPids]);
|
||||
|
||||
let current = 0;
|
||||
const total = rows.length;
|
||||
const BATCH_SIZE = 1000;
|
||||
|
||||
// Process products in batches
|
||||
for (let i = 0; i < rows.length; i += BATCH_SIZE) {
|
||||
let batch = rows.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Prepare product values and category relationships in parallel
|
||||
const productValues = [];
|
||||
const categoryRelationships = [];
|
||||
|
||||
batch.forEach((row) => {
|
||||
// Map values in the same order as columns
|
||||
const rowValues = columnNames.map((col) => {
|
||||
const val = row[col] ?? null;
|
||||
if (products.length > 0) {
|
||||
// Map values in the same order as columns
|
||||
const productValues = products.flatMap(product =>
|
||||
columnNames.map(col => {
|
||||
const val = product[col] ?? null;
|
||||
if (col === "managing_stock") return 1;
|
||||
if (typeof val === "number") return val || 0;
|
||||
return val;
|
||||
});
|
||||
productValues.push(...rowValues);
|
||||
})
|
||||
);
|
||||
|
||||
// Add category relationships
|
||||
if (row.category_ids) {
|
||||
const catIds = row.category_ids
|
||||
.split(",")
|
||||
.map((id) => id.trim())
|
||||
.filter((id) => id)
|
||||
.map(Number);
|
||||
catIds.forEach((catId) => {
|
||||
if (catId) categoryRelationships.push([catId, row.pid]);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Generate placeholders based on column count
|
||||
const placeholderGroup = `(${Array(columnNames.length)
|
||||
.fill("?")
|
||||
.join(",")})`;
|
||||
const productPlaceholders = Array(batch.length)
|
||||
.fill(placeholderGroup)
|
||||
// Generate placeholders for all products
|
||||
const placeholders = products
|
||||
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
|
||||
.join(",");
|
||||
|
||||
// Build the query dynamically
|
||||
const insertQuery = `
|
||||
// Build and execute the query
|
||||
const query = `
|
||||
INSERT INTO products (${columnNames.join(",")})
|
||||
VALUES ${productPlaceholders}
|
||||
VALUES ${placeholders}
|
||||
ON DUPLICATE KEY UPDATE ${columnNames
|
||||
.filter((col) => col !== "pid")
|
||||
.map((col) => `${col} = VALUES(${col})`)
|
||||
.join(",")}
|
||||
`;
|
||||
|
||||
// First insert the products and wait for it to complete
|
||||
await localConnection.query(insertQuery, productValues);
|
||||
await localConnection.query(query, productValues);
|
||||
|
||||
// Verify products were inserted before proceeding with categories
|
||||
const [insertedProducts] = await localConnection.query(
|
||||
"SELECT pid FROM products WHERE pid IN (?)",
|
||||
[products.map(p => p.pid)]
|
||||
);
|
||||
const insertedPids = new Set(insertedProducts.map(p => p.pid));
|
||||
|
||||
// Handle category relationships if any
|
||||
const categoryRelationships = [];
|
||||
products.forEach(product => {
|
||||
// Only add category relationships for products that were successfully inserted
|
||||
if (insertedPids.has(product.pid) && product.category_ids) {
|
||||
const catIds = product.category_ids
|
||||
.split(",")
|
||||
.map(id => id.trim())
|
||||
.filter(id => id)
|
||||
.map(Number);
|
||||
catIds.forEach(catId => {
|
||||
if (catId) categoryRelationships.push([catId, product.pid]);
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
// Now that products are inserted, handle category relationships
|
||||
if (categoryRelationships.length > 0) {
|
||||
// Get unique category IDs to verify they exist
|
||||
const uniqueCatIds = [
|
||||
...new Set(categoryRelationships.map(([catId]) => catId)),
|
||||
];
|
||||
|
||||
// Check which categories exist
|
||||
// Verify categories exist before inserting relationships
|
||||
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
|
||||
const [existingCats] = await localConnection.query(
|
||||
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
|
||||
[uniqueCatIds]
|
||||
);
|
||||
const existingCatIds = new Set(existingCats.map((c) => c.cat_id));
|
||||
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
|
||||
|
||||
// Log missing categories
|
||||
const missingCatIds = uniqueCatIds.filter(
|
||||
(id) => !existingCatIds.has(id)
|
||||
);
|
||||
if (missingCatIds.length > 0) {
|
||||
console.error("Missing categories:", missingCatIds);
|
||||
|
||||
// Query production to see what these categories are
|
||||
const [missingCats] = await prodConnection.query(
|
||||
`
|
||||
SELECT cat_id, name, type, master_cat_id, hidden
|
||||
FROM product_categories
|
||||
WHERE cat_id IN (?)
|
||||
`,
|
||||
[missingCatIds]
|
||||
);
|
||||
|
||||
console.error("Missing category details:", missingCats);
|
||||
console.warn(
|
||||
"Skipping invalid category relationships - continuing with import"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Verify products exist before inserting relationships
|
||||
const productIds = [
|
||||
...new Set(categoryRelationships.map(([_, pid]) => pid)),
|
||||
];
|
||||
const [existingProducts] = await localConnection.query(
|
||||
"SELECT pid FROM products WHERE pid IN (?)",
|
||||
[productIds]
|
||||
);
|
||||
const existingProductIds = new Set(existingProducts.map((p) => p.pid));
|
||||
|
||||
// Filter relationships to only include existing products
|
||||
const validRelationships = categoryRelationships.filter(([_, pid]) =>
|
||||
existingProductIds.has(pid)
|
||||
// Filter relationships to only include existing categories
|
||||
const validRelationships = categoryRelationships.filter(([catId]) =>
|
||||
existingCatIds.has(catId)
|
||||
);
|
||||
|
||||
if (validRelationships.length > 0) {
|
||||
@@ -460,49 +653,25 @@ async function importProducts(prodConnection, localConnection) {
|
||||
.join(",");
|
||||
await localConnection.query(
|
||||
`
|
||||
INSERT INTO product_categories (cat_id, pid)
|
||||
VALUES ${catPlaceholders}
|
||||
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||
`,
|
||||
INSERT INTO product_categories (cat_id, pid)
|
||||
VALUES ${catPlaceholders}
|
||||
ON DUPLICATE KEY UPDATE cat_id = VALUES(cat_id)
|
||||
`,
|
||||
validRelationships.flat()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
current += batch.length;
|
||||
outputProgress({
|
||||
status: "running",
|
||||
operation: "Products import",
|
||||
current,
|
||||
total,
|
||||
elapsed: formatElapsedTime(startTime),
|
||||
remaining: estimateRemaining(startTime, current, total),
|
||||
rate: calculateRate(startTime, current)
|
||||
});
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
operation: "Products import completed",
|
||||
current: total,
|
||||
total,
|
||||
duration: formatElapsedTime(Date.now() - startTime),
|
||||
});
|
||||
|
||||
return {
|
||||
status: "complete",
|
||||
totalImported: total
|
||||
totalImported: products.length
|
||||
};
|
||||
} catch (error) {
|
||||
console.error("Error importing products:", error);
|
||||
|
||||
outputProgress({
|
||||
status: "error",
|
||||
operation: "Products import failed",
|
||||
error: error.message
|
||||
});
|
||||
|
||||
throw error;
|
||||
} finally {
|
||||
// Cleanup temporary tables
|
||||
await cleanupTemporaryTables(localConnection);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,14 +1,20 @@
|
||||
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
|
||||
|
||||
async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
outputProgress({
|
||||
operation: "Starting purchase orders import - Initializing",
|
||||
status: "running",
|
||||
});
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
try {
|
||||
// Get last sync info
|
||||
const [syncInfo] = await localConnection.query(
|
||||
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
|
||||
);
|
||||
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
|
||||
|
||||
outputProgress({
|
||||
operation: "Starting purchase orders import - Initializing",
|
||||
status: "running",
|
||||
});
|
||||
|
||||
// Get column names for the insert
|
||||
const [columns] = await localConnection.query(`
|
||||
SELECT COLUMN_NAME
|
||||
@@ -20,7 +26,7 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
.map((col) => col.COLUMN_NAME)
|
||||
.filter((name) => name !== "id");
|
||||
|
||||
// First get all relevant PO IDs with basic info - this is much faster than the full join
|
||||
// First get all relevant PO IDs with basic info - modified for incremental
|
||||
const [[{ total }]] = await prodConnection.query(`
|
||||
SELECT COUNT(*) as total
|
||||
FROM (
|
||||
@@ -29,14 +35,19 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
FORCE INDEX (idx_date_created)
|
||||
JOIN po_products pop ON p.po_id = pop.po_id
|
||||
JOIN suppliers s ON p.supplier_id = s.supplierid
|
||||
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (p.date_ordered > ?
|
||||
OR p.stamp > ?
|
||||
OR p.date_modified > ?)
|
||||
UNION
|
||||
SELECT DISTINCT r.receiving_id as po_id, rp.pid
|
||||
FROM receivings_products rp
|
||||
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (rp.received_date > ?
|
||||
OR rp.stamp > ?)
|
||||
) all_items
|
||||
`);
|
||||
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
|
||||
|
||||
const [poList] = await prodConnection.query(`
|
||||
SELECT DISTINCT
|
||||
@@ -53,22 +64,29 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
COALESCE(p.notes, '') as long_note
|
||||
FROM (
|
||||
SELECT po_id FROM po
|
||||
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (date_ordered > ?
|
||||
OR stamp > ?
|
||||
OR date_modified > ?)
|
||||
UNION
|
||||
SELECT DISTINCT r.receiving_id as po_id
|
||||
FROM receivings r
|
||||
JOIN receivings_products rp ON r.receiving_id = rp.receiving_id
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 2 YEAR)
|
||||
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
|
||||
AND (rp.received_date > ?
|
||||
OR rp.stamp > ?)
|
||||
) ids
|
||||
LEFT JOIN po p ON ids.po_id = p.po_id
|
||||
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
|
||||
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
|
||||
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
|
||||
ORDER BY po_id
|
||||
`);
|
||||
`, [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]);
|
||||
|
||||
const totalItems = total;
|
||||
let processed = 0;
|
||||
let recordsAdded = 0;
|
||||
let recordsUpdated = 0;
|
||||
|
||||
const BATCH_SIZE = 5000;
|
||||
const PROGRESS_INTERVAL = 500;
|
||||
@@ -249,7 +267,9 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
.join(",")};
|
||||
`;
|
||||
|
||||
await localConnection.query(query, values.flat());
|
||||
const result = await localConnection.query(query, values.flat());
|
||||
recordsAdded += result.affectedRows - result.changedRows;
|
||||
recordsUpdated += result.changedRows;
|
||||
}
|
||||
|
||||
processed += batchProcessed;
|
||||
@@ -271,18 +291,21 @@ async function importPurchaseOrders(prodConnection, localConnection) {
|
||||
}
|
||||
}
|
||||
|
||||
outputProgress({
|
||||
status: "complete",
|
||||
operation: "Purchase orders import completed",
|
||||
current: totalItems,
|
||||
total: totalItems,
|
||||
duration: formatElapsedTime((Date.now() - startTime) / 1000),
|
||||
});
|
||||
// After successful import, update sync status
|
||||
await localConnection.query(`
|
||||
INSERT INTO sync_status (table_name, last_sync_timestamp)
|
||||
VALUES ('purchase_orders', NOW())
|
||||
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
|
||||
`);
|
||||
|
||||
return {
|
||||
status: "complete",
|
||||
totalImported: totalItems
|
||||
totalImported: totalItems,
|
||||
recordsAdded,
|
||||
recordsUpdated,
|
||||
incrementalUpdate: !!syncInfo?.[0]
|
||||
};
|
||||
|
||||
} catch (error) {
|
||||
outputProgress({
|
||||
operation: "Purchase orders import failed",
|
||||
|
||||
@@ -156,7 +156,7 @@ async function resetDatabase() {
|
||||
SELECT GROUP_CONCAT(table_name) as tables
|
||||
FROM information_schema.tables
|
||||
WHERE table_schema = DATABASE()
|
||||
AND table_name != 'users'
|
||||
AND table_name NOT IN ('users', 'import_history')
|
||||
`);
|
||||
|
||||
if (!tables[0].tables) {
|
||||
|
||||
@@ -13,7 +13,7 @@ import {
|
||||
AlertDialogTitle,
|
||||
AlertDialogTrigger,
|
||||
} from "@/components/ui/alert-dialog";
|
||||
import { Loader2, RefreshCw, Upload, X, Database } from "lucide-react";
|
||||
import { Loader2, RefreshCw, X, Database } from "lucide-react";
|
||||
import config from '../../config';
|
||||
import { toast } from "sonner";
|
||||
|
||||
@@ -36,11 +36,6 @@ interface ImportProgress {
|
||||
duration?: string;
|
||||
}
|
||||
|
||||
interface ImportLimits {
|
||||
products: number;
|
||||
orders: number;
|
||||
purchaseOrders: number;
|
||||
}
|
||||
|
||||
export function DataManagement() {
|
||||
const [isImportingProd, setIsImportingProd] = useState(false);
|
||||
|
||||
Reference in New Issue
Block a user