428 lines
14 KiB
Bash
428 lines
14 KiB
Bash
#!/bin/bash
|
|
|
|
# Simple script to import CSV to PostgreSQL using psql
|
|
# Usage: ./psql-csv-import.sh <csv-file> <table-name> [start-batch]
|
|
|
|
# Exit on error
|
|
set -e
|
|
|
|
# Get arguments
|
|
CSV_FILE=$1
|
|
TABLE_NAME=$2
|
|
BATCH_SIZE=500000 # Process 500,000 rows at a time
|
|
START_BATCH=${3:-1} # Optional third parameter to start from a specific batch
|
|
|
|
if [ -z "$CSV_FILE" ] || [ -z "$TABLE_NAME" ]; then
|
|
echo "Usage: ./psql-csv-import.sh <csv-file> <table-name> [start-batch]"
|
|
exit 1
|
|
fi
|
|
|
|
# Check if file exists (only needed for batch 1)
|
|
if [ "$START_BATCH" -eq 1 ] && [ ! -f "$CSV_FILE" ]; then
|
|
echo "Error: CSV file '$CSV_FILE' not found"
|
|
exit 1
|
|
fi
|
|
|
|
# Load environment variables
|
|
if [ -f "../.env" ]; then
|
|
source "../.env"
|
|
else
|
|
echo "Warning: .env file not found, using default connection parameters"
|
|
fi
|
|
|
|
# Set default connection parameters if not from .env
|
|
DB_HOST=${DB_HOST:-localhost}
|
|
DB_PORT=${DB_PORT:-5432}
|
|
DB_NAME=${DB_NAME:-inventory_db}
|
|
DB_USER=${DB_USER:-postgres}
|
|
export PGPASSWORD=${DB_PASSWORD:-} # Export password for psql
|
|
|
|
# Common psql parameters
|
|
PSQL_OPTS="-h $DB_HOST -p $DB_PORT -U $DB_USER -d $DB_NAME"
|
|
|
|
# Function to clean up database state
|
|
cleanup_and_optimize() {
|
|
echo "Cleaning up and optimizing database state..."
|
|
|
|
# Analyze the target table to update statistics
|
|
psql $PSQL_OPTS -c "ANALYZE $TABLE_NAME;"
|
|
|
|
# Perform vacuum to reclaim space and update stats
|
|
psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;"
|
|
|
|
# Reset connection pool
|
|
psql $PSQL_OPTS -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = current_database() AND pid <> pg_backend_pid();"
|
|
|
|
# Clean up shared memory
|
|
psql $PSQL_OPTS -c "DISCARD ALL;"
|
|
|
|
echo "Optimization complete."
|
|
}
|
|
|
|
# Show connection info
|
|
echo "Importing $CSV_FILE into $TABLE_NAME"
|
|
echo "Database: $DB_NAME on $DB_HOST:$DB_PORT with batch size: $BATCH_SIZE starting at batch $START_BATCH"
|
|
|
|
# Start timer
|
|
START_TIME=$(date +%s)
|
|
|
|
# Create progress tracking file
|
|
PROGRESS_FILE="/tmp/import_progress_${TABLE_NAME}.txt"
|
|
touch "$PROGRESS_FILE"
|
|
echo "Starting import at $(date), batch $START_BATCH" >> "$PROGRESS_FILE"
|
|
|
|
# If we're resuming, run cleanup first
|
|
if [ "$START_BATCH" -gt 1 ]; then
|
|
cleanup_and_optimize
|
|
fi
|
|
|
|
# For imported_product_stat_history, use optimized approach with hardcoded column names
|
|
if [ "$TABLE_NAME" = "imported_product_stat_history" ]; then
|
|
echo "Using optimized import for $TABLE_NAME"
|
|
|
|
# Only drop constraints/indexes and create staging table for batch 1
|
|
if [ "$START_BATCH" -eq 1 ]; then
|
|
# Extract CSV header
|
|
CSV_HEADER=$(head -n 1 "$CSV_FILE")
|
|
echo "CSV header: $CSV_HEADER"
|
|
|
|
# Step 1: Drop constraints and indexes
|
|
echo "Dropping constraints and indexes..."
|
|
psql $PSQL_OPTS -c "
|
|
DO \$\$
|
|
DECLARE
|
|
constraint_name TEXT;
|
|
BEGIN
|
|
-- Drop primary key constraint if exists
|
|
SELECT conname INTO constraint_name
|
|
FROM pg_constraint
|
|
WHERE conrelid = '$TABLE_NAME'::regclass AND contype = 'p';
|
|
|
|
IF FOUND THEN
|
|
EXECUTE 'ALTER TABLE $TABLE_NAME DROP CONSTRAINT IF EXISTS ' || constraint_name;
|
|
RAISE NOTICE 'Dropped primary key constraint: %', constraint_name;
|
|
END IF;
|
|
END \$\$;
|
|
"
|
|
|
|
# Drop all indexes on the table
|
|
psql $PSQL_OPTS -c "
|
|
DO \$\$
|
|
DECLARE
|
|
index_name TEXT;
|
|
index_record RECORD;
|
|
BEGIN
|
|
FOR index_record IN
|
|
SELECT indexname
|
|
FROM pg_indexes
|
|
WHERE tablename = '$TABLE_NAME'
|
|
LOOP
|
|
EXECUTE 'DROP INDEX IF EXISTS ' || index_record.indexname;
|
|
RAISE NOTICE 'Dropped index: %', index_record.indexname;
|
|
END LOOP;
|
|
END \$\$;
|
|
"
|
|
|
|
# Step 2: Set maintenance_work_mem and disable triggers
|
|
echo "Setting maintenance_work_mem and disabling triggers..."
|
|
psql $PSQL_OPTS -c "
|
|
SET maintenance_work_mem = '1GB';
|
|
ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;
|
|
"
|
|
|
|
# Step 3: Create staging table
|
|
echo "Creating staging table..."
|
|
psql $PSQL_OPTS -c "
|
|
DROP TABLE IF EXISTS staging_import;
|
|
CREATE UNLOGGED TABLE staging_import (
|
|
pid TEXT,
|
|
date TEXT,
|
|
score TEXT,
|
|
score2 TEXT,
|
|
qty_in_baskets TEXT,
|
|
qty_sold TEXT,
|
|
notifies_set TEXT,
|
|
visibility_score TEXT,
|
|
health_score TEXT,
|
|
sold_view_score TEXT
|
|
);
|
|
|
|
-- Create an index on staging_import to improve OFFSET performance
|
|
CREATE INDEX ON staging_import (pid);
|
|
"
|
|
|
|
# Step 4: Import CSV into staging table
|
|
echo "Importing CSV into staging table..."
|
|
psql $PSQL_OPTS -c "\copy staging_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','"
|
|
else
|
|
echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import"
|
|
|
|
# Check if staging table exists
|
|
STAGING_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='staging_import');" | tr -d '[:space:]')
|
|
|
|
if [ "$STAGING_EXISTS" != "t" ]; then
|
|
echo "Error: Staging table 'staging_import' does not exist. Run without batch parameter first."
|
|
exit 1
|
|
fi
|
|
|
|
# Ensure triggers are disabled
|
|
psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;"
|
|
|
|
# Optimize PostgreSQL for better performance
|
|
psql $PSQL_OPTS -c "
|
|
-- Increase work mem for this session
|
|
SET work_mem = '256MB';
|
|
SET maintenance_work_mem = '1GB';
|
|
"
|
|
fi
|
|
|
|
# Step 5: Get total row count
|
|
TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM staging_import;" | tr -d '[:space:]')
|
|
echo "Total rows to import: $TOTAL_ROWS"
|
|
|
|
# Calculate starting point
|
|
PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE ))
|
|
if [ $PROCESSED -ge $TOTAL_ROWS ]; then
|
|
echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)"
|
|
exit 1
|
|
fi
|
|
|
|
# Step 6: Process in batches with shell loop
|
|
BATCH_NUM=$(( $START_BATCH - 1 ))
|
|
|
|
# We'll process batches in chunks of 10 before cleaning up
|
|
CHUNKS_SINCE_CLEANUP=0
|
|
|
|
while [ $PROCESSED -lt $TOTAL_ROWS ]; do
|
|
BATCH_NUM=$(( $BATCH_NUM + 1 ))
|
|
BATCH_START=$(date +%s)
|
|
MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE ))
|
|
if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then
|
|
MAX_ROWS=$TOTAL_ROWS
|
|
fi
|
|
|
|
echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..."
|
|
|
|
# Optimize query buffer for this batch
|
|
psql $PSQL_OPTS -c "SET work_mem = '256MB';"
|
|
|
|
# Insert batch with type casts
|
|
psql $PSQL_OPTS -c "
|
|
INSERT INTO $TABLE_NAME (
|
|
pid, date, score, score2, qty_in_baskets, qty_sold,
|
|
notifies_set, visibility_score, health_score, sold_view_score
|
|
)
|
|
SELECT
|
|
pid::bigint,
|
|
date::date,
|
|
score::numeric,
|
|
score2::numeric,
|
|
qty_in_baskets::smallint,
|
|
qty_sold::smallint,
|
|
notifies_set::smallint,
|
|
visibility_score::numeric,
|
|
health_score::varchar,
|
|
sold_view_score::numeric
|
|
FROM staging_import
|
|
LIMIT $BATCH_SIZE
|
|
OFFSET $PROCESSED;
|
|
"
|
|
|
|
# Update progress
|
|
BATCH_END=$(date +%s)
|
|
BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START ))
|
|
PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc)
|
|
|
|
echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE"
|
|
|
|
# Increment counter
|
|
PROCESSED=$(( $PROCESSED + $BATCH_SIZE ))
|
|
CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 ))
|
|
|
|
# Check current row count every 10 batches
|
|
if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then
|
|
CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]')
|
|
echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE"
|
|
|
|
# Every 10 batches, run an intermediate cleanup
|
|
if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then
|
|
echo "Running intermediate cleanup and optimization..."
|
|
psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;"
|
|
CHUNKS_SINCE_CLEANUP=0
|
|
fi
|
|
fi
|
|
|
|
# Optional - write a checkpoint file to know where to restart
|
|
echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt"
|
|
done
|
|
|
|
# Only recreate indexes if we've completed the import
|
|
if [ $PROCESSED -ge $TOTAL_ROWS ]; then
|
|
# Step 7: Re-enable triggers and recreate primary key
|
|
echo "Re-enabling triggers and recreating primary key..."
|
|
psql $PSQL_OPTS -c "
|
|
ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL;
|
|
ALTER TABLE $TABLE_NAME ADD PRIMARY KEY (pid, date);
|
|
"
|
|
|
|
# Step 8: Clean up and get final count
|
|
echo "Cleaning up and getting final count..."
|
|
psql $PSQL_OPTS -c "
|
|
DROP TABLE staging_import;
|
|
VACUUM ANALYZE $TABLE_NAME;
|
|
SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME;
|
|
"
|
|
else
|
|
echo "Import interrupted at batch $BATCH_NUM. To resume, run:"
|
|
echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM"
|
|
fi
|
|
|
|
else
|
|
# Generic approach for other tables
|
|
if [ "$START_BATCH" -eq 1 ]; then
|
|
# Extract CSV header
|
|
CSV_HEADER=$(head -n 1 "$CSV_FILE")
|
|
echo "CSV header: $CSV_HEADER"
|
|
|
|
# Extract CSV header and format it for SQL
|
|
CSV_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/^/"/;s/$/"/' | tr '\n' ',' | sed 's/,$//')
|
|
TEMP_COLUMNS=$(echo "$CSV_HEADER" | tr ',' '\n' | sed 's/$/ TEXT/' | tr '\n' ',' | sed 's/,$//')
|
|
|
|
echo "Importing columns: $CSV_COLUMNS"
|
|
|
|
# Step 1: Set maintenance_work_mem and disable triggers
|
|
echo "Setting maintenance_work_mem and disabling triggers..."
|
|
psql $PSQL_OPTS -c "
|
|
SET maintenance_work_mem = '1GB';
|
|
ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;
|
|
"
|
|
|
|
# Step 2: Create temp table
|
|
echo "Creating temporary table..."
|
|
psql $PSQL_OPTS -c "
|
|
DROP TABLE IF EXISTS temp_import;
|
|
CREATE UNLOGGED TABLE temp_import ($TEMP_COLUMNS);
|
|
|
|
-- Create an index on temp_import to improve OFFSET performance
|
|
CREATE INDEX ON temp_import ((1)); -- Index on first column
|
|
"
|
|
|
|
# Step 3: Import CSV into temp table
|
|
echo "Importing CSV into temporary table..."
|
|
psql $PSQL_OPTS -c "\copy temp_import FROM '$CSV_FILE' WITH CSV HEADER DELIMITER ','"
|
|
else
|
|
echo "Resuming import from batch $START_BATCH - skipping table creation and CSV import"
|
|
|
|
# Check if temp table exists
|
|
TEMP_EXISTS=$(psql $PSQL_OPTS -t -c "SELECT EXISTS(SELECT 1 FROM pg_tables WHERE tablename='temp_import');" | tr -d '[:space:]')
|
|
|
|
if [ "$TEMP_EXISTS" != "t" ]; then
|
|
echo "Error: Temporary table 'temp_import' does not exist. Run without batch parameter first."
|
|
exit 1
|
|
fi
|
|
|
|
# Ensure triggers are disabled
|
|
psql $PSQL_OPTS -c "ALTER TABLE $TABLE_NAME DISABLE TRIGGER ALL;"
|
|
|
|
# Optimize PostgreSQL for better performance
|
|
psql $PSQL_OPTS -c "
|
|
-- Increase work mem for this session
|
|
SET work_mem = '256MB';
|
|
SET maintenance_work_mem = '1GB';
|
|
"
|
|
|
|
# Hard-code columns since we know them
|
|
CSV_COLUMNS='"pid","date","score","score2","qty_in_baskets","qty_sold","notifies_set","visibility_score","health_score","sold_view_score"'
|
|
|
|
echo "Using standard columns: $CSV_COLUMNS"
|
|
fi
|
|
|
|
# Step 4: Get total row count
|
|
TOTAL_ROWS=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM temp_import;" | tr -d '[:space:]')
|
|
echo "Total rows to import: $TOTAL_ROWS"
|
|
|
|
# Calculate starting point
|
|
PROCESSED=$(( ($START_BATCH - 1) * $BATCH_SIZE ))
|
|
if [ $PROCESSED -ge $TOTAL_ROWS ]; then
|
|
echo "Error: Start batch $START_BATCH is beyond the available rows ($TOTAL_ROWS)"
|
|
exit 1
|
|
fi
|
|
|
|
# Step 5: Process in batches with shell loop
|
|
BATCH_NUM=$(( $START_BATCH - 1 ))
|
|
|
|
# We'll process batches in chunks of 10 before cleaning up
|
|
CHUNKS_SINCE_CLEANUP=0
|
|
|
|
while [ $PROCESSED -lt $TOTAL_ROWS ]; do
|
|
BATCH_NUM=$(( $BATCH_NUM + 1 ))
|
|
BATCH_START=$(date +%s)
|
|
MAX_ROWS=$(( $PROCESSED + $BATCH_SIZE ))
|
|
if [ $MAX_ROWS -gt $TOTAL_ROWS ]; then
|
|
MAX_ROWS=$TOTAL_ROWS
|
|
fi
|
|
|
|
echo "Processing batch $BATCH_NUM (rows $PROCESSED to $MAX_ROWS)..."
|
|
|
|
# Optimize query buffer for this batch
|
|
psql $PSQL_OPTS -c "SET work_mem = '256MB';"
|
|
|
|
# Insert batch
|
|
psql $PSQL_OPTS -c "
|
|
INSERT INTO $TABLE_NAME ($CSV_COLUMNS)
|
|
SELECT $CSV_COLUMNS
|
|
FROM temp_import
|
|
LIMIT $BATCH_SIZE
|
|
OFFSET $PROCESSED;
|
|
"
|
|
|
|
# Update progress
|
|
BATCH_END=$(date +%s)
|
|
BATCH_ELAPSED=$(( $BATCH_END - $BATCH_START ))
|
|
PROGRESS_PCT=$(echo "scale=2; $MAX_ROWS * 100 / $TOTAL_ROWS" | bc)
|
|
|
|
echo "Batch $BATCH_NUM committed in ${BATCH_ELAPSED}s, $MAX_ROWS of $TOTAL_ROWS rows processed ($PROGRESS_PCT%)" | tee -a "$PROGRESS_FILE"
|
|
|
|
# Increment counter
|
|
PROCESSED=$(( $PROCESSED + $BATCH_SIZE ))
|
|
CHUNKS_SINCE_CLEANUP=$(( $CHUNKS_SINCE_CLEANUP + 1 ))
|
|
|
|
# Check current row count every 10 batches
|
|
if [ $(( $BATCH_NUM % 10 )) -eq 0 ]; then
|
|
CURRENT_COUNT=$(psql $PSQL_OPTS -t -c "SELECT COUNT(*) FROM $TABLE_NAME;" | tr -d '[:space:]')
|
|
echo "Current row count in $TABLE_NAME: $CURRENT_COUNT" | tee -a "$PROGRESS_FILE"
|
|
|
|
# Every 10 batches, run an intermediate cleanup
|
|
if [ $CHUNKS_SINCE_CLEANUP -ge 10 ]; then
|
|
echo "Running intermediate cleanup and optimization..."
|
|
psql $PSQL_OPTS -c "VACUUM $TABLE_NAME;"
|
|
CHUNKS_SINCE_CLEANUP=0
|
|
fi
|
|
fi
|
|
|
|
# Optional - write a checkpoint file to know where to restart
|
|
echo "$BATCH_NUM" > "/tmp/import_last_batch_${TABLE_NAME}.txt"
|
|
done
|
|
|
|
# Only clean up if we've completed the import
|
|
if [ $PROCESSED -ge $TOTAL_ROWS ]; then
|
|
# Step 6: Re-enable triggers and clean up
|
|
echo "Re-enabling triggers and cleaning up..."
|
|
psql $PSQL_OPTS -c "
|
|
ALTER TABLE $TABLE_NAME ENABLE TRIGGER ALL;
|
|
DROP TABLE temp_import;
|
|
VACUUM ANALYZE $TABLE_NAME;
|
|
SELECT COUNT(*) AS \"Total rows in $TABLE_NAME\" FROM $TABLE_NAME;
|
|
"
|
|
else
|
|
echo "Import interrupted at batch $BATCH_NUM. To resume, run:"
|
|
echo "./psql-csv-import.sh $CSV_FILE $TABLE_NAME $BATCH_NUM"
|
|
fi
|
|
fi
|
|
|
|
# Calculate elapsed time
|
|
END_TIME=$(date +%s)
|
|
ELAPSED=$((END_TIME - START_TIME))
|
|
|
|
echo "Import completed successfully in ${ELAPSED}s ($(($ELAPSED / 60)) minutes)"
|
|
echo "Progress log saved to $PROGRESS_FILE" |