1127 lines
40 KiB
JavaScript
1127 lines
40 KiB
JavaScript
const fs = require('fs');
|
|
const path = require('path');
|
|
const csv = require('csv-parse');
|
|
const mysql = require('mysql2/promise');
|
|
const dotenv = require('dotenv');
|
|
|
|
// Get test limits from environment variables
|
|
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
|
|
const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000');
|
|
const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000');
|
|
|
|
dotenv.config({ path: path.join(__dirname, '../.env') });
|
|
|
|
const dbConfig = {
|
|
host: process.env.DB_HOST,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
multipleStatements: true,
|
|
waitForConnections: true,
|
|
connectionLimit: 10,
|
|
queueLimit: 0,
|
|
namedPlaceholders: true
|
|
};
|
|
|
|
// Set up logging
|
|
const LOG_DIR = path.join(__dirname, '../logs');
|
|
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
|
|
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
|
|
|
|
// Ensure log directory exists
|
|
if (!fs.existsSync(LOG_DIR)) {
|
|
fs.mkdirSync(LOG_DIR, { recursive: true });
|
|
}
|
|
|
|
// Helper function to log errors
|
|
function logError(error, context = '') {
|
|
const timestamp = new Date().toISOString();
|
|
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
|
|
|
|
// Log to error file
|
|
fs.appendFileSync(ERROR_LOG, errorMessage);
|
|
|
|
// Also log to console
|
|
console.error(`\n${context}\nError: ${error.message}`);
|
|
}
|
|
|
|
// Helper function to log import progress
|
|
function logImport(message) {
|
|
const timestamp = new Date().toISOString();
|
|
const logMessage = `[${timestamp}] ${message}\n`;
|
|
|
|
// Log to import file
|
|
fs.appendFileSync(IMPORT_LOG, logMessage);
|
|
}
|
|
|
|
// Helper function to output progress in JSON format
|
|
function outputProgress(data) {
|
|
if (!data.status) {
|
|
data = {
|
|
status: 'running',
|
|
...data
|
|
};
|
|
}
|
|
|
|
// Log progress to import log
|
|
logImport(JSON.stringify(data));
|
|
|
|
// Output to console
|
|
console.log(JSON.stringify(data));
|
|
}
|
|
|
|
// Helper function to count total rows in a CSV file
|
|
async function countRows(filePath) {
|
|
return new Promise((resolve, reject) => {
|
|
let count = 0;
|
|
fs.createReadStream(filePath)
|
|
.pipe(csv.parse())
|
|
.on('data', () => count++)
|
|
.on('error', reject)
|
|
.on('end', () => resolve(count - 1)); // Subtract 1 for header row
|
|
});
|
|
}
|
|
|
|
// Helper function to format time duration
|
|
function formatDuration(seconds) {
|
|
if (seconds < 60) return `${Math.round(seconds)}s`;
|
|
const minutes = Math.floor(seconds / 60);
|
|
seconds = Math.round(seconds % 60);
|
|
return `${minutes}m ${seconds}s`;
|
|
}
|
|
|
|
// Helper function to update progress with time estimate
|
|
function updateProgress(current, total, operation, startTime) {
|
|
const elapsed = (Date.now() - startTime) / 1000;
|
|
const rate = current / elapsed; // rows per second
|
|
const remaining = (total - current) / rate;
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation,
|
|
current,
|
|
total,
|
|
rate,
|
|
elapsed: formatDuration(elapsed),
|
|
remaining: formatDuration(remaining),
|
|
percentage: ((current / total) * 100).toFixed(1)
|
|
});
|
|
}
|
|
|
|
// Helper function to handle category normalization
|
|
async function handleCategories(connection, productId, categoriesStr) {
|
|
if (!categoriesStr) {
|
|
// If no categories, remove all existing relationships
|
|
await connection.query(
|
|
'DELETE FROM product_categories WHERE product_id = ?',
|
|
[productId]
|
|
);
|
|
return;
|
|
}
|
|
|
|
// Special cases that should not be split
|
|
const specialCategories = [
|
|
'Paint, Dyes & Chalk',
|
|
'Fabric Paint, Markers, and Dye',
|
|
'Crystals, Gems & Rhinestones',
|
|
'Pens, Pencils & Markers'
|
|
];
|
|
|
|
// Split categories and clean them, preserving special cases
|
|
const categories = [];
|
|
let remainingStr = categoriesStr;
|
|
|
|
// First check for special categories
|
|
for (const special of specialCategories) {
|
|
if (remainingStr.includes(special)) {
|
|
categories.push(special);
|
|
// Remove the special category from the string
|
|
remainingStr = remainingStr.replace(special, '');
|
|
}
|
|
}
|
|
|
|
// Then process any remaining regular categories
|
|
remainingStr.split(',')
|
|
.map(cat => cat.trim())
|
|
.filter(cat => cat.length > 0)
|
|
.forEach(cat => {
|
|
if (!categories.includes(cat)) {
|
|
categories.push(cat);
|
|
}
|
|
});
|
|
|
|
// Remove existing category relationships for this product
|
|
await connection.query(
|
|
'DELETE FROM product_categories WHERE product_id = ?',
|
|
[productId]
|
|
);
|
|
|
|
// Insert categories and create relationships
|
|
for (const category of categories) {
|
|
// Insert category if it doesn't exist
|
|
await connection.query(
|
|
'INSERT IGNORE INTO categories (name) VALUES (?)',
|
|
[category]
|
|
);
|
|
|
|
// Get category ID and create relationship in one query to avoid race conditions
|
|
await connection.query(`
|
|
INSERT IGNORE INTO product_categories (product_id, category_id)
|
|
SELECT ?, id FROM categories WHERE name = ?`,
|
|
[productId, category]
|
|
);
|
|
}
|
|
}
|
|
|
|
// Helper function to calculate sales velocity metrics
|
|
async function calculateSalesVelocity(connection, productId) {
|
|
const [rows] = await connection.query(`
|
|
SELECT
|
|
COALESCE(COUNT(*) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0), 0) as daily_sales_avg,
|
|
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 7, 0) as weekly_sales_avg,
|
|
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 30, 0) as monthly_sales_avg
|
|
FROM orders
|
|
WHERE product_id = ? AND canceled = false
|
|
GROUP BY product_id
|
|
`, [productId]);
|
|
|
|
return rows[0] || { daily_sales_avg: 0, weekly_sales_avg: 0, monthly_sales_avg: 0 };
|
|
}
|
|
|
|
// Helper function to calculate stock metrics
|
|
async function calculateStockMetrics(connection, productId, dailySalesAvg) {
|
|
const [product] = await connection.query(
|
|
'SELECT stock_quantity FROM products WHERE product_id = ?',
|
|
[productId]
|
|
);
|
|
|
|
if (!product[0]) return null;
|
|
|
|
const stockQty = product[0].stock_quantity;
|
|
const daysOfInventory = dailySalesAvg > 0 ? Math.floor(stockQty / dailySalesAvg) : 999;
|
|
const weeksOfInventory = Math.floor(daysOfInventory / 7);
|
|
|
|
// Calculate safety stock (2 weeks of average sales)
|
|
const safetyStock = Math.ceil(dailySalesAvg * 14);
|
|
|
|
// Calculate reorder point (safety stock + 1 week of sales)
|
|
const reorderPoint = Math.ceil(safetyStock + (dailySalesAvg * 7));
|
|
|
|
return {
|
|
days_of_inventory: daysOfInventory,
|
|
weeks_of_inventory: weeksOfInventory,
|
|
safety_stock: safetyStock,
|
|
reorder_point: reorderPoint
|
|
};
|
|
}
|
|
|
|
// Helper function to calculate financial metrics
|
|
async function calculateFinancialMetrics(connection, productId) {
|
|
const [rows] = await connection.query(`
|
|
SELECT
|
|
SUM(o.price * o.quantity) as total_revenue,
|
|
AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent
|
|
FROM orders o
|
|
JOIN products p ON o.product_id = p.product_id
|
|
WHERE o.product_id = ? AND o.canceled = false
|
|
GROUP BY o.product_id
|
|
`, [productId]);
|
|
|
|
return rows[0] || { total_revenue: 0, avg_margin_percent: 0 };
|
|
}
|
|
|
|
// Helper function to calculate purchase metrics
|
|
async function calculatePurchaseMetrics(connection, productId) {
|
|
const [rows] = await connection.query(`
|
|
SELECT
|
|
AVG(DATEDIFF(received_date, date)) as avg_lead_time_days,
|
|
MAX(date) as last_purchase_date,
|
|
MAX(received_date) as last_received_date
|
|
FROM purchase_orders
|
|
WHERE product_id = ? AND status = 'closed'
|
|
GROUP BY product_id
|
|
`, [productId]);
|
|
|
|
return rows[0] || {
|
|
avg_lead_time_days: 0,
|
|
last_purchase_date: null,
|
|
last_received_date: null
|
|
};
|
|
}
|
|
|
|
// Helper function to calculate ABC classification
|
|
async function calculateABCClass(connection, productId) {
|
|
// Get total revenue for this product
|
|
const [productRevenue] = await connection.query(`
|
|
SELECT SUM(price * quantity) as revenue
|
|
FROM orders
|
|
WHERE product_id = ? AND canceled = false
|
|
`, [productId]);
|
|
|
|
// Get total revenue across all products
|
|
const [totalRevenue] = await connection.query(`
|
|
SELECT SUM(price * quantity) as total
|
|
FROM orders
|
|
WHERE canceled = false
|
|
`);
|
|
|
|
const revenue = productRevenue[0]?.revenue || 0;
|
|
const total = totalRevenue[0]?.total || 0;
|
|
|
|
if (total === 0) return 'C';
|
|
|
|
const percentage = (revenue / total) * 100;
|
|
|
|
// A: Top 20% of revenue
|
|
// B: Next 30% of revenue
|
|
// C: Remaining 50% of revenue
|
|
if (percentage >= 20) return 'A';
|
|
if (percentage >= 5) return 'B';
|
|
return 'C';
|
|
}
|
|
|
|
// Helper function to calculate time-based aggregates
|
|
async function calculateTimeAggregates(connection, productId) {
|
|
await connection.query(`
|
|
INSERT INTO product_time_aggregates (
|
|
product_id, year, month,
|
|
total_quantity_sold, total_revenue, total_cost,
|
|
order_count, stock_received, stock_ordered,
|
|
avg_price, profit_margin
|
|
)
|
|
SELECT
|
|
o.product_id,
|
|
YEAR(o.date) as year,
|
|
MONTH(o.date) as month,
|
|
SUM(o.quantity) as total_quantity_sold,
|
|
SUM(o.price * o.quantity) as total_revenue,
|
|
SUM(p.cost_price * o.quantity) as total_cost,
|
|
COUNT(DISTINCT o.order_number) as order_count,
|
|
COALESCE(SUM(po.received), 0) as stock_received,
|
|
COALESCE(SUM(po.ordered), 0) as stock_ordered,
|
|
AVG(o.price) as avg_price,
|
|
CASE
|
|
WHEN SUM(o.price * o.quantity) = 0 THEN 0
|
|
ELSE ((SUM(o.price * o.quantity) - COALESCE(SUM(p.cost_price * o.quantity), 0)) /
|
|
NULLIF(SUM(o.price * o.quantity), 0) * 100)
|
|
END as profit_margin
|
|
FROM orders o
|
|
JOIN products p ON o.product_id = p.product_id
|
|
LEFT JOIN purchase_orders po ON o.product_id = po.product_id
|
|
AND YEAR(o.date) = YEAR(po.date)
|
|
AND MONTH(o.date) = MONTH(po.date)
|
|
WHERE o.product_id = ? AND o.canceled = false
|
|
GROUP BY o.product_id, YEAR(o.date), MONTH(o.date)
|
|
ON DUPLICATE KEY UPDATE
|
|
total_quantity_sold = VALUES(total_quantity_sold),
|
|
total_revenue = VALUES(total_revenue),
|
|
total_cost = VALUES(total_cost),
|
|
order_count = VALUES(order_count),
|
|
stock_received = VALUES(stock_received),
|
|
stock_ordered = VALUES(stock_ordered),
|
|
avg_price = VALUES(avg_price),
|
|
profit_margin = VALUES(profit_margin)
|
|
`, [productId]);
|
|
}
|
|
|
|
// Helper function to calculate vendor metrics
|
|
async function calculateVendorMetrics(connection) {
|
|
try {
|
|
// Get list of vendors
|
|
const [vendors] = await connection.query('SELECT DISTINCT vendor FROM products WHERE vendor IS NOT NULL');
|
|
const startTime = Date.now();
|
|
let current = 0;
|
|
const total = vendors.length;
|
|
|
|
outputProgress({
|
|
operation: 'Calculating vendor metrics',
|
|
current: 0,
|
|
total,
|
|
percentage: '0.0'
|
|
});
|
|
|
|
for (const { vendor } of vendors) {
|
|
// Calculate average lead time
|
|
const [leadTimeResult] = await connection.query(`
|
|
SELECT
|
|
AVG(DATEDIFF(received_date, date)) as avg_lead_time,
|
|
COUNT(*) as total_orders,
|
|
SUM(CASE WHEN ordered = received THEN 1 ELSE 0 END) as fulfilled_orders
|
|
FROM purchase_orders
|
|
WHERE vendor = ? AND status = 'closed'
|
|
GROUP BY vendor
|
|
`, [vendor]);
|
|
|
|
const metrics = leadTimeResult[0] || {
|
|
avg_lead_time: 0,
|
|
total_orders: 0,
|
|
fulfilled_orders: 0
|
|
};
|
|
|
|
// Calculate fill rate
|
|
const fillRate = metrics.total_orders > 0 ?
|
|
(metrics.fulfilled_orders / metrics.total_orders * 100) : 0;
|
|
|
|
// Update vendor metrics
|
|
await connection.query(`
|
|
INSERT INTO vendor_metrics (
|
|
vendor,
|
|
avg_lead_time_days,
|
|
total_orders,
|
|
fulfilled_orders,
|
|
fill_rate
|
|
) VALUES (?, ?, ?, ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
avg_lead_time_days = VALUES(avg_lead_time_days),
|
|
total_orders = VALUES(total_orders),
|
|
fulfilled_orders = VALUES(fulfilled_orders),
|
|
fill_rate = VALUES(fill_rate)
|
|
`, [
|
|
vendor,
|
|
metrics.avg_lead_time || 0,
|
|
metrics.total_orders,
|
|
metrics.fulfilled_orders,
|
|
fillRate
|
|
]);
|
|
|
|
current++;
|
|
updateProgress(current, total, 'Calculating vendor metrics', startTime);
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'complete',
|
|
operation: 'Vendor metrics calculation completed',
|
|
current: total,
|
|
total,
|
|
percentage: '100.0'
|
|
});
|
|
} catch (error) {
|
|
logError(error, 'Error calculating vendor metrics');
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
// Helper function to update product metrics
|
|
async function updateProductMetrics(connection, productId, startTime, current, total) {
|
|
try {
|
|
// Calculate sales velocity metrics
|
|
const velocityMetrics = await calculateSalesVelocity(connection, productId);
|
|
|
|
// Calculate stock metrics
|
|
const stockMetrics = await calculateStockMetrics(connection, productId, velocityMetrics.daily_sales_avg);
|
|
|
|
// Calculate financial metrics
|
|
const financialMetrics = await calculateFinancialMetrics(connection, productId);
|
|
|
|
// Calculate purchase metrics
|
|
const purchaseMetrics = await calculatePurchaseMetrics(connection, productId);
|
|
|
|
// Update metrics in database
|
|
await connection.query(`
|
|
INSERT INTO product_metrics (
|
|
product_id,
|
|
daily_sales_avg,
|
|
weekly_sales_avg,
|
|
monthly_sales_avg,
|
|
days_of_inventory,
|
|
weeks_of_inventory,
|
|
safety_stock,
|
|
reorder_point,
|
|
total_revenue,
|
|
avg_margin_percent,
|
|
avg_lead_time_days,
|
|
last_purchase_date,
|
|
last_received_date
|
|
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON DUPLICATE KEY UPDATE
|
|
daily_sales_avg = VALUES(daily_sales_avg),
|
|
weekly_sales_avg = VALUES(weekly_sales_avg),
|
|
monthly_sales_avg = VALUES(monthly_sales_avg),
|
|
days_of_inventory = VALUES(days_of_inventory),
|
|
weeks_of_inventory = VALUES(weeks_of_inventory),
|
|
safety_stock = VALUES(safety_stock),
|
|
reorder_point = VALUES(reorder_point),
|
|
total_revenue = VALUES(total_revenue),
|
|
avg_margin_percent = VALUES(avg_margin_percent),
|
|
avg_lead_time_days = VALUES(avg_lead_time_days),
|
|
last_purchase_date = VALUES(last_purchase_date),
|
|
last_received_date = VALUES(last_received_date)
|
|
`, [
|
|
productId,
|
|
velocityMetrics.daily_sales_avg,
|
|
velocityMetrics.weekly_sales_avg,
|
|
velocityMetrics.monthly_sales_avg,
|
|
stockMetrics?.days_of_inventory || 0,
|
|
stockMetrics?.weeks_of_inventory || 0,
|
|
stockMetrics?.safety_stock || 0,
|
|
stockMetrics?.reorder_point || 0,
|
|
financialMetrics.total_revenue,
|
|
financialMetrics.avg_margin_percent,
|
|
purchaseMetrics.avg_lead_time_days,
|
|
purchaseMetrics.last_purchase_date,
|
|
purchaseMetrics.last_received_date
|
|
]);
|
|
|
|
// Output progress every 5 products or every second
|
|
if (current % 5 === 0 || Date.now() - startTime > 1000) {
|
|
updateProgress(current, total, 'Calculating product metrics', startTime);
|
|
startTime = Date.now();
|
|
}
|
|
} catch (error) {
|
|
logError(error, `Error updating metrics for product ${productId}`);
|
|
throw error;
|
|
}
|
|
}
|
|
|
|
async function importProducts(pool, filePath) {
|
|
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
|
|
const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath);
|
|
const startTime = Date.now();
|
|
outputProgress({
|
|
operation: 'Starting products import',
|
|
current: 0,
|
|
total: totalRows,
|
|
testLimit: PRODUCTS_TEST_LIMIT,
|
|
percentage: '0'
|
|
});
|
|
|
|
function convertDate(dateStr) {
|
|
if (!dateStr) {
|
|
// Default to current date for missing dates
|
|
const now = new Date();
|
|
const year = now.getFullYear();
|
|
const month = String(now.getMonth() + 1).padStart(2, '0');
|
|
const day = String(now.getDate()).padStart(2, '0');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
const [day, month, year] = dateStr.split('-');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
|
|
let updated = 0;
|
|
let added = 0;
|
|
let rowCount = 0;
|
|
let lastUpdate = Date.now();
|
|
|
|
// Batch processing variables
|
|
const BATCH_SIZE = 100;
|
|
let batch = [];
|
|
let categoryUpdates = new Map(); // Store category updates for batch processing
|
|
|
|
// Get a connection from the pool that we'll reuse
|
|
const connection = await pool.getConnection();
|
|
|
|
try {
|
|
for await (const record of parser) {
|
|
if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) {
|
|
// Process remaining batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch, categoryUpdates);
|
|
}
|
|
outputProgress({
|
|
operation: 'Products import',
|
|
message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`,
|
|
current: rowCount,
|
|
total: totalRows
|
|
});
|
|
break;
|
|
}
|
|
rowCount++;
|
|
|
|
// Update progress every 100ms to avoid console flooding
|
|
const now = Date.now();
|
|
if (now - lastUpdate > 100) {
|
|
updateProgress(rowCount, totalRows, 'Products import', startTime);
|
|
lastUpdate = now;
|
|
}
|
|
|
|
// Add to batch
|
|
batch.push({
|
|
product_id: record.product_id,
|
|
title: record.title,
|
|
SKU: record.SKU,
|
|
created_at: convertDate(record.created_at),
|
|
stock_quantity: parseInt(record.stock_quantity) || 0,
|
|
price: parseFloat(record.price) || 0,
|
|
regular_price: parseFloat(record.regular_price) || 0,
|
|
cost_price: parseFloat(record.cost_price) || null,
|
|
landing_cost_price: parseFloat(record.landing_cost_price) || null,
|
|
barcode: record.barcode,
|
|
updated_at: convertDate(record.updated_at),
|
|
visible: record.visible === '1',
|
|
managing_stock: record.managing_stock === '1',
|
|
replenishable: record.replenishable === '1',
|
|
vendor: record.vendor,
|
|
vendor_reference: record.vendor_reference,
|
|
permalink: record.permalink,
|
|
categories: record.categories,
|
|
image: record.image,
|
|
brand: record.brand,
|
|
options: record.options,
|
|
tags: record.tags,
|
|
moq: parseInt(record.moq) || 1,
|
|
uom: parseInt(record.uom) || 1
|
|
});
|
|
|
|
if (record.categories) {
|
|
categoryUpdates.set(record.product_id, record.categories);
|
|
}
|
|
|
|
// Process batch if it reaches BATCH_SIZE
|
|
if (batch.length >= BATCH_SIZE) {
|
|
await processBatch(batch, categoryUpdates);
|
|
batch = [];
|
|
categoryUpdates.clear();
|
|
}
|
|
}
|
|
|
|
// Process any remaining records in the final batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch, categoryUpdates);
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Products import completed',
|
|
current: rowCount,
|
|
total: totalRows,
|
|
added,
|
|
updated,
|
|
duration: formatDuration((Date.now() - startTime) / 1000),
|
|
percentage: '100'
|
|
});
|
|
} catch (error) {
|
|
console.error('Error during products import:', error);
|
|
throw error;
|
|
} finally {
|
|
if (connection) {
|
|
connection.release();
|
|
}
|
|
}
|
|
|
|
// Helper function to process a batch of records
|
|
async function processBatch(records, categoryUpdates) {
|
|
if (records.length === 0) return;
|
|
|
|
try {
|
|
await connection.beginTransaction();
|
|
try {
|
|
// Create the batch insert/update query
|
|
const values = records.map(r => [
|
|
r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity,
|
|
r.price, r.regular_price, r.cost_price, r.landing_cost_price,
|
|
r.barcode, r.updated_at, r.visible, r.managing_stock,
|
|
r.replenishable, r.vendor, r.vendor_reference, r.permalink,
|
|
r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom
|
|
]);
|
|
|
|
const placeholders = records.map(() =>
|
|
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
|
|
).join(',');
|
|
|
|
const sql = `INSERT INTO products VALUES ${placeholders}
|
|
ON DUPLICATE KEY UPDATE
|
|
title = VALUES(title),
|
|
stock_quantity = VALUES(stock_quantity),
|
|
price = VALUES(price),
|
|
regular_price = VALUES(regular_price),
|
|
cost_price = VALUES(cost_price),
|
|
landing_cost_price = VALUES(landing_cost_price),
|
|
barcode = VALUES(barcode),
|
|
updated_at = VALUES(updated_at),
|
|
visible = VALUES(visible),
|
|
managing_stock = VALUES(managing_stock),
|
|
replenishable = VALUES(replenishable),
|
|
vendor = VALUES(vendor),
|
|
vendor_reference = VALUES(vendor_reference),
|
|
permalink = VALUES(permalink),
|
|
categories = VALUES(categories),
|
|
image = VALUES(image),
|
|
brand = VALUES(brand),
|
|
options = VALUES(options),
|
|
tags = VALUES(tags),
|
|
moq = VALUES(moq),
|
|
uom = VALUES(uom)`;
|
|
|
|
const [result] = await connection.query(sql, values.flat());
|
|
|
|
// Update stats
|
|
if (result.affectedRows > 0) {
|
|
const insertCount = result.affectedRows - result.changedRows;
|
|
const updateCount = result.changedRows;
|
|
added += insertCount;
|
|
updated += updateCount;
|
|
}
|
|
|
|
// Process categories within the same transaction
|
|
for (const [productId, categories] of categoryUpdates) {
|
|
await handleCategories(connection, productId, categories);
|
|
}
|
|
|
|
await connection.commit();
|
|
} catch (error) {
|
|
await connection.rollback();
|
|
logError(error, `Error processing batch of ${records.length} records`);
|
|
throw error;
|
|
}
|
|
} catch (error) {
|
|
logError(error, `Error in batch processing:\nFirst record: ${JSON.stringify(records[0])}`);
|
|
// Continue with next batch instead of failing completely
|
|
}
|
|
}
|
|
}
|
|
|
|
async function importOrders(pool, filePath) {
|
|
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
|
|
const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath);
|
|
const startTime = Date.now();
|
|
outputProgress({
|
|
operation: 'Starting orders import',
|
|
current: 0,
|
|
total: totalRows,
|
|
testLimit: ORDERS_TEST_LIMIT,
|
|
percentage: '0'
|
|
});
|
|
|
|
function convertDate(dateStr) {
|
|
if (!dateStr) {
|
|
// Default to current date for missing dates
|
|
const now = new Date();
|
|
const year = now.getFullYear();
|
|
const month = String(now.getMonth() + 1).padStart(2, '0');
|
|
const day = String(now.getDate()).padStart(2, '0');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
const [day, month, year] = dateStr.split('-');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
|
|
// First, get all valid product IDs
|
|
const connection = await pool.getConnection();
|
|
let validProductIds;
|
|
try {
|
|
const [rows] = await connection.query('SELECT product_id FROM products');
|
|
validProductIds = new Set(rows.map(row => row.product_id.toString()));
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
|
|
let skipped = 0;
|
|
let updated = 0;
|
|
let added = 0;
|
|
let rowCount = 0;
|
|
let lastUpdate = Date.now();
|
|
|
|
// Batch processing variables
|
|
const BATCH_SIZE = 500;
|
|
let batch = [];
|
|
|
|
for await (const record of parser) {
|
|
if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) {
|
|
// Process remaining batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch);
|
|
}
|
|
outputProgress({
|
|
operation: 'Orders import',
|
|
message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`,
|
|
current: rowCount,
|
|
total: totalRows
|
|
});
|
|
break;
|
|
}
|
|
rowCount++;
|
|
|
|
// Update progress every 100ms
|
|
const now = Date.now();
|
|
if (now - lastUpdate > 100) {
|
|
updateProgress(rowCount, totalRows, 'Orders import', startTime);
|
|
lastUpdate = now;
|
|
}
|
|
|
|
if (!validProductIds.has(record.product_id)) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
|
|
// Add to batch
|
|
batch.push({
|
|
order_number: record.order_number,
|
|
product_id: record.product_id,
|
|
SKU: record.SKU,
|
|
date: convertDate(record.date),
|
|
price: parseFloat(record.price) || 0,
|
|
quantity: parseInt(record.quantity) || 0,
|
|
discount: parseFloat(record.discount) || 0,
|
|
tax: parseFloat(record.tax) || 0,
|
|
tax_included: record.tax_included === '1',
|
|
shipping: parseFloat(record.shipping) || 0,
|
|
customer: record.customer,
|
|
canceled: record.canceled === '1'
|
|
});
|
|
|
|
// Process batch if it reaches BATCH_SIZE
|
|
if (batch.length >= BATCH_SIZE) {
|
|
await processBatch(batch);
|
|
batch = [];
|
|
}
|
|
}
|
|
|
|
// Process any remaining records in the final batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch);
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Orders import completed',
|
|
current: rowCount,
|
|
total: totalRows,
|
|
added,
|
|
updated,
|
|
skipped,
|
|
duration: formatDuration((Date.now() - startTime) / 1000),
|
|
percentage: '100'
|
|
});
|
|
|
|
// Helper function to process a batch of records
|
|
async function processBatch(records) {
|
|
if (records.length === 0) return;
|
|
|
|
const connection = await pool.getConnection();
|
|
try {
|
|
// Create the batch insert/update query
|
|
const values = records.map(r => [
|
|
r.order_number, r.product_id, r.SKU, r.date, r.price,
|
|
r.quantity, r.discount, r.tax, r.tax_included, r.shipping,
|
|
r.customer, r.canceled
|
|
]);
|
|
|
|
const placeholders = records.map(() =>
|
|
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
|
|
).join(',');
|
|
|
|
const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price,
|
|
quantity, discount, tax, tax_included, shipping, customer, canceled)
|
|
VALUES ${placeholders}
|
|
ON DUPLICATE KEY UPDATE
|
|
price = VALUES(price),
|
|
quantity = VALUES(quantity),
|
|
discount = VALUES(discount),
|
|
tax = VALUES(tax),
|
|
tax_included = VALUES(tax_included),
|
|
shipping = VALUES(shipping),
|
|
canceled = VALUES(canceled)`;
|
|
|
|
const [result] = await connection.query(sql, values.flat());
|
|
|
|
// Update stats
|
|
if (result.affectedRows > 0) {
|
|
// For INSERT ... ON DUPLICATE KEY UPDATE:
|
|
// - If a row is inserted, affectedRows = 1
|
|
// - If a row is updated, affectedRows = 2
|
|
// So we can calculate:
|
|
// - Number of inserts = number of rows where affectedRows = 1
|
|
// - Number of updates = number of rows where affectedRows = 2
|
|
const insertCount = result.affectedRows - result.changedRows;
|
|
const updateCount = result.changedRows;
|
|
added += insertCount;
|
|
updated += updateCount;
|
|
}
|
|
} catch (error) {
|
|
console.error(`\nError processing batch:`, error.message);
|
|
// Continue with next batch instead of failing completely
|
|
skipped += records.length;
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
async function importPurchaseOrders(pool, filePath) {
|
|
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
|
|
const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath);
|
|
const startTime = Date.now();
|
|
outputProgress({
|
|
operation: 'Starting purchase orders import',
|
|
current: 0,
|
|
total: totalRows,
|
|
testLimit: PURCHASE_ORDERS_TEST_LIMIT,
|
|
percentage: '0'
|
|
});
|
|
|
|
function convertDate(dateStr) {
|
|
if (!dateStr) {
|
|
// Default to current date for missing dates
|
|
const now = new Date();
|
|
const year = now.getFullYear();
|
|
const month = String(now.getMonth() + 1).padStart(2, '0');
|
|
const day = String(now.getDate()).padStart(2, '0');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
const [day, month, year] = dateStr.split('-');
|
|
return `${year}-${month}-${day}`;
|
|
}
|
|
|
|
// First, get all valid product IDs
|
|
const connection = await pool.getConnection();
|
|
let validProductIds;
|
|
try {
|
|
const [rows] = await connection.query('SELECT product_id FROM products');
|
|
validProductIds = new Set(rows.map(row => row.product_id.toString()));
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
|
|
let skipped = 0;
|
|
let updated = 0;
|
|
let added = 0;
|
|
let rowCount = 0;
|
|
let lastUpdate = Date.now();
|
|
|
|
// Batch processing variables
|
|
const BATCH_SIZE = 500;
|
|
let batch = [];
|
|
|
|
for await (const record of parser) {
|
|
if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) {
|
|
// Process remaining batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch);
|
|
}
|
|
outputProgress({
|
|
operation: 'Purchase orders import',
|
|
message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`,
|
|
current: rowCount,
|
|
total: totalRows
|
|
});
|
|
break;
|
|
}
|
|
rowCount++;
|
|
|
|
// Update progress every 100ms
|
|
const now = Date.now();
|
|
if (now - lastUpdate > 100) {
|
|
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime);
|
|
lastUpdate = now;
|
|
}
|
|
|
|
if (!validProductIds.has(record.product_id)) {
|
|
skipped++;
|
|
continue;
|
|
}
|
|
|
|
// Add to batch
|
|
batch.push({
|
|
po_id: record.po_id,
|
|
vendor: record.vendor,
|
|
date: convertDate(record.date),
|
|
expected_date: convertDate(record.expected_date),
|
|
product_id: record.product_id,
|
|
sku: record.sku,
|
|
cost_price: parseFloat(record.cost_price) || 0,
|
|
status: record.status || 'pending',
|
|
notes: record.notes,
|
|
ordered: parseInt(record.ordered) || 0,
|
|
received: parseInt(record.received) || 0,
|
|
received_date: convertDate(record.received_date)
|
|
});
|
|
|
|
// Process batch if it reaches BATCH_SIZE
|
|
if (batch.length >= BATCH_SIZE) {
|
|
await processBatch(batch);
|
|
batch = [];
|
|
}
|
|
}
|
|
|
|
// Process any remaining records in the final batch
|
|
if (batch.length > 0) {
|
|
await processBatch(batch);
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'running',
|
|
operation: 'Purchase orders import completed',
|
|
current: rowCount,
|
|
total: totalRows,
|
|
added,
|
|
updated,
|
|
skipped,
|
|
duration: formatDuration((Date.now() - startTime) / 1000),
|
|
percentage: '100'
|
|
});
|
|
|
|
// Helper function to process a batch of records
|
|
async function processBatch(records) {
|
|
if (records.length === 0) return;
|
|
|
|
const connection = await pool.getConnection();
|
|
try {
|
|
// Create the batch insert/update query
|
|
const values = records.map(r => [
|
|
r.po_id, r.vendor, r.date, r.expected_date, r.product_id,
|
|
r.sku, r.cost_price, r.status, r.notes, r.ordered,
|
|
r.received, r.received_date
|
|
]);
|
|
|
|
const placeholders = records.map(() =>
|
|
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
|
|
).join(',');
|
|
|
|
const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date,
|
|
product_id, sku, cost_price, status, notes, ordered, received, received_date)
|
|
VALUES ${placeholders}
|
|
ON DUPLICATE KEY UPDATE
|
|
vendor = VALUES(vendor),
|
|
expected_date = VALUES(expected_date),
|
|
cost_price = VALUES(cost_price),
|
|
status = VALUES(status),
|
|
notes = VALUES(notes),
|
|
ordered = VALUES(ordered),
|
|
received = VALUES(received),
|
|
received_date = VALUES(received_date)`;
|
|
|
|
const [result] = await connection.query(sql, values.flat());
|
|
|
|
// Update stats
|
|
if (result.affectedRows > 0) {
|
|
// For INSERT ... ON DUPLICATE KEY UPDATE:
|
|
// - If a row is inserted, affectedRows = 1
|
|
// - If a row is updated, affectedRows = 2
|
|
// So we can calculate:
|
|
// - Number of inserts = number of rows where affectedRows = 1
|
|
// - Number of updates = number of rows where affectedRows = 2
|
|
const insertCount = result.affectedRows - result.changedRows;
|
|
const updateCount = result.changedRows;
|
|
added += insertCount;
|
|
updated += updateCount;
|
|
}
|
|
} catch (error) {
|
|
console.error(`\nError processing batch:`, error.message);
|
|
// Continue with next batch instead of failing completely
|
|
skipped += records.length;
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
}
|
|
}
|
|
|
|
async function main() {
|
|
outputProgress({
|
|
operation: 'Starting import process',
|
|
message: 'Creating connection pool...'
|
|
});
|
|
|
|
const startTime = Date.now();
|
|
let pool;
|
|
|
|
try {
|
|
pool = mysql.createPool(dbConfig);
|
|
|
|
// Check if tables exist, if not create them
|
|
outputProgress({
|
|
operation: 'Checking database schema',
|
|
message: 'Creating tables if needed...'
|
|
});
|
|
|
|
const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8');
|
|
await pool.query(schemaSQL);
|
|
|
|
// Step 1: Import all data first
|
|
try {
|
|
// Import products first since they're referenced by other tables
|
|
await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv'));
|
|
|
|
// Process orders and purchase orders in parallel
|
|
outputProgress({
|
|
operation: 'Starting parallel import',
|
|
message: 'Processing orders and purchase orders simultaneously...'
|
|
});
|
|
|
|
await Promise.all([
|
|
importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv')),
|
|
importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv'))
|
|
]);
|
|
|
|
// Step 2: Calculate all metrics after imports are complete
|
|
outputProgress({
|
|
operation: 'Starting metrics calculation',
|
|
message: 'Calculating metrics for all products and vendors...'
|
|
});
|
|
|
|
const connection = await pool.getConnection();
|
|
try {
|
|
// Calculate product metrics
|
|
const [products] = await connection.query('SELECT DISTINCT product_id FROM products');
|
|
const totalProducts = products.length;
|
|
let processedProducts = 0;
|
|
const metricsStartTime = Date.now();
|
|
|
|
outputProgress({
|
|
operation: 'Starting product metrics calculation',
|
|
message: `Calculating metrics for ${totalProducts} products...`,
|
|
current: 0,
|
|
total: totalProducts,
|
|
percentage: '0'
|
|
});
|
|
|
|
for (const product of products) {
|
|
try {
|
|
// Update progress every 5 products or 1 second
|
|
if (processedProducts % 5 === 0 || (Date.now() - lastUpdate) > 1000) {
|
|
updateProgress(processedProducts, totalProducts, 'Calculating product metrics', metricsStartTime);
|
|
lastUpdate = Date.now();
|
|
}
|
|
|
|
await updateProductMetrics(connection, product.product_id, metricsStartTime, processedProducts, totalProducts);
|
|
processedProducts++;
|
|
} catch (error) {
|
|
logError(error, `Error calculating metrics for product ${product.product_id}`);
|
|
// Continue with next product instead of failing completely
|
|
}
|
|
}
|
|
|
|
outputProgress({
|
|
operation: 'Product metrics calculation completed',
|
|
current: processedProducts,
|
|
total: totalProducts,
|
|
duration: formatDuration((Date.now() - metricsStartTime) / 1000),
|
|
percentage: '100'
|
|
});
|
|
|
|
// Calculate vendor metrics
|
|
await calculateVendorMetrics(connection);
|
|
|
|
} finally {
|
|
connection.release();
|
|
}
|
|
|
|
} catch (error) {
|
|
logError(error, 'Error during import/metrics calculation');
|
|
throw error;
|
|
}
|
|
|
|
outputProgress({
|
|
status: 'complete',
|
|
operation: 'Import process completed',
|
|
duration: formatDuration((Date.now() - startTime) / 1000)
|
|
});
|
|
} catch (error) {
|
|
logError(error, 'Fatal error during import process');
|
|
outputProgress({
|
|
status: 'error',
|
|
error: error.message
|
|
});
|
|
process.exit(1);
|
|
} finally {
|
|
if (pool) {
|
|
await pool.end();
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run the import
|
|
main().catch(error => {
|
|
logError(error, 'Unhandled error in main process');
|
|
process.exit(1);
|
|
});
|