Set up change tracking in core tables

This commit is contained in:
2025-02-02 15:06:20 -05:00
parent 12cab7473a
commit 22ad2d44db
9 changed files with 216 additions and 94 deletions

View File

@@ -102,6 +102,16 @@ async function main() {
if (isImportCancelled) throw new Error("Import cancelled");
// Initialize product_metric_status table for any missing products
await localConnection.query(`
INSERT IGNORE INTO product_metric_status (pid)
SELECT pid FROM products p
WHERE NOT EXISTS (
SELECT 1 FROM product_metric_status pms
WHERE pms.pid = p.pid
)
`);
// Clean up any previously running imports that weren't completed
await localConnection.query(`
UPDATE import_history

View File

@@ -136,6 +136,21 @@ async function importCategories(prodConnection, localConnection) {
total: totalInserted,
elapsed: formatElapsedTime((Date.now() - startTime) / 1000),
});
// Mark all products in these categories for recalculation
if (categoriesToInsert.length > 0) {
const affectedCatIds = categoriesToInsert.map(c => c.cat_id);
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
SELECT DISTINCT pc.pid, TRUE
FROM product_categories pc
WHERE pc.cat_id IN (?)
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, [affectedCatIds]);
}
}
// After all imports, if we skipped any categories, throw an error

View File

@@ -472,7 +472,19 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += processedOrderItems.size; // Count unique order items processed
importedCount += processedOrderItems.size;
// Mark affected products for recalculation
const affectedPids = [...new Set(validOrders.map(o => o.pid))];
if (affectedPids.length > 0) {
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
VALUES ${affectedPids.map(() => '(?, TRUE)').join(',')}
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, affectedPids);
}
}
// Update progress based on unique orders processed
@@ -585,6 +597,18 @@ async function importOrders(prodConnection, localConnection, incrementalUpdate =
recordsAdded += inserts;
recordsUpdated += updates;
importedCount += retryOrderItems.size;
// Mark affected products for recalculation
const affectedPids = [...new Set(validOrders.map(o => o.pid))];
if (affectedPids.length > 0) {
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
VALUES ${affectedPids.map(() => '(?, TRUE)').join(',')}
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, affectedPids);
}
}
} catch (error) {
console.warn('Warning: Failed to retry skipped orders:', error.message);

View File

@@ -468,6 +468,21 @@ async function importProducts(prodConnection, localConnection, incrementalUpdate
recordsUpdated += insertsAndUpdates.updates.length;
}
if (insertsAndUpdates.updates.length > 0 || insertsAndUpdates.inserts.length > 0) {
const affectedPids = [
...insertsAndUpdates.updates.map(p => p.pid),
...insertsAndUpdates.inserts.map(p => p.pid)
];
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
VALUES ${affectedPids.map(() => '(?, TRUE)').join(',')}
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, affectedPids);
}
// Process category relationships
if (batch.some(p => p.category_ids)) {
// First get all valid categories

View File

@@ -474,6 +474,18 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
recordsAdded += inserts;
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
// Mark affected products for recalculation
const affectedPids = [...new Set(productBatch.map(p => p.pid))];
if (affectedPids.length > 0) {
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
VALUES ${affectedPids.map(() => '(?, TRUE)').join(',')}
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, affectedPids);
}
}
// Handle updates - now we know these actually have changes
@@ -499,6 +511,18 @@ async function importPurchaseOrders(prodConnection, localConnection, incremental
recordsUpdated += Math.floor(updates); // Ensure we never have fractional updates
processed += batchProcessed;
// Mark affected products for recalculation
const affectedPids = [...new Set(productBatch.map(p => p.pid))];
if (affectedPids.length > 0) {
await localConnection.query(`
INSERT INTO product_metric_status (pid, needs_recalculation)
VALUES ${affectedPids.map(() => '(?, TRUE)').join(',')}
ON DUPLICATE KEY UPDATE
needs_recalculation = TRUE,
updated_at = CURRENT_TIMESTAMP
`, affectedPids);
}
}
// Update progress based on time interval

View File

@@ -41,7 +41,10 @@ const CONFIG_TABLES = [
'abc_classification_config',
'safety_stock_config',
'sales_seasonality',
'turnover_config'
'turnover_config',
'sync_status',
'metric_calculation_config',
'product_metric_status'
];
// Split SQL into individual statements
@@ -175,7 +178,7 @@ async function resetDatabase() {
DROP TABLE IF EXISTS
${tables[0].tables
.split(',')
.filter(table => table !== 'users')
.filter(table => !['users', 'import_history'].includes(table))
.map(table => '`' + table + '`')
.join(', ')}
`;
@@ -436,34 +439,7 @@ async function resetDatabase() {
}
}
// Verify config tables were created
const [showConfigTables] = await connection.query('SHOW TABLES');
const existingConfigTables = showConfigTables.map(t => Object.values(t)[0]);
outputProgress({
operation: 'Config tables verification',
message: {
found: existingConfigTables,
expected: CONFIG_TABLES
}
});
const missingConfigTables = CONFIG_TABLES.filter(
t => !existingConfigTables.includes(t)
);
if (missingConfigTables.length > 0) {
throw new Error(
`Failed to create config tables: ${missingConfigTables.join(', ')}`
);
}
outputProgress({
operation: 'Config tables created',
message: `Successfully created tables: ${CONFIG_TABLES.join(', ')}`
});
// Read and execute metrics schema (metrics tables)
// Read and execute metrics schema
outputProgress({
operation: 'Running metrics setup',
message: 'Creating metrics tables...'
@@ -525,6 +501,29 @@ async function resetDatabase() {
}
}
// Verify triggers exist
const [triggers] = await connection.query('SHOW TRIGGERS');
const expectedTriggers = [
'orders_after_insert_update',
'purchase_orders_after_insert_update',
'products_after_insert_update'
];
const missingTriggers = expectedTriggers.filter(
triggerName => !triggers.some(t => t.Trigger === triggerName)
);
if (missingTriggers.length > 0) {
throw new Error(
`Missing required triggers: ${missingTriggers.join(', ')}`
);
}
outputProgress({
operation: 'Triggers verified',
message: `Successfully verified triggers: ${expectedTriggers.join(', ')}`
});
outputProgress({
status: 'complete',
operation: 'Database reset complete',