Add new calculate scripts, add in historical data import

This commit is contained in:
2025-03-30 10:30:13 -04:00
parent 4c4359908c
commit 047122a620
9 changed files with 1990 additions and 28 deletions

View File

@@ -211,4 +211,57 @@ CREATE INDEX idx_po_updated ON purchase_orders(updated);
SET session_replication_role = 'origin'; -- Re-enable foreign key checks
-- Create views for common calculations
-- product_sales_trends view moved to metrics-schema.sql
-- product_sales_trends view moved to metrics-schema.sql
-- Historical data tables imported from production
CREATE TABLE imported_product_current_prices (
price_id BIGSERIAL PRIMARY KEY,
pid BIGINT NOT NULL,
qty_buy SMALLINT NOT NULL,
is_min_qty_buy BOOLEAN NOT NULL,
price_each NUMERIC(10,3) NOT NULL,
qty_limit SMALLINT NOT NULL,
no_promo BOOLEAN NOT NULL,
checkout_offer BOOLEAN NOT NULL,
active BOOLEAN NOT NULL,
date_active TIMESTAMP WITH TIME ZONE,
date_deactive TIMESTAMP WITH TIME ZONE,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_imported_product_current_prices_pid ON imported_product_current_prices(pid, active, qty_buy);
CREATE INDEX idx_imported_product_current_prices_checkout ON imported_product_current_prices(checkout_offer, active);
CREATE INDEX idx_imported_product_current_prices_deactive ON imported_product_current_prices(date_deactive, active);
CREATE INDEX idx_imported_product_current_prices_active ON imported_product_current_prices(date_active, active);
CREATE TABLE imported_daily_inventory (
date DATE NOT NULL,
pid BIGINT NOT NULL,
amountsold SMALLINT NOT NULL DEFAULT 0,
times_sold SMALLINT NOT NULL DEFAULT 0,
qtyreceived SMALLINT NOT NULL DEFAULT 0,
price NUMERIC(7,2) NOT NULL DEFAULT 0,
costeach NUMERIC(7,2) NOT NULL DEFAULT 0,
stamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (date, pid)
);
CREATE INDEX idx_imported_daily_inventory_pid ON imported_daily_inventory(pid);
CREATE TABLE imported_product_stat_history (
pid BIGINT NOT NULL,
date DATE NOT NULL,
score NUMERIC(10,2) NOT NULL,
score2 NUMERIC(10,2) NOT NULL,
qty_in_baskets SMALLINT NOT NULL,
qty_sold SMALLINT NOT NULL,
notifies_set SMALLINT NOT NULL,
visibility_score NUMERIC(10,2) NOT NULL,
health_score VARCHAR(5) NOT NULL,
sold_view_score NUMERIC(6,3) NOT NULL,
updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (pid, date)
);
CREATE INDEX idx_imported_product_stat_history_date ON imported_product_stat_history(date);

View File

@@ -6,6 +6,7 @@ const importCategories = require('./import/categories');
const { importProducts } = require('./import/products');
const importOrders = require('./import/orders');
const importPurchaseOrders = require('./import/purchase-orders');
const importHistoricalData = require('./import/historical-data');
dotenv.config({ path: path.join(__dirname, "../.env") });
@@ -13,7 +14,8 @@ dotenv.config({ path: path.join(__dirname, "../.env") });
const IMPORT_CATEGORIES = false;
const IMPORT_PRODUCTS = false;
const IMPORT_ORDERS = false;
const IMPORT_PURCHASE_ORDERS = true;
const IMPORT_PURCHASE_ORDERS = false;
const IMPORT_HISTORICAL_DATA = true;
// Add flag for incremental updates
const INCREMENTAL_UPDATE = process.env.INCREMENTAL_UPDATE !== 'false'; // Default to true unless explicitly set to false
@@ -78,7 +80,8 @@ async function main() {
IMPORT_CATEGORIES,
IMPORT_PRODUCTS,
IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS
IMPORT_PURCHASE_ORDERS,
IMPORT_HISTORICAL_DATA
].filter(Boolean).length;
try {
@@ -108,17 +111,6 @@ async function main() {
WHERE status = 'running'
`);
// Initialize sync_status table if it doesn't exist
await localConnection.query(`
CREATE TABLE IF NOT EXISTS sync_status (
table_name TEXT PRIMARY KEY,
last_sync_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP,
last_sync_id BIGINT
);
CREATE INDEX IF NOT EXISTS idx_last_sync ON sync_status (last_sync_timestamp);
`);
// Create import history record for the overall session
try {
const [historyResult] = await localConnection.query(`
@@ -137,10 +129,11 @@ async function main() {
'categories_enabled', $2::boolean,
'products_enabled', $3::boolean,
'orders_enabled', $4::boolean,
'purchase_orders_enabled', $5::boolean
'purchase_orders_enabled', $5::boolean,
'historical_data_enabled', $6::boolean
)
) RETURNING id
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS]);
`, [INCREMENTAL_UPDATE, IMPORT_CATEGORIES, IMPORT_PRODUCTS, IMPORT_ORDERS, IMPORT_PURCHASE_ORDERS, IMPORT_HISTORICAL_DATA]);
importHistoryId = historyResult.rows[0].id;
} catch (error) {
console.error("Error creating import history record:", error);
@@ -157,7 +150,8 @@ async function main() {
categories: null,
products: null,
orders: null,
purchaseOrders: null
purchaseOrders: null,
historicalData: null
};
let totalRecordsAdded = 0;
@@ -217,6 +211,32 @@ async function main() {
}
}
if (IMPORT_HISTORICAL_DATA) {
try {
results.historicalData = await importHistoricalData(prodConnection, localConnection, INCREMENTAL_UPDATE);
if (isImportCancelled) throw new Error("Import cancelled");
completedSteps++;
console.log('Historical data import result:', results.historicalData);
// Handle potential error status
if (results.historicalData?.status === 'error') {
console.error('Historical data import had an error:', results.historicalData.error);
} else {
totalRecordsAdded += parseInt(results.historicalData?.recordsAdded || 0);
totalRecordsUpdated += parseInt(results.historicalData?.recordsUpdated || 0);
}
} catch (error) {
console.error('Error during historical data import:', error);
// Continue with other imports, don't fail the whole process
results.historicalData = {
status: 'error',
error: error.message,
recordsAdded: 0,
recordsUpdated: 0
};
}
}
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
@@ -234,12 +254,14 @@ async function main() {
'products_enabled', $5::boolean,
'orders_enabled', $6::boolean,
'purchase_orders_enabled', $7::boolean,
'categories_result', COALESCE($8::jsonb, 'null'::jsonb),
'products_result', COALESCE($9::jsonb, 'null'::jsonb),
'orders_result', COALESCE($10::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($11::jsonb, 'null'::jsonb)
'historical_data_enabled', $8::boolean,
'categories_result', COALESCE($9::jsonb, 'null'::jsonb),
'products_result', COALESCE($10::jsonb, 'null'::jsonb),
'orders_result', COALESCE($11::jsonb, 'null'::jsonb),
'purchase_orders_result', COALESCE($12::jsonb, 'null'::jsonb),
'historical_data_result', COALESCE($13::jsonb, 'null'::jsonb)
)
WHERE id = $12
WHERE id = $14
`, [
totalElapsedSeconds,
parseInt(totalRecordsAdded),
@@ -248,10 +270,12 @@ async function main() {
IMPORT_PRODUCTS,
IMPORT_ORDERS,
IMPORT_PURCHASE_ORDERS,
IMPORT_HISTORICAL_DATA,
JSON.stringify(results.categories),
JSON.stringify(results.products),
JSON.stringify(results.orders),
JSON.stringify(results.purchaseOrders),
JSON.stringify(results.historicalData),
importHistoryId
]);

View File

@@ -0,0 +1,961 @@
const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics/utils/progress');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('stream');
const { promisify } = require('util');
// Configuration constants to control which tables get imported
const IMPORT_PRODUCT_CURRENT_PRICES = false;
const IMPORT_DAILY_INVENTORY = false;
const IMPORT_PRODUCT_STAT_HISTORY = true;
// For product stat history, limit to more recent data for faster initial import
const USE_RECENT_MONTHS = 12; // Just use the most recent months for product_stat_history
/**
* Validates a date from MySQL before inserting it into PostgreSQL
* @param {string|Date|null} mysqlDate - Date string or object from MySQL
* @returns {string|null} Valid date string or null if invalid
*/
function validateDate(mysqlDate) {
// Handle null, undefined, or empty values
if (!mysqlDate) {
return null;
}
// Convert to string if it's not already
const dateStr = String(mysqlDate);
// Handle MySQL zero dates and empty values
if (dateStr === '0000-00-00' ||
dateStr === '0000-00-00 00:00:00' ||
dateStr.indexOf('0000-00-00') !== -1 ||
dateStr === '') {
return null;
}
// Check if the date is valid
const date = new Date(mysqlDate);
// If the date is invalid or suspiciously old (pre-1970), return null
if (isNaN(date.getTime()) || date.getFullYear() < 1970) {
return null;
}
return mysqlDate;
}
/**
* Imports historical data from MySQL to PostgreSQL
*/
async function importHistoricalData(
prodConnection,
localConnection,
options = {}
) {
const {
incrementalUpdate = true,
oneYearAgo = new Date(new Date().setFullYear(new Date().getFullYear() - 1))
} = options;
const oneYearAgoStr = oneYearAgo.toISOString().split('T')[0];
const startTime = Date.now();
// Use larger batch sizes to improve performance
const BATCH_SIZE = 5000; // For fetching from small tables
const INSERT_BATCH_SIZE = 500; // For inserting to small tables
const LARGE_BATCH_SIZE = 10000; // For fetching from large tables
const LARGE_INSERT_BATCH_SIZE = 1000; // For inserting to large tables
// Calculate date for recent data
const recentDateStr = new Date(
new Date().setMonth(new Date().getMonth() - USE_RECENT_MONTHS)
).toISOString().split('T')[0];
console.log(`Starting import with:
- One year ago date: ${oneYearAgoStr}
- Recent months date: ${recentDateStr} (for product_stat_history)
- Incremental update: ${incrementalUpdate}
- Standard batch size: ${BATCH_SIZE}
- Standard insert batch size: ${INSERT_BATCH_SIZE}
- Large table batch size: ${LARGE_BATCH_SIZE}
- Large table insert batch size: ${LARGE_INSERT_BATCH_SIZE}
- Import product_current_prices: ${IMPORT_PRODUCT_CURRENT_PRICES}
- Import daily_inventory: ${IMPORT_DAILY_INVENTORY}
- Import product_stat_history: ${IMPORT_PRODUCT_STAT_HISTORY}`);
try {
// Get last sync time for incremental updates
const lastSyncTimes = {};
if (incrementalUpdate) {
try {
const syncResult = await localConnection.query(`
SELECT table_name, last_sync_timestamp
FROM sync_status
WHERE table_name IN (
'imported_product_current_prices',
'imported_daily_inventory',
'imported_product_stat_history'
)
`);
// Add check for rows existence and type
if (syncResult && Array.isArray(syncResult.rows)) {
for (const row of syncResult.rows) {
lastSyncTimes[row.table_name] = row.last_sync_timestamp;
console.log(`Last sync time for ${row.table_name}: ${row.last_sync_timestamp}`);
}
} else {
console.warn('Sync status query did not return expected rows. Proceeding without last sync times.');
}
} catch (error) {
console.error('Error fetching sync status:', error);
}
}
// Determine how many tables will be imported
const tablesCount = [
IMPORT_PRODUCT_CURRENT_PRICES,
IMPORT_DAILY_INVENTORY,
IMPORT_PRODUCT_STAT_HISTORY
].filter(Boolean).length;
// Run all imports sequentially for better reliability
console.log(`Starting sequential imports for ${tablesCount} tables...`);
outputProgress({
status: "running",
operation: "Historical data import",
message: `Starting sequential imports for ${tablesCount} tables...`,
current: 0,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
let progressCount = 0;
let productCurrentPricesResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
let dailyInventoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
let productStatHistoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
// Import product current prices
if (IMPORT_PRODUCT_CURRENT_PRICES) {
console.log('Importing product current prices...');
productCurrentPricesResult = await importProductCurrentPrices(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTimes['imported_product_current_prices'],
BATCH_SIZE,
INSERT_BATCH_SIZE,
incrementalUpdate,
startTime
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Import daily inventory
if (IMPORT_DAILY_INVENTORY) {
console.log('Importing daily inventory...');
dailyInventoryResult = await importDailyInventory(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTimes['imported_daily_inventory'],
BATCH_SIZE,
INSERT_BATCH_SIZE,
incrementalUpdate,
startTime
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Import product stat history - using optimized approach
if (IMPORT_PRODUCT_STAT_HISTORY) {
console.log('Importing product stat history...');
productStatHistoryResult = await importProductStatHistory(
prodConnection,
localConnection,
recentDateStr, // Use more recent date for this massive table
lastSyncTimes['imported_product_stat_history'],
LARGE_BATCH_SIZE,
LARGE_INSERT_BATCH_SIZE,
incrementalUpdate,
startTime,
USE_RECENT_MONTHS // Pass the recent months constant
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Aggregate results
const totalRecordsAdded =
productCurrentPricesResult.recordsAdded +
dailyInventoryResult.recordsAdded +
productStatHistoryResult.recordsAdded;
const totalRecordsUpdated =
productCurrentPricesResult.recordsUpdated +
dailyInventoryResult.recordsUpdated +
productStatHistoryResult.recordsUpdated;
const totalProcessed =
productCurrentPricesResult.totalProcessed +
dailyInventoryResult.totalProcessed +
productStatHistoryResult.totalProcessed;
const allErrors = [
...productCurrentPricesResult.errors,
...dailyInventoryResult.errors,
...productStatHistoryResult.errors
];
// Log import summary
console.log(`
Historical data import complete:
-------------------------------
Records added: ${totalRecordsAdded}
Records updated: ${totalRecordsUpdated}
Total processed: ${totalProcessed}
Errors: ${allErrors.length}
Time taken: ${formatElapsedTime(startTime)}
`);
// Final progress update
outputProgress({
status: "complete",
operation: "Historical data import",
message: `Import complete. Added: ${totalRecordsAdded}, Updated: ${totalRecordsUpdated}, Errors: ${allErrors.length}`,
current: tablesCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
// Log any errors
if (allErrors.length > 0) {
console.log('Errors encountered during import:');
console.log(JSON.stringify(allErrors, null, 2));
}
// Calculate duration
const endTime = Date.now();
const durationSeconds = Math.round((endTime - startTime) / 1000);
const finalStatus = allErrors.length === 0 ? 'complete' : 'failed';
const errorMessage = allErrors.length > 0 ? JSON.stringify(allErrors) : null;
// Update import history
await localConnection.query(`
INSERT INTO import_history (
table_name,
end_time,
duration_seconds,
records_added,
records_updated,
is_incremental,
status,
error_message,
additional_info
)
VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7, $8)
`, [
'historical_data_combined',
durationSeconds,
totalRecordsAdded,
totalRecordsUpdated,
incrementalUpdate,
finalStatus,
errorMessage,
JSON.stringify({
totalProcessed,
tablesImported: {
imported_product_current_prices: IMPORT_PRODUCT_CURRENT_PRICES,
imported_daily_inventory: IMPORT_DAILY_INVENTORY,
imported_product_stat_history: IMPORT_PRODUCT_STAT_HISTORY
}
})
]);
// Return summary
return {
recordsAdded: totalRecordsAdded,
recordsUpdated: totalRecordsUpdated,
totalProcessed,
errors: allErrors,
timeTaken: formatElapsedTime(startTime)
};
} catch (error) {
console.error('Error importing historical data:', error);
// Final progress update on error
outputProgress({
status: "failed",
operation: "Historical data import",
message: `Import failed: ${error.message}`,
elapsed: formatElapsedTime(startTime)
});
throw error;
}
}
/**
* Imports product_current_prices data from MySQL to PostgreSQL
*/
async function importProductCurrentPrices(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM product_current_prices
WHERE (date_active >= ? OR date_deactive >= ?)
${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''}
`, [oneYearAgoStr, oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
outputProgress({
status: "running",
operation: "Historical data import - Product Current Prices",
message: `Found ${totalCount} records to process`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production
const [rows] = await prodConnection.query(`
SELECT
price_id,
pid,
qty_buy,
is_min_qty_buy,
price_each,
qty_limit,
no_promo,
checkout_offer,
active,
date_active,
date_deactive
FROM product_current_prices
WHERE (date_active >= ? OR date_deactive >= ?)
${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''}
ORDER BY price_id
LIMIT ? OFFSET ?
`, [
oneYearAgoStr,
oneYearAgoStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
row.price_id,
row.pid,
row.qty_buy,
row.is_min_qty_buy ? true : false,
row.price_each,
row.qty_limit, // PostgreSQL will handle null values properly
row.no_promo ? true : false,
row.checkout_offer ? true : false,
row.active ? true : false,
validateDate(row.date_active),
validateDate(row.date_deactive)
);
}
// Execute batch insert
const result = await localConnection.query(`
WITH ins AS (
INSERT INTO imported_product_current_prices (
price_id, pid, qty_buy, is_min_qty_buy, price_each, qty_limit,
no_promo, checkout_offer, active, date_active, date_deactive
)
VALUES ${placeholders.join(',\n')}
ON CONFLICT (price_id) DO UPDATE SET
pid = EXCLUDED.pid,
qty_buy = EXCLUDED.qty_buy,
is_min_qty_buy = EXCLUDED.is_min_qty_buy,
price_each = EXCLUDED.price_each,
qty_limit = EXCLUDED.qty_limit,
no_promo = EXCLUDED.no_promo,
checkout_offer = EXCLUDED.checkout_offer,
active = EXCLUDED.active,
date_active = EXCLUDED.date_active,
date_deactive = EXCLUDED.date_deactive,
updated = CURRENT_TIMESTAMP
RETURNING (xmax = 0) AS inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) AS inserted_count,
COUNT(*) FILTER (WHERE NOT inserted) AS updated_count
FROM ins
`, values);
// Safely update counts based on the result
if (result && result.rows && result.rows.length > 0) {
const insertedCount = parseInt(result.rows[0].inserted_count || 0);
const updatedCount = parseInt(result.rows[0].updated_count || 0);
recordsAdded += insertedCount;
recordsUpdated += updatedCount;
}
} catch (error) {
console.error(`Error in batch import of product_current_prices at offset ${i}:`, error);
errors.push({
table: 'imported_product_current_prices',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Product Current Prices",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of product_current_prices:', error);
errors.push({
table: 'imported_product_current_prices',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_product_current_prices', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in product current prices import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_product_current_prices',
error: error.message
}]
};
}
}
/**
* Imports daily_inventory data from MySQL to PostgreSQL
*/
async function importDailyInventory(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM daily_inventory
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''}
`, [oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
outputProgress({
status: "running",
operation: "Historical data import - Daily Inventory",
message: `Found ${totalCount} records to process`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production
const [rows] = await prodConnection.query(`
SELECT
date,
pid,
amountsold,
times_sold,
qtyreceived,
price,
costeach,
stamp
FROM daily_inventory
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''}
ORDER BY date, pid
LIMIT ? OFFSET ?
`, [
oneYearAgoStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
validateDate(row.date),
row.pid,
row.amountsold || 0,
row.times_sold || 0,
row.qtyreceived || 0,
row.price || 0,
row.costeach || 0,
validateDate(row.stamp)
);
}
// Execute batch insert
const result = await localConnection.query(`
WITH ins AS (
INSERT INTO imported_daily_inventory (
date, pid, amountsold, times_sold, qtyreceived, price, costeach, stamp
)
VALUES ${placeholders.join(',\n')}
ON CONFLICT (date, pid) DO UPDATE SET
amountsold = EXCLUDED.amountsold,
times_sold = EXCLUDED.times_sold,
qtyreceived = EXCLUDED.qtyreceived,
price = EXCLUDED.price,
costeach = EXCLUDED.costeach,
stamp = EXCLUDED.stamp,
updated = CURRENT_TIMESTAMP
RETURNING (xmax = 0) AS inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) AS inserted_count,
COUNT(*) FILTER (WHERE NOT inserted) AS updated_count
FROM ins
`, values);
// Safely update counts based on the result
if (result && result.rows && result.rows.length > 0) {
const insertedCount = parseInt(result.rows[0].inserted_count || 0);
const updatedCount = parseInt(result.rows[0].updated_count || 0);
recordsAdded += insertedCount;
recordsUpdated += updatedCount;
}
} catch (error) {
console.error(`Error in batch import of daily_inventory at offset ${i}:`, error);
errors.push({
table: 'imported_daily_inventory',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Daily Inventory",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of daily_inventory:', error);
errors.push({
table: 'imported_daily_inventory',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_daily_inventory', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in daily inventory import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_daily_inventory',
error: error.message
}]
};
}
}
/**
* Imports product_stat_history data from MySQL to PostgreSQL
* Using fast direct inserts without conflict checking
*/
async function importProductStatHistory(
prodConnection,
localConnection,
recentDateStr, // Use more recent date instead of one year ago
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime,
recentMonths // Add parameter for recent months
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
let lastRateCheck = Date.now();
let lastProcessed = 0;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM product_stat_history
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''}
`, [recentDateStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
console.log(`Found ${totalCount} records to process in product_stat_history (using recent date: ${recentDateStr})`);
// Progress indicator
outputProgress({
status: "running",
operation: "Historical data import - Product Stat History",
message: `Found ${totalCount} records to process (last ${recentMonths} months only)`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// If not incremental, truncate the table first for better performance
if (!incrementalUpdate) {
console.log('Truncating imported_product_stat_history for full import...');
await localConnection.query('TRUNCATE TABLE imported_product_stat_history');
} else if (lastSyncTime) {
// For incremental updates, delete records that will be reimported
console.log(`Deleting records from imported_product_stat_history since ${lastSyncTime}...`);
await localConnection.query('DELETE FROM imported_product_stat_history WHERE date > $1', [lastSyncTime]);
}
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production with minimal filtering and no sorting
const [rows] = await prodConnection.query(`
SELECT
pid,
date,
COALESCE(score, 0) as score,
COALESCE(score2, 0) as score2,
COALESCE(qty_in_baskets, 0) as qty_in_baskets,
COALESCE(qty_sold, 0) as qty_sold,
COALESCE(notifies_set, 0) as notifies_set,
COALESCE(visibility_score, 0) as visibility_score,
COALESCE(health_score, 0) as health_score,
COALESCE(sold_view_score, 0) as sold_view_score
FROM product_stat_history
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''}
LIMIT ? OFFSET ?
`, [
recentDateStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
row.pid,
validateDate(row.date),
row.score,
row.score2,
row.qty_in_baskets,
row.qty_sold,
row.notifies_set,
row.visibility_score,
row.health_score,
row.sold_view_score
);
}
// Execute direct batch insert without conflict checking
await localConnection.query(`
INSERT INTO imported_product_stat_history (
pid, date, score, score2, qty_in_baskets, qty_sold, notifies_set,
visibility_score, health_score, sold_view_score
)
VALUES ${placeholders.join(',\n')}
`, values);
// All inserts are new records when using this approach
recordsAdded += batch.length;
} catch (error) {
console.error(`Error in batch insert of product_stat_history at offset ${i}:`, error);
errors.push({
table: 'imported_product_stat_history',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Calculate current rate every 10 seconds or 100,000 records
const now = Date.now();
if (now - lastRateCheck > 10000 || totalProcessed - lastProcessed > 100000) {
const timeElapsed = (now - lastRateCheck) / 1000; // seconds
const recordsProcessed = totalProcessed - lastProcessed;
const currentRate = Math.round(recordsProcessed / timeElapsed);
console.log(`Current import rate: ${currentRate} records/second`);
lastRateCheck = now;
lastProcessed = totalProcessed;
}
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Product Stat History",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of product_stat_history:', error);
errors.push({
table: 'imported_product_stat_history',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_product_stat_history', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in product stat history import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_product_stat_history',
error: error.message
}]
};
}
}
module.exports = importHistoricalData;

View File

@@ -0,0 +1,306 @@
const path = require('path');
const fs = require('fs');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
// Load environment variables
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
// Load progress module
const progress = require('./utils/progress');
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
// Load database module
const { getConnection, closePool } = require('./utils/db');
// Add cancel handler
let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
console.log('Calculation has been cancelled by user');
// Force-terminate any query that's been running for more than 5 seconds
try {
const connection = getConnection();
connection.then(async (conn) => {
try {
// Identify and terminate long-running queries from our application
await conn.query(`
SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query_start < now() - interval '5 seconds'
AND application_name LIKE '%node%'
AND query NOT LIKE '%pg_cancel_backend%'
`);
// Release connection
conn.release();
} catch (err) {
console.error('Error during force cancellation:', err);
conn.release();
}
}).catch(err => {
console.error('Could not get connection for cancellation:', err);
});
} catch (err) {
console.error('Failed to terminate running queries:', err);
}
return {
success: true,
message: 'Calculation has been cancelled'
};
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
async function updateDailySnapshots() {
let connection;
const startTime = Date.now();
let calculateHistoryId;
// Set a maximum execution time (30 minutes)
const MAX_EXECUTION_TIME = 30 * 60 * 1000;
const timeout = setTimeout(() => {
console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`);
// Call cancel and force exit
cancelCalculation();
process.exit(1);
}, MAX_EXECUTION_TIME);
try {
// Read the SQL file
const sqlFilePath = path.resolve(__dirname, 'update_daily_snapshots.sql');
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8');
// Clean up any previously running calculations
connection = await getConnection();
// Ensure the calculate_status table exists and has the correct structure
await connection.query(`
CREATE TABLE IF NOT EXISTS calculate_status (
module_name TEXT PRIMARY KEY,
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running' AND additional_info->>'type' = 'daily_snapshots'
`);
// Create history record for this calculation
const historyResult = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
additional_info
) VALUES (
NOW(),
'running',
jsonb_build_object(
'type', 'daily_snapshots',
'sql_file', 'update_daily_snapshots.sql'
)
) RETURNING id
`);
calculateHistoryId = historyResult.rows[0].id;
// Initialize progress
global.outputProgress({
status: 'running',
operation: 'Starting daily snapshots calculation',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Execute the SQL query
global.outputProgress({
status: 'running',
operation: 'Executing daily snapshots SQL query',
current: 25,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: 'Calculating...',
rate: 0,
percentage: '25',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
await connection.query(sqlQuery);
// Update calculate_status table
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ($1, $2)
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp
`, ['daily_snapshots', new Date()]);
// Update progress to 100%
global.outputProgress({
status: 'complete',
operation: 'Daily snapshots calculation complete',
current: 100,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: 0,
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = 'completed'
WHERE id = $2
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
// Clear progress file on successful completion
global.clearProgress();
return {
success: true,
message: 'Daily snapshots calculation completed successfully',
duration: Math.round((Date.now() - startTime) / 1000)
};
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
if (connection && calculateHistoryId) {
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = $2,
error_message = $3
WHERE id = $4
`, [
totalElapsedSeconds,
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
}
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
throw error;
} finally {
// Clear the timeout to prevent forced termination
clearTimeout(timeout);
// Always release connection
if (connection) {
try {
connection.release();
} catch (err) {
console.error('Error in final cleanup:', err);
}
}
}
}
// Export as a module with all necessary functions
module.exports = {
updateDailySnapshots,
cancelCalculation,
getProgress: global.getProgress
};
// Run directly if called from command line
if (require.main === module) {
updateDailySnapshots().then(() => {
closePool().then(() => {
process.exit(0);
});
}).catch(error => {
console.error('Error:', error);
closePool().then(() => {
process.exit(1);
});
});
}

View File

@@ -0,0 +1,306 @@
const path = require('path');
const fs = require('fs');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
// Load environment variables
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
// Load progress module
const progress = require('./utils/progress');
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
// Load database module
const { getConnection, closePool } = require('./utils/db');
// Add cancel handler
let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
console.log('Calculation has been cancelled by user');
// Force-terminate any query that's been running for more than 5 seconds
try {
const connection = getConnection();
connection.then(async (conn) => {
try {
// Identify and terminate long-running queries from our application
await conn.query(`
SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query_start < now() - interval '5 seconds'
AND application_name LIKE '%node%'
AND query NOT LIKE '%pg_cancel_backend%'
`);
// Release connection
conn.release();
} catch (err) {
console.error('Error during force cancellation:', err);
conn.release();
}
}).catch(err => {
console.error('Could not get connection for cancellation:', err);
});
} catch (err) {
console.error('Failed to terminate running queries:', err);
}
return {
success: true,
message: 'Calculation has been cancelled'
};
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
async function updatePeriodicMetrics() {
let connection;
const startTime = Date.now();
let calculateHistoryId;
// Set a maximum execution time (30 minutes)
const MAX_EXECUTION_TIME = 30 * 60 * 1000;
const timeout = setTimeout(() => {
console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`);
// Call cancel and force exit
cancelCalculation();
process.exit(1);
}, MAX_EXECUTION_TIME);
try {
// Read the SQL file
const sqlFilePath = path.resolve(__dirname, 'update_periodic_metrics.sql');
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8');
// Clean up any previously running calculations
connection = await getConnection();
// Ensure the calculate_status table exists and has the correct structure
await connection.query(`
CREATE TABLE IF NOT EXISTS calculate_status (
module_name TEXT PRIMARY KEY,
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running' AND additional_info->>'type' = 'periodic_metrics'
`);
// Create history record for this calculation
const historyResult = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
additional_info
) VALUES (
NOW(),
'running',
jsonb_build_object(
'type', 'periodic_metrics',
'sql_file', 'update_periodic_metrics.sql'
)
) RETURNING id
`);
calculateHistoryId = historyResult.rows[0].id;
// Initialize progress
global.outputProgress({
status: 'running',
operation: 'Starting periodic metrics calculation',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Execute the SQL query
global.outputProgress({
status: 'running',
operation: 'Executing periodic metrics SQL query',
current: 25,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: 'Calculating...',
rate: 0,
percentage: '25',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
await connection.query(sqlQuery);
// Update calculate_status table
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ($1, $2)
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp
`, ['periodic_metrics', new Date()]);
// Update progress to 100%
global.outputProgress({
status: 'complete',
operation: 'Periodic metrics calculation complete',
current: 100,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: 0,
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = 'completed'
WHERE id = $2
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
// Clear progress file on successful completion
global.clearProgress();
return {
success: true,
message: 'Periodic metrics calculation completed successfully',
duration: Math.round((Date.now() - startTime) / 1000)
};
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
if (connection && calculateHistoryId) {
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = $2,
error_message = $3
WHERE id = $4
`, [
totalElapsedSeconds,
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
}
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
throw error;
} finally {
// Clear the timeout to prevent forced termination
clearTimeout(timeout);
// Always release connection
if (connection) {
try {
connection.release();
} catch (err) {
console.error('Error in final cleanup:', err);
}
}
}
}
// Export as a module with all necessary functions
module.exports = {
updatePeriodicMetrics,
cancelCalculation,
getProgress: global.getProgress
};
// Run directly if called from command line
if (require.main === module) {
updatePeriodicMetrics().then(() => {
closePool().then(() => {
process.exit(0);
});
}).catch(error => {
console.error('Error:', error);
closePool().then(() => {
process.exit(1);
});
});
}

View File

@@ -0,0 +1,306 @@
const path = require('path');
const fs = require('fs');
// Change working directory to script directory
process.chdir(path.dirname(__filename));
// Load environment variables
require('dotenv').config({ path: path.resolve(__dirname, '../..', '.env') });
// Add error handler for uncaught exceptions
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
// Add error handler for unhandled promise rejections
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
// Load progress module
const progress = require('./utils/progress');
// Store progress functions in global scope to ensure availability
global.formatElapsedTime = progress.formatElapsedTime;
global.estimateRemaining = progress.estimateRemaining;
global.calculateRate = progress.calculateRate;
global.outputProgress = progress.outputProgress;
global.clearProgress = progress.clearProgress;
global.getProgress = progress.getProgress;
global.logError = progress.logError;
// Load database module
const { getConnection, closePool } = require('./utils/db');
// Add cancel handler
let isCancelled = false;
function cancelCalculation() {
isCancelled = true;
console.log('Calculation has been cancelled by user');
// Force-terminate any query that's been running for more than 5 seconds
try {
const connection = getConnection();
connection.then(async (conn) => {
try {
// Identify and terminate long-running queries from our application
await conn.query(`
SELECT pg_cancel_backend(pid)
FROM pg_stat_activity
WHERE query_start < now() - interval '5 seconds'
AND application_name LIKE '%node%'
AND query NOT LIKE '%pg_cancel_backend%'
`);
// Release connection
conn.release();
} catch (err) {
console.error('Error during force cancellation:', err);
conn.release();
}
}).catch(err => {
console.error('Could not get connection for cancellation:', err);
});
} catch (err) {
console.error('Failed to terminate running queries:', err);
}
return {
success: true,
message: 'Calculation has been cancelled'
};
}
// Handle SIGTERM signal for cancellation
process.on('SIGTERM', cancelCalculation);
async function updateProductMetrics() {
let connection;
const startTime = Date.now();
let calculateHistoryId;
// Set a maximum execution time (30 minutes)
const MAX_EXECUTION_TIME = 30 * 60 * 1000;
const timeout = setTimeout(() => {
console.error(`Calculation timed out after ${MAX_EXECUTION_TIME/1000} seconds, forcing termination`);
// Call cancel and force exit
cancelCalculation();
process.exit(1);
}, MAX_EXECUTION_TIME);
try {
// Read the SQL file
const sqlFilePath = path.resolve(__dirname, 'update_product_metrics.sql');
const sqlQuery = fs.readFileSync(sqlFilePath, 'utf8');
// Clean up any previously running calculations
connection = await getConnection();
// Ensure the calculate_status table exists and has the correct structure
await connection.query(`
CREATE TABLE IF NOT EXISTS calculate_status (
module_name TEXT PRIMARY KEY,
last_calculation_timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT CURRENT_TIMESTAMP
)
`);
await connection.query(`
UPDATE calculate_history
SET
status = 'cancelled',
end_time = NOW(),
duration_seconds = EXTRACT(EPOCH FROM (NOW() - start_time))::INTEGER,
error_message = 'Previous calculation was not completed properly'
WHERE status = 'running' AND additional_info->>'type' = 'product_metrics'
`);
// Create history record for this calculation
const historyResult = await connection.query(`
INSERT INTO calculate_history (
start_time,
status,
additional_info
) VALUES (
NOW(),
'running',
jsonb_build_object(
'type', 'product_metrics',
'sql_file', 'update_product_metrics.sql'
)
) RETURNING id
`);
calculateHistoryId = historyResult.rows[0].id;
// Initialize progress
global.outputProgress({
status: 'running',
operation: 'Starting product metrics calculation',
current: 0,
total: 100,
elapsed: '0s',
remaining: 'Calculating...',
rate: 0,
percentage: '0',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Execute the SQL query
global.outputProgress({
status: 'running',
operation: 'Executing product metrics SQL query',
current: 25,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: 'Calculating...',
rate: 0,
percentage: '25',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
await connection.query(sqlQuery);
// Update calculate_status table
await connection.query(`
INSERT INTO calculate_status (module_name, last_calculation_timestamp)
VALUES ($1, $2)
ON CONFLICT (module_name) DO UPDATE
SET last_calculation_timestamp = EXCLUDED.last_calculation_timestamp
`, ['product_metrics', new Date()]);
// Update progress to 100%
global.outputProgress({
status: 'complete',
operation: 'Product metrics calculation complete',
current: 100,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: '0s',
rate: 0,
percentage: '100',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
// Update history with completion
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = 'completed'
WHERE id = $2
`, [Math.round((Date.now() - startTime) / 1000), calculateHistoryId]);
// Clear progress file on successful completion
global.clearProgress();
return {
success: true,
message: 'Product metrics calculation completed successfully',
duration: Math.round((Date.now() - startTime) / 1000)
};
} catch (error) {
const endTime = Date.now();
const totalElapsedSeconds = Math.round((endTime - startTime) / 1000);
// Update history with error
if (connection && calculateHistoryId) {
await connection.query(`
UPDATE calculate_history
SET
end_time = NOW(),
duration_seconds = $1,
status = $2,
error_message = $3
WHERE id = $4
`, [
totalElapsedSeconds,
isCancelled ? 'cancelled' : 'failed',
error.message,
calculateHistoryId
]);
}
if (isCancelled) {
global.outputProgress({
status: 'cancelled',
operation: 'Calculation cancelled',
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
} else {
global.outputProgress({
status: 'error',
operation: 'Error: ' + error.message,
current: 50,
total: 100,
elapsed: global.formatElapsedTime(startTime),
remaining: null,
rate: 0,
percentage: '50',
timing: {
start_time: new Date(startTime).toISOString(),
end_time: new Date().toISOString(),
elapsed_seconds: Math.round((Date.now() - startTime) / 1000)
}
});
}
throw error;
} finally {
// Clear the timeout to prevent forced termination
clearTimeout(timeout);
// Always release connection
if (connection) {
try {
connection.release();
} catch (err) {
console.error('Error in final cleanup:', err);
}
}
}
}
// Export as a module with all necessary functions
module.exports = {
updateProductMetrics,
cancelCalculation,
getProgress: global.getProgress
};
// Run directly if called from command line
if (require.main === module) {
updateProductMetrics().then(() => {
closePool().then(() => {
process.exit(0);
});
}).catch(error => {
console.error('Error:', error);
closePool().then(() => {
process.exit(1);
});
});
}

View File

@@ -5,7 +5,7 @@
DO $$
DECLARE
_module_name VARCHAR := 'daily_snapshots';
_module_name TEXT := 'daily_snapshots';
_start_time TIMESTAMPTZ := clock_timestamp(); -- Time execution started
_last_calc_time TIMESTAMPTZ;
_target_date DATE := CURRENT_DATE; -- Always recalculate today for simplicity with hourly runs

View File

@@ -5,7 +5,7 @@
DO $$
DECLARE
_module_name VARCHAR := 'periodic_metrics';
_module_name TEXT := 'periodic_metrics';
_start_time TIMESTAMPTZ := clock_timestamp();
_last_calc_time TIMESTAMPTZ;
_abc_basis VARCHAR;
@@ -25,7 +25,7 @@ BEGIN
WITH LeadTimes AS (
SELECT
pid,
AVG(GREATEST(1, DATE_PART('day', last_received_date - date))) AS avg_days -- Use GREATEST(1,...) to avoid 0 or negative days
AVG(GREATEST(1, (last_received_date::date - date::date))) AS avg_days -- Use GREATEST(1,...) to avoid 0 or negative days
FROM public.purchase_orders
WHERE status = 'received' -- Or potentially 'full_received' if using that status
AND last_received_date IS NOT NULL

View File

@@ -5,7 +5,7 @@
DO $$
DECLARE
_module_name VARCHAR := 'product_metrics';
_module_name TEXT := 'product_metrics';
_start_time TIMESTAMPTZ := clock_timestamp();
_last_calc_time TIMESTAMPTZ;
_current_date DATE := CURRENT_DATE;
@@ -179,7 +179,13 @@ BEGIN
ci.current_price, ci.current_regular_price, ci.current_cost_price, ci.current_effective_cost,
ci.current_stock, ci.current_stock * ci.current_effective_cost, ci.current_stock * ci.current_price, ci.current_stock * ci.current_regular_price,
COALESCE(ooi.on_order_qty, 0), COALESCE(ooi.on_order_cost, 0.00), COALESCE(ooi.on_order_qty, 0) * ci.current_price, ooi.earliest_expected_date,
ci.created_at::date, COALESCE(ci.first_received::date, hd.date_first_received_calc), hd.date_last_received_calc, hd.date_first_sold, COALESCE(ci.date_last_sold, hd.max_order_date), DATE_PART('day', _current_date - LEAST(ci.created_at::date, hd.date_first_sold)),
ci.created_at::date, COALESCE(ci.first_received::date, hd.date_first_received_calc), hd.date_last_received_calc, hd.date_first_sold, COALESCE(ci.date_last_sold, hd.max_order_date),
CASE
WHEN ci.created_at IS NULL AND hd.date_first_sold IS NULL THEN 0
WHEN ci.created_at IS NULL THEN (_current_date - hd.date_first_sold)::integer
WHEN hd.date_first_sold IS NULL THEN (_current_date - ci.created_at::date)::integer
ELSE (_current_date - LEAST(ci.created_at::date, hd.date_first_sold))::integer
END AS age_days,
sa.sales_7d, sa.revenue_7d, sa.sales_14d, sa.revenue_14d, sa.sales_30d, sa.revenue_30d, sa.cogs_30d, sa.profit_30d,
sa.returns_units_30d, sa.returns_revenue_30d, sa.discounts_30d, sa.gross_revenue_30d, sa.gross_regular_revenue_30d,
sa.stockout_days_30d, sa.sales_365d, sa.revenue_365d,
@@ -287,7 +293,7 @@ BEGIN
sales_velocity_daily = EXCLUDED.sales_velocity_daily, config_lead_time = EXCLUDED.config_lead_time, config_days_of_stock = EXCLUDED.config_days_of_stock, config_safety_stock = EXCLUDED.config_safety_stock,
planning_period_days = EXCLUDED.planning_period_days, lead_time_forecast_units = EXCLUDED.lead_time_forecast_units, days_of_stock_forecast_units = EXCLUDED.days_of_stock_forecast_units,
planning_period_forecast_units = EXCLUDED.planning_period_forecast_units, lead_time_closing_stock = EXCLUDED.lead_time_closing_stock, days_of_stock_closing_stock = EXCLUDED.days_of_stock_closing_stock,
replenishment_needed_raw = EXCLUDED.replenishment_needed_raw, replenishment_units = EXCLUDED.replenishment_units, replenishment_cost = EXCLUDED.replenishment_cost, replenishment_retail = EXcluded.replenishment_retail, replenishment_profit = EXCLUDED.replenishment_profit,
replenishment_needed_raw = EXCLUDED.replenishment_needed_raw, replenishment_units = EXCLUDED.replenishment_units, replenishment_cost = EXCLUDED.replenishment_cost, replenishment_retail = EXCLUDED.replenishment_retail, replenishment_profit = EXCLUDED.replenishment_profit,
to_order_units = EXCLUDED.to_order_units, forecast_lost_sales_units = EXCLUDED.forecast_lost_sales_units, forecast_lost_revenue = EXCLUDED.forecast_lost_revenue,
stock_cover_in_days = EXCLUDED.stock_cover_in_days, po_cover_in_days = EXCLUDED.po_cover_in_days, sells_out_in_days = EXCLUDED.sells_out_in_days, replenish_date = EXCLUDED.replenish_date,
overstocked_units = EXCLUDED.overstocked_units, overstocked_cost = EXCLUDED.overstocked_cost, overstocked_retail = EXCLUDED.overstocked_retail, is_old_stock = EXCLUDED.is_old_stock,