const { Client } = require('pg'); const path = require('path'); const fs = require('fs'); require('dotenv').config({ path: path.resolve(__dirname, '../.env') }); const dbConfig = { host: process.env.DB_HOST, user: process.env.DB_USER, password: process.env.DB_PASSWORD, database: process.env.DB_NAME, port: process.env.DB_PORT || 5432 }; function outputProgress(data) { if (!data.status) { data = { status: 'running', ...data }; } console.log(JSON.stringify(data)); } // Explicitly define all metrics-related tables in dependency order const METRICS_TABLES = [ 'brand_metrics', 'brand_time_metrics', 'category_forecasts', 'category_metrics', 'category_sales_metrics', 'category_time_metrics', 'product_metrics', 'product_time_aggregates', 'sales_forecasts', 'temp_purchase_metrics', 'temp_sales_metrics', 'vendor_metrics', 'vendor_time_metrics', 'vendor_details' ]; // Tables to always protect from being dropped const PROTECTED_TABLES = [ 'users', 'permissions', 'user_permissions', 'calculate_history', 'import_history', 'ai_prompts', 'ai_validation_performance', 'templates', 'reusable_images' ]; // Split SQL into individual statements function splitSQLStatements(sql) { sql = sql.replace(/\r\n/g, '\n'); let statements = []; let currentStatement = ''; let inString = false; let stringChar = ''; for (let i = 0; i < sql.length; i++) { const char = sql[i]; const nextChar = sql[i + 1] || ''; if ((char === "'" || char === '"') && sql[i - 1] !== '\\') { if (!inString) { inString = true; stringChar = char; } else if (char === stringChar) { inString = false; } } if (!inString && char === '-' && nextChar === '-') { while (i < sql.length && sql[i] !== '\n') i++; continue; } if (!inString && char === '/' && nextChar === '*') { i += 2; while (i < sql.length && (sql[i] !== '*' || sql[i + 1] !== '/')) i++; i++; continue; } if (!inString && char === ';') { if (currentStatement.trim()) { statements.push(currentStatement.trim()); } currentStatement = ''; } else { currentStatement += char; } } if (currentStatement.trim()) { statements.push(currentStatement.trim()); } return statements; } async function resetMetrics() { let client; try { outputProgress({ operation: 'Starting metrics reset', message: 'Connecting to database...' }); client = new Client(dbConfig); await client.connect(); // Explicitly begin a transaction await client.query('BEGIN'); // First verify current state const initialTables = await client.query(` SELECT tablename as name FROM pg_tables WHERE schemaname = 'public' AND tablename = ANY($1) AND tablename NOT IN (SELECT unnest($2::text[])) `, [METRICS_TABLES, PROTECTED_TABLES]); outputProgress({ operation: 'Initial state', message: `Found ${initialTables.rows.length} existing metrics tables: ${initialTables.rows.map(t => t.name).join(', ')}` }); // Disable foreign key checks at the start await client.query('SET session_replication_role = \'replica\''); // Drop all metrics tables in reverse order to handle dependencies outputProgress({ operation: 'Dropping metrics tables', message: 'Removing existing metrics tables...' }); for (const table of [...METRICS_TABLES].reverse()) { // Skip protected tables if (PROTECTED_TABLES.includes(table)) { outputProgress({ operation: 'Protected table', message: `Skipping protected table: ${table}` }); continue; } try { // Use NOWAIT to avoid hanging if there's a lock await client.query(`DROP TABLE IF EXISTS "${table}" CASCADE`); // Verify the table was actually dropped const checkDrop = await client.query(` SELECT COUNT(*) as count FROM pg_tables WHERE schemaname = 'public' AND tablename = $1 `, [table]); if (parseInt(checkDrop.rows[0].count) > 0) { throw new Error(`Failed to drop table ${table} - table still exists`); } outputProgress({ operation: 'Table dropped', message: `Successfully dropped table: ${table}` }); // Commit after each table drop to ensure locks are released await client.query('COMMIT'); // Start a new transaction for the next table await client.query('BEGIN'); // Re-disable foreign key constraints for the new transaction await client.query('SET session_replication_role = \'replica\''); } catch (err) { outputProgress({ status: 'error', operation: 'Drop table error', message: `Error dropping table ${table}: ${err.message}` }); await client.query('ROLLBACK'); // Re-start transaction for next table await client.query('BEGIN'); await client.query('SET session_replication_role = \'replica\''); } } // Verify all tables were dropped const afterDrop = await client.query(` SELECT tablename as name FROM pg_tables WHERE schemaname = 'public' AND tablename = ANY($1) `, [METRICS_TABLES]); if (afterDrop.rows.length > 0) { throw new Error(`Failed to drop all tables. Remaining tables: ${afterDrop.rows.map(t => t.name).join(', ')}`); } // Make sure we have a fresh transaction here await client.query('COMMIT'); await client.query('BEGIN'); await client.query('SET session_replication_role = \'replica\''); // Read metrics schema outputProgress({ operation: 'Reading schema', message: 'Loading metrics schema file...' }); const schemaPath = path.resolve(__dirname, '../db/metrics-schema.sql'); if (!fs.existsSync(schemaPath)) { throw new Error(`Schema file not found at: ${schemaPath}`); } const schemaSQL = fs.readFileSync(schemaPath, 'utf8'); const statements = splitSQLStatements(schemaSQL); outputProgress({ operation: 'Schema loaded', message: `Found ${statements.length} SQL statements to execute` }); // Execute schema statements for (let i = 0; i < statements.length; i++) { const stmt = statements[i]; try { const result = await client.query(stmt); // If this is a CREATE TABLE statement, verify the table was created if (stmt.trim().toLowerCase().startsWith('create table')) { const tableName = stmt.match(/create\s+table\s+(?:if\s+not\s+exists\s+)?["]?(\w+)["]?/i)?.[1]; if (tableName) { const checkCreate = await client.query(` SELECT tablename as name FROM pg_tables WHERE schemaname = 'public' AND tablename = $1 `, [tableName]); if (checkCreate.rows.length === 0) { throw new Error(`Failed to create table ${tableName} - table does not exist after CREATE statement`); } outputProgress({ operation: 'Table created', message: `Successfully created table: ${tableName}` }); } } outputProgress({ operation: 'SQL Progress', message: { statement: i + 1, total: statements.length, preview: stmt.substring(0, 100) + (stmt.length > 100 ? '...' : ''), rowCount: result.rowCount } }); // Commit every 10 statements to avoid long-running transactions if (i > 0 && i % 10 === 0) { await client.query('COMMIT'); await client.query('BEGIN'); await client.query('SET session_replication_role = \'replica\''); } } catch (sqlError) { outputProgress({ status: 'error', operation: 'SQL Error', message: { error: sqlError.message, statement: stmt, statementNumber: i + 1 } }); await client.query('ROLLBACK'); throw sqlError; } } // Final commit for any pending statements await client.query('COMMIT'); // Start new transaction for final checks await client.query('BEGIN'); // Re-enable foreign key checks after all tables are created await client.query('SET session_replication_role = \'origin\''); // Verify metrics tables were created outputProgress({ operation: 'Verifying metrics tables', message: 'Checking all metrics tables were created...' }); const metricsTablesResult = await client.query(` SELECT tablename as name FROM pg_tables WHERE schemaname = 'public' AND tablename = ANY($1) `, [METRICS_TABLES]); outputProgress({ operation: 'Tables found', message: `Found ${metricsTablesResult.rows.length} tables: ${metricsTablesResult.rows.map(t => t.name).join(', ')}` }); const existingMetricsTables = metricsTablesResult.rows.map(t => t.name); const missingMetricsTables = METRICS_TABLES.filter(t => !existingMetricsTables.includes(t)); if (missingMetricsTables.length > 0) { // Do one final check of the actual tables const finalCheck = await client.query(` SELECT tablename as name FROM pg_tables WHERE schemaname = 'public' `); outputProgress({ operation: 'Final table check', message: `All database tables: ${finalCheck.rows.map(t => t.name).join(', ')}` }); await client.query('ROLLBACK'); throw new Error(`Failed to create metrics tables: ${missingMetricsTables.join(', ')}`); } // Commit final transaction await client.query('COMMIT'); outputProgress({ status: 'complete', operation: 'Reset complete', message: 'All metrics tables have been reset successfully' }); } catch (error) { outputProgress({ status: 'error', operation: 'Reset failed', message: error.message, stack: error.stack }); if (client) { try { await client.query('ROLLBACK'); } catch (rollbackError) { console.error('Error during rollback:', rollbackError); } // Make sure to re-enable foreign key checks even if there's an error await client.query('SET session_replication_role = \'origin\'').catch(() => {}); } throw error; } finally { if (client) { // One final attempt to ensure foreign key checks are enabled await client.query('SET session_replication_role = \'origin\'').catch(() => {}); await client.end(); } } } // Export if required as a module if (typeof module !== 'undefined' && module.exports) { module.exports = resetMetrics; } // Run if called from command line if (require.main === module) { resetMetrics().catch(error => { console.error('Error:', error); process.exit(1); }); }