Merge branch 'Improve-data-import'

This commit is contained in:
2025-02-01 14:09:34 -05:00
27 changed files with 3751 additions and 2053 deletions

1
.gitignore vendored
View File

@@ -58,3 +58,4 @@ csv/**/*
**/csv/**/*
!csv/.gitkeep
inventory/tsconfig.tsbuildinfo
inventory-server/scripts/.fuse_hidden00000fa20000000a

View File

@@ -88,6 +88,16 @@ CREATE TABLE IF NOT EXISTS turnover_config (
UNIQUE KEY unique_category_vendor (category_id, vendor)
);
-- Create table for sales seasonality factors
CREATE TABLE IF NOT EXISTS sales_seasonality (
month INT NOT NULL,
seasonality_factor DECIMAL(5,3) DEFAULT 0,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (month),
CHECK (month BETWEEN 1 AND 12),
CHECK (seasonality_factor BETWEEN -1.0 AND 1.0)
);
-- Insert default global thresholds if not exists
INSERT INTO stock_thresholds (id, category_id, vendor, critical_days, reorder_days, overstock_days)
VALUES (1, NULL, NULL, 7, 14, 90)
@@ -129,6 +139,13 @@ ON DUPLICATE KEY UPDATE
calculation_period_days = VALUES(calculation_period_days),
target_rate = VALUES(target_rate);
-- Insert default seasonality factors (neutral)
INSERT INTO sales_seasonality (month, seasonality_factor)
VALUES
(1, 0), (2, 0), (3, 0), (4, 0), (5, 0), (6, 0),
(7, 0), (8, 0), (9, 0), (10, 0), (11, 0), (12, 0)
ON DUPLICATE KEY UPDATE last_updated = CURRENT_TIMESTAMP;
-- View to show thresholds with category names
CREATE OR REPLACE VIEW stock_thresholds_view AS
SELECT
@@ -153,3 +170,27 @@ ORDER BY
END,
c.name,
st.vendor;
CREATE TABLE IF NOT EXISTS sync_status (
table_name VARCHAR(50) PRIMARY KEY,
last_sync_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT,
INDEX idx_last_sync (last_sync_timestamp)
);
CREATE TABLE IF NOT EXISTS import_history (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(50) NOT NULL,
start_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_time TIMESTAMP NULL,
duration_seconds INT,
duration_minutes DECIMAL(10,2) GENERATED ALWAYS AS (duration_seconds / 60.0) STORED,
records_added INT DEFAULT 0,
records_updated INT DEFAULT 0,
is_incremental BOOLEAN DEFAULT FALSE,
status ENUM('running', 'completed', 'failed', 'cancelled') DEFAULT 'running',
error_message TEXT,
additional_info JSON,
INDEX idx_table_time (table_name, start_time),
INDEX idx_status (status)
);

View File

@@ -287,26 +287,6 @@ CREATE TABLE IF NOT EXISTS category_forecasts (
INDEX idx_category_forecast_last_calculated (last_calculated_at)
);
-- Create table for sales seasonality factors
CREATE TABLE IF NOT EXISTS sales_seasonality (
month INT NOT NULL,
seasonality_factor DECIMAL(5,3) DEFAULT 0,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (month),
CHECK (month BETWEEN 1 AND 12),
CHECK (seasonality_factor BETWEEN -1.0 AND 1.0)
);
-- Insert default seasonality factors (neutral)
INSERT INTO sales_seasonality (month, seasonality_factor)
VALUES
(1, 0), (2, 0), (3, 0), (4, 0), (5, 0), (6, 0),
(7, 0), (8, 0), (9, 0), (10, 0), (11, 0), (12, 0)
ON DUPLICATE KEY UPDATE last_updated = CURRENT_TIMESTAMP;
-- Re-enable foreign key checks
SET FOREIGN_KEY_CHECKS = 1;
-- Create view for inventory health
CREATE OR REPLACE VIEW inventory_health AS
WITH product_thresholds AS (
@@ -428,3 +408,23 @@ LEFT JOIN
categories p ON c.parent_id = p.cat_id
LEFT JOIN
category_metrics cm ON c.cat_id = cm.category_id;
-- Re-enable foreign key checks
SET FOREIGN_KEY_CHECKS = 1;
-- Create table for sales seasonality factors
CREATE TABLE IF NOT EXISTS sales_seasonality (
month INT NOT NULL,
seasonality_factor DECIMAL(5,3) DEFAULT 0,
last_updated TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (month),
CHECK (month BETWEEN 1 AND 12),
CHECK (seasonality_factor BETWEEN -1.0 AND 1.0)
);
-- Insert default seasonality factors (neutral)
INSERT INTO sales_seasonality (month, seasonality_factor)
VALUES
(1, 0), (2, 0), (3, 0), (4, 0), (5, 0), (6, 0),
(7, 0), (8, 0), (9, 0), (10, 0), (11, 0), (12, 0)
ON DUPLICATE KEY UPDATE last_updated = CURRENT_TIMESTAMP;

View File

@@ -39,7 +39,7 @@ CREATE TABLE products (
tags TEXT,
moq INT DEFAULT 1,
uom INT DEFAULT 1,
rating TINYINT UNSIGNED DEFAULT 0,
rating DECIMAL(10,2) DEFAULT 0.00,
reviews INT UNSIGNED DEFAULT 0,
weight DECIMAL(10,3),
length DECIMAL(10,3),
@@ -52,7 +52,7 @@ CREATE TABLE products (
notifies INT UNSIGNED DEFAULT 0,
date_last_sold DATE,
PRIMARY KEY (pid),
UNIQUE KEY unique_sku (SKU),
INDEX idx_sku (SKU),
INDEX idx_vendor (vendor),
INDEX idx_brand (brand),
INDEX idx_location (location),
@@ -113,11 +113,13 @@ CREATE TABLE IF NOT EXISTS orders (
tax DECIMAL(10,3) DEFAULT 0.000,
tax_included TINYINT(1) DEFAULT 0,
shipping DECIMAL(10,3) DEFAULT 0.000,
costeach DECIMAL(10,3) DEFAULT 0.000,
customer VARCHAR(50) NOT NULL,
customer_name VARCHAR(100),
status VARCHAR(20) DEFAULT 'pending',
canceled TINYINT(1) DEFAULT 0,
PRIMARY KEY (id),
UNIQUE KEY unique_order_line (order_number, pid),
KEY order_number (order_number),
KEY pid (pid),
KEY customer (customer),
@@ -135,7 +137,9 @@ CREATE TABLE purchase_orders (
expected_date DATE,
pid BIGINT NOT NULL,
sku VARCHAR(50) NOT NULL,
name VARCHAR(100) NOT NULL COMMENT 'Product name from products.description',
cost_price DECIMAL(10, 3) NOT NULL,
po_cost_price DECIMAL(10, 3) NOT NULL COMMENT 'Original cost from PO, before receiving adjustments',
status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,10=electronically_ready_send,11=ordered,12=preordered,13=electronically_sent,15=receiving_started,50=done',
receiving_status TINYINT UNSIGNED DEFAULT 1 COMMENT '0=canceled,1=created,30=partial_received,40=full_received,50=paid',
notes TEXT,
@@ -147,7 +151,6 @@ CREATE TABLE purchase_orders (
received_by INT,
receiving_history JSON COMMENT 'Array of receiving records with qty, date, cost, receiving_id, and alt_po flag',
FOREIGN KEY (pid) REFERENCES products(pid),
FOREIGN KEY (sku) REFERENCES products(SKU),
INDEX idx_po_id (po_id),
INDEX idx_vendor (vendor),
INDEX idx_status (status),

View File

@@ -186,6 +186,19 @@ async function calculateMetrics() {
}
// Calculate ABC classification
outputProgress({
status: 'running',
operation: 'Starting ABC classification',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
const [abcConfig] = await connection.query('SELECT a_threshold, b_threshold FROM abc_classification_config WHERE id = 1');
const abcThresholds = abcConfig[0] || { a_threshold: 20, b_threshold: 50 };
@@ -202,6 +215,19 @@ async function calculateMetrics() {
) ENGINE=MEMORY
`);
outputProgress({
status: 'running',
operation: 'Creating revenue rankings',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
await connection.query(`
INSERT INTO temp_revenue_ranks
SELECT
@@ -222,11 +248,26 @@ async function calculateMetrics() {
const [rankingCount] = await connection.query('SELECT MAX(rank_num) as total_count FROM temp_revenue_ranks');
const totalCount = rankingCount[0].total_count || 1;
outputProgress({
status: 'running',
operation: 'Updating ABC classifications',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Process updates in batches
let abcProcessedCount = 0;
const batchSize = 5000;
while (true) {
if (isCancelled) return processedCount;
// First get a batch of PIDs that need updating
const [pids] = await connection.query(`
SELECT pm.pid
@@ -267,6 +308,18 @@ async function calculateMetrics() {
pids.map(row => row.pid)]);
abcProcessedCount += result.affectedRows;
processedCount = Math.floor(totalProducts * (0.99 + (abcProcessedCount / totalCount) * 0.01));
outputProgress({
status: 'running',
operation: 'ABC classification progress',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Small delay between batches to allow other transactions
await new Promise(resolve => setTimeout(resolve, 100));
@@ -276,14 +329,14 @@ async function calculateMetrics() {
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_revenue_ranks');
// Final success message
global.outputProgress({
outputProgress({
status: 'complete',
operation: 'Metrics calculation complete',
current: totalProducts,
total: totalProducts,
elapsed: global.formatElapsedTime(startTime),
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: global.calculateRate(startTime, totalProducts),
rate: calculateRate(startTime, totalProducts),
percentage: '100'
});

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,182 @@
const { outputProgress, formatElapsedTime } = require('../metrics/utils/progress');
async function importCategories(prodConnection, localConnection) {
outputProgress({
operation: "Starting categories import",
status: "running",
});
const startTime = Date.now();
const typeOrder = [10, 20, 11, 21, 12, 13];
let totalInserted = 0;
let skippedCategories = [];
try {
// Process each type in order with its own query
for (const type of typeOrder) {
const [categories] = await prodConnection.query(
`
SELECT
pc.cat_id,
pc.name,
pc.type,
CASE
WHEN pc.type IN (10, 20) THEN NULL -- Top level categories should have no parent
WHEN pc.master_cat_id IS NULL THEN NULL
ELSE pc.master_cat_id
END as parent_id,
pc.combined_name as description
FROM product_categories pc
WHERE pc.type = ?
ORDER BY pc.cat_id
`,
[type]
);
if (categories.length === 0) continue;
console.log(`\nProcessing ${categories.length} type ${type} categories`);
if (type === 10) {
console.log("Type 10 categories:", JSON.stringify(categories, null, 2));
}
// For types that can have parents (11, 21, 12, 13), verify parent existence
let categoriesToInsert = categories;
if (![10, 20].includes(type)) {
// Get all parent IDs
const parentIds = [
...new Set(
categories.map((c) => c.parent_id).filter((id) => id !== null)
),
];
// Check which parents exist
const [existingParents] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[parentIds]
);
const existingParentIds = new Set(existingParents.map((p) => p.cat_id));
// Filter categories and track skipped ones
categoriesToInsert = categories.filter(
(cat) =>
cat.parent_id === null || existingParentIds.has(cat.parent_id)
);
const invalidCategories = categories.filter(
(cat) =>
cat.parent_id !== null && !existingParentIds.has(cat.parent_id)
);
if (invalidCategories.length > 0) {
const skippedInfo = invalidCategories.map((c) => ({
id: c.cat_id,
name: c.name,
type: c.type,
missing_parent: c.parent_id,
}));
skippedCategories.push(...skippedInfo);
console.log(
"\nSkipping categories with missing parents:",
invalidCategories
.map(
(c) =>
`${c.cat_id} - ${c.name} (missing parent: ${c.parent_id})`
)
.join("\n")
);
}
if (categoriesToInsert.length === 0) {
console.log(
`No valid categories of type ${type} to insert - all had missing parents`
);
continue;
}
}
console.log(
`Inserting ${categoriesToInsert.length} type ${type} categories`
);
const placeholders = categoriesToInsert
.map(() => "(?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)")
.join(",");
const values = categoriesToInsert.flatMap((cat) => [
cat.cat_id,
cat.name,
cat.type,
cat.parent_id,
cat.description,
"active",
]);
// Insert categories and create relationships in one query to avoid race conditions
await localConnection.query(
`
INSERT INTO categories (cat_id, name, type, parent_id, description, status, created_at, updated_at)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
name = VALUES(name),
type = VALUES(type),
parent_id = VALUES(parent_id),
description = VALUES(description),
status = VALUES(status),
updated_at = CURRENT_TIMESTAMP
`,
values
);
totalInserted += categoriesToInsert.length;
outputProgress({
status: "running",
operation: "Categories import",
current: totalInserted,
total: totalInserted,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
}
// After all imports, if we skipped any categories, throw an error
if (skippedCategories.length > 0) {
const error = new Error(
"Categories import completed with errors - some categories were skipped due to missing parents"
);
error.skippedCategories = skippedCategories;
throw error;
}
outputProgress({
status: "complete",
operation: "Categories import completed",
current: totalInserted,
total: totalInserted,
duration: formatElapsedTime((Date.now() - startTime) / 1000),
});
return {
status: "complete",
totalImported: totalInserted
};
} catch (error) {
console.error("Error importing categories:", error);
if (error.skippedCategories) {
console.error(
"Skipped categories:",
JSON.stringify(error.skippedCategories, null, 2)
);
}
outputProgress({
status: "error",
operation: "Categories import failed",
error: error.message,
skippedCategories: error.skippedCategories
});
throw error;
}
}
module.exports = importCategories;

View File

@@ -0,0 +1,568 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const { importMissingProducts, setupTemporaryTables, cleanupTemporaryTables, materializeCalculations } = require('./products');
/**
* Imports orders from a production MySQL database to a local MySQL database.
* It can run in two modes:
* 1. Incremental update mode (default): Only fetch orders that have changed since the last sync time.
* 2. Full update mode: Fetch all eligible orders within the last 5 years regardless of timestamp.
*
* @param {object} prodConnection - A MySQL connection to production DB (MySQL 5.7).
* @param {object} localConnection - A MySQL connection to local DB (MySQL 8.0).
* @param {boolean} incrementalUpdate - Set to false for a full sync; true for incremental.
*
* @returns {object} Information about the sync operation.
*/
async function importOrders(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now();
const skippedOrders = new Set();
const missingProducts = new Set();
let recordsAdded = 0;
let recordsUpdated = 0;
let processedCount = 0;
let importedCount = 0;
let totalOrderItems = 0;
let totalUniqueOrders = 0;
// Add a cumulative counter for processed orders before the loop
let cumulativeProcessedOrders = 0;
try {
// Insert temporary table creation queries
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_items (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
SKU VARCHAR(50) NOT NULL,
price DECIMAL(10,2) NOT NULL,
quantity INT NOT NULL,
base_discount DECIMAL(10,2) DEFAULT 0,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_meta (
order_id INT UNSIGNED NOT NULL,
date DATE NOT NULL,
customer VARCHAR(100) NOT NULL,
customer_name VARCHAR(150) NOT NULL,
status INT,
canceled TINYINT(1),
PRIMARY KEY (order_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_discounts (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
discount DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_taxes (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
tax DECIMAL(10,2) NOT NULL,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_order_costs (
order_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
costeach DECIMAL(10,3) DEFAULT 0.000,
PRIMARY KEY (order_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
// Get column names from the local table
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'orders'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Orders: Using last sync time:', lastSyncTime);
// First get count of order items
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
OR oi.stamp > ?
OR EXISTS (
SELECT 1 FROM order_discount_items odi
WHERE odi.order_id = o.order_id
AND odi.pid = oi.prod_pid
)
OR EXISTS (
SELECT 1 FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
WHERE oti.order_id = o.order_id
AND otip.pid = oi.prod_pid
AND oti.stamp > ?
)
)
` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
totalOrderItems = total;
console.log('Orders: Found changes:', totalOrderItems);
// Get order items in batches
const [orderItems] = await prodConnection.query(`
SELECT
oi.order_id,
oi.prod_pid as pid,
oi.prod_itemnumber as SKU,
oi.prod_price as price,
oi.qty_ordered as quantity,
COALESCE(oi.prod_price_reg - oi.prod_price, 0) as base_discount,
oi.stamp as last_modified
FROM order_items oi
USE INDEX (PRIMARY)
JOIN _order o ON oi.order_id = o.order_id
WHERE o.order_status >= 15
AND o.date_placed_onlydate >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
AND o.date_placed_onlydate IS NOT NULL
${incrementalUpdate ? `
AND (
o.stamp > ?
OR oi.stamp > ?
OR EXISTS (
SELECT 1 FROM order_discount_items odi
WHERE odi.order_id = o.order_id
AND odi.pid = oi.prod_pid
)
OR EXISTS (
SELECT 1 FROM order_tax_info oti
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
WHERE oti.order_id = o.order_id
AND otip.pid = oi.prod_pid
AND oti.stamp > ?
)
)
` : ''}
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime] : []);
console.log('Orders: Processing', orderItems.length, 'order items');
// Insert order items in batches
for (let i = 0; i < orderItems.length; i += 5000) {
const batch = orderItems.slice(i, Math.min(i + 5000, orderItems.length));
const placeholders = batch.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = batch.flatMap(item => [
item.order_id, item.pid, item.SKU, item.price, item.quantity, item.base_discount
]);
await localConnection.query(`
INSERT INTO temp_order_items (order_id, pid, SKU, price, quantity, base_discount)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
price = VALUES(price),
quantity = VALUES(quantity),
base_discount = VALUES(base_discount)
`, values);
processedCount = i + batch.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order items: ${processedCount} of ${totalOrderItems}`,
current: processedCount,
total: totalOrderItems
});
}
// Get unique order IDs
const orderIds = [...new Set(orderItems.map(item => item.order_id))];
totalUniqueOrders = orderIds.length;
console.log('Total unique order IDs:', totalUniqueOrders);
// Reset processed count for order processing phase
processedCount = 0;
// Get order metadata in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
console.log(`Processing batch ${i/5000 + 1}, size: ${batchIds.length}`);
console.log('Sample of batch IDs:', batchIds.slice(0, 5));
const [orders] = await prodConnection.query(`
SELECT
o.order_id,
o.date_placed_onlydate as date,
o.order_cid as customer,
CONCAT(COALESCE(u.firstname, ''), ' ', COALESCE(u.lastname, '')) as customer_name,
o.order_status as status,
CASE WHEN o.date_cancelled != '0000-00-00 00:00:00' THEN 1 ELSE 0 END as canceled
FROM _order o
LEFT JOIN users u ON o.order_cid = u.cid
WHERE o.order_id IN (?)
`, [batchIds]);
console.log(`Retrieved ${orders.length} orders for ${batchIds.length} IDs`);
const duplicates = orders.filter((order, index, self) =>
self.findIndex(o => o.order_id === order.order_id) !== index
);
if (duplicates.length > 0) {
console.log('Found duplicates:', duplicates);
}
const placeholders = orders.map(() => "(?, ?, ?, ?, ?, ?)").join(",");
const values = orders.flatMap(order => [
order.order_id, order.date, order.customer, order.customer_name, order.status, order.canceled
]);
await localConnection.query(`
INSERT INTO temp_order_meta VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
date = VALUES(date),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled)
`, values);
processedCount = i + orders.length;
outputProgress({
status: "running",
operation: "Orders import",
message: `Loading order metadata: ${processedCount} of ${totalUniqueOrders}`,
current: processedCount,
total: totalUniqueOrders
});
}
// Reset processed count for final phase
processedCount = 0;
// Get promotional discounts in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [discounts] = await prodConnection.query(`
SELECT order_id, pid, SUM(amount) as discount
FROM order_discount_items
WHERE order_id IN (?)
GROUP BY order_id, pid
`, [batchIds]);
if (discounts.length > 0) {
const placeholders = discounts.map(() => "(?, ?, ?)").join(",");
const values = discounts.flatMap(d => [d.order_id, d.pid, d.discount]);
await localConnection.query(`
INSERT INTO temp_order_discounts VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
discount = VALUES(discount)
`, values);
}
}
// Get tax information in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [taxes] = await prodConnection.query(`
SELECT DISTINCT
oti.order_id,
otip.pid,
otip.item_taxes_to_collect as tax
FROM order_tax_info oti
JOIN (
SELECT order_id, MAX(stamp) as max_stamp
FROM order_tax_info
WHERE order_id IN (?)
GROUP BY order_id
) latest ON oti.order_id = latest.order_id AND oti.stamp = latest.max_stamp
JOIN order_tax_info_products otip ON oti.taxinfo_id = otip.taxinfo_id
`, [batchIds]);
if (taxes.length > 0) {
// Remove any duplicates before inserting
const uniqueTaxes = new Map();
taxes.forEach(t => {
const key = `${t.order_id}-${t.pid}`;
uniqueTaxes.set(key, t);
});
const values = Array.from(uniqueTaxes.values()).flatMap(t => [t.order_id, t.pid, t.tax]);
if (values.length > 0) {
const placeholders = Array(uniqueTaxes.size).fill("(?, ?, ?)").join(",");
await localConnection.query(`
INSERT INTO temp_order_taxes VALUES ${placeholders}
ON DUPLICATE KEY UPDATE tax = VALUES(tax)
`, values);
}
}
}
// Get costeach values in batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
const [costs] = await prodConnection.query(`
SELECT orderid as order_id, pid, costeach
FROM order_costs
WHERE orderid IN (?)
`, [batchIds]);
if (costs.length > 0) {
const placeholders = costs.map(() => '(?, ?, ?)').join(",");
const values = costs.flatMap(c => [c.order_id, c.pid, c.costeach]);
await localConnection.query(`
INSERT INTO temp_order_costs (order_id, pid, costeach)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE costeach = VALUES(costeach)
`, values);
}
}
// Now combine all the data and insert into orders table
// Pre-check all products at once instead of per batch
const allOrderPids = [...new Set(orderItems.map(item => item.pid))];
const [existingProducts] = allOrderPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[allOrderPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Process in larger batches
for (let i = 0; i < orderIds.length; i += 5000) {
const batchIds = orderIds.slice(i, i + 5000);
// Get combined data for this batch
const [orders] = await localConnection.query(`
SELECT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [batchIds]);
// Filter orders and track missing products - do this in a single pass
const validOrders = [];
const values = [];
const processedOrderItems = new Set(); // Track unique order items
const processedOrders = new Set(); // Track unique orders
for (const order of orders) {
if (!existingPids.has(order.pid)) {
missingProducts.add(order.pid);
skippedOrders.add(order.order_number);
continue;
}
validOrders.push(order);
values.push(...columnNames.map(col => order[col] ?? null));
processedOrderItems.add(`${order.order_number}-${order.pid}`);
processedOrders.add(order.order_number);
}
if (validOrders.length > 0) {
// Pre-compute the placeholders string once
const singlePlaceholder = `(${columnNames.map(() => "?").join(",")})`;
const placeholders = Array(validOrders.length).fill(singlePlaceholder).join(",");
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat());
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += processedOrderItems.size; // Count unique order items processed
}
// Update progress based on unique orders processed
cumulativeProcessedOrders += processedOrders.size;
outputProgress({
status: "running",
operation: "Orders import",
message: `Imported ${importedCount} order items (${cumulativeProcessedOrders} of ${totalUniqueOrders} orders processed)`,
current: cumulativeProcessedOrders,
total: totalUniqueOrders,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, cumulativeProcessedOrders, totalUniqueOrders),
rate: calculateRate(startTime, cumulativeProcessedOrders)
});
}
// Now try to import any orders that were skipped due to missing products
if (skippedOrders.size > 0) {
try {
outputProgress({
status: "running",
operation: "Orders import",
message: `Retrying import of ${skippedOrders.size} orders with previously missing products`,
});
// Get the orders that were skipped
const [skippedProdOrders] = await localConnection.query(`
SELECT DISTINCT
oi.order_id as order_number,
oi.pid,
oi.SKU,
om.date,
oi.price,
oi.quantity,
oi.base_discount + COALESCE(od.discount, 0) as discount,
COALESCE(ot.tax, 0) as tax,
0 as tax_included,
0 as shipping,
om.customer,
om.customer_name,
om.status,
om.canceled,
COALESCE(tc.costeach, 0) as costeach
FROM temp_order_items oi
JOIN temp_order_meta om ON oi.order_id = om.order_id
LEFT JOIN temp_order_discounts od ON oi.order_id = od.order_id AND oi.pid = od.pid
LEFT JOIN temp_order_taxes ot ON oi.order_id = ot.order_id AND oi.pid = ot.pid
LEFT JOIN temp_order_costs tc ON oi.order_id = tc.order_id AND oi.pid = tc.pid
WHERE oi.order_id IN (?)
`, [Array.from(skippedOrders)]);
// Check which products exist now
const skippedPids = [...new Set(skippedProdOrders.map(o => o.pid))];
const [existingProducts] = skippedPids.length > 0 ? await localConnection.query(
"SELECT pid FROM products WHERE pid IN (?)",
[skippedPids]
) : [[]];
const existingPids = new Set(existingProducts.map(p => p.pid));
// Filter orders that can now be imported
const validOrders = skippedProdOrders.filter(order => existingPids.has(order.pid));
const retryOrderItems = new Set(); // Track unique order items in retry
if (validOrders.length > 0) {
const placeholders = validOrders.map(() => `(${columnNames.map(() => "?").join(", ")})`).join(",");
const values = validOrders.map(o => columnNames.map(col => o[col] ?? null)).flat();
const result = await localConnection.query(`
INSERT INTO orders (${columnNames.join(", ")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
SKU = VALUES(SKU),
date = VALUES(date),
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
customer = VALUES(customer),
customer_name = VALUES(customer_name),
status = VALUES(status),
canceled = VALUES(canceled),
costeach = VALUES(costeach)
`, values);
const affectedRows = result[0].affectedRows;
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
// Track unique order items
validOrders.forEach(order => {
retryOrderItems.add(`${order.order_number}-${order.pid}`);
});
outputProgress({
status: "running",
operation: "Orders import",
message: `Successfully imported ${retryOrderItems.size} previously skipped order items`,
});
// Update the main counters
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
}
} catch (error) {
console.warn('Warning: Failed to retry skipped orders:', error.message);
console.warn(`Skipped ${skippedOrders.size} orders due to ${missingProducts.size} missing products`);
}
}
// Clean up temporary tables after ALL processing is complete
await localConnection.query(`
DROP TEMPORARY TABLE IF EXISTS temp_order_items;
DROP TEMPORARY TABLE IF EXISTS temp_order_meta;
DROP TEMPORARY TABLE IF EXISTS temp_order_discounts;
DROP TEMPORARY TABLE IF EXISTS temp_order_taxes;
DROP TEMPORARY TABLE IF EXISTS temp_order_costs;
`);
// Only update sync status if we get here (no errors thrown)
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('orders', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
return {
status: "complete",
totalImported: Math.floor(importedCount),
recordsAdded: recordsAdded || 0,
recordsUpdated: Math.floor(recordsUpdated),
totalSkipped: skippedOrders.size,
missingProducts: missingProducts.size,
incrementalUpdate,
lastSyncTime
};
} catch (error) {
console.error("Error during orders import:", error);
throw error;
}
}
module.exports = importOrders;

View File

@@ -0,0 +1,739 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Utility functions
const imageUrlBase = 'https://sbing.com/i/products/0000/';
const getImageUrls = (pid, iid = 1) => {
const paddedPid = pid.toString().padStart(6, '0');
// Use padded PID only for the first 3 digits
const prefix = paddedPid.slice(0, 3);
// Use the actual pid for the rest of the URL
const basePath = `${imageUrlBase}${prefix}/${pid}`;
return {
image: `${basePath}-t-${iid}.jpg`,
image_175: `${basePath}-175x175-${iid}.jpg`,
image_full: `${basePath}-o-${iid}.jpg`
};
};
async function setupAndCleanupTempTables(connection, operation = 'setup') {
if (operation === 'setup') {
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_products (
pid BIGINT NOT NULL,
title VARCHAR(255),
description TEXT,
SKU VARCHAR(50),
stock_quantity INT DEFAULT 0,
pending_qty INT DEFAULT 0,
preorder_count INT DEFAULT 0,
notions_inv_count INT DEFAULT 0,
price DECIMAL(10,3) NOT NULL DEFAULT 0,
regular_price DECIMAL(10,3) NOT NULL DEFAULT 0,
cost_price DECIMAL(10,3),
vendor VARCHAR(100),
vendor_reference VARCHAR(100),
notions_reference VARCHAR(100),
brand VARCHAR(100),
line VARCHAR(100),
subline VARCHAR(100),
artist VARCHAR(100),
category_ids TEXT,
created_at DATETIME,
first_received DATETIME,
landing_cost_price DECIMAL(10,3),
barcode VARCHAR(50),
harmonized_tariff_code VARCHAR(50),
updated_at DATETIME,
visible BOOLEAN,
replenishable BOOLEAN,
permalink VARCHAR(255),
moq DECIMAL(10,3),
rating DECIMAL(10,2),
reviews INT,
weight DECIMAL(10,3),
length DECIMAL(10,3),
width DECIMAL(10,3),
height DECIMAL(10,3),
country_of_origin VARCHAR(100),
location VARCHAR(100),
total_sold INT,
baskets INT,
notifies INT,
date_last_sold DATETIME,
needs_update BOOLEAN DEFAULT TRUE,
PRIMARY KEY (pid),
INDEX idx_needs_update (needs_update)
) ENGINE=InnoDB;
`);
} else {
await connection.query('DROP TEMPORARY TABLE IF EXISTS temp_products;');
}
}
async function materializeCalculations(prodConnection, localConnection, incrementalUpdate = true, lastSyncTime = '1970-01-01') {
outputProgress({
status: "running",
operation: "Products import",
message: "Fetching product data from production"
});
// Get all product data in a single optimized query
const [prodData] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) - COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL as landing_cost_price,
s.companyname AS vendor,
CASE
WHEN s.companyname = 'Notions' THEN sid.notions_itemnumber
ELSE sid.supplier_itemnumber
END AS vendor_reference,
sid.notions_itemnumber AS notions_reference,
CONCAT('https://www.acherryontop.com/shop/product/', p.pid) AS permalink,
pc1.name AS brand,
pc2.name AS line,
pc3.name AS subline,
pc4.name AS artist,
COALESCE(CASE
WHEN sid.supplier_id = 92 THEN sid.notions_qty_per_unit
ELSE sid.supplier_qty_per_unit
END, sid.notions_qty_per_unit) AS moq,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
p.country_of_origin,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE
WHEN pc.cat_id IS NOT NULL
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
THEN pci.cat_id
END) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
WHERE ${incrementalUpdate ? `
p.stamp > ? OR
ci.stamp > ? OR
pcp.date_deactive > ? OR
pcp.date_active > ? OR
pnb.date_updated > ?
` : 'TRUE'}
GROUP BY p.pid
`, incrementalUpdate ? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime] : []);
outputProgress({
status: "running",
operation: "Products import",
message: `Processing ${prodData.length} product records`
});
// Insert all product data into temp table in batches
for (let i = 0; i < prodData.length; i += 1000) {
const batch = prodData.slice(i, i + 1000);
const values = batch.map(row => [
row.pid,
row.title,
row.description,
row.SKU,
// Set stock quantity to 0 if it's over 5000
row.stock_quantity > 5000 ? 0 : Math.max(0, row.stock_quantity),
row.pending_qty,
row.preorder_count,
row.notions_inv_count,
row.price,
row.regular_price,
row.cost_price,
row.vendor,
row.vendor_reference,
row.notions_reference,
row.brand,
row.line,
row.subline,
row.artist,
row.category_ids,
row.date_created, // map to created_at
row.first_received,
row.landing_cost_price,
row.barcode,
row.harmonized_tariff_code,
row.updated_at,
row.visible,
row.replenishable,
row.permalink,
row.moq,
row.rating ? Number(row.rating).toFixed(2) : null,
row.reviews,
row.weight,
row.length,
row.width,
row.height,
row.country_of_origin,
row.location,
row.total_sold,
row.baskets,
row.notifies,
row.date_last_sold,
true // Mark as needing update
]);
if (values.length > 0) {
await localConnection.query(`
INSERT INTO temp_products (
pid, title, description, SKU,
stock_quantity, pending_qty, preorder_count, notions_inv_count,
price, regular_price, cost_price,
vendor, vendor_reference, notions_reference,
brand, line, subline, artist,
category_ids, created_at, first_received,
landing_cost_price, barcode, harmonized_tariff_code,
updated_at, visible, replenishable, permalink,
moq, rating, reviews, weight, length, width,
height, country_of_origin, location, total_sold,
baskets, notifies, date_last_sold, needs_update
)
VALUES ?
ON DUPLICATE KEY UPDATE
title = VALUES(title),
description = VALUES(description),
SKU = VALUES(SKU),
stock_quantity = VALUES(stock_quantity),
pending_qty = VALUES(pending_qty),
preorder_count = VALUES(preorder_count),
notions_inv_count = VALUES(notions_inv_count),
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
notions_reference = VALUES(notions_reference),
brand = VALUES(brand),
line = VALUES(line),
subline = VALUES(subline),
artist = VALUES(artist),
category_ids = VALUES(category_ids),
created_at = VALUES(created_at),
first_received = VALUES(first_received),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
harmonized_tariff_code = VALUES(harmonized_tariff_code),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
replenishable = VALUES(replenishable),
permalink = VALUES(permalink),
moq = VALUES(moq),
rating = VALUES(rating),
reviews = VALUES(reviews),
weight = VALUES(weight),
length = VALUES(length),
width = VALUES(width),
height = VALUES(height),
country_of_origin = VALUES(country_of_origin),
location = VALUES(location),
total_sold = VALUES(total_sold),
baskets = VALUES(baskets),
notifies = VALUES(notifies),
date_last_sold = VALUES(date_last_sold),
needs_update = TRUE
`, [values]);
}
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${Math.min(i + 1000, prodData.length)} of ${prodData.length} product records`,
current: i + batch.length,
total: prodData.length
});
}
outputProgress({
status: "running",
operation: "Products import",
message: "Finished materializing calculations"
});
}
async function importProducts(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now();
let recordsAdded = 0;
let recordsUpdated = 0;
try {
// Get column names first
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map(col => col.COLUMN_NAME);
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'products'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Products: Using last sync time:', lastSyncTime);
// Setup temporary tables
await setupAndCleanupTempTables(localConnection, 'setup');
// Materialize calculations - this will populate temp_products
await materializeCalculations(prodConnection, localConnection, incrementalUpdate, lastSyncTime);
// Get actual count from temp table - only count products that need updates
const [[{ actualTotal }]] = await localConnection.query(`
SELECT COUNT(DISTINCT pid) as actualTotal
FROM temp_products
WHERE needs_update = 1
`);
console.log('Products: Found changes:', actualTotal);
// Process in batches
const BATCH_SIZE = 5000;
let processed = 0;
while (processed < actualTotal) {
const [batch] = await localConnection.query(`
SELECT * FROM temp_products
WHERE needs_update = 1
LIMIT ? OFFSET ?
`, [BATCH_SIZE, processed]);
if (!batch || batch.length === 0) break;
// Add image URLs
batch.forEach(row => {
const urls = getImageUrls(row.pid);
row.image = urls.image;
row.image_175 = urls.image_175;
row.image_full = urls.image_full;
});
if (batch.length > 0) {
// Get existing products in one query
const [existingProducts] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM products WHERE pid IN (?)`,
[batch.map(p => p.pid)]
);
const existingPidsMap = new Map(existingProducts.map(p => [p.pid, p]));
// Split into inserts and updates
const insertsAndUpdates = batch.reduce((acc, product) => {
if (existingPidsMap.has(product.pid)) {
const existing = existingPidsMap.get(product.pid);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = product[col] ?? null;
const oldVal = existing[col] ?? null;
if (col === "managing_stock") return false; // Skip this as it's always 1
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001;
}
return newVal !== oldVal;
});
if (hasChanges) {
acc.updates.push(product);
}
} else {
acc.inserts.push(product);
}
return acc;
}, { inserts: [], updates: [] });
// Process inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertValues = insertsAndUpdates.inserts.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const insertResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(',')})
VALUES ${insertPlaceholders}
`, insertValues.flat());
recordsAdded += insertResult[0].affectedRows;
}
// Process updates
if (insertsAndUpdates.updates.length > 0) {
const updateValues = insertsAndUpdates.updates.map(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
return val;
})
);
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill('?').join(',')})`)
.join(',');
const updateResult = await localConnection.query(`
INSERT INTO products (${columnNames.join(',')})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== 'pid')
.map(col => `${col} = VALUES(${col})`)
.join(',')};
`, updateValues.flat());
recordsUpdated += insertsAndUpdates.updates.length;
}
// Process category relationships
if (batch.some(p => p.category_ids)) {
const categoryRelationships = batch
.filter(p => p.category_ids)
.flatMap(product =>
product.category_ids
.split(',')
.map(id => id.trim())
.filter(id => id)
.map(Number)
.filter(id => !isNaN(id))
.map(catId => [catId, product.pid])
);
if (categoryRelationships.length > 0) {
// Verify categories exist before inserting relationships
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
// Filter relationships to only include existing categories
const validRelationships = categoryRelationships.filter(([catId]) =>
existingCatIds.has(catId)
);
if (validRelationships.length > 0) {
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`INSERT IGNORE INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}`,
validRelationships.flat()
);
}
}
}
}
processed += batch.length;
outputProgress({
status: "running",
operation: "Products import",
message: `Processed ${processed} of ${actualTotal} products`,
current: processed,
total: actualTotal,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, actualTotal),
rate: calculateRate(startTime, processed)
});
}
// Drop temporary tables
await setupAndCleanupTempTables(localConnection, 'cleanup');
// Only update sync status if we get here (no errors thrown)
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('products', NOW())
ON DUPLICATE KEY UPDATE last_sync_timestamp = NOW()
`);
return {
status: "complete",
totalImported: actualTotal,
recordsAdded: recordsAdded || 0,
recordsUpdated: recordsUpdated || 0,
incrementalUpdate,
lastSyncTime
};
} catch (error) {
throw error;
}
}
async function importMissingProducts(prodConnection, localConnection, missingPids) {
try {
// Get column names first
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'products'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns.map((col) => col.COLUMN_NAME);
// Get the missing products with all their data in one optimized query
const [products] = await prodConnection.query(`
SELECT
p.pid,
p.description AS title,
p.notes AS description,
p.itemnumber AS SKU,
p.date_created,
p.datein AS first_received,
p.location,
p.upc AS barcode,
p.harmonized_tariff_code,
p.stamp AS updated_at,
CASE WHEN si.show + si.buyable > 0 THEN 1 ELSE 0 END AS visible,
CASE
WHEN p.reorder < 0 THEN 0
WHEN (
(IFNULL(pls.date_sold, '0000-00-00') = '0000-00-00' OR pls.date_sold <= DATE_SUB(CURDATE(), INTERVAL 5 YEAR))
OR (p.datein = '0000-00-00 00:00:00' OR p.datein <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
OR (p.date_refill = '0000-00-00 00:00:00' OR p.date_refill <= DATE_SUB(NOW(), INTERVAL 5 YEAR))
) THEN 0
ELSE 1
END AS replenishable,
COALESCE(si.available_local, 0) as stock_quantity,
COALESCE(
(SELECT SUM(oi.qty_ordered - oi.qty_placed)
FROM order_items oi
JOIN _order o ON oi.order_id = o.order_id
WHERE oi.prod_pid = p.pid
AND o.date_placed != '0000-00-00 00:00:00'
AND o.date_shipped = '0000-00-00 00:00:00'
AND oi.pick_finished = 0
AND oi.qty_back = 0
AND o.order_status != 15
AND o.order_status < 90
AND oi.qty_ordered >= oi.qty_placed
AND oi.qty_ordered > 0
), 0
) as pending_qty,
COALESCE(ci.onpreorder, 0) as preorder_count,
COALESCE(pnb.inventory, 0) as notions_inv_count,
COALESCE(pcp.price_each, 0) as price,
COALESCE(p.sellingprice, 0) AS regular_price,
CASE
WHEN EXISTS (SELECT 1 FROM product_inventory WHERE pid = p.pid AND count > 0)
THEN (SELECT ROUND(AVG(costeach), 5) FROM product_inventory WHERE pid = p.pid AND count > 0)
ELSE (SELECT costeach FROM product_inventory WHERE pid = p.pid ORDER BY daterec DESC LIMIT 1)
END AS cost_price,
NULL AS landing_cost_price,
p.rating,
p.rating_votes AS reviews,
p.weight,
p.length,
p.width,
p.height,
(SELECT COUNT(*) FROM mybasket mb WHERE mb.item = p.pid AND mb.qty > 0) AS baskets,
(SELECT COUNT(*) FROM product_notify pn WHERE pn.pid = p.pid) AS notifies,
p.totalsold AS total_sold,
p.country_of_origin,
pls.date_sold as date_last_sold,
GROUP_CONCAT(DISTINCT CASE WHEN pc.cat_id IS NOT NULL THEN pci.cat_id END) as category_ids
FROM products p
LEFT JOIN shop_inventory si ON p.pid = si.pid AND si.store = 0
LEFT JOIN supplier_item_data sid ON p.pid = sid.pid
LEFT JOIN suppliers s ON sid.supplier_id = s.supplierid
LEFT JOIN product_category_index pci ON p.pid = pci.pid
LEFT JOIN product_categories pc ON pci.cat_id = pc.cat_id
AND pc.type IN (10, 20, 11, 21, 12, 13)
AND pci.cat_id NOT IN (16, 17)
LEFT JOIN product_categories pc1 ON p.company = pc1.cat_id
LEFT JOIN product_categories pc2 ON p.line = pc2.cat_id
LEFT JOIN product_categories pc3 ON p.subline = pc3.cat_id
LEFT JOIN product_categories pc4 ON p.artist = pc4.cat_id
LEFT JOIN product_last_sold pls ON p.pid = pls.pid
LEFT JOIN current_inventory ci ON p.pid = ci.pid
LEFT JOIN product_current_prices pcp ON p.pid = pcp.pid AND pcp.active = 1
LEFT JOIN product_notions_b2b pnb ON p.pid = pnb.pid
WHERE p.pid IN (?)
GROUP BY p.pid
`, [missingPids]);
// Add image URLs
products.forEach(product => {
const urls = getImageUrls(product.pid);
product.image = urls.image;
product.image_175 = urls.image_175;
product.image_full = urls.image_full;
});
let recordsAdded = 0;
let recordsUpdated = 0;
if (products.length > 0) {
// Map values in the same order as columns
const productValues = products.flatMap(product =>
columnNames.map(col => {
const val = product[col] ?? null;
if (col === "managing_stock") return 1;
if (typeof val === "number") return val || 0;
return val;
})
);
// Generate placeholders for all products
const placeholders = products
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
// Build and execute the query
const query = `
INSERT INTO products (${columnNames.join(",")})
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")};
`;
const result = await localConnection.query(query, productValues);
recordsAdded = result.affectedRows - result.changedRows;
recordsUpdated = result.changedRows;
// Handle category relationships if any
const categoryRelationships = [];
products.forEach(product => {
if (product.category_ids) {
const catIds = product.category_ids
.split(",")
.map(id => id.trim())
.filter(id => id)
.map(Number);
catIds.forEach(catId => {
if (catId) categoryRelationships.push([catId, product.pid]);
});
}
});
if (categoryRelationships.length > 0) {
// Verify categories exist before inserting relationships
const uniqueCatIds = [...new Set(categoryRelationships.map(([catId]) => catId))];
const [existingCats] = await localConnection.query(
"SELECT cat_id FROM categories WHERE cat_id IN (?)",
[uniqueCatIds]
);
const existingCatIds = new Set(existingCats.map(c => c.cat_id));
// Filter relationships to only include existing categories
const validRelationships = categoryRelationships.filter(([catId]) =>
existingCatIds.has(catId)
);
if (validRelationships.length > 0) {
const catPlaceholders = validRelationships
.map(() => "(?, ?)")
.join(",");
await localConnection.query(
`INSERT IGNORE INTO product_categories (cat_id, pid)
VALUES ${catPlaceholders}`,
validRelationships.flat()
);
}
}
}
return {
status: "complete",
totalImported: products.length,
recordsAdded,
recordsUpdated
};
} catch (error) {
throw error;
}
}
module.exports = {
importProducts,
importMissingProducts
};

View File

@@ -0,0 +1,543 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
async function importPurchaseOrders(prodConnection, localConnection, incrementalUpdate = true) {
const startTime = Date.now();
let recordsAdded = 0;
let recordsUpdated = 0;
try {
// Get last sync info
const [syncInfo] = await localConnection.query(
"SELECT last_sync_timestamp FROM sync_status WHERE table_name = 'purchase_orders'"
);
const lastSyncTime = syncInfo?.[0]?.last_sync_timestamp || '1970-01-01';
console.log('Purchase Orders: Using last sync time:', lastSyncTime);
// Insert temporary table creation query for purchase orders
await localConnection.query(`
CREATE TABLE IF NOT EXISTS temp_purchase_orders (
po_id INT UNSIGNED NOT NULL,
pid INT UNSIGNED NOT NULL,
vendor VARCHAR(255),
date DATE,
expected_date DATE,
status INT,
notes TEXT,
PRIMARY KEY (po_id, pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
`);
outputProgress({
operation: `Starting ${incrementalUpdate ? 'incremental' : 'full'} purchase orders import`,
status: "running",
});
// Get column names for the insert
const [columns] = await localConnection.query(`
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = 'purchase_orders'
ORDER BY ORDINAL_POSITION
`);
const columnNames = columns
.map((col) => col.COLUMN_NAME)
.filter((name) => name !== "id");
// Build incremental conditions
const incrementalWhereClause = incrementalUpdate
? `AND (
p.date_updated > ?
OR p.date_ordered > ?
OR p.date_estin > ?
OR r.date_updated > ?
OR r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)`
: "";
const incrementalParams = incrementalUpdate
? [lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime]
: [];
// First get all relevant PO IDs with basic info
const [[{ total }]] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM (
SELECT DISTINCT pop.po_id, pop.pid
FROM po p
USE INDEX (idx_date_created)
JOIN po_products pop ON p.po_id = pop.po_id
JOIN suppliers s ON p.supplier_id = s.supplierid
WHERE p.date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
p.date_updated > ?
OR p.date_ordered > ?
OR p.date_estin > ?
)
` : ''}
UNION
SELECT DISTINCT r.receiving_id as po_id, rp.pid
FROM receivings_products rp
USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) all_items
`, incrementalUpdate ? [
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Purchase Orders: Found changes:', total);
const [poList] = await prodConnection.query(`
SELECT DISTINCT
COALESCE(p.po_id, r.receiving_id) as po_id,
COALESCE(
NULLIF(s1.companyname, ''),
NULLIF(s2.companyname, ''),
'Unknown Vendor'
) as vendor,
CASE
WHEN p.po_id IS NOT NULL THEN
DATE(COALESCE(
NULLIF(p.date_ordered, '0000-00-00 00:00:00'),
p.date_created
))
WHEN r.receiving_id IS NOT NULL THEN
DATE(r.date_created)
END as date,
CASE
WHEN p.date_estin = '0000-00-00' THEN NULL
WHEN p.date_estin IS NULL THEN NULL
WHEN p.date_estin NOT REGEXP '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN NULL
ELSE p.date_estin
END as expected_date,
COALESCE(p.status, 50) as status,
p.short_note as notes,
p.notes as long_note
FROM (
SELECT po_id FROM po
USE INDEX (idx_date_created)
WHERE date_ordered >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
date_ordered > ?
OR date_updated > ?
OR date_estin > ?
)
` : ''}
UNION
SELECT DISTINCT r.receiving_id as po_id
FROM receivings r
JOIN receivings_products rp USE INDEX (received_date) ON r.receiving_id = rp.receiving_id
WHERE rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL ${incrementalUpdate ? '1' : '5'} YEAR)
${incrementalUpdate ? `
AND (
r.date_created > ?
OR r.date_checked > ?
OR rp.stamp > ?
OR rp.received_date > ?
)
` : ''}
) ids
LEFT JOIN po p ON ids.po_id = p.po_id
LEFT JOIN suppliers s1 ON p.supplier_id = s1.supplierid
LEFT JOIN receivings r ON ids.po_id = r.receiving_id
LEFT JOIN suppliers s2 ON r.supplier_id = s2.supplierid
ORDER BY po_id
`, incrementalUpdate ? [
lastSyncTime, lastSyncTime, lastSyncTime, // PO conditions
lastSyncTime, lastSyncTime, lastSyncTime, lastSyncTime // Receiving conditions
] : []);
console.log('Sample PO dates:', poList.slice(0, 5).map(po => ({
po_id: po.po_id,
raw_date_ordered: po.raw_date_ordered,
raw_date_created: po.raw_date_created,
raw_date_estin: po.raw_date_estin,
computed_date: po.date,
expected_date: po.expected_date
})));
const totalItems = total;
let processed = 0;
const BATCH_SIZE = 5000;
const PROGRESS_INTERVAL = 500;
let lastProgressUpdate = Date.now();
outputProgress({
operation: `Starting purchase orders import - Processing ${totalItems} purchase order items`,
status: "running",
});
for (let i = 0; i < poList.length; i += BATCH_SIZE) {
const batch = poList.slice(i, Math.min(i + BATCH_SIZE, poList.length));
const poIds = batch.map(po => po.po_id);
// Get all products for these POs in one query
const [poProducts] = await prodConnection.query(`
SELECT
pop.po_id,
pop.pid,
pr.itemnumber as sku,
pr.description as name,
pop.cost_each,
pop.qty_each as ordered
FROM po_products pop
USE INDEX (PRIMARY)
JOIN products pr ON pop.pid = pr.pid
WHERE pop.po_id IN (?)
`, [poIds]);
// Process PO products in smaller sub-batches to avoid packet size issues
const SUB_BATCH_SIZE = 5000;
for (let j = 0; j < poProducts.length; j += SUB_BATCH_SIZE) {
const productBatch = poProducts.slice(j, j + SUB_BATCH_SIZE);
const productPids = [...new Set(productBatch.map(p => p.pid))];
const batchPoIds = [...new Set(productBatch.map(p => p.po_id))];
// Get receivings for this batch with employee names
const [receivings] = await prodConnection.query(`
SELECT
r.po_id,
rp.pid,
rp.receiving_id,
rp.qty_each,
rp.cost_each,
COALESCE(rp.received_date, r.date_created) as received_date,
rp.received_by,
CONCAT(e.firstname, ' ', e.lastname) as received_by_name,
CASE
WHEN r.po_id IS NULL THEN 2 -- No PO
WHEN r.po_id IN (?) THEN 0 -- Original PO
ELSE 1 -- Different PO
END as is_alt_po
FROM receivings_products rp
USE INDEX (received_date)
LEFT JOIN receivings r ON r.receiving_id = rp.receiving_id
LEFT JOIN employees e ON rp.received_by = e.employeeid
WHERE rp.pid IN (?)
AND rp.received_date >= DATE_SUB(CURRENT_DATE, INTERVAL 5 YEAR)
ORDER BY r.po_id, rp.pid, rp.received_date
`, [batchPoIds, productPids]);
// Create maps for this sub-batch
const poProductMap = new Map();
productBatch.forEach(product => {
const key = `${product.po_id}-${product.pid}`;
poProductMap.set(key, product);
});
const receivingMap = new Map();
const altReceivingMap = new Map();
const noPOReceivingMap = new Map();
receivings.forEach(receiving => {
const key = `${receiving.po_id}-${receiving.pid}`;
if (receiving.is_alt_po === 2) {
// No PO
if (!noPOReceivingMap.has(receiving.pid)) {
noPOReceivingMap.set(receiving.pid, []);
}
noPOReceivingMap.get(receiving.pid).push(receiving);
} else if (receiving.is_alt_po === 1) {
// Different PO
if (!altReceivingMap.has(receiving.pid)) {
altReceivingMap.set(receiving.pid, []);
}
altReceivingMap.get(receiving.pid).push(receiving);
} else {
// Original PO
if (!receivingMap.has(key)) {
receivingMap.set(key, []);
}
receivingMap.get(key).push(receiving);
}
});
// Verify PIDs exist
const [existingPids] = await localConnection.query(
'SELECT pid FROM products WHERE pid IN (?)',
[productPids]
);
const validPids = new Set(existingPids.map(p => p.pid));
// First check which PO lines already exist and get their current values
const poLines = Array.from(poProductMap.values())
.filter(p => validPids.has(p.pid))
.map(p => [p.po_id, p.pid]);
const [existingPOs] = await localConnection.query(
`SELECT ${columnNames.join(',')} FROM purchase_orders WHERE (po_id, pid) IN (${poLines.map(() => "(?,?)").join(",")})`,
poLines.flat()
);
const existingPOMap = new Map(
existingPOs.map(po => [`${po.po_id}-${po.pid}`, po])
);
// Split into inserts and updates
const insertsAndUpdates = { inserts: [], updates: [] };
let batchProcessed = 0;
for (const po of batch) {
const poProducts = Array.from(poProductMap.values())
.filter(p => p.po_id === po.po_id && validPids.has(p.pid));
for (const product of poProducts) {
const key = `${po.po_id}-${product.pid}`;
const receivingHistory = receivingMap.get(key) || [];
const altReceivingHistory = altReceivingMap.get(product.pid) || [];
const noPOReceivingHistory = noPOReceivingMap.get(product.pid) || [];
// Combine all receivings and sort by date
const allReceivings = [
...receivingHistory.map(r => ({ ...r, type: 'original' })),
...altReceivingHistory.map(r => ({ ...r, type: 'alternate' })),
...noPOReceivingHistory.map(r => ({ ...r, type: 'no_po' }))
].sort((a, b) => new Date(a.received_date || '9999-12-31') - new Date(b.received_date || '9999-12-31'));
// Split receivings into original PO and others
const originalPOReceivings = allReceivings.filter(r => r.type === 'original');
const otherReceivings = allReceivings.filter(r => r.type !== 'original');
// Track FIFO fulfillment
let remainingToFulfill = product.ordered;
const fulfillmentTracking = [];
let totalReceived = 0;
let actualCost = null; // Will store the cost of the first receiving that fulfills this PO
let firstFulfillmentReceiving = null;
let lastFulfillmentReceiving = null;
for (const receiving of allReceivings) {
const qtyToApply = Math.min(remainingToFulfill, receiving.qty_each);
if (qtyToApply > 0) {
// If this is the first receiving being applied, use its cost
if (actualCost === null) {
actualCost = receiving.cost_each;
firstFulfillmentReceiving = receiving;
}
lastFulfillmentReceiving = receiving;
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: qtyToApply,
qty_total: receiving.qty_each,
cost: receiving.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
remaining_qty: receiving.qty_each - qtyToApply
});
remainingToFulfill -= qtyToApply;
} else {
// Track excess receivings
fulfillmentTracking.push({
receiving_id: receiving.receiving_id,
qty_applied: 0,
qty_total: receiving.qty_each,
cost: receiving.cost_each,
date: receiving.received_date,
received_by: receiving.received_by,
received_by_name: receiving.received_by_name || 'Unknown',
type: receiving.type,
is_excess: true
});
}
totalReceived += receiving.qty_each;
}
const receiving_status = !totalReceived ? 1 : // created
remainingToFulfill > 0 ? 30 : // partial
40; // full
function formatDate(dateStr) {
if (!dateStr) return null;
if (dateStr === '0000-00-00' || dateStr === '0000-00-00 00:00:00') return null;
if (typeof dateStr === 'string' && !dateStr.match(/^\d{4}-\d{2}-\d{2}/)) return null;
try {
const date = new Date(dateStr);
if (isNaN(date.getTime())) return null;
if (date.getFullYear() < 1900 || date.getFullYear() > 2100) return null;
return date.toISOString().split('T')[0];
} catch (e) {
return null;
}
}
const rowValues = columnNames.map(col => {
switch (col) {
case 'po_id': return po.po_id;
case 'vendor': return po.vendor;
case 'date': return formatDate(po.date);
case 'expected_date': return formatDate(po.expected_date);
case 'pid': return product.pid;
case 'sku': return product.sku;
case 'name': return product.name;
case 'cost_price': return actualCost || product.cost_each;
case 'po_cost_price': return product.cost_each;
case 'status': return po.status;
case 'notes': return po.notes;
case 'long_note': return po.long_note;
case 'ordered': return product.ordered;
case 'received': return totalReceived;
case 'unfulfilled': return remainingToFulfill;
case 'excess_received': return Math.max(0, totalReceived - product.ordered);
case 'received_date': return formatDate(firstFulfillmentReceiving?.received_date);
case 'last_received_date': return formatDate(lastFulfillmentReceiving?.received_date);
case 'received_by': return firstFulfillmentReceiving?.received_by_name || null;
case 'receiving_status': return receiving_status;
case 'receiving_history': return JSON.stringify({
fulfillment: fulfillmentTracking,
ordered_qty: product.ordered,
total_received: totalReceived,
remaining_unfulfilled: remainingToFulfill,
excess_received: Math.max(0, totalReceived - product.ordered),
po_cost: product.cost_each,
actual_cost: actualCost || product.cost_each
});
default: return null;
}
});
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = rowValues[columnNames.indexOf(col)];
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history - parse and compare
if (col === 'receiving_history') {
const newHistory = JSON.parse(newVal || '{}');
const oldHistory = JSON.parse(oldVal || '{}');
return JSON.stringify(newHistory) !== JSON.stringify(oldHistory);
}
return newVal !== oldVal;
});
if (hasChanges) {
insertsAndUpdates.updates.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
} else {
insertsAndUpdates.inserts.push({
po_id: po.po_id,
pid: product.pid,
values: rowValues
});
}
batchProcessed++;
}
}
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = insertsAndUpdates.inserts
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
const affectedRows = insertResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Handle updates - now we know these actually have changes
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = insertsAndUpdates.updates
.map(() => `(${Array(columnNames.length).fill("?").join(",")})`)
.join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE ${columnNames
.filter((col) => col !== "po_id" && col !== "pid")
.map((col) => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
const affectedRows = updateResult[0].affectedRows;
// For an upsert, MySQL counts rows twice for updates
// So if affectedRows is odd, we have (updates * 2 + inserts)
const updates = Math.floor(affectedRows / 2);
const inserts = affectedRows - (updates * 2);
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
}
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL || processed === totalItems) {
outputProgress({
status: "running",
operation: "Purchase orders import",
current: processed,
total: totalItems,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
remaining: estimateRemaining(startTime, processed, totalItems),
rate: calculateRate(startTime, processed)
});
lastProgressUpdate = now;
}
}
}
// Only update sync status if we get here (no errors thrown)
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('purchase_orders', NOW())
ON DUPLICATE KEY UPDATE
last_sync_timestamp = NOW(),
last_sync_id = LAST_INSERT_ID(last_sync_id)
`);
return {
status: "complete",
totalImported: totalItems,
recordsAdded: recordsAdded || 0,
recordsUpdated: recordsUpdated || 0,
incrementalUpdate,
lastSyncTime
};
} catch (error) {
outputProgress({
operation: `${incrementalUpdate ? 'Incremental' : 'Full'} purchase orders import failed`,
status: "error",
error: error.message,
});
throw error;
}
}
module.exports = importPurchaseOrders;

View File

@@ -0,0 +1,82 @@
// Split into inserts and updates
const insertsAndUpdates = batch.reduce((acc, po) => {
const key = `${po.po_id}-${po.pid}`;
if (existingPOMap.has(key)) {
const existing = existingPOMap.get(key);
// Check if any values are different
const hasChanges = columnNames.some(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
// Special handling for numbers to avoid type coercion issues
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001; // Allow for tiny floating point differences
}
// Special handling for receiving_history JSON
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
});
if (hasChanges) {
console.log(`PO line changed: ${key}`, {
po_id: po.po_id,
pid: po.pid,
changes: columnNames.filter(col => {
const newVal = po[col] ?? null;
const oldVal = existing[col] ?? null;
if (typeof newVal === 'number' && typeof oldVal === 'number') {
return Math.abs(newVal - oldVal) > 0.00001;
}
if (col === 'receiving_history') {
return JSON.stringify(newVal) !== JSON.stringify(oldVal);
}
return newVal !== oldVal;
})
});
acc.updates.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
} else {
console.log(`New PO line: ${key}`);
acc.inserts.push({
po_id: po.po_id,
pid: po.pid,
values: columnNames.map(col => po[col] ?? null)
});
}
return acc;
}, { inserts: [], updates: [] });
// Handle inserts
if (insertsAndUpdates.inserts.length > 0) {
const insertPlaceholders = Array(insertsAndUpdates.inserts.length).fill(placeholderGroup).join(",");
const insertResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${insertPlaceholders}
`, insertsAndUpdates.inserts.map(i => i.values).flat());
recordsAdded += insertResult[0].affectedRows;
}
// Handle updates
if (insertsAndUpdates.updates.length > 0) {
const updatePlaceholders = Array(insertsAndUpdates.updates.length).fill(placeholderGroup).join(",");
const updateResult = await localConnection.query(`
INSERT INTO purchase_orders (${columnNames.join(",")})
VALUES ${updatePlaceholders}
ON DUPLICATE KEY UPDATE
${columnNames
.filter(col => col !== "po_id" && col !== "pid")
.map(col => `${col} = VALUES(${col})`)
.join(",")};
`, insertsAndUpdates.updates.map(u => u.values).flat());
// Each update affects 2 rows in affectedRows, so we divide by 2 to get actual count
recordsUpdated += insertsAndUpdates.updates.length;
}

View File

@@ -0,0 +1,88 @@
const mysql = require("mysql2/promise");
const { Client } = require("ssh2");
const dotenv = require("dotenv");
const path = require("path");
// Helper function to setup SSH tunnel
async function setupSshTunnel(sshConfig) {
return new Promise((resolve, reject) => {
const ssh = new Client();
ssh.on('error', (err) => {
console.error('SSH connection error:', err);
});
ssh.on('end', () => {
console.log('SSH connection ended normally');
});
ssh.on('close', () => {
console.log('SSH connection closed');
});
ssh
.on("ready", () => {
ssh.forwardOut(
"127.0.0.1",
0,
sshConfig.prodDbConfig.host,
sshConfig.prodDbConfig.port,
async (err, stream) => {
if (err) reject(err);
resolve({ ssh, stream });
}
);
})
.connect(sshConfig.ssh);
});
}
// Helper function to setup database connections
async function setupConnections(sshConfig) {
const tunnel = await setupSshTunnel(sshConfig);
const prodConnection = await mysql.createConnection({
...sshConfig.prodDbConfig,
stream: tunnel.stream,
});
const localConnection = await mysql.createPool({
...sshConfig.localDbConfig,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0
});
return {
ssh: tunnel.ssh,
prodConnection,
localConnection
};
}
// Helper function to close connections
async function closeConnections(connections) {
const { ssh, prodConnection, localConnection } = connections;
try {
if (prodConnection) await prodConnection.end();
if (localConnection) await localConnection.end();
// Wait a bit for any pending data to be written before closing SSH
await new Promise(resolve => setTimeout(resolve, 100));
if (ssh) {
ssh.on('close', () => {
console.log('SSH connection closed cleanly');
});
ssh.end();
}
} catch (err) {
console.error('Error during cleanup:', err);
}
}
module.exports = {
setupConnections,
closeConnections
};

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateBrandMetrics(startTime, totalProducts, processedCount) {
async function calculateBrandMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Calculating brand metrics',
current: Math.floor(totalProducts * 0.95),
status: 'cancelled',
operation: 'Brand metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)),
percentage: '95'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting brand metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Calculate brand metrics with optimized queries
@@ -111,6 +125,20 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount) {
last_calculated_at = CURRENT_TIMESTAMP
`);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Brand metrics calculated, starting time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate brand time-based metrics with optimized query
await connection.query(`
INSERT INTO brand_time_metrics (
@@ -170,10 +198,27 @@ async function calculateBrandMetrics(startTime, totalProducts, processedCount) {
avg_margin = VALUES(avg_margin)
`);
return Math.floor(totalProducts * 0.98);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Brand time-based metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating brand metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateBrandMetrics;

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateCategoryMetrics(startTime, totalProducts, processedCount) {
async function calculateCategoryMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Calculating category metrics',
current: Math.floor(totalProducts * 0.85),
status: 'cancelled',
operation: 'Category metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.85), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.85)),
percentage: '85'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting category metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// First, calculate base category metrics
@@ -44,6 +58,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
last_calculated_at = VALUES(last_calculated_at)
`);
processedCount = Math.floor(totalProducts * 0.90);
outputProgress({
status: 'running',
operation: 'Base category metrics calculated, updating with margin data',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Then update with margin and turnover data
await connection.query(`
WITH category_sales AS (
@@ -68,6 +96,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
cm.last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.95);
outputProgress({
status: 'running',
operation: 'Margin data updated, calculating growth rates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Finally update growth rates
await connection.query(`
WITH current_period AS (
@@ -112,6 +154,20 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
WHERE cp.cat_id IS NOT NULL OR pp.cat_id IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.97);
outputProgress({
status: 'running',
operation: 'Growth rates calculated, updating time-based metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate time-based metrics
await connection.query(`
INSERT INTO category_time_metrics (
@@ -157,50 +213,27 @@ async function calculateCategoryMetrics(startTime, totalProducts, processedCount
turnover_rate = VALUES(turnover_rate)
`);
// Calculate sales metrics for different time periods
const periods = [30, 90, 180, 365];
for (const days of periods) {
await connection.query(`
INSERT INTO category_sales_metrics (
category_id,
brand,
period_start,
period_end,
avg_daily_sales,
total_sold,
num_products,
avg_price,
last_calculated_at
)
SELECT
pc.cat_id as category_id,
COALESCE(p.brand, 'Unbranded') as brand,
DATE_SUB(CURDATE(), INTERVAL ? DAY) as period_start,
CURDATE() as period_end,
COALESCE(SUM(o.quantity), 0) / ? as avg_daily_sales,
COALESCE(SUM(o.quantity), 0) as total_sold,
COUNT(DISTINCT p.pid) as num_products,
COALESCE(AVG(o.price), 0) as avg_price,
NOW() as last_calculated_at
FROM product_categories pc
JOIN products p ON pc.pid = p.pid
LEFT JOIN orders o ON p.pid = o.pid
AND o.date >= DATE_SUB(CURDATE(), INTERVAL ? DAY)
AND o.canceled = false
GROUP BY pc.cat_id, p.brand
ON DUPLICATE KEY UPDATE
avg_daily_sales = VALUES(avg_daily_sales),
total_sold = VALUES(total_sold),
num_products = VALUES(num_products),
avg_price = VALUES(avg_price),
last_calculated_at = NOW()
`, [days, days, days]);
}
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Time-based metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return Math.floor(totalProducts * 0.9);
return processedCount;
} catch (error) {
logError(error, 'Error calculating category metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateCategoryMetrics;

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateFinancialMetrics(startTime, totalProducts, processedCount) {
async function calculateFinancialMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Calculating financial metrics',
current: Math.floor(totalProducts * 0.6),
status: 'cancelled',
operation: 'Financial metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.6), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.6)),
percentage: '60'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting financial metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Calculate financial metrics with optimized query
@@ -48,6 +62,20 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
END
`);
processedCount = Math.floor(totalProducts * 0.65);
outputProgress({
status: 'running',
operation: 'Base financial metrics calculated, updating time aggregates',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Update time-based aggregates with optimized query
await connection.query(`
WITH monthly_financials AS (
@@ -78,10 +106,27 @@ async function calculateFinancialMetrics(startTime, totalProducts, processedCoun
END
`);
return Math.floor(totalProducts * 0.7);
processedCount = Math.floor(totalProducts * 0.70);
outputProgress({
status: 'running',
operation: 'Time-based aggregates updated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating financial metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateFinancialMetrics;

View File

@@ -1,4 +1,4 @@
const { outputProgress, logError } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
// Helper function to handle NaN and undefined values
@@ -9,24 +9,38 @@ function sanitizeValue(value) {
return value;
}
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0) {
async function calculateProductMetrics(startTime, totalProducts, processedCount = 0, isCancelled = false) {
const connection = await getConnection();
try {
// Skip flags are inherited from the parent scope
const SKIP_PRODUCT_BASE_METRICS = 0;
const SKIP_PRODUCT_TIME_AGGREGATES =0;
const SKIP_PRODUCT_TIME_AGGREGATES = 0;
if (isCancelled) {
outputProgress({
status: 'cancelled',
operation: 'Product metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
// Calculate base product metrics
if (!SKIP_PRODUCT_BASE_METRICS) {
outputProgress({
status: 'running',
operation: 'Calculating base product metrics',
current: Math.floor(totalProducts * 0.2),
operation: 'Starting base product metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.2), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.2)),
percentage: '20'
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Calculate base metrics
@@ -72,8 +86,17 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`);
processedCount = Math.floor(totalProducts * 0.4);
outputProgress({
status: 'running',
operation: 'Base product metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
} else {
console.log('Skipping base product metrics calculation');
processedCount = Math.floor(totalProducts * 0.4);
outputProgress({
status: 'running',
@@ -83,21 +106,23 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: '40'
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
}
if (isCancelled) return processedCount;
// Calculate product time aggregates
if (!SKIP_PRODUCT_TIME_AGGREGATES) {
outputProgress({
status: 'running',
operation: 'Calculating product time aggregates',
current: Math.floor(totalProducts * 0.4),
operation: 'Starting product time aggregates calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.4), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.4)),
percentage: '40'
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Calculate time-based aggregates
@@ -151,8 +176,17 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
`);
processedCount = Math.floor(totalProducts * 0.6);
outputProgress({
status: 'running',
operation: 'Product time aggregates calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
} else {
console.log('Skipping product time aggregates calculation');
processedCount = Math.floor(totalProducts * 0.6);
outputProgress({
status: 'running',
@@ -162,11 +196,14 @@ async function calculateProductMetrics(startTime, totalProducts, processedCount
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: '60'
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
}
return processedCount;
} catch (error) {
logError(error, 'Error calculating product metrics');
throw error;
} finally {
if (connection) {
connection.release();

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateSalesForecasts(startTime, totalProducts, processedCount) {
async function calculateSalesForecasts(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Calculating sales forecasts',
current: Math.floor(totalProducts * 0.98),
status: 'cancelled',
operation: 'Sales forecasts calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.98), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.98)),
percentage: '98'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting sales forecasts calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// First, create a temporary table for forecast dates
@@ -42,6 +56,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
) numbers
`);
processedCount = Math.floor(totalProducts * 0.92);
outputProgress({
status: 'running',
operation: 'Forecast dates prepared, calculating daily sales stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for daily sales stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_daily_sales AS
@@ -57,6 +85,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY o.pid, DAYOFWEEK(o.date)
`);
processedCount = Math.floor(totalProducts * 0.94);
outputProgress({
status: 'running',
operation: 'Daily sales stats calculated, preparing product stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for product stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_product_stats AS
@@ -68,6 +110,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY pid
`);
processedCount = Math.floor(totalProducts * 0.96);
outputProgress({
status: 'running',
operation: 'Product stats prepared, calculating product-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate product-level forecasts
await connection.query(`
INSERT INTO sales_forecasts (
@@ -116,6 +172,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
last_calculated_at = NOW()
`);
processedCount = Math.floor(totalProducts * 0.98);
outputProgress({
status: 'running',
operation: 'Product forecasts calculated, preparing category stats',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Create temporary table for category stats
await connection.query(`
CREATE TEMPORARY TABLE IF NOT EXISTS temp_category_sales AS
@@ -142,6 +212,20 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
GROUP BY cat_id
`);
processedCount = Math.floor(totalProducts * 0.99);
outputProgress({
status: 'running',
operation: 'Category stats prepared, calculating category-level forecasts',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Calculate category-level forecasts
await connection.query(`
INSERT INTO category_forecasts (
@@ -199,10 +283,27 @@ async function calculateSalesForecasts(startTime, totalProducts, processedCount)
DROP TEMPORARY TABLE IF EXISTS temp_category_stats;
`);
return Math.floor(totalProducts * 1.0);
processedCount = Math.floor(totalProducts * 1.0);
outputProgress({
status: 'running',
operation: 'Category forecasts calculated and temporary tables cleaned up',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating sales forecasts');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateSalesForecasts;

View File

@@ -1,18 +1,32 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateTimeAggregates(startTime, totalProducts, processedCount) {
async function calculateTimeAggregates(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Calculating time aggregates',
current: Math.floor(totalProducts * 0.95),
status: 'cancelled',
operation: 'Time aggregates calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.95), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.95)),
percentage: '95'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting time aggregates calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// Initial insert of time-based aggregates
@@ -109,6 +123,20 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount)
profit_margin = VALUES(profit_margin)
`);
processedCount = Math.floor(totalProducts * 0.60);
outputProgress({
status: 'running',
operation: 'Base time aggregates calculated, updating financial metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Update with financial metrics
await connection.query(`
UPDATE product_time_aggregates pta
@@ -136,7 +164,22 @@ async function calculateTimeAggregates(startTime, totalProducts, processedCount)
END
`);
return Math.floor(totalProducts * 0.65);
processedCount = Math.floor(totalProducts * 0.65);
outputProgress({
status: 'running',
operation: 'Financial metrics updated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating time aggregates');
throw error;
} finally {
if (connection) {
connection.release();

View File

@@ -2,8 +2,15 @@ const fs = require('fs');
const path = require('path');
// Helper function to format elapsed time
function formatElapsedTime(startTime) {
const elapsed = Date.now() - startTime;
function formatElapsedTime(elapsed) {
// If elapsed is a timestamp, convert to elapsed milliseconds
if (elapsed instanceof Date || elapsed > 1000000000000) {
elapsed = Date.now() - elapsed;
} else {
// If elapsed is in seconds, convert to milliseconds
elapsed = elapsed * 1000;
}
const seconds = Math.floor(elapsed / 1000);
const minutes = Math.floor(seconds / 60);
const hours = Math.floor(minutes / 60);

View File

@@ -1,18 +1,32 @@
const { outputProgress } = require('./utils/progress');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate, logError } = require('./utils/progress');
const { getConnection } = require('./utils/db');
async function calculateVendorMetrics(startTime, totalProducts, processedCount) {
async function calculateVendorMetrics(startTime, totalProducts, processedCount, isCancelled = false) {
const connection = await getConnection();
try {
if (isCancelled) {
outputProgress({
status: 'running',
operation: 'Ensuring vendors exist in vendor_details',
current: Math.floor(totalProducts * 0.7),
status: 'cancelled',
operation: 'Vendor metrics calculation cancelled',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.7), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.7)),
percentage: '70'
remaining: null,
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
}
outputProgress({
status: 'running',
operation: 'Starting vendor metrics calculation',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
// First ensure all vendors exist in vendor_details
@@ -27,17 +41,20 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount)
WHERE vendor IS NOT NULL
`);
processedCount = Math.floor(totalProducts * 0.8);
outputProgress({
status: 'running',
operation: 'Calculating vendor metrics',
current: Math.floor(totalProducts * 0.8),
operation: 'Vendor details updated, calculating metrics',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, Math.floor(totalProducts * 0.8), totalProducts),
rate: calculateRate(startTime, Math.floor(totalProducts * 0.8)),
percentage: '80'
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
if (isCancelled) return processedCount;
// Now calculate vendor metrics
await connection.query(`
INSERT INTO vendor_metrics (
@@ -130,10 +147,27 @@ async function calculateVendorMetrics(startTime, totalProducts, processedCount)
last_calculated_at = VALUES(last_calculated_at)
`);
return Math.floor(totalProducts * 0.9);
processedCount = Math.floor(totalProducts * 0.9);
outputProgress({
status: 'running',
operation: 'Vendor metrics calculated',
current: processedCount,
total: totalProducts,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, processedCount, totalProducts),
rate: calculateRate(startTime, processedCount),
percentage: ((processedCount / totalProducts) * 100).toFixed(1)
});
return processedCount;
} catch (error) {
logError(error, 'Error calculating vendor metrics');
throw error;
} finally {
if (connection) {
connection.release();
}
}
}
module.exports = calculateVendorMetrics;

View File

@@ -3,6 +3,7 @@ const path = require('path');
const csv = require('csv-parse');
const mysql = require('mysql2/promise');
const dotenv = require('dotenv');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Get test limits from environment variables
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
@@ -106,20 +107,19 @@ async function countRows(filePath) {
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed; // rows per second
const remaining = (total - current) / rate;
function updateProgress(current, total, operation, startTime, added = 0, updated = 0, skipped = 0) {
outputProgress({
status: 'running',
operation,
current,
total,
rate,
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((current / total) * 100).toFixed(1)
rate: calculateRate(startTime, current),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
percentage: ((current / total) * 100).toFixed(1),
added,
updated,
skipped
});
}
@@ -474,7 +474,7 @@ async function importProducts(pool, filePath) {
// Update progress every 100ms to avoid console flooding
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Products import', startTime);
updateProgress(rowCount, totalRows, 'Products import', startTime, added, updated, 0);
lastUpdate = now;
}
@@ -678,7 +678,7 @@ async function importOrders(pool, filePath) {
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Orders import', startTime);
updateProgress(rowCount, totalRows, 'Orders import', startTime, added, updated, skipped);
lastUpdate = now;
}
@@ -845,7 +845,7 @@ async function importPurchaseOrders(pool, filePath) {
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime);
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime, added, updated, skipped);
lastUpdate = now;
}

View File

@@ -0,0 +1,180 @@
const path = require('path');
const fs = require('fs');
const axios = require('axios');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
require('dotenv').config({ path: path.resolve(__dirname, '..', '.env') });
const FILES = [
{
name: '39f2x83-products.csv',
url: process.env.PRODUCTS_CSV_URL
},
{
name: '39f2x83-orders.csv',
url: process.env.ORDERS_CSV_URL
},
{
name: '39f2x83-purchase_orders.csv',
url: process.env.PURCHASE_ORDERS_CSV_URL
}
];
let isCancelled = false;
function cancelUpdate() {
isCancelled = true;
outputProgress({
status: 'cancelled',
operation: 'CSV update cancelled',
current: 0,
total: FILES.length,
elapsed: null,
remaining: null,
rate: 0
});
}
async function downloadFile(file, index, startTime) {
if (isCancelled) return;
const csvDir = path.join(__dirname, '../csv');
if (!fs.existsSync(csvDir)) {
fs.mkdirSync(csvDir, { recursive: true });
}
const writer = fs.createWriteStream(path.join(csvDir, file.name));
try {
const response = await axios({
url: file.url,
method: 'GET',
responseType: 'stream'
});
const totalLength = response.headers['content-length'];
let downloadedLength = 0;
let lastProgressUpdate = Date.now();
const PROGRESS_INTERVAL = 1000; // Update progress every second
response.data.on('data', (chunk) => {
if (isCancelled) {
writer.end();
return;
}
downloadedLength += chunk.length;
// Update progress based on time interval
const now = Date.now();
if (now - lastProgressUpdate >= PROGRESS_INTERVAL) {
const progress = (downloadedLength / totalLength) * 100;
outputProgress({
status: 'running',
operation: `Downloading ${file.name}`,
current: index + (downloadedLength / totalLength),
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, index + (downloadedLength / totalLength), FILES.length),
rate: calculateRate(startTime, index + (downloadedLength / totalLength)),
percentage: progress.toFixed(1),
file_progress: {
name: file.name,
downloaded: downloadedLength,
total: totalLength,
percentage: progress.toFixed(1)
}
});
lastProgressUpdate = now;
}
});
response.data.pipe(writer);
return new Promise((resolve, reject) => {
writer.on('finish', resolve);
writer.on('error', reject);
});
} catch (error) {
fs.unlinkSync(path.join(csvDir, file.name));
throw error;
}
}
// Main function to update all files
async function updateFiles() {
const startTime = Date.now();
outputProgress({
status: 'running',
operation: 'Starting CSV update',
current: 0,
total: FILES.length,
elapsed: '0s',
remaining: null,
rate: 0,
percentage: '0'
});
try {
for (let i = 0; i < FILES.length; i++) {
if (isCancelled) {
return;
}
const file = FILES[i];
await downloadFile(file, i, startTime);
outputProgress({
status: 'running',
operation: 'CSV update in progress',
current: i + 1,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, i + 1, FILES.length),
rate: calculateRate(startTime, i + 1),
percentage: (((i + 1) / FILES.length) * 100).toFixed(1)
});
}
outputProgress({
status: 'complete',
operation: 'CSV update complete',
current: FILES.length,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: '0s',
rate: calculateRate(startTime, FILES.length),
percentage: '100'
});
} catch (error) {
outputProgress({
status: 'error',
operation: 'CSV update failed',
error: error.message,
current: 0,
total: FILES.length,
elapsed: formatElapsedTime(startTime),
remaining: null,
rate: 0
});
throw error;
}
}
// Run the update only if this is the main module
if (require.main === module) {
updateFiles().catch((error) => {
console.error('Error updating CSV files:', error);
process.exit(1);
});
}
// Export the functions needed by the route
module.exports = {
updateFiles,
cancelUpdate
};

View File

@@ -40,6 +40,7 @@ const CONFIG_TABLES = [
'sales_velocity_config',
'abc_classification_config',
'safety_stock_config',
'sales_seasonality',
'turnover_config'
];
@@ -155,7 +156,7 @@ async function resetDatabase() {
SELECT GROUP_CONCAT(table_name) as tables
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name != 'users'
AND table_name NOT IN ('users', 'import_history')
`);
if (!tables[0].tables) {

View File

@@ -12,10 +12,16 @@ const dbConfig = {
};
function outputProgress(data) {
if (!data.status) {
data = {
status: 'running',
...data
};
}
console.log(JSON.stringify(data));
}
// Explicitly define all metrics-related tables
// Explicitly define all metrics-related tables in dependency order
const METRICS_TABLES = [
'brand_metrics',
'brand_time_metrics',
@@ -26,7 +32,6 @@ const METRICS_TABLES = [
'product_metrics',
'product_time_aggregates',
'sales_forecasts',
'sales_seasonality',
'temp_purchase_metrics',
'temp_sales_metrics',
'vendor_metrics', //before vendor_details for foreign key
@@ -34,56 +39,279 @@ const METRICS_TABLES = [
'vendor_details'
];
// Config tables that must exist
const CONFIG_TABLES = [
'stock_thresholds',
'lead_time_thresholds',
'sales_velocity_config',
'abc_classification_config',
'safety_stock_config',
'turnover_config'
];
// Split SQL into individual statements
function splitSQLStatements(sql) {
sql = sql.replace(/\r\n/g, '\n');
let statements = [];
let currentStatement = '';
let inString = false;
let stringChar = '';
// Core tables that must exist
const REQUIRED_CORE_TABLES = [
'products',
'orders',
'purchase_orders'
];
for (let i = 0; i < sql.length; i++) {
const char = sql[i];
const nextChar = sql[i + 1] || '';
if ((char === "'" || char === '"') && sql[i - 1] !== '\\') {
if (!inString) {
inString = true;
stringChar = char;
} else if (char === stringChar) {
inString = false;
}
}
if (!inString && char === '-' && nextChar === '-') {
while (i < sql.length && sql[i] !== '\n') i++;
continue;
}
if (!inString && char === '/' && nextChar === '*') {
i += 2;
while (i < sql.length && (sql[i] !== '*' || sql[i + 1] !== '/')) i++;
i++;
continue;
}
if (!inString && char === ';') {
if (currentStatement.trim()) {
statements.push(currentStatement.trim());
}
currentStatement = '';
} else {
currentStatement += char;
}
}
if (currentStatement.trim()) {
statements.push(currentStatement.trim());
}
return statements;
}
async function resetMetrics() {
let connection;
try {
outputProgress({
operation: 'Starting metrics reset',
message: 'Connecting to database...'
});
connection = await mysql.createConnection(dbConfig);
await connection.beginTransaction();
// Drop all metrics tables
for (const table of METRICS_TABLES) {
console.log(`Dropping table: ${table}`);
// First verify current state
const [initialTables] = await connection.query(`
SELECT TABLE_NAME as name
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
`, [METRICS_TABLES]);
outputProgress({
operation: 'Initial state',
message: `Found ${initialTables.length} existing metrics tables: ${initialTables.map(t => t.name).join(', ')}`
});
// Disable foreign key checks at the start
await connection.query('SET FOREIGN_KEY_CHECKS = 0');
// Drop all metrics tables in reverse order to handle dependencies
outputProgress({
operation: 'Dropping metrics tables',
message: 'Removing existing metrics tables...'
});
for (const table of [...METRICS_TABLES].reverse()) {
try {
await connection.query(`DROP TABLE IF EXISTS ${table}`);
console.log(`Successfully dropped: ${table}`);
// Verify the table was actually dropped
const [checkDrop] = await connection.query(`
SELECT COUNT(*) as count
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
`, [table]);
if (checkDrop[0].count > 0) {
throw new Error(`Failed to drop table ${table} - table still exists`);
}
outputProgress({
operation: 'Table dropped',
message: `Successfully dropped table: ${table}`
});
} catch (err) {
console.error(`Error dropping ${table}:`, err.message);
outputProgress({
status: 'error',
operation: 'Drop table error',
message: `Error dropping table ${table}: ${err.message}`
});
throw err;
}
}
// Recreate all metrics tables from schema
const schemaSQL = fs.readFileSync(path.resolve(__dirname, '../db/metrics-schema.sql'), 'utf8');
await connection.query(schemaSQL);
console.log('All metrics tables recreated successfully');
// Verify all tables were dropped
const [afterDrop] = await connection.query(`
SELECT TABLE_NAME as name
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
`, [METRICS_TABLES]);
if (afterDrop.length > 0) {
throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.map(t => t.name).join(', ')}`);
}
// Read metrics schema
outputProgress({
operation: 'Reading schema',
message: 'Loading metrics schema file...'
});
const schemaPath = path.resolve(__dirname, '../db/metrics-schema.sql');
if (!fs.existsSync(schemaPath)) {
throw new Error(`Schema file not found at: ${schemaPath}`);
}
const schemaSQL = fs.readFileSync(schemaPath, 'utf8');
const statements = splitSQLStatements(schemaSQL);
outputProgress({
operation: 'Schema loaded',
message: `Found ${statements.length} SQL statements to execute`
});
// Execute schema statements
for (let i = 0; i < statements.length; i++) {
const stmt = statements[i];
try {
await connection.query(stmt);
// Check for warnings
const [warnings] = await connection.query('SHOW WARNINGS');
if (warnings && warnings.length > 0) {
outputProgress({
status: 'warning',
operation: 'SQL Warning',
message: {
statement: i + 1,
warnings: warnings
}
});
}
// If this is a CREATE TABLE statement, verify the table was created
if (stmt.trim().toLowerCase().startsWith('create table')) {
const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?`?(\w+)`?/i)?.[1];
if (tableName) {
const [checkCreate] = await connection.query(`
SELECT TABLE_NAME as name, CREATE_TIME as created
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = ?
`, [tableName]);
if (checkCreate.length === 0) {
throw new Error(`Failed to create table ${tableName} - table does not exist after CREATE statement`);
}
outputProgress({
operation: 'Table created',
message: `Successfully created table: ${tableName} at ${checkCreate[0].created}`
});
}
}
outputProgress({
operation: 'SQL Progress',
message: {
statement: i + 1,
total: statements.length,
preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : '')
}
});
} catch (sqlError) {
outputProgress({
status: 'error',
operation: 'SQL Error',
message: {
error: sqlError.message,
sqlState: sqlError.sqlState,
errno: sqlError.errno,
statement: stmt,
statementNumber: i + 1
}
});
throw sqlError;
}
}
// Re-enable foreign key checks after all tables are created
await connection.query('SET FOREIGN_KEY_CHECKS = 1');
// Verify metrics tables were created
outputProgress({
operation: 'Verifying metrics tables',
message: 'Checking all metrics tables were created...'
});
const [metricsTablesResult] = await connection.query(`
SELECT
TABLE_NAME as name,
TABLE_ROWS as \`rows\`,
CREATE_TIME as created
FROM information_schema.tables
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME IN (?)
`, [METRICS_TABLES]);
outputProgress({
operation: 'Tables found',
message: `Found ${metricsTablesResult.length} tables: ${metricsTablesResult.map(t =>
`${t.name} (created: ${t.created})`
).join(', ')}`
});
const existingMetricsTables = metricsTablesResult.map(t => t.name);
const missingMetricsTables = METRICS_TABLES.filter(t => !existingMetricsTables.includes(t));
if (missingMetricsTables.length > 0) {
// Do one final check of the actual tables
const [finalCheck] = await connection.query('SHOW TABLES');
outputProgress({
operation: 'Final table check',
message: `All database tables: ${finalCheck.map(t => Object.values(t)[0]).join(', ')}`
});
throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`);
}
await connection.commit();
console.log('All metrics tables reset successfully');
outputProgress({
status: 'complete',
operation: 'Reset complete',
message: 'All metrics tables have been reset successfully'
});
} catch (error) {
outputProgress({
status: 'error',
operation: 'Reset failed',
message: error.message,
stack: error.stack
});
if (connection) {
await connection.rollback();
// Make sure to re-enable foreign key checks even if there's an error
await connection.query('SET FOREIGN_KEY_CHECKS = 1').catch(() => {});
}
console.error('Error resetting metrics:', error);
throw error;
} finally {
if (connection) {
// One final attempt to ensure foreign key checks are enabled
await connection.query('SET FOREIGN_KEY_CHECKS = 1').catch(() => {});
await connection.end();
}
}

View File

@@ -0,0 +1,180 @@
const readline = require('readline');
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
const question = (query) => new Promise((resolve) => rl.question(query, resolve));
async function loadScript(name) {
try {
return await require(name);
} catch (error) {
console.error(`Failed to load script ${name}:`, error);
return null;
}
}
async function runWithTimeout(fn) {
return new Promise((resolve, reject) => {
// Create a child process for the script
const child = require('child_process').fork(fn.toString(), [], {
stdio: 'inherit'
});
child.on('exit', (code) => {
if (code === 0) {
resolve();
} else {
reject(new Error(`Script exited with code ${code}`));
}
});
child.on('error', (err) => {
reject(err);
});
});
}
function clearScreen() {
process.stdout.write('\x1Bc');
}
const scripts = {
'Import Scripts': {
'1': { name: 'Full Import From Production', path: './import-from-prod' },
'2': { name: 'Individual Import Scripts ▸', submenu: {
'1': { name: 'Import Orders', path: './import/orders', key: 'importOrders' },
'2': { name: 'Import Products', path: './import/products', key: 'importProducts' },
'3': { name: 'Import Purchase Orders', path: './import/purchase-orders' },
'4': { name: 'Import Categories', path: './import/categories' },
'b': { name: 'Back to Main Menu' }
}}
},
'Metrics': {
'3': { name: 'Calculate All Metrics', path: './calculate-metrics' },
'4': { name: 'Individual Metric Scripts ▸', submenu: {
'1': { name: 'Brand Metrics', path: './metrics/brand-metrics' },
'2': { name: 'Category Metrics', path: './metrics/category-metrics' },
'3': { name: 'Financial Metrics', path: './metrics/financial-metrics' },
'4': { name: 'Product Metrics', path: './metrics/product-metrics' },
'5': { name: 'Sales Forecasts', path: './metrics/sales-forecasts' },
'6': { name: 'Time Aggregates', path: './metrics/time-aggregates' },
'7': { name: 'Vendor Metrics', path: './metrics/vendor-metrics' },
'b': { name: 'Back to Main Menu' }
}}
},
'Database Management': {
'5': { name: 'Test Production Connection', path: './test-prod-connection' }
},
'Reset Scripts': {
'6': { name: 'Reset Database', path: './reset-db' },
'7': { name: 'Reset Metrics', path: './reset-metrics' }
}
};
let lastRun = null;
async function displayMenu(menuItems, title = 'Inventory Management Script Runner') {
clearScreen();
console.log(`\n${title}\n`);
for (const [category, items] of Object.entries(menuItems)) {
console.log(`\n${category}:`);
Object.entries(items).forEach(([key, script]) => {
console.log(`${key}. ${script.name}`);
});
}
if (lastRun) {
console.log('\nQuick Access:');
console.log(`r. Repeat Last Script (${lastRun.name})`);
}
console.log('\nq. Quit\n');
}
async function handleSubmenu(submenu, title) {
while (true) {
await displayMenu({"Individual Scripts": submenu}, title);
const choice = await question('Select an option (or b to go back): ');
if (choice.toLowerCase() === 'b') {
return null;
}
if (submenu[choice]) {
return submenu[choice];
}
console.log('Invalid selection. Please try again.');
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
async function runScript(script) {
console.log(`\nRunning: ${script.name}`);
try {
const scriptPath = require.resolve(script.path);
await runWithTimeout(scriptPath);
console.log('\nScript completed successfully');
lastRun = script;
} catch (error) {
console.error('\nError running script:', error);
}
await question('\nPress Enter to continue...');
}
async function main() {
while (true) {
await displayMenu(scripts);
const choice = await question('Select an option: ');
if (choice.toLowerCase() === 'q') {
break;
}
if (choice.toLowerCase() === 'r' && lastRun) {
await runScript(lastRun);
continue;
}
let selectedScript = null;
for (const category of Object.values(scripts)) {
if (category[choice]) {
selectedScript = category[choice];
break;
}
}
if (!selectedScript) {
console.log('Invalid selection. Please try again.');
await new Promise(resolve => setTimeout(resolve, 1000));
continue;
}
if (selectedScript.submenu) {
const submenuChoice = await handleSubmenu(
selectedScript.submenu,
selectedScript.name
);
if (submenuChoice && submenuChoice.path) {
await runScript(submenuChoice);
}
} else if (selectedScript.path) {
await runScript(selectedScript);
}
}
rl.close();
process.exit(0);
}
if (require.main === module) {
main().catch(error => {
console.error('Fatal error:', error);
process.exit(1);
});
}

View File

@@ -1,167 +0,0 @@
const fs = require('fs');
const path = require('path');
const https = require('https');
// Configuration
const FILES = [
{
name: '39f2x83-products.csv',
url: 'https://feeds.acherryontop.com/39f2x83-products.csv'
},
{
name: '39f2x83-orders.csv',
url: 'https://feeds.acherryontop.com/39f2x83-orders.csv'
},
{
name: '39f2x83-purchase_orders.csv',
url: 'https://feeds.acherryontop.com/39f2x83-purchase_orders.csv'
}
];
const CSV_DIR = path.join(__dirname, '..', 'csv');
// Ensure CSV directory exists
if (!fs.existsSync(CSV_DIR)) {
fs.mkdirSync(CSV_DIR, { recursive: true });
}
// Function to download a file
function downloadFile(url, filePath) {
return new Promise((resolve, reject) => {
const file = fs.createWriteStream(filePath);
https.get(url, response => {
if (response.statusCode !== 200) {
reject(new Error(`Failed to download: ${response.statusCode} ${response.statusMessage}`));
return;
}
const totalSize = parseInt(response.headers['content-length'], 10);
let downloadedSize = 0;
let lastProgressUpdate = Date.now();
const startTime = Date.now();
response.on('data', chunk => {
downloadedSize += chunk.length;
const now = Date.now();
// Update progress at most every 100ms to avoid console flooding
if (now - lastProgressUpdate > 100) {
const elapsed = (now - startTime) / 1000;
const rate = downloadedSize / elapsed;
const remaining = (totalSize - downloadedSize) / rate;
console.log(JSON.stringify({
status: 'running',
operation: `Downloading ${path.basename(filePath)}`,
current: downloadedSize,
total: totalSize,
rate: (rate / 1024 / 1024).toFixed(2), // MB/s
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((downloadedSize / totalSize) * 100).toFixed(1)
}));
lastProgressUpdate = now;
}
});
response.pipe(file);
file.on('finish', () => {
console.log(JSON.stringify({
status: 'running',
operation: `Completed ${path.basename(filePath)}`,
current: totalSize,
total: totalSize,
percentage: '100'
}));
file.close();
resolve();
});
}).on('error', error => {
fs.unlink(filePath, () => {}); // Delete the file if download failed
reject(error);
});
file.on('error', error => {
fs.unlink(filePath, () => {}); // Delete the file if there was an error
reject(error);
});
});
}
// Helper function to format duration
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
const minutes = Math.floor(seconds / 60);
seconds = Math.round(seconds % 60);
return `${minutes}m ${seconds}s`;
}
// Main function to update all files
async function updateFiles() {
console.log(JSON.stringify({
status: 'running',
operation: 'Starting CSV file updates',
total: FILES.length,
current: 0
}));
for (let i = 0; i < FILES.length; i++) {
const file = FILES[i];
const filePath = path.join(CSV_DIR, file.name);
try {
// Delete existing file if it exists
if (fs.existsSync(filePath)) {
console.log(JSON.stringify({
status: 'running',
operation: `Removing existing file: ${file.name}`,
current: i,
total: FILES.length,
percentage: ((i / FILES.length) * 100).toFixed(1)
}));
fs.unlinkSync(filePath);
}
// Download new file
console.log(JSON.stringify({
status: 'running',
operation: `Starting download: ${file.name}`,
current: i,
total: FILES.length,
percentage: ((i / FILES.length) * 100).toFixed(1)
}));
await downloadFile(file.url, filePath);
console.log(JSON.stringify({
status: 'running',
operation: `Successfully updated ${file.name}`,
current: i + 1,
total: FILES.length,
percentage: (((i + 1) / FILES.length) * 100).toFixed(1)
}));
} catch (error) {
console.error(JSON.stringify({
status: 'error',
operation: `Error updating ${file.name}`,
error: error.message
}));
throw error;
}
}
console.log(JSON.stringify({
status: 'complete',
operation: 'CSV file update complete',
current: FILES.length,
total: FILES.length,
percentage: '100'
}));
}
// Run the update
updateFiles().catch(error => {
console.error(JSON.stringify({
error: `Update failed: ${error.message}`
}));
process.exit(1);
});

View File

@@ -13,7 +13,7 @@ import {
AlertDialogTitle,
AlertDialogTrigger,
} from "@/components/ui/alert-dialog";
import { Loader2, RefreshCw, Upload, X, Database } from "lucide-react";
import { Loader2, RefreshCw, X, Database } from "lucide-react";
import config from '../../config';
import { toast } from "sonner";
@@ -36,34 +36,20 @@ interface ImportProgress {
duration?: string;
}
interface ImportLimits {
products: number;
orders: number;
purchaseOrders: number;
}
export function DataManagement() {
const [isUpdating, setIsUpdating] = useState(false);
const [isImportingCSV, setIsImportingCSV] = useState(false);
const [isImportingProd, setIsImportingProd] = useState(false);
const [isResetting, setIsResetting] = useState(false);
const [updateProgress, setUpdateProgress] = useState<ImportProgress | null>(null);
const [importProgress, setImportProgress] = useState<ImportProgress | null>(null);
const [purchaseOrdersProgress, setPurchaseOrdersProgress] = useState<ImportProgress | null>(null);
const [resetProgress, setResetProgress] = useState<ImportProgress | null>(null);
const [eventSource, setEventSource] = useState<EventSource | null>(null);
const [] = useState<ImportLimits>({
products: 0,
orders: 0,
purchaseOrders: 0
});
const [isResettingMetrics, setIsResettingMetrics] = useState(false);
const [resetMetricsProgress, setResetMetricsProgress] = useState<ImportProgress | null>(null);
const [isCalculatingMetrics, setIsCalculatingMetrics] = useState(false);
const [metricsProgress, setMetricsProgress] = useState<ImportProgress | null>(null);
// Add states for completed operations
const [lastUpdateStatus, setLastUpdateStatus] = useState<ImportProgress | null>(null);
const [lastImportStatus, setLastImportStatus] = useState<ImportProgress | null>(null);
const [lastResetStatus, setLastResetStatus] = useState<ImportProgress | null>(null);
const [lastMetricsStatus, setLastMetricsStatus] = useState<ImportProgress | null>(null);
@@ -77,7 +63,7 @@ export function DataManagement() {
// Helper to check if any operation is running
const isAnyOperationRunning = () => {
return isUpdating || isImportingCSV || isImportingProd || isTestingConnection || isResetting || isCalculatingMetrics;
return isImportingProd || isTestingConnection || isResetting || isCalculatingMetrics || isResettingMetrics;
};
// Helper function to get progress bar color based on status
@@ -132,7 +118,7 @@ export function DataManagement() {
};
// Helper function to render progress
const renderProgress = (progress: any, operationType: 'update' | 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics') => {
const renderProgress = (progress: any, operationType: 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics') => {
if (!progress) return null;
const status = progress.status?.toLowerCase();
@@ -218,7 +204,7 @@ export function DataManagement() {
};
// Helper to connect to event source
const connectToEventSource = (type: 'update' | 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics') => {
const connectToEventSource = (type: 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics') => {
console.log(`Setting up EventSource for ${type}...`);
// Clean up existing connection first
@@ -257,8 +243,7 @@ export function DataManagement() {
// Try to reconnect via status check if the operation might still be running
if (
(type === 'calculate-metrics' && isCalculatingMetrics) ||
(type === 'import' && isImportingCSV) ||
(type === 'update' && isUpdating) ||
(type === 'import' && isImportingProd) ||
(type === 'reset' && isResetting) ||
(type === 'reset-metrics' && isResettingMetrics)
) {
@@ -295,7 +280,7 @@ export function DataManagement() {
};
const handleProgressUpdate = (
type: 'update' | 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics',
type: 'import' | 'reset' | 'reset-metrics' | 'calculate-metrics',
progressData: any,
source: EventSource
) => {
@@ -342,7 +327,6 @@ export function DataManagement() {
if (!otherProgress || otherProgress.status === 'complete' || otherProgress.status === 'error' || otherProgress.status === 'cancelled') {
source.close();
setEventSource(null);
setIsImportingCSV(false);
setIsImportingProd(false);
// Show appropriate toast based on final status
@@ -374,12 +358,6 @@ export function DataManagement() {
let operationName;
switch (type) {
case 'update':
setProgress = setUpdateProgress;
setLastStatus = setLastUpdateStatus;
setIsRunning = setIsUpdating;
operationName = 'Update';
break;
case 'reset':
setProgress = setResetProgress;
setLastStatus = setLastResetStatus;
@@ -435,7 +413,7 @@ export function DataManagement() {
}
};
const handleCancel = async (operation: 'update' | 'import' | 'reset' | 'calculate-metrics') => {
const handleCancel = async (operation: 'import' | 'reset' | 'calculate-metrics') => {
try {
const response = await fetch(`${config.apiUrl}/csv/cancel?operation=${operation}`, {
method: 'POST',
@@ -448,13 +426,9 @@ export function DataManagement() {
// Reset the appropriate state
if (operation === 'import') {
setIsImportingCSV(false);
setIsImportingProd(false);
setImportProgress(null);
setPurchaseOrdersProgress(null);
} else if (operation === 'update') {
setIsUpdating(false);
setUpdateProgress(null);
}
// ... other operation states ...
} catch (error) {
@@ -511,7 +485,7 @@ export function DataManagement() {
if (operation.includes('import')) {
console.log('Import is running');
setIsImportingCSV(true);
setIsImportingProd(true);
if (operation.includes('purchase orders')) {
setPurchaseOrdersProgress(importData.progress || importData);
} else {
@@ -520,13 +494,6 @@ export function DataManagement() {
if (!eventSource) {
connectToEventSource('import');
}
} else if (operation.includes('update')) {
console.log('Update is running');
setIsUpdating(true);
setUpdateProgress(importData.progress || importData);
if (!eventSource) {
connectToEventSource('update');
}
} else if (operation.includes('reset')) {
if (operation.includes('metrics')) {
console.log('Reset metrics is running');
@@ -549,8 +516,6 @@ export function DataManagement() {
const operation = (importData.lastStatus?.operation || '').toLowerCase();
if (operation.includes('import')) {
setLastImportStatus(importData.lastStatus);
} else if (operation.includes('update')) {
setLastUpdateStatus(importData.lastStatus);
} else if (operation.includes('reset')) {
if (operation.includes('metrics')) {
setLastResetMetricsStatus(importData.lastStatus);
@@ -569,39 +534,30 @@ export function DataManagement() {
checkStatus();
}, []);
const handleUpdateCSV = async () => {
setIsUpdating(true);
setUpdateProgress({ status: 'running', operation: 'Starting CSV update' });
const handleTestConnection = async () => {
setIsTestingConnection(true);
try {
connectToEventSource('update');
const response = await fetch(`${config.apiUrl}/csv/update`, {
method: 'POST',
const response = await fetch(`${config.apiUrl}/test-prod-connection`, {
credentials: 'include'
});
if (!response.ok) {
const data = await response.json().catch(() => ({}));
if (data.error === 'Import already in progress') {
return;
}
throw new Error(data.error || `Failed to update CSV files: ${response.status} ${response.statusText}`);
const data = await response.json();
if (response.ok) {
toast.success(`Successfully connected to production database. Found ${data.productCount.toLocaleString()} products.`);
} else {
throw new Error(data.error || 'Failed to connect to production database');
}
} catch (error) {
if (eventSource) {
eventSource.close();
setEventSource(null);
}
setIsUpdating(false);
setUpdateProgress(null);
toast.error(`CSV update failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
toast.error(`Connection test failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
} finally {
setIsTestingConnection(false);
}
};
const handleImportCSV = async () => {
setIsImportingCSV(true);
setImportProgress({ status: 'running', operation: 'Starting CSV import' });
const handleImportFromProd = async () => {
setIsImportingProd(true);
setImportProgress({ status: 'running', operation: 'Starting import from production' });
try {
connectToEventSource('import');
@@ -620,20 +576,93 @@ export function DataManagement() {
}
// Start new import
const response = await fetch(`${config.apiUrl}/csv/import`, {
const response = await fetch(`${config.apiUrl}/csv/import-from-prod`, {
method: 'POST',
credentials: 'include'
}).catch(error => {
console.log('Import request error (may be timeout):', error);
return null;
});
const data = await response.json();
if (!response.ok) {
throw new Error(data.error || 'Failed to start CSV import');
// If we got no response but have progress, assume it's still running
if (!response && (importProgress?.current || purchaseOrdersProgress?.current)) {
console.log('No response but import appears to be running, continuing...');
return;
}
// If we got a response, check if it indicates an actual error
if (response) {
const data = await response.json().catch(() => null);
if (!response.ok && data?.error && !data.error.includes('already in progress')) {
throw new Error(data.error || 'Failed to start production import');
}
}
} catch (error) {
toast.error(`CSV import failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
setIsImportingCSV(false);
// Only handle actual errors, not timeouts or connection issues
if (error instanceof Error && !error.message.includes('NetworkError') && !error.message.includes('Failed to fetch')) {
toast.error(`Production import failed: ${error.message}`);
setIsImportingProd(false);
setImportProgress(null);
setPurchaseOrdersProgress(null);
} else {
console.log('Ignoring network error, import may still be running:', error);
}
}
};
const handleCalculateMetrics = async () => {
setIsCalculatingMetrics(true);
setMetricsProgress({ status: 'running', operation: 'Starting metrics calculation' });
try {
connectToEventSource('calculate-metrics');
// First check if metrics calculation is already running
const statusResponse = await fetch(`${config.apiUrl}/csv/calculate-metrics/status`, {
credentials: 'include'
}).catch(() => null);
if (statusResponse) {
const statusData = await statusResponse.json().catch(() => null);
if (statusData?.active && statusData?.progress) {
console.log('Metrics calculation already running, connecting to existing process');
setMetricsProgress(statusData.progress);
return;
}
}
// Start new metrics calculation
const response = await fetch(`${config.apiUrl}/csv/calculate-metrics`, {
method: 'POST',
credentials: 'include'
}).catch(error => {
// Ignore network errors as the calculation might still be running
console.log('Metrics calculation request error (may be timeout):', error);
return null;
});
// If we got no response but have progress, assume it's still running
if (!response && metricsProgress?.current) {
console.log('No response but metrics calculation appears to be running, continuing...');
return;
}
// If we got a response, check if it indicates an actual error
if (response) {
const data = await response.json().catch(() => null);
if (!response.ok && data?.error && !data.error.includes('already in progress')) {
throw new Error(data.error || 'Failed to calculate metrics');
}
}
} catch (error) {
// Only handle actual errors, not timeouts or connection issues
if (error instanceof Error && !error.message.includes('NetworkError') && !error.message.includes('Failed to fetch')) {
toast.error(`Metrics calculation failed: ${error.message}`);
setIsCalculatingMetrics(false);
setMetricsProgress(null);
} else {
console.log('Ignoring network error, metrics calculation may still be running:', error);
}
}
};
@@ -726,138 +755,6 @@ export function DataManagement() {
}
};
const handleCalculateMetrics = async () => {
setIsCalculatingMetrics(true);
setMetricsProgress({ status: 'running', operation: 'Starting metrics calculation' });
try {
connectToEventSource('calculate-metrics');
// First check if metrics calculation is already running
const statusResponse = await fetch(`${config.apiUrl}/csv/calculate-metrics/status`, {
credentials: 'include'
}).catch(() => null);
if (statusResponse) {
const statusData = await statusResponse.json().catch(() => null);
if (statusData?.active && statusData?.progress) {
console.log('Metrics calculation already running, connecting to existing process');
setMetricsProgress(statusData.progress);
return;
}
}
// Start new metrics calculation
const response = await fetch(`${config.apiUrl}/csv/calculate-metrics`, {
method: 'POST',
credentials: 'include'
}).catch(error => {
// Ignore network errors as the calculation might still be running
console.log('Metrics calculation request error (may be timeout):', error);
return null;
});
// If we got no response but have progress, assume it's still running
if (!response && metricsProgress?.current) {
console.log('No response but metrics calculation appears to be running, continuing...');
return;
}
// If we got a response, check if it indicates an actual error
if (response) {
const data = await response.json().catch(() => null);
if (!response.ok && data?.error && !data.error.includes('already in progress')) {
throw new Error(data.error || 'Failed to calculate metrics');
}
}
} catch (error) {
// Only handle actual errors, not timeouts or connection issues
if (error instanceof Error && !error.message.includes('NetworkError') && !error.message.includes('Failed to fetch')) {
toast.error(`Metrics calculation failed: ${error.message}`);
setIsCalculatingMetrics(false);
setMetricsProgress(null);
} else {
console.log('Ignoring network error, metrics calculation may still be running:', error);
}
}
};
const handleTestConnection = async () => {
setIsTestingConnection(true);
try {
const response = await fetch(`${config.apiUrl}/test-prod-connection`, {
credentials: 'include'
});
const data = await response.json();
if (response.ok) {
toast.success(`Successfully connected to production database. Found ${data.productCount.toLocaleString()} products.`);
} else {
throw new Error(data.error || 'Failed to connect to production database');
}
} catch (error) {
toast.error(`Connection test failed: ${error instanceof Error ? error.message : 'Unknown error'}`);
} finally {
setIsTestingConnection(false);
}
};
const handleImportFromProd = async () => {
setIsImportingProd(true);
setImportProgress({ status: 'running', operation: 'Starting import from production' });
try {
connectToEventSource('import');
// First check if import is already running
const statusResponse = await fetch(`${config.apiUrl}/csv/status`, {
credentials: 'include'
}).catch(() => null);
if (statusResponse) {
const statusData = await statusResponse.json().catch(() => null);
if (statusData?.active && statusData?.progress) {
console.log('Import already running, connecting to existing process');
return;
}
}
// Start new import
const response = await fetch(`${config.apiUrl}/csv/import-from-prod`, {
method: 'POST',
credentials: 'include'
}).catch(error => {
console.log('Import request error (may be timeout):', error);
return null;
});
// If we got no response but have progress, assume it's still running
if (!response && (importProgress?.current || purchaseOrdersProgress?.current)) {
console.log('No response but import appears to be running, continuing...');
return;
}
// If we got a response, check if it indicates an actual error
if (response) {
const data = await response.json().catch(() => null);
if (!response.ok && data?.error && !data.error.includes('already in progress')) {
throw new Error(data.error || 'Failed to start production import');
}
}
} catch (error) {
// Only handle actual errors, not timeouts or connection issues
if (error instanceof Error && !error.message.includes('NetworkError') && !error.message.includes('Failed to fetch')) {
toast.error(`Production import failed: ${error.message}`);
setIsImportingProd(false);
setImportProgress(null);
setPurchaseOrdersProgress(null);
} else {
console.log('Ignoring network error, import may still be running:', error);
}
}
};
return (
<div className="max-w-[400px] space-y-4">
{/* Test Production Connection Card */}
@@ -887,91 +784,33 @@ export function DataManagement() {
</CardContent>
</Card>
{/* Update CSV Card */}
<Card>
<CardHeader>
<CardTitle>Update CSV Files</CardTitle>
<CardDescription>Download the latest CSV data files</CardDescription>
</CardHeader>
<CardContent>
<div className="flex gap-2">
<Button
className="flex-1"
onClick={handleUpdateCSV}
disabled={isAnyOperationRunning()}
>
{isUpdating ? (
<>
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
Updating CSV Files...
</>
) : (
<>
<RefreshCw className="mr-2 h-4 w-4" />
Update CSV Files
</>
)}
</Button>
{isUpdating && (
<Button
variant="destructive"
onClick={() => handleCancel('update')}
>
<X className="h-4 w-4" />
</Button>
)}
</div>
{(isUpdating || lastUpdateStatus) && renderProgress(updateProgress || lastUpdateStatus, 'update')}
</CardContent>
</Card>
{/* Import Data Card */}
<Card>
<CardHeader>
<CardTitle>Import Data</CardTitle>
<CardDescription>Import data from CSV files or production database</CardDescription>
<CardDescription>Import data from production database</CardDescription>
</CardHeader>
<CardContent className="space-y-6">
<div className="flex gap-2">
<Button
className="flex-1 min-w-0"
onClick={handleImportCSV}
disabled={isAnyOperationRunning()}
>
{isImportingCSV ? (
<div className="flex items-center justify-center">
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
<span className="truncate">Importing CSV...</span>
</div>
) : (
<div className="flex items-center justify-center">
<Upload className="mr-2 h-4 w-4" />
<span>Import from CSV</span>
</div>
)}
</Button>
<Button
className="flex-1 min-w-0"
className="w-full"
onClick={handleImportFromProd}
disabled={isAnyOperationRunning()}
>
{isImportingProd ? (
<div className="flex items-center justify-center">
<Loader2 className="mr-2 h-4 w-4 animate-spin" />
<span className="truncate">Importing Prod...</span>
<span className="truncate">Importing from Production...</span>
</div>
) : (
<div className="flex items-center justify-center">
<Database className="mr-2 h-4 w-4" />
<span>Import from Prod</span>
<span>Import from Production</span>
</div>
)}
</Button>
{(isImportingCSV || isImportingProd) && (
{isImportingProd && (
<Button
variant="destructive"
onClick={() => handleCancel('import')}
@@ -981,7 +820,7 @@ export function DataManagement() {
)}
</div>
{(isImportingCSV || isImportingProd || lastImportStatus) && (
{(isImportingProd || lastImportStatus) && (
<div className="space-y-4">
{renderProgress(importProgress || lastImportStatus, 'import')}
{renderProgress(purchaseOrdersProgress, 'import')}