Files
inventory/inventory-server/scripts/import-csv.js

640 lines
22 KiB
JavaScript

const fs = require('fs');
const path = require('path');
const csv = require('csv-parse');
const mysql = require('mysql2/promise');
const dotenv = require('dotenv');
// Get test limits from environment variables
const PRODUCTS_TEST_LIMIT = parseInt(process.env.PRODUCTS_TEST_LIMIT || '0');
const ORDERS_TEST_LIMIT = parseInt(process.env.ORDERS_TEST_LIMIT || '10000');
const PURCHASE_ORDERS_TEST_LIMIT = parseInt(process.env.PURCHASE_ORDERS_TEST_LIMIT || '10000');
dotenv.config({ path: path.join(__dirname, '../.env') });
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true
};
// Helper function to output progress in JSON format
function outputProgress(data) {
if (!data.status) {
data = {
status: 'running',
...data
};
}
console.log(JSON.stringify(data));
}
// Helper function to count total rows in a CSV file
async function countRows(filePath) {
return new Promise((resolve, reject) => {
let count = 0;
fs.createReadStream(filePath)
.pipe(csv.parse())
.on('data', () => count++)
.on('error', reject)
.on('end', () => resolve(count - 1)); // Subtract 1 for header row
});
}
// Helper function to format time duration
function formatDuration(seconds) {
if (seconds < 60) return `${Math.round(seconds)}s`;
const minutes = Math.floor(seconds / 60);
seconds = Math.round(seconds % 60);
return `${minutes}m ${seconds}s`;
}
// Helper function to update progress with time estimate
function updateProgress(current, total, operation, startTime) {
const elapsed = (Date.now() - startTime) / 1000;
const rate = current / elapsed; // rows per second
const remaining = (total - current) / rate;
outputProgress({
status: 'running',
operation,
current,
total,
rate,
elapsed: formatDuration(elapsed),
remaining: formatDuration(remaining),
percentage: ((current / total) * 100).toFixed(1)
});
}
// Helper function to handle category normalization
async function handleCategories(connection, productId, categoriesStr) {
if (!categoriesStr) {
// If no categories, remove all existing relationships
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
return;
}
// Split categories and clean them
const categories = categoriesStr.split(',')
.map(cat => cat.trim())
.filter(cat => cat.length > 0);
// Remove existing category relationships for this product
await connection.query(
'DELETE FROM product_categories WHERE product_id = ?',
[productId]
);
// Insert categories and create relationships
for (const category of categories) {
// Insert category if it doesn't exist
await connection.query(
'INSERT IGNORE INTO categories (name) VALUES (?)',
[category]
);
// Get category ID and create relationship in one query to avoid race conditions
await connection.query(`
INSERT IGNORE INTO product_categories (product_id, category_id)
SELECT ?, id FROM categories WHERE name = ?`,
[productId, category]
);
}
}
async function importProducts(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PRODUCTS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PRODUCTS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting products import',
current: 0,
total: totalRows,
testLimit: PRODUCTS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) return null;
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 100;
let batch = [];
let categoryUpdates = new Map(); // Store category updates for batch processing
for await (const record of parser) {
if (PRODUCTS_TEST_LIMIT > 0 && rowCount >= PRODUCTS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
operation: 'Products import',
message: `Reached test limit of ${PRODUCTS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms to avoid console flooding
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Products import', startTime);
lastUpdate = now;
}
// Add to batch
batch.push({
product_id: record.product_id,
title: record.title,
SKU: record.SKU,
created_at: convertDate(record.created_at),
stock_quantity: parseInt(record.stock_quantity) || 0,
price: parseFloat(record.price) || 0,
regular_price: parseFloat(record.regular_price) || 0,
cost_price: parseFloat(record.cost_price) || null,
landing_cost_price: parseFloat(record.landing_cost_price) || null,
barcode: record.barcode,
updated_at: convertDate(record.updated_at),
visible: record.visible === '1',
managing_stock: record.managing_stock === '1',
replenishable: record.replenishable === '1',
vendor: record.vendor,
vendor_reference: record.vendor_reference,
permalink: record.permalink,
categories: record.categories,
image: record.image,
brand: record.brand,
options: record.options,
tags: record.tags,
moq: parseInt(record.moq) || 1,
uom: parseInt(record.uom) || 1
});
// Store category updates for later
if (record.categories) {
categoryUpdates.set(record.product_id, record.categories);
}
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch, categoryUpdates);
batch = [];
categoryUpdates.clear();
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch, categoryUpdates);
}
outputProgress({
status: 'running',
operation: 'Products import completed',
current: rowCount,
total: totalRows,
added,
updated,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records, categoryUpdates) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.product_id, r.title, r.SKU, r.created_at, r.stock_quantity,
r.price, r.regular_price, r.cost_price, r.landing_cost_price,
r.barcode, r.updated_at, r.visible, r.managing_stock,
r.replenishable, r.vendor, r.vendor_reference, r.permalink,
r.categories, r.image, r.brand, r.options, r.tags, r.moq, r.uom
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO products VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
title = VALUES(title),
stock_quantity = VALUES(stock_quantity),
price = VALUES(price),
regular_price = VALUES(regular_price),
cost_price = VALUES(cost_price),
landing_cost_price = VALUES(landing_cost_price),
barcode = VALUES(barcode),
updated_at = VALUES(updated_at),
visible = VALUES(visible),
managing_stock = VALUES(managing_stock),
replenishable = VALUES(replenishable),
vendor = VALUES(vendor),
vendor_reference = VALUES(vendor_reference),
permalink = VALUES(permalink),
categories = VALUES(categories),
image = VALUES(image),
brand = VALUES(brand),
options = VALUES(options),
tags = VALUES(tags),
moq = VALUES(moq),
uom = VALUES(uom)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
updated += result.affectedRows - result.insertId;
added += result.insertId;
}
// Process categories within the same transaction
for (const [productId, categories] of categoryUpdates) {
await handleCategories(connection, productId, categories);
}
await connection.commit();
} catch (error) {
await connection.rollback();
throw error;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
} finally {
connection.release();
}
}
}
async function importOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting orders import',
current: 0,
total: totalRows,
testLimit: ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) return null;
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (ORDERS_TEST_LIMIT > 0 && rowCount >= ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Orders import',
message: `Reached test limit of ${ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Orders import', startTime);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
order_number: record.order_number,
product_id: record.product_id,
SKU: record.SKU,
date: convertDate(record.date),
price: parseFloat(record.price) || 0,
quantity: parseInt(record.quantity) || 0,
discount: parseFloat(record.discount) || 0,
tax: parseFloat(record.tax) || 0,
tax_included: record.tax_included === '1',
shipping: parseFloat(record.shipping) || 0,
customer: record.customer,
canceled: record.canceled === '1'
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.order_number, r.product_id, r.SKU, r.date, r.price,
r.quantity, r.discount, r.tax, r.tax_included, r.shipping,
r.customer, r.canceled
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO orders (order_number, product_id, SKU, date, price,
quantity, discount, tax, tax_included, shipping, customer, canceled)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
price = VALUES(price),
quantity = VALUES(quantity),
discount = VALUES(discount),
tax = VALUES(tax),
tax_included = VALUES(tax_included),
shipping = VALUES(shipping),
canceled = VALUES(canceled)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
updated += result.affectedRows - result.insertId;
added += result.insertId;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function importPurchaseOrders(pool, filePath) {
const parser = fs.createReadStream(filePath).pipe(csv.parse({ columns: true, trim: true }));
const totalRows = PURCHASE_ORDERS_TEST_LIMIT > 0 ? Math.min(await countRows(filePath), PURCHASE_ORDERS_TEST_LIMIT) : await countRows(filePath);
const startTime = Date.now();
outputProgress({
operation: 'Starting purchase orders import',
current: 0,
total: totalRows,
testLimit: PURCHASE_ORDERS_TEST_LIMIT,
percentage: '0'
});
function convertDate(dateStr) {
if (!dateStr) return null;
const [day, month, year] = dateStr.split('-');
return `${year}-${month}-${day}`;
}
// First, get all valid product IDs
const connection = await pool.getConnection();
let validProductIds;
try {
const [rows] = await connection.query('SELECT product_id FROM products');
validProductIds = new Set(rows.map(row => row.product_id.toString()));
} finally {
connection.release();
}
let skipped = 0;
let updated = 0;
let added = 0;
let rowCount = 0;
let lastUpdate = Date.now();
// Batch processing variables
const BATCH_SIZE = 500;
let batch = [];
for await (const record of parser) {
if (PURCHASE_ORDERS_TEST_LIMIT > 0 && rowCount >= PURCHASE_ORDERS_TEST_LIMIT) {
// Process remaining batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
operation: 'Purchase orders import',
message: `Reached test limit of ${PURCHASE_ORDERS_TEST_LIMIT.toLocaleString()} rows`,
current: rowCount,
total: totalRows
});
break;
}
rowCount++;
// Update progress every 100ms
const now = Date.now();
if (now - lastUpdate > 100) {
updateProgress(rowCount, totalRows, 'Purchase orders import', startTime);
lastUpdate = now;
}
if (!validProductIds.has(record.product_id)) {
skipped++;
continue;
}
// Add to batch
batch.push({
po_id: record.po_id,
vendor: record.vendor,
date: convertDate(record.date),
expected_date: convertDate(record.expected_date),
product_id: record.product_id,
sku: record.sku,
cost_price: parseFloat(record.cost_price) || 0,
status: record.status || 'pending',
notes: record.notes,
ordered: parseInt(record.ordered) || 0,
received: parseInt(record.received) || 0,
received_date: convertDate(record.received_date)
});
// Process batch if it reaches BATCH_SIZE
if (batch.length >= BATCH_SIZE) {
await processBatch(batch);
batch = [];
}
}
// Process any remaining records in the final batch
if (batch.length > 0) {
await processBatch(batch);
}
outputProgress({
status: 'running',
operation: 'Purchase orders import completed',
current: rowCount,
total: totalRows,
added,
updated,
skipped,
duration: formatDuration((Date.now() - startTime) / 1000),
percentage: '100'
});
// Helper function to process a batch of records
async function processBatch(records) {
if (records.length === 0) return;
const connection = await pool.getConnection();
try {
// Create the batch insert/update query
const values = records.map(r => [
r.po_id, r.vendor, r.date, r.expected_date, r.product_id,
r.sku, r.cost_price, r.status, r.notes, r.ordered,
r.received, r.received_date
]);
const placeholders = records.map(() =>
'(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)'
).join(',');
const sql = `INSERT INTO purchase_orders (po_id, vendor, date, expected_date,
product_id, sku, cost_price, status, notes, ordered, received, received_date)
VALUES ${placeholders}
ON DUPLICATE KEY UPDATE
vendor = VALUES(vendor),
expected_date = VALUES(expected_date),
cost_price = VALUES(cost_price),
status = VALUES(status),
notes = VALUES(notes),
ordered = VALUES(ordered),
received = VALUES(received),
received_date = VALUES(received_date)`;
const [result] = await connection.query(sql, values.flat());
// Update stats
if (result.affectedRows > 0) {
updated += result.affectedRows - result.insertId;
added += result.insertId;
}
} catch (error) {
console.error(`\nError processing batch:`, error.message);
// Continue with next batch instead of failing completely
skipped += records.length;
} finally {
connection.release();
}
}
}
async function main() {
outputProgress({
operation: 'Starting import process',
message: 'Creating connection pool...'
});
const startTime = Date.now();
const pool = mysql.createPool(dbConfig);
try {
// Check if tables exist, if not create them
outputProgress({
operation: 'Checking database schema',
message: 'Creating tables if needed...'
});
const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8');
await pool.query(schemaSQL);
// Import products first since they're referenced by other tables
await importProducts(pool, path.join(__dirname, '../csv/39f2x83-products.csv'));
await importOrders(pool, path.join(__dirname, '../csv/39f2x83-orders.csv'));
await importPurchaseOrders(pool, path.join(__dirname, '../csv/39f2x83-purchase_orders.csv'));
outputProgress({
status: 'complete',
operation: 'Import process completed',
duration: formatDuration((Date.now() - startTime) / 1000)
});
} catch (error) {
outputProgress({
status: 'error',
error: error.message
});
process.exit(1);
} finally {
await pool.end();
}
}
// Run the import
main();