Debug splitting normal and metrics tables

This commit is contained in:
2025-01-11 17:53:08 -05:00
parent 50ead64356
commit 1eccfe0b2c
7 changed files with 325 additions and 337 deletions

View File

@@ -41,6 +41,7 @@ async function calculateMetrics() {
percentage: '0'
});
// Create and truncate tables one at a time
await connection.query(`
CREATE TABLE IF NOT EXISTS temp_sales_metrics (
product_id INT PRIMARY KEY,
@@ -49,8 +50,10 @@ async function calculateMetrics() {
average_price DECIMAL(10,2) DEFAULT 0.00,
last_sale_date DATE,
sales_rank INT
);
)
`);
await connection.query(`
CREATE TABLE IF NOT EXISTS temp_purchase_metrics (
product_id INT PRIMARY KEY,
total_quantity_purchased INT DEFAULT 0,
@@ -58,12 +61,12 @@ async function calculateMetrics() {
average_cost DECIMAL(10,2) DEFAULT 0.00,
last_purchase_date DATE,
purchase_rank INT
);
TRUNCATE TABLE temp_sales_metrics;
TRUNCATE TABLE temp_purchase_metrics;
)
`);
await connection.query('TRUNCATE TABLE temp_sales_metrics');
await connection.query('TRUNCATE TABLE temp_purchase_metrics');
// Calculate sales metrics
outputProgress({
status: 'running',
@@ -71,6 +74,7 @@ async function calculateMetrics() {
percentage: '20'
});
// First insert sales metrics
await connection.query(`
INSERT INTO temp_sales_metrics (
product_id,
@@ -87,21 +91,26 @@ async function calculateMetrics() {
MAX(date) as last_sale_date
FROM orders
WHERE canceled = 0
GROUP BY product_id;
UPDATE temp_sales_metrics
SET sales_rank = (
SELECT rank
FROM (
SELECT
product_id,
RANK() OVER (ORDER BY total_revenue DESC) as rank
FROM temp_sales_metrics
) rankings
WHERE rankings.product_id = temp_sales_metrics.product_id
);
GROUP BY product_id
`);
// Then update sales rank using a temporary table
await connection.query(`
CREATE TEMPORARY TABLE sales_rankings AS
SELECT
product_id,
RANK() OVER (ORDER BY total_revenue DESC) as rank
FROM temp_sales_metrics
`);
await connection.query(`
UPDATE temp_sales_metrics t
JOIN sales_rankings r ON t.product_id = r.product_id
SET t.sales_rank = r.rank
`);
await connection.query(`DROP TEMPORARY TABLE sales_rankings`);
// Calculate purchase metrics
outputProgress({
status: 'running',
@@ -109,6 +118,7 @@ async function calculateMetrics() {
percentage: '40'
});
// First insert purchase metrics
await connection.query(`
INSERT INTO temp_purchase_metrics (
product_id,
@@ -125,21 +135,26 @@ async function calculateMetrics() {
MAX(received_date) as last_purchase_date
FROM purchase_orders
WHERE status = 'closed' AND received > 0
GROUP BY product_id;
UPDATE temp_purchase_metrics
SET purchase_rank = (
SELECT rank
FROM (
SELECT
product_id,
RANK() OVER (ORDER BY total_cost DESC) as rank
FROM temp_purchase_metrics
) rankings
WHERE rankings.product_id = temp_purchase_metrics.product_id
);
GROUP BY product_id
`);
// Then update purchase rank using a temporary table
await connection.query(`
CREATE TEMPORARY TABLE purchase_rankings AS
SELECT
product_id,
RANK() OVER (ORDER BY total_cost DESC) as rank
FROM temp_purchase_metrics
`);
await connection.query(`
UPDATE temp_purchase_metrics t
JOIN purchase_rankings r ON t.product_id = r.product_id
SET t.purchase_rank = r.rank
`);
await connection.query(`DROP TEMPORARY TABLE purchase_rankings`);
// Update product metrics
outputProgress({
status: 'running',

View File

@@ -67,19 +67,28 @@ async function resetDatabase() {
await connection.query('SET FOREIGN_KEY_CHECKS = 1');
}
// Read and execute schema directly instead of spawning a process
// Read and execute main schema
outputProgress({
operation: 'Running database setup',
message: 'Creating new tables...'
message: 'Creating core tables...'
});
const schemaSQL = fs.readFileSync(path.join(__dirname, '../db/schema.sql'), 'utf8');
await connection.query(schemaSQL);
// Read and execute metrics schema
outputProgress({
operation: 'Running metrics setup',
message: 'Creating metrics tables...'
});
const metricsSchemaSQL = fs.readFileSync(path.join(__dirname, '../db/metrics-schema.sql'), 'utf8');
await connection.query(metricsSchemaSQL);
outputProgress({
status: 'complete',
operation: 'Database reset complete',
message: 'Database has been reset and tables recreated'
message: 'Database has been reset and all tables recreated'
});
} catch (error) {
outputProgress({

View File

@@ -1,170 +1,154 @@
const mysql = require('mysql2/promise');
const path = require('path');
const dotenv = require('dotenv');
const fs = require('fs');
dotenv.config({ path: path.join(__dirname, '../.env') });
require('dotenv').config({ path: path.resolve(__dirname, '../.env') });
const dbConfig = {
host: process.env.DB_HOST,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
database: process.env.DB_NAME,
multipleStatements: true,
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
namedPlaceholders: true
multipleStatements: true
};
// Set up logging
const LOG_DIR = path.join(__dirname, '../logs');
const ERROR_LOG = path.join(LOG_DIR, 'import-errors.log');
const IMPORT_LOG = path.join(LOG_DIR, 'import.log');
// Ensure log directory exists
if (!fs.existsSync(LOG_DIR)) {
fs.mkdirSync(LOG_DIR, { recursive: true });
}
// Helper function to log errors
function logError(error, context = '') {
const timestamp = new Date().toISOString();
const errorMessage = `[${timestamp}] ${context}\nError: ${error.message}\nStack: ${error.stack}\n\n`;
fs.appendFileSync(ERROR_LOG, errorMessage);
console.error(`\n${context}\nError: ${error.message}`);
}
// Helper function to log progress
function outputProgress(data) {
const timestamp = new Date().toISOString();
const logMessage = `[${timestamp}] ${JSON.stringify(data)}\n`;
fs.appendFileSync(IMPORT_LOG, logMessage);
console.log(JSON.stringify(data));
}
async function resetMetrics() {
let pool;
try {
pool = mysql.createPool(dbConfig);
const connection = await pool.getConnection();
function getMetricsTablesFromSchema() {
const schemaPath = path.join(__dirname, '../db/metrics-schema.sql');
const schemaSQL = fs.readFileSync(schemaPath, 'utf8');
// Extract table names from CREATE TABLE statements
const createTableRegex = /CREATE TABLE.*?`(\w+)`/g;
const tables = [];
let match;
while ((match = createTableRegex.exec(schemaSQL)) !== null) {
tables.push(match[1]);
}
return tables;
}
try {
outputProgress({
status: 'running',
operation: 'Starting metrics reset',
message: 'Creating/resetting metrics tables...',
percentage: '0'
});
async function checkIndexExists(connection, tableName, indexName) {
const [rows] = await connection.query(`
SELECT COUNT(*) as count
FROM information_schema.statistics
WHERE table_schema = DATABASE()
AND table_name = ?
AND index_name = ?`,
[tableName, indexName]
);
return rows[0].count > 0;
}
// Create tables if they don't exist and then truncate them
await connection.query(`
CREATE TABLE IF NOT EXISTS temp_sales_metrics (
product_id INT PRIMARY KEY,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,2) DEFAULT 0.00,
average_price DECIMAL(10,2) DEFAULT 0.00,
last_sale_date DATE,
sales_rank INT
);
async function createMetricsIndexes(connection) {
// Check and create orders index
const ordersIndexExists = await checkIndexExists(connection, 'orders', 'idx_orders_metrics');
if (!ordersIndexExists) {
await connection.query('CREATE INDEX idx_orders_metrics ON orders (product_id, date, canceled, quantity, price)');
}
CREATE TABLE IF NOT EXISTS temp_purchase_metrics (
product_id INT PRIMARY KEY,
total_quantity_purchased INT DEFAULT 0,
total_cost DECIMAL(10,2) DEFAULT 0.00,
average_cost DECIMAL(10,2) DEFAULT 0.00,
last_purchase_date DATE,
purchase_rank INT
);
CREATE TABLE IF NOT EXISTS product_metrics (
product_id INT PRIMARY KEY,
total_quantity_sold INT DEFAULT 0,
total_revenue DECIMAL(10,2) DEFAULT 0.00,
average_price DECIMAL(10,2) DEFAULT 0.00,
total_quantity_purchased INT DEFAULT 0,
total_cost DECIMAL(10,2) DEFAULT 0.00,
average_cost DECIMAL(10,2) DEFAULT 0.00,
profit_margin DECIMAL(5,2) DEFAULT 0.00,
turnover_rate DECIMAL(5,2) DEFAULT 0.00,
abc_class CHAR(1),
last_calculated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
last_sale_date DATE,
last_purchase_date DATE,
sales_rank INT,
purchase_rank INT
);
CREATE TABLE IF NOT EXISTS product_time_aggregates (
product_id INT,
period_type ENUM('daily', 'weekly', 'monthly', 'quarterly', 'yearly'),
period_start DATE,
quantity_sold INT DEFAULT 0,
revenue DECIMAL(10,2) DEFAULT 0.00,
quantity_purchased INT DEFAULT 0,
purchase_cost DECIMAL(10,2) DEFAULT 0.00,
PRIMARY KEY (product_id, period_type, period_start)
);
CREATE TABLE IF NOT EXISTS vendor_metrics (
vendor VARCHAR(255) PRIMARY KEY,
total_orders INT DEFAULT 0,
total_items_ordered INT DEFAULT 0,
total_items_received INT DEFAULT 0,
total_spend DECIMAL(10,2) DEFAULT 0.00,
average_order_value DECIMAL(10,2) DEFAULT 0.00,
fulfillment_rate DECIMAL(5,2) DEFAULT 0.00,
average_delivery_days DECIMAL(5,1),
last_order_date DATE,
last_delivery_date DATE
);
TRUNCATE TABLE temp_sales_metrics;
TRUNCATE TABLE temp_purchase_metrics;
TRUNCATE TABLE product_metrics;
TRUNCATE TABLE product_time_aggregates;
TRUNCATE TABLE vendor_metrics;
`);
outputProgress({
status: 'complete',
operation: 'Metrics reset completed',
message: 'All metrics tables have been created/cleared',
percentage: '100'
});
} catch (error) {
logError(error, 'Error resetting metrics tables');
outputProgress({
status: 'error',
error: error.message,
operation: 'Failed to reset metrics'
});
throw error;
} finally {
connection.release();
}
} catch (error) {
logError(error, 'Fatal error during metrics reset');
outputProgress({
status: 'error',
error: error.message,
operation: 'Failed to reset metrics'
});
throw error;
} finally {
if (pool) {
await pool.end();
}
// Check and create purchase_orders index
const poIndexExists = await checkIndexExists(connection, 'purchase_orders', 'idx_purchase_orders_metrics');
if (!poIndexExists) {
await connection.query('CREATE INDEX idx_purchase_orders_metrics ON purchase_orders (product_id, date, status, ordered, received)');
}
}
// Run if called directly
if (require.main === module) {
resetMetrics().catch(error => {
logError(error, 'Unhandled error in main process');
process.exit(1);
async function resetMetrics() {
outputProgress({
status: 'running',
operation: 'Starting metrics reset',
percentage: '0'
});
const connection = await mysql.createConnection(dbConfig);
try {
// Get list of metrics tables from schema
const metricsTables = getMetricsTablesFromSchema();
// Disable foreign key checks first
await connection.query('SET FOREIGN_KEY_CHECKS = 0');
// Get list of existing metrics tables
if (metricsTables.length > 0) {
const [tables] = await connection.query(`
SELECT GROUP_CONCAT(table_name) as tables
FROM information_schema.tables
WHERE table_schema = DATABASE()
AND table_name IN (${metricsTables.map(table => `'${table}'`).join(',')})`
);
if (tables[0].tables) {
outputProgress({
status: 'running',
operation: 'Dropping metrics tables',
percentage: '40'
});
// Drop all metrics tables in one query
const dropQuery = `DROP TABLE IF EXISTS ${tables[0].tables.split(',').map(table => '`' + table + '`').join(', ')}`;
await connection.query(dropQuery);
}
}
// Read and execute metrics schema (without the index creation)
outputProgress({
status: 'running',
operation: 'Creating metrics tables',
percentage: '60'
});
const schemaPath = path.join(__dirname, '../db/metrics-schema.sql');
let schemaSQL = fs.readFileSync(schemaPath, 'utf8');
// Remove the index creation statements from the schema
schemaSQL = schemaSQL.split('-- Create optimized indexes')[0];
await connection.query(schemaSQL);
// Create indexes if they don't exist
outputProgress({
status: 'running',
operation: 'Checking and creating indexes',
percentage: '80'
});
await createMetricsIndexes(connection);
// Re-enable foreign key checks
await connection.query('SET FOREIGN_KEY_CHECKS = 1');
outputProgress({
status: 'complete',
operation: 'Metrics tables have been reset',
percentage: '100'
});
return { success: true };
} catch (error) {
console.error('Error resetting metrics:', error);
outputProgress({
status: 'error',
operation: 'Failed to reset metrics',
error: error.message
});
throw error;
} finally {
await connection.end();
}
}
module.exports = resetMetrics;
// Export the function if being required as a module
if (typeof module !== 'undefined' && module.exports) {
module.exports = resetMetrics;
}
// Run directly if called from command line
if (require.main === module) {
resetMetrics().catch(error => {
console.error('Error:', error);
process.exit(1);
});
}