/** * This script updates the costeach values for existing orders from the original MySQL database * without needing to run the full import process. */ const dotenv = require("dotenv"); const path = require("path"); const fs = require("fs"); const { setupConnections, closeConnections } = require('../scripts/import/utils'); const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress'); dotenv.config({ path: path.join(__dirname, "../.env") }); // SSH configuration const sshConfig = { ssh: { host: process.env.PROD_SSH_HOST, port: process.env.PROD_SSH_PORT || 22, username: process.env.PROD_SSH_USER, privateKey: process.env.PROD_SSH_KEY_PATH ? fs.readFileSync(process.env.PROD_SSH_KEY_PATH) : undefined, compress: true, // Enable SSH compression }, prodDbConfig: { // MySQL config for production host: process.env.PROD_DB_HOST || "localhost", user: process.env.PROD_DB_USER, password: process.env.PROD_DB_PASSWORD, database: process.env.PROD_DB_NAME, port: process.env.PROD_DB_PORT || 3306, timezone: 'Z', }, localDbConfig: { // PostgreSQL config for local host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, port: process.env.DB_PORT || 5432, ssl: process.env.DB_SSL === 'true', connectionTimeoutMillis: 60000, idleTimeoutMillis: 30000, max: 10 // connection pool max size } }; async function updateOrderCosts() { const startTime = Date.now(); let connections; let updatedCount = 0; let errorCount = 0; try { outputProgress({ status: "running", operation: "Order costs update", message: "Initializing SSH tunnel..." }); connections = await setupConnections(sshConfig); const { prodConnection, localConnection } = connections; // 1. Get all orders from local database that need cost updates outputProgress({ status: "running", operation: "Order costs update", message: "Getting orders from local database..." }); const [orders] = await localConnection.query(` SELECT DISTINCT order_number, pid FROM orders WHERE costeach = 0 OR costeach IS NULL ORDER BY order_number `); if (!orders || !orders.rows || orders.rows.length === 0) { console.log("No orders found that need cost updates"); return { updatedCount: 0, errorCount: 0 }; } const totalOrders = orders.rows.length; console.log(`Found ${totalOrders} orders that need cost updates`); // Process in batches of 1000 orders const BATCH_SIZE = 500; for (let i = 0; i < orders.rows.length; i += BATCH_SIZE) { try { // Start transaction for this batch await localConnection.beginTransaction(); const batch = orders.rows.slice(i, i + BATCH_SIZE); const orderNumbers = [...new Set(batch.map(o => o.order_number))]; // 2. Fetch costs from production database for these orders outputProgress({ status: "running", operation: "Order costs update", message: `Fetching costs for orders ${i + 1} to ${Math.min(i + BATCH_SIZE, totalOrders)} of ${totalOrders}`, current: i, total: totalOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); const [costs] = await prodConnection.query(` SELECT oc.orderid as order_number, oc.pid, oc.costeach FROM order_costs oc INNER JOIN ( SELECT orderid, pid, MAX(id) as max_id FROM order_costs WHERE orderid IN (?) AND pending = 0 GROUP BY orderid, pid ) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id `, [orderNumbers]); // Create a map of costs for easy lookup const costMap = {}; if (costs && costs.length) { costs.forEach(c => { costMap[`${c.order_number}-${c.pid}`] = c.costeach || 0; }); } // 3. Update costs in local database by batches // Using a more efficient update approach with a temporary table // Create a temporary table for each batch await localConnection.query(` DROP TABLE IF EXISTS temp_order_costs; CREATE TEMP TABLE temp_order_costs ( order_number VARCHAR(50) NOT NULL, pid BIGINT NOT NULL, costeach DECIMAL(10,3) NOT NULL, PRIMARY KEY (order_number, pid) ); `); // Insert cost data into the temporary table const costEntries = []; for (const order of batch) { const key = `${order.order_number}-${order.pid}`; if (key in costMap) { costEntries.push({ order_number: order.order_number, pid: order.pid, costeach: costMap[key] }); } } // Insert in sub-batches of 100 const DB_BATCH_SIZE = 50; for (let j = 0; j < costEntries.length; j += DB_BATCH_SIZE) { const subBatch = costEntries.slice(j, j + DB_BATCH_SIZE); if (subBatch.length === 0) continue; const placeholders = subBatch.map((_, idx) => `($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})` ).join(','); const values = subBatch.flatMap(item => [ item.order_number, item.pid, item.costeach ]); await localConnection.query(` INSERT INTO temp_order_costs (order_number, pid, costeach) VALUES ${placeholders} `, values); } // Perform bulk update from the temporary table const [updateResult] = await localConnection.query(` UPDATE orders o SET costeach = t.costeach FROM temp_order_costs t WHERE o.order_number = t.order_number AND o.pid = t.pid RETURNING o.id `); const batchUpdated = updateResult.rowCount || 0; updatedCount += batchUpdated; // Commit transaction for this batch await localConnection.commit(); outputProgress({ status: "running", operation: "Order costs update", message: `Updated ${updatedCount} orders with costs from production (batch: ${batchUpdated})`, current: i + batch.length, total: totalOrders, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); } catch (error) { // If a batch fails, roll back that batch's transaction and continue try { await localConnection.rollback(); } catch (rollbackError) { console.error("Error during batch rollback:", rollbackError); } console.error(`Error processing batch ${i}-${i + BATCH_SIZE}:`, error); errorCount++; } } // 4. For orders with no matching costs, set a default based on price outputProgress({ status: "running", operation: "Order costs update", message: "Setting default costs for remaining orders..." }); // Process remaining updates in smaller batches const DEFAULT_BATCH_SIZE = 10000; let totalDefaultUpdated = 0; try { // Start with a count query to determine how many records need the default update const [countResult] = await localConnection.query(` SELECT COUNT(*) as count FROM orders WHERE (costeach = 0 OR costeach IS NULL) `); const totalToUpdate = parseInt(countResult.rows[0]?.count || 0); if (totalToUpdate > 0) { console.log(`Applying default cost to ${totalToUpdate} orders`); // Apply the default in batches with separate transactions for (let i = 0; i < totalToUpdate; i += DEFAULT_BATCH_SIZE) { try { await localConnection.beginTransaction(); const [defaultUpdates] = await localConnection.query(` WITH orders_to_update AS ( SELECT id FROM orders WHERE (costeach = 0 OR costeach IS NULL) LIMIT ${DEFAULT_BATCH_SIZE} ) UPDATE orders o SET costeach = price * 0.5 FROM orders_to_update otu WHERE o.id = otu.id RETURNING o.id `); const batchDefaultUpdated = defaultUpdates.rowCount || 0; totalDefaultUpdated += batchDefaultUpdated; await localConnection.commit(); outputProgress({ status: "running", operation: "Order costs update", message: `Applied default costs to ${totalDefaultUpdated} of ${totalToUpdate} orders`, current: totalDefaultUpdated, total: totalToUpdate, elapsed: formatElapsedTime((Date.now() - startTime) / 1000) }); } catch (error) { try { await localConnection.rollback(); } catch (rollbackError) { console.error("Error during default update rollback:", rollbackError); } console.error(`Error applying default costs batch ${i}-${i + DEFAULT_BATCH_SIZE}:`, error); errorCount++; } } } } catch (error) { console.error("Error counting or updating remaining orders:", error); errorCount++; } updatedCount += totalDefaultUpdated; const endTime = Date.now(); const totalSeconds = (endTime - startTime) / 1000; outputProgress({ status: "complete", operation: "Order costs update", message: `Updated ${updatedCount} orders (${totalDefaultUpdated} with default values) in ${formatElapsedTime(totalSeconds)}`, elapsed: formatElapsedTime(totalSeconds) }); return { status: "complete", updatedCount, errorCount }; } catch (error) { console.error("Error during order costs update:", error); return { status: "error", error: error.message, updatedCount, errorCount }; } finally { if (connections) { await closeConnections(connections).catch(err => { console.error("Error closing connections:", err); }); } } } // Run the script only if this is the main module if (require.main === module) { updateOrderCosts().then((results) => { console.log('Cost update completed:', results); // Force exit after a small delay to ensure all logs are written setTimeout(() => process.exit(0), 500); }).catch((error) => { console.error("Unhandled error:", error); // Force exit with error code after a small delay setTimeout(() => process.exit(1), 500); }); } // Export the function for use in other scripts module.exports = updateOrderCosts;