337 lines
11 KiB
JavaScript
337 lines
11 KiB
JavaScript
/**
|
|
* This script updates the costeach values for existing orders from the original MySQL database
|
|
* without needing to run the full import process.
|
|
*/
|
|
const dotenv = require("dotenv");
|
|
const path = require("path");
|
|
const fs = require("fs");
|
|
const { setupConnections, closeConnections } = require('./import/utils');
|
|
const { outputProgress, formatElapsedTime } = require('./metrics/utils/progress');
|
|
|
|
dotenv.config({ path: path.join(__dirname, "../.env") });
|
|
|
|
// SSH configuration
|
|
const sshConfig = {
|
|
ssh: {
|
|
host: process.env.PROD_SSH_HOST,
|
|
port: process.env.PROD_SSH_PORT || 22,
|
|
username: process.env.PROD_SSH_USER,
|
|
privateKey: process.env.PROD_SSH_KEY_PATH
|
|
? fs.readFileSync(process.env.PROD_SSH_KEY_PATH)
|
|
: undefined,
|
|
compress: true, // Enable SSH compression
|
|
},
|
|
prodDbConfig: {
|
|
// MySQL config for production
|
|
host: process.env.PROD_DB_HOST || "localhost",
|
|
user: process.env.PROD_DB_USER,
|
|
password: process.env.PROD_DB_PASSWORD,
|
|
database: process.env.PROD_DB_NAME,
|
|
port: process.env.PROD_DB_PORT || 3306,
|
|
timezone: 'Z',
|
|
},
|
|
localDbConfig: {
|
|
// PostgreSQL config for local
|
|
host: process.env.DB_HOST,
|
|
user: process.env.DB_USER,
|
|
password: process.env.DB_PASSWORD,
|
|
database: process.env.DB_NAME,
|
|
port: process.env.DB_PORT || 5432,
|
|
ssl: process.env.DB_SSL === 'true',
|
|
connectionTimeoutMillis: 60000,
|
|
idleTimeoutMillis: 30000,
|
|
max: 10 // connection pool max size
|
|
}
|
|
};
|
|
|
|
async function updateOrderCosts() {
|
|
const startTime = Date.now();
|
|
let connections;
|
|
let updatedCount = 0;
|
|
let errorCount = 0;
|
|
|
|
try {
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: "Initializing SSH tunnel..."
|
|
});
|
|
|
|
connections = await setupConnections(sshConfig);
|
|
const { prodConnection, localConnection } = connections;
|
|
|
|
// 1. Get all orders from local database that need cost updates
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: "Getting orders from local database..."
|
|
});
|
|
|
|
const [orders] = await localConnection.query(`
|
|
SELECT DISTINCT order_number, pid
|
|
FROM orders
|
|
WHERE costeach = 0 OR costeach IS NULL
|
|
ORDER BY order_number
|
|
`);
|
|
|
|
if (!orders || !orders.rows || orders.rows.length === 0) {
|
|
console.log("No orders found that need cost updates");
|
|
return { updatedCount: 0, errorCount: 0 };
|
|
}
|
|
|
|
const totalOrders = orders.rows.length;
|
|
console.log(`Found ${totalOrders} orders that need cost updates`);
|
|
|
|
// Process in batches of 1000 orders
|
|
const BATCH_SIZE = 500;
|
|
for (let i = 0; i < orders.rows.length; i += BATCH_SIZE) {
|
|
try {
|
|
// Start transaction for this batch
|
|
await localConnection.beginTransaction();
|
|
|
|
const batch = orders.rows.slice(i, i + BATCH_SIZE);
|
|
|
|
const orderNumbers = [...new Set(batch.map(o => o.order_number))];
|
|
|
|
// 2. Fetch costs from production database for these orders
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: `Fetching costs for orders ${i + 1} to ${Math.min(i + BATCH_SIZE, totalOrders)} of ${totalOrders}`,
|
|
current: i,
|
|
total: totalOrders,
|
|
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
|
|
});
|
|
|
|
const [costs] = await prodConnection.query(`
|
|
SELECT
|
|
oc.orderid as order_number,
|
|
oc.pid,
|
|
oc.costeach
|
|
FROM order_costs oc
|
|
INNER JOIN (
|
|
SELECT
|
|
orderid,
|
|
pid,
|
|
MAX(id) as max_id
|
|
FROM order_costs
|
|
WHERE orderid IN (?)
|
|
AND pending = 0
|
|
GROUP BY orderid, pid
|
|
) latest ON oc.orderid = latest.orderid AND oc.pid = latest.pid AND oc.id = latest.max_id
|
|
`, [orderNumbers]);
|
|
|
|
// Create a map of costs for easy lookup
|
|
const costMap = {};
|
|
if (costs && costs.length) {
|
|
costs.forEach(c => {
|
|
costMap[`${c.order_number}-${c.pid}`] = c.costeach || 0;
|
|
});
|
|
}
|
|
|
|
// 3. Update costs in local database by batches
|
|
// Using a more efficient update approach with a temporary table
|
|
|
|
// Create a temporary table for each batch
|
|
await localConnection.query(`
|
|
DROP TABLE IF EXISTS temp_order_costs;
|
|
CREATE TEMP TABLE temp_order_costs (
|
|
order_number VARCHAR(50) NOT NULL,
|
|
pid BIGINT NOT NULL,
|
|
costeach DECIMAL(10,3) NOT NULL,
|
|
PRIMARY KEY (order_number, pid)
|
|
);
|
|
`);
|
|
|
|
// Insert cost data into the temporary table
|
|
const costEntries = [];
|
|
for (const order of batch) {
|
|
const key = `${order.order_number}-${order.pid}`;
|
|
if (key in costMap) {
|
|
costEntries.push({
|
|
order_number: order.order_number,
|
|
pid: order.pid,
|
|
costeach: costMap[key]
|
|
});
|
|
}
|
|
}
|
|
|
|
// Insert in sub-batches of 100
|
|
const DB_BATCH_SIZE = 50;
|
|
for (let j = 0; j < costEntries.length; j += DB_BATCH_SIZE) {
|
|
const subBatch = costEntries.slice(j, j + DB_BATCH_SIZE);
|
|
if (subBatch.length === 0) continue;
|
|
|
|
const placeholders = subBatch.map((_, idx) =>
|
|
`($${idx * 3 + 1}, $${idx * 3 + 2}, $${idx * 3 + 3})`
|
|
).join(',');
|
|
|
|
const values = subBatch.flatMap(item => [
|
|
item.order_number,
|
|
item.pid,
|
|
item.costeach
|
|
]);
|
|
|
|
await localConnection.query(`
|
|
INSERT INTO temp_order_costs (order_number, pid, costeach)
|
|
VALUES ${placeholders}
|
|
`, values);
|
|
}
|
|
|
|
// Perform bulk update from the temporary table
|
|
const [updateResult] = await localConnection.query(`
|
|
UPDATE orders o
|
|
SET costeach = t.costeach
|
|
FROM temp_order_costs t
|
|
WHERE o.order_number = t.order_number AND o.pid = t.pid
|
|
RETURNING o.id
|
|
`);
|
|
|
|
const batchUpdated = updateResult.rowCount || 0;
|
|
updatedCount += batchUpdated;
|
|
|
|
// Commit transaction for this batch
|
|
await localConnection.commit();
|
|
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: `Updated ${updatedCount} orders with costs from production (batch: ${batchUpdated})`,
|
|
current: i + batch.length,
|
|
total: totalOrders,
|
|
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
|
|
});
|
|
} catch (error) {
|
|
// If a batch fails, roll back that batch's transaction and continue
|
|
try {
|
|
await localConnection.rollback();
|
|
} catch (rollbackError) {
|
|
console.error("Error during batch rollback:", rollbackError);
|
|
}
|
|
|
|
console.error(`Error processing batch ${i}-${i + BATCH_SIZE}:`, error);
|
|
errorCount++;
|
|
}
|
|
}
|
|
|
|
// 4. For orders with no matching costs, set a default based on price
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: "Setting default costs for remaining orders..."
|
|
});
|
|
|
|
// Process remaining updates in smaller batches
|
|
const DEFAULT_BATCH_SIZE = 10000;
|
|
let totalDefaultUpdated = 0;
|
|
|
|
try {
|
|
// Start with a count query to determine how many records need the default update
|
|
const [countResult] = await localConnection.query(`
|
|
SELECT COUNT(*) as count FROM orders
|
|
WHERE (costeach = 0 OR costeach IS NULL)
|
|
`);
|
|
|
|
const totalToUpdate = parseInt(countResult.rows[0]?.count || 0);
|
|
|
|
if (totalToUpdate > 0) {
|
|
console.log(`Applying default cost to ${totalToUpdate} orders`);
|
|
|
|
// Apply the default in batches with separate transactions
|
|
for (let i = 0; i < totalToUpdate; i += DEFAULT_BATCH_SIZE) {
|
|
try {
|
|
await localConnection.beginTransaction();
|
|
|
|
const [defaultUpdates] = await localConnection.query(`
|
|
WITH orders_to_update AS (
|
|
SELECT id FROM orders
|
|
WHERE (costeach = 0 OR costeach IS NULL)
|
|
LIMIT ${DEFAULT_BATCH_SIZE}
|
|
)
|
|
UPDATE orders o
|
|
SET costeach = price * 0.5
|
|
FROM orders_to_update otu
|
|
WHERE o.id = otu.id
|
|
RETURNING o.id
|
|
`);
|
|
|
|
const batchDefaultUpdated = defaultUpdates.rowCount || 0;
|
|
totalDefaultUpdated += batchDefaultUpdated;
|
|
|
|
await localConnection.commit();
|
|
|
|
outputProgress({
|
|
status: "running",
|
|
operation: "Order costs update",
|
|
message: `Applied default costs to ${totalDefaultUpdated} of ${totalToUpdate} orders`,
|
|
current: totalDefaultUpdated,
|
|
total: totalToUpdate,
|
|
elapsed: formatElapsedTime((Date.now() - startTime) / 1000)
|
|
});
|
|
} catch (error) {
|
|
try {
|
|
await localConnection.rollback();
|
|
} catch (rollbackError) {
|
|
console.error("Error during default update rollback:", rollbackError);
|
|
}
|
|
|
|
console.error(`Error applying default costs batch ${i}-${i + DEFAULT_BATCH_SIZE}:`, error);
|
|
errorCount++;
|
|
}
|
|
}
|
|
}
|
|
} catch (error) {
|
|
console.error("Error counting or updating remaining orders:", error);
|
|
errorCount++;
|
|
}
|
|
|
|
updatedCount += totalDefaultUpdated;
|
|
|
|
const endTime = Date.now();
|
|
const totalSeconds = (endTime - startTime) / 1000;
|
|
|
|
outputProgress({
|
|
status: "complete",
|
|
operation: "Order costs update",
|
|
message: `Updated ${updatedCount} orders (${totalDefaultUpdated} with default values) in ${formatElapsedTime(totalSeconds)}`,
|
|
elapsed: formatElapsedTime(totalSeconds)
|
|
});
|
|
|
|
return {
|
|
status: "complete",
|
|
updatedCount,
|
|
errorCount
|
|
};
|
|
} catch (error) {
|
|
console.error("Error during order costs update:", error);
|
|
|
|
return {
|
|
status: "error",
|
|
error: error.message,
|
|
updatedCount,
|
|
errorCount
|
|
};
|
|
} finally {
|
|
if (connections) {
|
|
await closeConnections(connections).catch(err => {
|
|
console.error("Error closing connections:", err);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
// Run the script only if this is the main module
|
|
if (require.main === module) {
|
|
updateOrderCosts().then((results) => {
|
|
console.log('Cost update completed:', results);
|
|
// Force exit after a small delay to ensure all logs are written
|
|
setTimeout(() => process.exit(0), 500);
|
|
}).catch((error) => {
|
|
console.error("Unhandled error:", error);
|
|
// Force exit with error code after a small delay
|
|
setTimeout(() => process.exit(1), 500);
|
|
});
|
|
}
|
|
|
|
// Export the function for use in other scripts
|
|
module.exports = updateOrderCosts;
|