#!/bin/bash # Simple script to import CSV to PostgreSQL using psql # Usage: ./psql-csv-import.sh [start-batch] # Exit on error set -e # Get arguments CSV_FILE=$1 TABLE_NAME=$2 BATCH_SIZE=500000 # Process 500,000 rows at a time START_BATCH=${3:-1} # Optional third parameter to start from a specific batch if [ -z "$CSV_FILE" ] || [ -z "$TABLE_NAME" ]; then echo "Usage: ./psql-csv-import.sh [start-batch]" exit 1 fi # Check if file exists (only needed for batch 1) if [ "$START_BATCH" -eq 1 ] && [ ! -f "$CSV_FILE" ]; then echo "Error: CSV file '$CSV_FILE' not found" exit 1 fi # Load environment variables if [ -f "../.env" ]; then source "../.env" else echo "Warning: .env file not found, using default connection parameters" fi # Set default connection parameters if not from .env DB_HOST=${DB_HOST:-localhost} DB_PORT=${DB_PORT:-5432} DB_NAME=${DB_NAME:-inventory_db} DB_USER=${DB_USER:-postgres} export PGPASSWORD=${DB_PASSWORD:-} # Export password for psql # Common psql parameters PSQL_OPTS="-h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME" # Function to clean up database state cleanup_and_optimize() { echo "Cleaning up and optimizing database state..." # Analyze the target table to update statistics psql $PSQL_OPTS -c "ANALYZE $TABLE_NAME;" # Perform vacuum to reclaim space and update stats psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" # Reset connection pool psql $PSQL_OPTS -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = current_database() AND pid <> pg_backend_pid();" # Clean up shared memory psql $PSQL_OPTS -c "DISCARD ALL;" echo "Optimization complete." } # Show connection info echo "Importing $CSV_FILE into $TABLE_NAME" echo "Database: $DB_NAME on $DB_HOST:$DB_PORT with batch size: $BATCH_SIZE starting at batch $START_BATCH" # Start timer START_TIME=$(date +%s) # Create progress tracking file PROGRESS_FILE="/tmp/import_progress_${TABLE_NAME}.txt" touch "$PROGRESS_FILE" echo "Starting import at $(date), batch $START_BATCH" >> "$PROGRESS_FILE" # If we're resuming, run cleanup first if [ "$START_BATCH" -gt 1 ]; then cleanup_and_optimize fi # For imported_product_stat_history, use optimized approach with hardcoded column names if [ "$TABLE_NAME" = "imported_product_stat_history" ]; then echo "Using optimized import for $TABLE_NAME" # Only drop constraints/indexes and create staging table for batch 1 if [ "$START_BATCH" -eq 1 ]; then # Extract CSV header CSV_HEADER=$(head -n 1 "$CSV_FILE") echo "CSV header: $CSV_HEADER" # Step 1: Drop constraints and indexes echo "Dropping constraints and indexes..." psql $PSQL_OPTS -c " DO \$\$ DECLARE constraint_name TEXT; BEGIN -- Drop primary key constraint if exists SELECT conname INTO constraint_name FROM pg_constraint WHERE conrelid = '$TABLE_NAME'::regclass AND contype = 'p'; IF FOUND THEN EXECUTE 'ALTER TABLE $TABLE_NAME DROP CONSTRAINT IF EXISTS ' || constraint_name; RAISE NOTICE 'Dropped primary key constraint: %', constraint_name; END IF; END \$\$; " # Drop all indexes on the table psql $PSQL_OPTS -c " DO \$\$ DECLARE index_name TEXT; index_record RECORD; BEGIN FOR index_record IN SELECT indexname FROM pg_indexes WHERE tablename = '$TABLE_NAME' LOOP EXECUTE 'DROP INDEX IF EXISTS ' || index_record.indexname; RAISE NOTICE 'Dropped index: %', index_record.indexname; END LOOP; END \$\$; " # Step 2: Set maintenance_work_mem and disable triggers echo "Setting maintenance_work_mem and disabling triggers..." psql $PSQL_OPTS -c " SET maintenance_work_mem = '1GB'; ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL; " # Step 3: Create staging table echo "Creating staging table..." psql $PSQL_OPTS -c " DROP TABLE IF EXISTS staging_import; CREATE UNLOGGED TABLE staging_import ( pid TEXT, date TEXT, score TEXT, score2 TEXT, qty_in_baskets TEXT, qty_sold TEXT, notifies_set TEXT, visibility_score TEXT, health_score TEXT, sold_view_score TEXT ); -- Create an index on staging_import to improve OFFSET performance CREATE INDEX ON staging_import (pid); " # Step 4: Import CSV into staging table echo "Importing CSV into staging table..." psql $PSQL_OPTS -c "\copy staging_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','" else echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import" # Check if staging table exists STAGING_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='staging_import');" | tr -d '[:space:]') if [ "$STAGING_EXISTS" != "t" ]; then echo "Error: Staging table 'staging_import' does not exist. Run without batch parameter first." exit 1 fi # Ensure triggers are disabled psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;" # Optimize PostgreSQL for better performance psql $PSQL_OPTS -c " -- Increase work mem for this session SET work_mem = '256MB'; SET maintenance_work_mem = '1GB'; " fi # Step 5: Get total row count TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM staging_import;" | tr -d '[:space:]') echo "Total rows to import: $TOTAL_ROWS" # Calculate starting point PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE )) if [ $PROCESSED -ge $TOTAL_ROWS ]; then echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)" exit 1 fi # Step 6: Process in batches with shell loop BATCH_NUM=$(( $START_BATCH - 1 )) # We'll process batches in chunks of 10 before cleaning up CHUNKS_SINCE_CLEANUP=0 while [ $PROCESSED -lt $TOTAL_ROWS ]; do BATCH_NUM=$(( $BATCH_NUM + 1 )) BATCH_START=$(date +%s) MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE )) if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then MAX_ROWS=$TOTAL_ROWS fi echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..." # Optimize query buffer for this batch psql $PSQL_OPTS -c "SET work_mem = '256MB';" # Insert batch with type casts psql $PSQL_OPTS -c " INSERT INTO $TABLE_NAME ( pid, date, score, score2, qty_in_baskets, qty_sold, notifies_set, visibility_score, health_score, sold_view_score ) SELECT pid::bigint, date::date, score::numeric, score2::numeric, qty_in_baskets::smallint, qty_sold::smallint, notifies_set::smallint, visibility_score::numeric, health_score::varchar, sold_view_score::numeric FROM staging_import LIMIT $BATCH_SIZE OFFSET $PROCESSED; " # Update progress BATCH_END=$(date +%s) BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START )) PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc) echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE" # Increment counter PROCESSED=$(( $PROCESSED + $BATCH_SIZE )) CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 )) # Check current row count every 10 batches if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]') echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE" # Every 10 batches, run an intermediate cleanup if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then echo "Running intermediate cleanup and optimization..." psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" CHUNKS_SINCE_CLEANUP=0 fi fi # Optional - write a checkpoint file to know where to restart echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt" done # Only recreate indexes if we've completed the import if [ $PROCESSED -ge $TOTAL_ROWS ]; then # Step 7: Re-enable triggers and recreate primary key echo "Re-enabling triggers and recreating primary key..." psql $PSQL_OPTS -c " ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL; ALTER TABLE $TABLE_NAME ADD PRIMARY KEY (pid, date); " # Step 8: Clean up and get final count echo "Cleaning up and getting final count..." psql $PSQL_OPTS -c " DROP TABLE staging_import; VACUUM ANALYZE $TABLE_NAME; SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME; " else echo "Import interrupted at batch $BATCH_NUM. To resume, run:" echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM" fi else # Generic approach for other tables if [ "$START_BATCH" -eq 1 ]; then # Extract CSV header CSV_HEADER=$(head -n 1 "$CSV_FILE") echo "CSV header: $CSV_HEADER" # Extract CSV header and format it for SQL CSV_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/^/"/;s/$/"/' | tr '\n' ',' | sed 's/,$//') TEMP_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/$/ TEXT/' | tr '\n' ',' | sed 's/,$//') echo "Importing columns: $CSV_COLUMNS" # Step 1: Set maintenance_work_mem and disable triggers echo "Setting maintenance_work_mem and disabling triggers..." psql $PSQL_OPTS -c " SET maintenance_work_mem = '1GB'; ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL; " # Step 2: Create temp table echo "Creating temporary table..." psql $PSQL_OPTS -c " DROP TABLE IF EXISTS temp_import; CREATE UNLOGGED TABLE temp_import ($TEMP_COLUMNS); -- Create an index on temp_import to improve OFFSET performance CREATE INDEX ON temp_import ((1)); -- Index on first column " # Step 3: Import CSV into temp table echo "Importing CSV into temporary table..." psql $PSQL_OPTS -c "\copy temp_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','" else echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import" # Check if temp table exists TEMP_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='temp_import');" | tr -d '[:space:]') if [ "$TEMP_EXISTS" != "t" ]; then echo "Error: Temporary table 'temp_import' does not exist. Run without batch parameter first." exit 1 fi # Ensure triggers are disabled psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;" # Optimize PostgreSQL for better performance psql $PSQL_OPTS -c " -- Increase work mem for this session SET work_mem = '256MB'; SET maintenance_work_mem = '1GB'; " # Hard-code columns since we know them CSV_COLUMNS='"pid","date","score","score2","qty_in_baskets","qty_sold","notifies_set","visibility_score","health_score","sold_view_score"' echo "Using standard columns: $CSV_COLUMNS" fi # Step 4: Get total row count TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM temp_import;" | tr -d '[:space:]') echo "Total rows to import: $TOTAL_ROWS" # Calculate starting point PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE )) if [ $PROCESSED -ge $TOTAL_ROWS ]; then echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)" exit 1 fi # Step 5: Process in batches with shell loop BATCH_NUM=$(( $START_BATCH - 1 )) # We'll process batches in chunks of 10 before cleaning up CHUNKS_SINCE_CLEANUP=0 while [ $PROCESSED -lt $TOTAL_ROWS ]; do BATCH_NUM=$(( $BATCH_NUM + 1 )) BATCH_START=$(date +%s) MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE )) if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then MAX_ROWS=$TOTAL_ROWS fi echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..." # Optimize query buffer for this batch psql $PSQL_OPTS -c "SET work_mem = '256MB';" # Insert batch psql $PSQL_OPTS -c " INSERT INTO $TABLE_NAME ($CSV_COLUMNS) SELECT $CSV_COLUMNS FROM temp_import LIMIT $BATCH_SIZE OFFSET $PROCESSED; " # Update progress BATCH_END=$(date +%s) BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START )) PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc) echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE" # Increment counter PROCESSED=$(( $PROCESSED + $BATCH_SIZE )) CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 )) # Check current row count every 10 batches if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]') echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE" # Every 10 batches, run an intermediate cleanup if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then echo "Running intermediate cleanup and optimization..." psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;" CHUNKS_SINCE_CLEANUP=0 fi fi # Optional - write a checkpoint file to know where to restart echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt" done # Only clean up if we've completed the import if [ $PROCESSED -ge $TOTAL_ROWS ]; then # Step 6: Re-enable triggers and clean up echo "Re-enabling triggers and cleaning up..." psql $PSQL_OPTS -c " ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL; DROP TABLE temp_import; VACUUM ANALYZE $TABLE_NAME; SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME; " else echo "Import interrupted at batch $BATCH_NUM. To resume, run:" echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM" fi fi # Calculate elapsed time END_TIME=$(date +%s) ELAPSED=$((END_TIME - START_TIME)) echo "Import completed successfully in ${ELAPSED}s ($(($ELAPSED / 60)) minutes)" echo "Progress log saved to $PROGRESS_FILE"