Files
inventory/inventory-server/scripts/import-csv.js

1132 lines
40 KiB
JavaScript

const fs = require('fs');
const path = require('path');
const csv = require('csv-parse');
const mysql = require('mysql2/promise');
const dotenv = require('dotenv');
// Get test limits from environment variables
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000');
const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000');
dotenv.config({ path: path.join(__dirname, '../.env') });
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true
};
// Set up logging
const LOG_DIR = path.join(__dirname, '../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
// Log to error file
fs.appendFileSync(ERROR_LOG, errorMessage);
// Also log to console
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log import progress
function logImport(message) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${message}\n`;
// Log to import file
fs.appendFileSync(IMPORT_LOG, logMessage);
}
// Helper function to output progress in JSON format
function outputProgress(data) {
if (!data.status) {
data = {
status: 'running',
...data
};
}
// Log progress to import log
logImport(JSON.stringify(data));
// Output to console
console.log(JSON.stringify(data));
}
// Helper function to count total rows in a CSV file
async function countRows(filePath) {
return new Promise((resolve, reject) => {
let count = 0;
fs.createReadStream(filePath)
.pipe(csv.parse())
.on('data', () => count++)
.on('error', reject)
.on('end', () => resolve(count - 1)); // Subtract 1 for header row
});
}
// Helper function to format time duration
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
const minutes = Math.floor(seconds / 60);
seconds = Math.round(seconds % 60);
return `${minutes}m ${seconds}s`;
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed; // rows per second
const remaining = (total - current) / rate;
outputProgress({
status: 'running',
operation,
current,
total,
rate,
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((current / total) * 100).toFixed(1)
});
}
// Helper function to handle category normalization
async function handleCategories(connection, productId, categoriesStr) {
if (!categoriesStr) {
// If no categories, remove all existing relationships
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
return;
}
// Special cases that should not be split
const specialCategories = [
'Paint, Dyes & Chalk',
'Fabric Paint, Markers, and Dye',
'Crystals, Gems & Rhinestones',
'Pens, Pencils & Markers'
];
// Split categories and clean them, preserving special cases
const categories = [];
let remainingStr = categoriesStr;
// First check for special categories
for (const special of specialCategories) {
if (remainingStr.includes(special)) {
categories.push(special);
// Remove the special category from the string
remainingStr = remainingStr.replace(special, '');
}
}
// Then process any remaining regular categories
remainingStr.split(',')
.map(cat => cat.trim())
.filter(cat => cat.length > 0)
.forEach(cat => {
if (!categories.includes(cat)) {
categories.push(cat);
}
});
// Remove existing category relationships for this product
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
// Insert categories and create relationships
for (const category of categories) {
// Insert category if it doesn't exist
await connection.query(
'INSERT IGNORE INTO categories (name) VALUES (?)',
[category]
);
// Get category ID and create relationship in one query to avoid race conditions
await connection.query(`
INSERT IGNORE INTO product_categories (product_id, category_id)
SELECT ?, id FROM categories WHERE name = ?`,
[productId, category]
);
}
}
// Helper function to calculate sales velocity metrics
async function calculateSalesVelocity(connection, productId) {
const [rows] = await connection.query(`
SELECT
COALESCE(COUNT(*) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0), 0) as daily_sales_avg,
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 7, 0) as weekly_sales_avg,
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 30, 0) as monthly_sales_avg
FROM orders
WHERE product_id = ? AND canceled = false
GROUP BY product_id
`, [productId]);
return rows[0] || { daily_sales_avg: 0, weekly_sales_avg: 0, monthly_sales_avg: 0 };
}
// Helper function to calculate stock metrics
async function calculateStockMetrics(connection, productId, dailySalesAvg) {
const [product] = await connection.query(
'SELECT stock_quantity FROM products WHERE product_id = ?',
[productId]
);
if (!product[0]) return null;
const stockQty = product[0].stock_quantity;
const daysOfInventory = dailySalesAvg > 0 ? Math.floor(stockQty / dailySalesAvg) : 999;
const weeksOfInventory = Math.floor(daysOfInventory / 7);
// Calculate safety stock (2 weeks of average sales)
const safetyStock = Math.ceil(dailySalesAvg * 14);
// Calculate reorder point (safety stock + 1 week of sales)
const reorderPoint = Math.ceil(safetyStock + (dailySalesAvg * 7));
return {
days_of_inventory: daysOfInventory,
weeks_of_inventory: weeksOfInventory,
safety_stock: safetyStock,
reorder_point: reorderPoint
};
}
// Helper function to calculate financial metrics
async function calculateFinancialMetrics(connection, productId) {
const [rows] = await connection.query(`
SELECT
SUM(o.price * o.quantity) as total_revenue,
AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.product_id = ? AND o.canceled = false
GROUP BY o.product_id
`, [productId]);
return rows[0] || { total_revenue: 0, avg_margin_percent: 0 };
}
// Helper function to calculate purchase metrics
async function calculatePurchaseMetrics(connection, productId) {
const [rows] = await connection.query(`
SELECT
AVG(DATEDIFF(received_date, date)) as avg_lead_time_days,
MAX(date) as last_purchase_date,
MAX(received_date) as last_received_date
FROM purchase_orders
WHERE product_id = ? AND status = 'closed'
GROUP BY product_id
`, [productId]);
return rows[0] || {
avg_lead_time_days: 0,
last_purchase_date: null,
last_received_date: null
};
}
// Helper function to calculate ABC classification
async function calculateABCClass(connection, productId) {
// Get total revenue for this product
const [productRevenue] = await connection.query(`
SELECT SUM(price * quantity) as revenue
FROM orders
WHERE product_id = ? AND canceled = false
`, [productId]);
// Get total revenue across all products
const [totalRevenue] = await connection.query(`
SELECT SUM(price * quantity) as total
FROM orders
WHERE canceled = false
`);
const revenue = productRevenue[0]?.revenue || 0;
const total = totalRevenue[0]?.total || 0;
if (total === 0) return 'C';
const percentage = (revenue / total) * 100;
// A: Top 20% of revenue
// B: Next 30% of revenue
// C: Remaining 50% of revenue
if (percentage >= 20) return 'A';
if (percentage >= 5) return 'B';
return 'C';
}
// Helper function to calculate time-based aggregates
async function calculateTimeAggregates(connection, productId) {
await connection.query(`
INSERT INTO product_time_aggregates (
product_id, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin
)
SELECT
o.product_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM(o.price * o.quantity) as total_revenue,
SUM(p.cost_price * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
COALESCE(SUM(po.received), 0) as stock_received,
COALESCE(SUM(po.ordered), 0) as stock_ordered,
AVG(o.price) as avg_price,
CASE
WHEN SUM(o.price * o.quantity) = 0 THEN 0
ELSE ((SUM(o.price * o.quantity) - COALESCE(SUM(p.cost_price * o.quantity), 0)) /
NULLIF(SUM(o.price * o.quantity), 0) * 100)
END as profit_margin
FROM orders o
JOIN products p ON o.product_id = p.product_id
LEFT JOIN purchase_orders po ON o.product_id = po.product_id
AND YEAR(o.date) = YEAR(po.date)
AND MONTH(o.date) = MONTH(po.date)
WHERE o.product_id = ? AND o.canceled = false
GROUP BY o.product_id, YEAR(o.date), MONTH(o.date)
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin)
`, [productId]);
}
// Helper function to calculate vendor metrics
async function calculateVendorMetrics(connection) {
try {
// Get list of vendors
const [vendors] = await connection.query('SELECT DISTINCT vendor FROM products WHERE vendor IS NOT NULL');
const startTime = Date.now();
let current = 0;
const total = vendors.length;
outputProgress({
operation: 'Calculating vendor metrics',
current: 0,
total,
percentage: '0.0'
});
for (const { vendor } of vendors) {
// Calculate average lead time
const [leadTimeResult] = await connection.query(`
SELECT
AVG(DATEDIFF(received_date, date)) as avg_lead_time,
COUNT(*) as total_orders,
SUM(CASE WHEN ordered = received THEN 1 ELSE 0 END) as fulfilled_orders
FROM purchase_orders
WHERE vendor = ? AND status = 'closed'
GROUP BY vendor
`, [vendor]);
const metrics = leadTimeResult[0] || {
avg_lead_time: 0,
total_orders: 0,
fulfilled_orders: 0
};
// Calculate fill rate
const fillRate = metrics.total_orders > 0 ?
(metrics.fulfilled_orders / metrics.total_orders * 100) : 0;
// Update vendor metrics
await connection.query(`
INSERT INTO vendor_metrics (
vendor,
avg_lead_time_days,
total_orders,
fulfilled_orders,
fill_rate
) VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
avg_lead_time_days = VALUES(avg_lead_time_days),
total_orders = VALUES(total_orders),
fulfilled_orders = VALUES(fulfilled_orders),
fill_rate = VALUES(fill_rate)
`, [
vendor,
metrics.avg_lead_time || 0,
metrics.total_orders,
metrics.fulfilled_orders,
fillRate
]);
current++;
updateProgress(current, total, 'Calculating vendor metrics', startTime);
}
outputProgress({
status: 'complete',
operation: 'Vendor metrics calculation completed',
current: total,
total,
percentage: '100.0'
});
} catch (error) {
logError(error, 'Error calculating vendor metrics');
throw error;
}
}
// Helper function to calculate metrics in batches
async function calculateMetricsInBatch(connection) {
try {
// Clear temporary tables
await connection.query('TRUNCATE TABLE temp_sales_metrics');
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
// Calculate sales metrics for all products in one go
await connection.query(`
INSERT INTO temp_sales_metrics
SELECT
o.product_id,
COUNT(*) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) as daily_sales_avg,
SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 7 as weekly_sales_avg,
SUM(o.quantity) / NULLIF(DATEDIFF(MAX(o.date), MIN(o.date)), 0) * 30 as monthly_sales_avg,
SUM(o.price * o.quantity) as total_revenue,
AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent,
MIN(o.date) as first_sale_date,
MAX(o.date) as last_sale_date
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.canceled = false
GROUP BY o.product_id
`);
// Calculate purchase metrics for all products in one go
await connection.query(`
INSERT INTO temp_purchase_metrics
SELECT
product_id,
AVG(DATEDIFF(received_date, date)) as avg_lead_time_days,
MAX(date) as last_purchase_date,
MAX(received_date) as last_received_date
FROM purchase_orders
WHERE status = 'closed'
GROUP BY product_id
`);
// Update product_metrics table with all metrics at once
await connection.query(`
INSERT INTO product_metrics (
product_id, daily_sales_avg, weekly_sales_avg, monthly_sales_avg,
days_of_inventory, weeks_of_inventory, safety_stock, reorder_point,
avg_margin_percent, total_revenue, avg_lead_time_days,
last_purchase_date, last_received_date
)
SELECT
p.product_id,
COALESCE(s.daily_sales_avg, 0),
COALESCE(s.weekly_sales_avg, 0),
COALESCE(s.monthly_sales_avg, 0),
CASE
WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg)
ELSE 999
END as days_of_inventory,
CASE
WHEN s.daily_sales_avg > 0 THEN FLOOR(p.stock_quantity / s.daily_sales_avg / 7)
ELSE 999
END as weeks_of_inventory,
CEIL(COALESCE(s.daily_sales_avg, 0) * 14) as safety_stock,
CEIL(COALESCE(s.daily_sales_avg, 0) * 21) as reorder_point,
COALESCE(s.avg_margin_percent, 0),
COALESCE(s.total_revenue, 0),
COALESCE(pm.avg_lead_time_days, 0),
pm.last_purchase_date,
pm.last_received_date
FROM products p
LEFT JOIN temp_sales_metrics s ON p.product_id = s.product_id
LEFT JOIN temp_purchase_metrics pm ON p.product_id = pm.product_id
ON DUPLICATE KEY UPDATE
daily_sales_avg = VALUES(daily_sales_avg),
weekly_sales_avg = VALUES(weekly_sales_avg),
monthly_sales_avg = VALUES(monthly_sales_avg),
days_of_inventory = VALUES(days_of_inventory),
weeks_of_inventory = VALUES(weeks_of_inventory),
safety_stock = VALUES(safety_stock),
reorder_point = VALUES(reorder_point),
avg_margin_percent = VALUES(avg_margin_percent),
total_revenue = VALUES(total_revenue),
avg_lead_time_days = VALUES(avg_lead_time_days),
last_purchase_date = VALUES(last_purchase_date),
last_received_date = VALUES(last_received_date),
last_calculated_at = CURRENT_TIMESTAMP
`);
// Calculate ABC classification in one go
await connection.query(`
WITH revenue_ranks AS (
SELECT
product_id,
total_revenue,
total_revenue / SUM(total_revenue) OVER () * 100 as revenue_percent,
ROW_NUMBER() OVER (ORDER BY total_revenue DESC) as rank
FROM product_metrics
WHERE total_revenue > 0
)
UPDATE product_metrics pm
JOIN revenue_ranks r ON pm.product_id = r.product_id
SET abc_class =
CASE
WHEN r.revenue_percent >= 20 THEN 'A'
WHEN r.revenue_percent >= 5 THEN 'B'
ELSE 'C'
END
`);
} catch (error) {
logError(error, 'Error in batch metrics calculation');
throw error;
}
}
async function importProducts(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting products import',
current: 0,
total: totalRows,
testLimit: PRODUCTS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 100;
let batch = [];
let categoryUpdates = new Map(); // Store category updates for batch processing
// Get a connection from the pool that we'll reuse
const connection = await pool.getConnection();
try {
for await (const record of parser) {
if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
operation: 'Products import',
message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms to avoid console flooding
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Products import', startTime);
lastUpdate = now;
}
// Add to batch
batch.push({
product_id: record.product_id,
title: record.title,
SKU: record.SKU,
created_at: convertDate(record.created_at),
stock_quantity: parseInt(record.stock_quantity) || 0,
price: parseFloat(record.price) || 0,
regular_price: parseFloat(record.regular_price) || 0,
cost_price: parseFloat(record.cost_price) || null,
landing_cost_price: parseFloat(record.landing_cost_price) || null,
barcode: record.barcode,
updated_at: convertDate(record.updated_at),
visible: record.visible === '1',
managing_stock: record.managing_stock === '1',
replenishable: record.replenishable === '1',
vendor: record.vendor,
vendor_reference: record.vendor_reference,
permalink: record.permalink,
categories: record.categories,
image: record.image,
brand: record.brand,
options: record.options,
tags: record.tags,
moq: parseInt(record.moq) || 1,
uom: parseInt(record.uom) || 1
});
if (record.categories) {
categoryUpdates.set(record.product_id, record.categories);
}
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch, categoryUpdates);
batch = [];
categoryUpdates.clear();
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
status: 'running',
operation: 'Products import completed',
current: rowCount,
total: totalRows,
added,
updated,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
} catch (error) {
console.error('Error during products import:', error);
throw error;
} finally {
if (connection) {
connection.release();
}
}
// Helper function to process a batch of records
async function processBatch(records, categoryUpdates) {
if (records.length === 0) return;
try {
await connection.beginTransaction();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity,
r.price, r.regular_price, r.cost_price, r.landing_cost_price,
r.barcode, r.updated_at, r.visible, r.managing_stock,
r.replenishable, r.vendor, r.vendor_reference, r.permalink,
r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO products VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
title = VALUES(title),
stock_quantity = VALUES(stock_quantity),
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
managing_stock = VALUES(managing_stock),
replenishable = VALUES(replenishable),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
permalink = VALUES(permalink),
categories = VALUES(categories),
image = VALUES(image),
brand = VALUES(brand),
options = VALUES(options),
tags = VALUES(tags),
moq = VALUES(moq),
uom = VALUES(uom)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
// Process categories within the same transaction
for (const [productId, categories] of categoryUpdates) {
await handleCategories(connection, productId, categories);
}
await connection.commit();
} catch (error) {
await connection.rollback();
logError(error, `Error processing batch of ${records.length} records`);
throw error;
}
} catch (error) {
logError(error, `Error in batch processing:\nFirst record: ${JSON.stringify(records[0])}`);
// Continue with next batch instead of failing completely
}
}
}
async function importOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting orders import',
current: 0,
total: totalRows,
testLimit: ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Orders import',
message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Orders import', startTime);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
order_number: record.order_number,
product_id: record.product_id,
SKU: record.SKU,
date: convertDate(record.date),
price: parseFloat(record.price) || 0,
quantity: parseInt(record.quantity) || 0,
discount: parseFloat(record.discount) || 0,
tax: parseFloat(record.tax) || 0,
tax_included: record.tax_included === '1',
shipping: parseFloat(record.shipping) || 0,
customer: record.customer,
canceled: record.canceled === '1'
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.order_number, r.product_id, r.SKU, r.date, r.price,
r.quantity, r.discount, r.tax, r.tax_included, r.shipping,
r.customer, r.canceled
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price,
quantity, discount, tax, tax_included, shipping, customer, canceled)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
canceled = VALUES(canceled)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
// For INSERT ... ON DUPLICATE KEY UPDATE:
// - If a row is inserted, affectedRows = 1
// - If a row is updated, affectedRows = 2
// So we can calculate:
// - Number of inserts = number of rows where affectedRows = 1
// - Number of updates = number of rows where affectedRows = 2
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function importPurchaseOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting purchase orders import',
current: 0,
total: totalRows,
testLimit: PURCHASE_ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Purchase orders import',
message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
po_id: record.po_id,
vendor: record.vendor,
date: convertDate(record.date),
expected_date: convertDate(record.expected_date),
product_id: record.product_id,
sku: record.sku,
cost_price: parseFloat(record.cost_price) || 0,
status: record.status || 'pending',
notes: record.notes,
ordered: parseInt(record.ordered) || 0,
received: parseInt(record.received) || 0,
received_date: convertDate(record.received_date)
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Purchase orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.po_id, r.vendor, r.date, r.expected_date, r.product_id,
r.sku, r.cost_price, r.status, r.notes, r.ordered,
r.received, r.received_date
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date,
product_id, sku, cost_price, status, notes, ordered, received, received_date)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
vendor = VALUES(vendor),
expected_date = VALUES(expected_date),
cost_price = VALUES(cost_price),
status = VALUES(status),
notes = VALUES(notes),
ordered = VALUES(ordered),
received = VALUES(received),
received_date = VALUES(received_date)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
// For INSERT ... ON DUPLICATE KEY UPDATE:
// - If a row is inserted, affectedRows = 1
// - If a row is updated, affectedRows = 2
// So we can calculate:
// - Number of inserts = number of rows where affectedRows = 1
// - Number of updates = number of rows where affectedRows = 2
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function main() {
outputProgress({
operation: 'Starting import process',
message: 'Creating connection pool...'
});
const startTime = Date.now();
let pool;
try {
pool = mysql.createPool(dbConfig);
// Check if tables exist, if not create them
outputProgress({
operation: 'Checking database schema',
message: 'Creating tables if needed...'
});
const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8');
await pool.query(schemaSQL);
// Step 1: Import all data first
try {
// Import products first since they're referenced by other tables
await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv'));
// Process orders and purchase orders in parallel
outputProgress({
operation: 'Starting parallel import',
message: 'Processing orders and purchase orders simultaneously...'
});
await Promise.all([
importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv')),
importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv'))
]);
// Step 2: Calculate all metrics after imports are complete
outputProgress({
operation: 'Starting metrics calculation',
message: 'Calculating metrics for all products and vendors...'
});
const connection = await pool.getConnection();
try {
// Calculate metrics in batches
await calculateMetricsInBatch(connection);
// Calculate vendor metrics
await calculateVendorMetrics(connection);
} finally {
connection.release();
}
} catch (error) {
logError(error, 'Error during import/metrics calculation');
throw error;
}
outputProgress({
status: 'complete',
operation: 'Import process completed',
duration: formatDuration((Date.now() - startTime) / 1000)
});
} catch (error) {
logError(error, 'Fatal error during import process');
outputProgress({
status: 'error',
error: error.message
});
process.exit(1);
} finally {
if (pool) {
await pool.end();
}
}
}
// Run the import
main().catch(error => {
logError(error, 'Unhandled error in main process');
process.exit(1);
});