Files
inventory/inventory-server/old/old_csv/import-csv.js

1049 lines
37 KiB
JavaScript

const fs = require('fs');
const path = require('path');
const csv = require('csv-parse');
const mysql = require('mysql2/promise');
const dotenv = require('dotenv');
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
// Get test limits from environment variables
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000');
const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000');
dotenv.config({ path: path.join(__dirname, '../.env') });
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true
};
// Set up logging
const LOG_DIR = path.join(__dirname, '../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
// Log to error file
fs.appendFileSync(ERROR_LOG, errorMessage);
// Also log to console
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log import progress
function logImport(message, isSignificant = false) {
// Only write to disk if it's a significant event
if (isSignificant) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${message}\n`;
fs.appendFileSync(IMPORT_LOG, logMessage);
}
}
// Helper function to format duration
function formatDuration(seconds) {
const hours = Math.floor(seconds / 3600);
const minutes = Math.floor((seconds % 3600) / 60);
seconds = Math.floor(seconds % 60);
const parts = [];
if (hours > 0) parts.push(`${hours}h`);
if (minutes > 0) parts.push(`${minutes}m`);
if (seconds > 0 || parts.length === 0) parts.push(`${seconds}s`);
return parts.join(' ');
}
// Helper function to output progress
function outputProgress(data) {
// Always send to stdout for frontend
process.stdout.write(JSON.stringify(data) + '\n');
// Log significant events to disk
const isSignificant =
// Operation starts
(data.operation && !data.current) ||
// Operation completions and errors
data.status === 'complete' ||
data.status === 'error' ||
// Test limits reached
data.message?.includes('test limit') ||
// Schema changes
data.operation?.includes('Creating database schema') ||
// Parallel import starts
data.message?.includes('Processing orders and purchase orders simultaneously');
if (isSignificant) {
logImport(`${data.operation || 'Operation'}${data.message ? ': ' + data.message : ''}${data.error ? ' Error: ' + data.error : ''}${data.status ? ' Status: ' + data.status : ''}`, true);
}
}
// Helper function to count total rows in a CSV file
async function countRows(filePath) {
return new Promise((resolve, reject) => {
let count = 0;
fs.createReadStream(filePath)
.pipe(csv.parse())
.on('data', () => count++)
.on('error', reject)
.on('end', () => resolve(count - 1)); // Subtract 1 for header row
});
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime, added = 0, updated = 0, skipped = 0) {
outputProgress({
status: 'running',
operation,
current,
total,
rate: calculateRate(startTime, current),
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, current, total),
percentage: ((current / total) * 100).toFixed(1),
added,
updated,
skipped
});
}
// Helper function to handle category normalization
async function handleCategories(connection, productId, categoriesStr) {
if (!categoriesStr) {
// If no categories, remove all existing relationships
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
return;
}
// Special cases that should not be split
const specialCategories = [
'Paint, Dyes & Chalk',
'Fabric Paint, Markers, and Dye',
'Crystals, Gems & Rhinestones',
'Pens, Pencils & Markers'
];
// Split categories and clean them, preserving special cases
const categories = [];
let remainingStr = categoriesStr;
// First check for special categories
for (const special of specialCategories) {
if (remainingStr.includes(special)) {
categories.push(special);
// Remove the special category from the string
remainingStr = remainingStr.replace(special, '');
}
}
// Then process any remaining regular categories
remainingStr.split(',')
.map(cat => cat.trim())
.filter(cat => cat.length > 0)
.forEach(cat => {
if (!categories.includes(cat)) {
categories.push(cat);
}
});
// Remove existing category relationships for this product
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
// Insert categories and create relationships
for (const category of categories) {
// Insert category if it doesn't exist
await connection.query(
'INSERT IGNORE INTO categories (name) VALUES (?)',
[category]
);
// Get category ID and create relationship in one query to avoid race conditions
await connection.query(`
INSERT IGNORE INTO product_categories (product_id, category_id)
SELECT ?, id FROM categories WHERE name = ?`,
[productId, category]
);
}
}
// Helper function to calculate sales velocity metrics
async function calculateSalesVelocity(connection, productId) {
const [rows] = await connection.query(`
SELECT
COALESCE(COUNT(*) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0), 0) as daily_sales_avg,
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 7, 0) as weekly_sales_avg,
COALESCE(SUM(quantity) / NULLIF(DATEDIFF(MAX(date), MIN(date)), 0) * 30, 0) as monthly_sales_avg
FROM orders
WHERE product_id = ? AND canceled = false
GROUP BY product_id
`, [productId]);
return rows[0] || { daily_sales_avg: 0, weekly_sales_avg: 0, monthly_sales_avg: 0 };
}
// Helper function to calculate stock metrics
async function calculateStockMetrics(connection, productId, dailySalesAvg) {
const [product] = await connection.query(
'SELECT stock_quantity FROM products WHERE product_id = ?',
[productId]
);
if (!product[0]) return null;
const stockQty = product[0].stock_quantity;
const daysOfInventory = dailySalesAvg > 0 ? Math.floor(stockQty / dailySalesAvg) : 999;
const weeksOfInventory = Math.floor(daysOfInventory / 7);
// Calculate safety stock (2 weeks of average sales)
const safetyStock = Math.ceil(dailySalesAvg * 14);
// Calculate reorder point (safety stock + 1 week of sales)
const reorderPoint = Math.ceil(safetyStock + (dailySalesAvg * 7));
return {
days_of_inventory: daysOfInventory,
weeks_of_inventory: weeksOfInventory,
safety_stock: safetyStock,
reorder_point: reorderPoint
};
}
// Helper function to calculate financial metrics
async function calculateFinancialMetrics(connection, productId) {
const [rows] = await connection.query(`
SELECT
SUM(o.price * o.quantity) as total_revenue,
AVG((o.price - p.cost_price) / o.price * 100) as avg_margin_percent
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE o.product_id = ? AND o.canceled = false
GROUP BY o.product_id
`, [productId]);
return rows[0] || { total_revenue: 0, avg_margin_percent: 0 };
}
// Helper function to calculate purchase metrics
async function calculatePurchaseMetrics(connection, productId) {
const [rows] = await connection.query(`
SELECT
AVG(DATEDIFF(received_date, date)) as avg_lead_time_days,
MAX(date) as last_purchase_date,
MAX(received_date) as last_received_date
FROM purchase_orders
WHERE product_id = ? AND status = 'closed'
GROUP BY product_id
`, [productId]);
return rows[0] || {
avg_lead_time_days: 0,
last_purchase_date: null,
last_received_date: null
};
}
// Helper function to calculate ABC classification
async function calculateABCClass(connection, productId) {
// Get total revenue for this product
const [productRevenue] = await connection.query(`
SELECT SUM(price * quantity) as revenue
FROM orders
WHERE product_id = ? AND canceled = false
`, [productId]);
// Get total revenue across all products
const [totalRevenue] = await connection.query(`
SELECT SUM(price * quantity) as total
FROM orders
WHERE canceled = false
`);
const revenue = productRevenue[0]?.revenue || 0;
const total = totalRevenue[0]?.total || 0;
if (total === 0) return 'C';
const percentage = (revenue / total) * 100;
// A: Top 20% of revenue
// B: Next 30% of revenue
// C: Remaining 50% of revenue
if (percentage >= 20) return 'A';
if (percentage >= 5) return 'B';
return 'C';
}
// Helper function to calculate time-based aggregates
async function calculateTimeAggregates(connection, productId) {
await connection.query(`
INSERT INTO product_time_aggregates (
product_id, year, month,
total_quantity_sold, total_revenue, total_cost,
order_count, stock_received, stock_ordered,
avg_price, profit_margin
)
SELECT
o.product_id,
YEAR(o.date) as year,
MONTH(o.date) as month,
SUM(o.quantity) as total_quantity_sold,
SUM(o.price * o.quantity) as total_revenue,
SUM(p.cost_price * o.quantity) as total_cost,
COUNT(DISTINCT o.order_number) as order_count,
COALESCE(SUM(po.received), 0) as stock_received,
COALESCE(SUM(po.ordered), 0) as stock_ordered,
AVG(o.price) as avg_price,
CASE
WHEN SUM(o.price * o.quantity) = 0 THEN 0
ELSE ((SUM(o.price * o.quantity) - COALESCE(SUM(p.cost_price * o.quantity), 0)) /
NULLIF(SUM(o.price * o.quantity), 0) * 100)
END as profit_margin
FROM orders o
JOIN products p ON o.product_id = p.product_id
LEFT JOIN purchase_orders po ON o.product_id = po.product_id
AND YEAR(o.date) = YEAR(po.date)
AND MONTH(o.date) = MONTH(po.date)
WHERE o.product_id = ? AND o.canceled = false
GROUP BY o.product_id, YEAR(o.date), MONTH(o.date)
ON DUPLICATE KEY UPDATE
total_quantity_sold = VALUES(total_quantity_sold),
total_revenue = VALUES(total_revenue),
total_cost = VALUES(total_cost),
order_count = VALUES(order_count),
stock_received = VALUES(stock_received),
stock_ordered = VALUES(stock_ordered),
avg_price = VALUES(avg_price),
profit_margin = VALUES(profit_margin)
`, [productId]);
}
// Helper function to calculate vendor metrics
async function calculateVendorMetrics(connection) {
try {
// Get list of vendors
const [vendors] = await connection.query('SELECT DISTINCT vendor FROM products WHERE vendor IS NOT NULL');
const startTime = Date.now();
let current = 0;
const total = vendors.length;
outputProgress({
operation: 'Calculating vendor metrics',
current: 0,
total,
percentage: '0.0'
});
for (const { vendor } of vendors) {
// Calculate average lead time
const [leadTimeResult] = await connection.query(`
SELECT
AVG(DATEDIFF(received_date, date)) as avg_lead_time,
COUNT(*) as total_orders,
SUM(CASE WHEN ordered = received THEN 1 ELSE 0 END) as fulfilled_orders
FROM purchase_orders
WHERE vendor = ? AND status = 'closed'
GROUP BY vendor
`, [vendor]);
const metrics = leadTimeResult[0] || {
avg_lead_time: 0,
total_orders: 0,
fulfilled_orders: 0
};
// Calculate fill rate
const fillRate = metrics.total_orders > 0 ?
(metrics.fulfilled_orders / metrics.total_orders * 100) : 0;
// Update vendor metrics
await connection.query(`
INSERT INTO vendor_metrics (
vendor,
avg_lead_time_days,
total_orders,
fulfilled_orders,
fill_rate
) VALUES (?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
avg_lead_time_days = VALUES(avg_lead_time_days),
total_orders = VALUES(total_orders),
fulfilled_orders = VALUES(fulfilled_orders),
fill_rate = VALUES(fill_rate)
`, [
vendor,
metrics.avg_lead_time || 0,
metrics.total_orders,
metrics.fulfilled_orders,
fillRate
]);
current++;
updateProgress(current, total, 'Calculating vendor metrics', startTime);
}
outputProgress({
status: 'complete',
operation: 'Vendor metrics calculation completed',
current: total,
total,
percentage: '100.0'
});
} catch (error) {
logError(error, 'Error calculating vendor metrics');
throw error;
}
}
async function importProducts(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting products import',
current: 0,
total: totalRows,
testLimit: PRODUCTS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 100;
let batch = [];
let categoryUpdates = new Map(); // Store category updates for batch processing
// Get a connection from the pool that we'll reuse
const connection = await pool.getConnection();
try {
for await (const record of parser) {
if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
operation: 'Products import',
message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms to avoid console flooding
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Products import', startTime, added, updated, 0);
lastUpdate = now;
}
// Add to batch
batch.push({
product_id: record.product_id,
title: record.title,
SKU: record.SKU,
created_at: convertDate(record.created_at),
stock_quantity: parseInt(record.stock_quantity) || 0,
price: parseFloat(record.price) || 0,
regular_price: parseFloat(record.regular_price) || 0,
cost_price: parseFloat(record.cost_price) || null,
landing_cost_price: parseFloat(record.landing_cost_price) || null,
barcode: record.barcode,
updated_at: convertDate(record.updated_at),
visible: record.visible === '1',
managing_stock: record.managing_stock === '1',
replenishable: record.replenishable === '1',
vendor: record.vendor,
vendor_reference: record.vendor_reference,
permalink: record.permalink,
categories: record.categories,
image: record.image,
brand: record.brand,
options: record.options,
tags: record.tags,
moq: parseInt(record.moq) || 1,
uom: parseInt(record.uom) || 1
});
if (record.categories) {
categoryUpdates.set(record.product_id, record.categories);
}
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch, categoryUpdates);
batch = [];
categoryUpdates.clear();
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
status: 'running',
operation: 'Products import completed',
current: rowCount,
total: totalRows,
added,
updated,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
} catch (error) {
console.error('Error during products import:', error);
throw error;
} finally {
if (connection) {
connection.release();
}
}
// Helper function to process a batch of records
async function processBatch(records, categoryUpdates) {
if (records.length === 0) return;
try {
await connection.beginTransaction();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity,
r.price, r.regular_price, r.cost_price, r.landing_cost_price,
r.barcode, r.updated_at, r.visible, r.managing_stock,
r.replenishable, r.vendor, r.vendor_reference, r.permalink,
r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO products VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
title = VALUES(title),
stock_quantity = VALUES(stock_quantity),
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
managing_stock = VALUES(managing_stock),
replenishable = VALUES(replenishable),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
permalink = VALUES(permalink),
categories = VALUES(categories),
image = VALUES(image),
brand = VALUES(brand),
options = VALUES(options),
tags = VALUES(tags),
moq = VALUES(moq),
uom = VALUES(uom)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
// Process categories within the same transaction
for (const [productId, categories] of categoryUpdates) {
await handleCategories(connection, productId, categories);
}
await connection.commit();
} catch (error) {
await connection.rollback();
logError(error, `Error processing batch of ${records.length} records`);
throw error;
}
} catch (error) {
logError(error, `Error in batch processing:\nFirst record: ${JSON.stringify(records[0])}`);
// Continue with next batch instead of failing completely
}
}
}
async function importOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting orders import',
current: 0,
total: totalRows,
testLimit: ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Orders import',
message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Orders import', startTime, added, updated, skipped);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
order_number: record.order_number,
product_id: record.product_id,
SKU: record.SKU,
date: convertDate(record.date),
price: parseFloat(record.price) || 0,
quantity: parseInt(record.quantity) || 0,
discount: parseFloat(record.discount) || 0,
tax: parseFloat(record.tax) || 0,
tax_included: record.tax_included === '1',
shipping: parseFloat(record.shipping) || 0,
customer: record.customer,
canceled: record.canceled === '1'
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.order_number, r.product_id, r.SKU, r.date, r.price,
r.quantity, r.discount, r.tax, r.tax_included, r.shipping,
r.customer, r.canceled
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price,
quantity, discount, tax, tax_included, shipping, customer, canceled)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
canceled = VALUES(canceled)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
// For INSERT ... ON DUPLICATE KEY UPDATE:
// - If a row is inserted, affectedRows = 1
// - If a row is updated, affectedRows = 2
// So we can calculate:
// - Number of inserts = number of rows where affectedRows = 1
// - Number of updates = number of rows where affectedRows = 2
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function importPurchaseOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting purchase orders import',
current: 0,
total: totalRows,
testLimit: PURCHASE_ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) {
// Default to current date for missing dates
const now = new Date();
const year = now.getFullYear();
const month = String(now.getMonth() + 1).padStart(2, '0');
const day = String(now.getDate()).padStart(2, '0');
return `${year}-${month}-${day}`;
}
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Purchase orders import',
message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime, added, updated, skipped);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
po_id: record.po_id,
vendor: record.vendor,
date: convertDate(record.date),
expected_date: convertDate(record.expected_date),
product_id: record.product_id,
sku: record.sku,
cost_price: parseFloat(record.cost_price) || 0,
status: record.status || 'pending',
notes: record.notes,
ordered: parseInt(record.ordered) || 0,
received: parseInt(record.received) || 0,
received_date: convertDate(record.received_date)
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Purchase orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.po_id, r.vendor, r.date, r.expected_date, r.product_id,
r.sku, r.cost_price, r.status, r.notes, r.ordered,
r.received, r.received_date
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date,
product_id, sku, cost_price, status, notes, ordered, received, received_date)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
vendor = VALUES(vendor),
expected_date = VALUES(expected_date),
cost_price = VALUES(cost_price),
status = VALUES(status),
notes = VALUES(notes),
ordered = VALUES(ordered),
received = VALUES(received),
received_date = VALUES(received_date)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
// For INSERT ... ON DUPLICATE KEY UPDATE:
// - If a row is inserted, affectedRows = 1
// - If a row is updated, affectedRows = 2
// So we can calculate:
// - Number of inserts = number of rows where affectedRows = 1
// - Number of updates = number of rows where affectedRows = 2
const insertCount = result.affectedRows - result.changedRows;
const updateCount = result.changedRows;
added += insertCount;
updated += updateCount;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function main() {
const startTime = Date.now();
let pool;
let importInProgress = false;
try {
outputProgress({
operation: 'Starting import process',
message: 'Creating connection pool...'
});
pool = mysql.createPool(dbConfig);
// Check if tables exist, if not create them
outputProgress({
operation: 'Checking database schema',
message: 'Verifying tables exist...'
});
const connection = await pool.getConnection();
try {
// Check if products table exists as a proxy for schema being initialized
const [tables] = await connection.query(
'SELECT COUNT(*) as count FROM information_schema.tables WHERE table_schema = ? AND table_name = ?',
[dbConfig.database, 'products']
);
if (tables[0].count === 0) {
outputProgress({
operation: 'Creating database schema',
message: 'Tables not found, creating schema...'
});
const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8');
await connection.query(schemaSQL);
}
} finally {
connection.release();
}
// Import all data
try {
importInProgress = true;
// Import products first since they're referenced by other tables
await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv'));
// Process orders and purchase orders in parallel
outputProgress({
operation: 'Starting parallel import',
message: 'Processing orders and purchase orders simultaneously...'
});
await Promise.all([
importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv')),
importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv'))
]);
// Only output completion if we haven't encountered an error
if (importInProgress) {
outputProgress({
status: 'complete',
operation: 'Import process completed',
duration: formatDuration((Date.now() - startTime) / 1000)
});
}
} catch (error) {
importInProgress = false;
logError(error, 'Error during import');
outputProgress({
status: 'error',
operation: 'Import process',
error: error.message
});
throw error;
}
} catch (error) {
importInProgress = false;
logError(error, 'Fatal error during import process');
outputProgress({
status: 'error',
operation: 'Import process',
error: error.message
});
process.exit(1);
} finally {
if (pool) {
await pool.end();
}
}
}
// Run the import
main().catch(error => {
logError(error, 'Unhandled error in main process');
process.exit(1);
});