#!/usr/bin/env python3 """ MongoDB to PostgreSQL Converter for Rocket.Chat Converts MongoDB BSON export files to PostgreSQL database Usage: python3 mongo_to_postgres_converter.py \ --mongo-path db/database/62df06d44234d20001289144 \ --pg-database rocketchat_converted \ --pg-user rocketchat_user \ --pg-password your_password \ --debug """ import json import os import re import subprocess import sys import struct from datetime import datetime from pathlib import Path from typing import Dict, Any, List, Optional import argparse import traceback # Auto-install dependencies if needed try: import bson import psycopg2 except ImportError: print("Installing required packages...") subprocess.check_call([sys.executable, "-m", "pip", "install", "pymongo", "psycopg2-binary"]) import bson import psycopg2 class MongoToPostgresConverter: def __init__(self, mongo_db_path: str, postgres_config: Dict[str, str], debug_mode: bool = False, debug_collections: List[str] = None): self.mongo_db_path = Path(mongo_db_path) self.postgres_config = postgres_config self.debug_mode = debug_mode self.debug_collections = debug_collections or [] self.collections = {} self.schema_info = {} self.error_log = {} def log_debug(self, message: str, collection: str = None): """Log debug messages if debug mode is enabled and collection is in debug list""" if self.debug_mode and (not self.debug_collections or collection in self.debug_collections): print(f"DEBUG: {message}") def log_error(self, collection: str, error_type: str, details: str): """Log detailed error information""" if collection not in self.error_log: self.error_log[collection] = [] self.error_log[collection].append({ 'type': error_type, 'details': details, 'timestamp': datetime.now().isoformat() }) def sample_documents(self, collection_name: str, max_samples: int = 3) -> List[Dict]: """Sample documents from a collection for debugging""" if not self.debug_mode or (self.debug_collections and collection_name not in self.debug_collections): return [] print(f"\nšŸ” Sampling documents from {collection_name}:") bson_file = self.collections[collection_name]['bson_file'] if bson_file.stat().st_size == 0: print(" Collection is empty") return [] samples = [] try: with open(bson_file, 'rb') as f: sample_count = 0 while sample_count < max_samples: try: doc_size = int.from_bytes(f.read(4), byteorder='little') if doc_size <= 0: break f.seek(-4, 1) doc_bytes = f.read(doc_size) if len(doc_bytes) != doc_size: break doc = bson.decode(doc_bytes) samples.append(doc) sample_count += 1 print(f" Sample {sample_count} - Keys: {list(doc.keys())}") # Show a few key fields with their types and truncated values for key, value in list(doc.items())[:3]: value_preview = str(value)[:50] + "..." if len(str(value)) > 50 else str(value) print(f" {key}: {type(value).__name__} = {value_preview}") if len(doc) > 3: print(f" ... and {len(doc) - 3} more fields") print() except (bson.InvalidBSON, struct.error, OSError) as e: self.log_error(collection_name, 'document_parsing', str(e)) break except Exception as e: self.log_error(collection_name, 'file_reading', str(e)) print(f" Error reading collection: {e}") return samples def discover_collections(self): """Discover all BSON files and their metadata""" print("Discovering MongoDB collections...") for bson_file in self.mongo_db_path.glob("*.bson"): collection_name = bson_file.stem metadata_file = bson_file.with_suffix(".metadata.json") # Read metadata if available metadata = {} if metadata_file.exists(): try: with open(metadata_file, 'r', encoding='utf-8') as f: metadata = json.load(f) except (UnicodeDecodeError, json.JSONDecodeError) as e: print(f"Warning: Could not read metadata for {collection_name}: {e}") metadata = {} # Get file size and document count estimate file_size = bson_file.stat().st_size doc_count = self._estimate_document_count(bson_file) self.collections[collection_name] = { 'bson_file': bson_file, 'metadata': metadata, 'file_size': file_size, 'estimated_docs': doc_count } print(f"Found {len(self.collections)} collections") for name, info in self.collections.items(): print(f" - {name}: {info['file_size']/1024/1024:.1f}MB (~{info['estimated_docs']} docs)") def _estimate_document_count(self, bson_file: Path) -> int: """Estimate document count by reading first few documents""" if bson_file.stat().st_size == 0: return 0 try: with open(bson_file, 'rb') as f: docs_sampled = 0 bytes_sampled = 0 max_sample_size = min(1024 * 1024, bson_file.stat().st_size) # 1MB or file size while bytes_sampled < max_sample_size: try: doc_size = int.from_bytes(f.read(4), byteorder='little') if doc_size <= 0 or doc_size > 16 * 1024 * 1024: # MongoDB doc size limit break f.seek(-4, 1) # Go back doc_bytes = f.read(doc_size) if len(doc_bytes) != doc_size: break bson.decode(doc_bytes) # Validate it's a valid BSON document docs_sampled += 1 bytes_sampled += doc_size except (bson.InvalidBSON, struct.error, OSError): break if docs_sampled > 0 and bytes_sampled > 0: avg_doc_size = bytes_sampled / docs_sampled return int(bson_file.stat().st_size / avg_doc_size) except Exception: pass return 0 def analyze_schema(self, collection_name: str, sample_size: int = 100) -> Dict[str, Any]: """Analyze collection schema by sampling documents""" print(f"Analyzing schema for {collection_name}...") bson_file = self.collections[collection_name]['bson_file'] if bson_file.stat().st_size == 0: return {} schema = {} docs_analyzed = 0 try: with open(bson_file, 'rb') as f: while docs_analyzed < sample_size: try: doc_size = int.from_bytes(f.read(4), byteorder='little') if doc_size <= 0: break f.seek(-4, 1) doc_bytes = f.read(doc_size) if len(doc_bytes) != doc_size: break doc = bson.decode(doc_bytes) self._analyze_document_schema(doc, schema) docs_analyzed += 1 except (bson.InvalidBSON, struct.error, OSError): break except Exception as e: print(f"Error analyzing {collection_name}: {e}") self.schema_info[collection_name] = schema return schema def _analyze_document_schema(self, doc: Dict[str, Any], schema: Dict[str, Any], prefix: str = ""): """Recursively analyze document structure""" for key, value in doc.items(): full_key = f"{prefix}.{key}" if prefix else key if full_key not in schema: schema[full_key] = { 'types': set(), 'null_count': 0, 'total_count': 0, 'is_array': False, 'nested_schema': {} } schema[full_key]['total_count'] += 1 if value is None: schema[full_key]['null_count'] += 1 schema[full_key]['types'].add('null') elif isinstance(value, dict): schema[full_key]['types'].add('object') if 'nested_schema' not in schema[full_key]: schema[full_key]['nested_schema'] = {} self._analyze_document_schema(value, schema[full_key]['nested_schema']) elif isinstance(value, list): schema[full_key]['types'].add('array') schema[full_key]['is_array'] = True if value and isinstance(value[0], dict): if 'array_item_schema' not in schema[full_key]: schema[full_key]['array_item_schema'] = {} for item in value[:5]: # Sample first 5 items if isinstance(item, dict): self._analyze_document_schema(item, schema[full_key]['array_item_schema']) else: schema[full_key]['types'].add(type(value).__name__) def generate_postgres_schema(self) -> Dict[str, str]: """Generate PostgreSQL CREATE TABLE statements""" print("Generating PostgreSQL schema...") table_definitions = {} for collection_name, schema in self.schema_info.items(): if not schema: # Empty collection continue table_name = self._sanitize_table_name(collection_name) columns = [] # Always add an id column (PostgreSQL doesn't use _id like MongoDB) columns.append("id SERIAL PRIMARY KEY") for field_name, field_info in schema.items(): if field_name == '_id': columns.append("mongo_id TEXT") # Always allow NULL for mongo_id continue col_name = self._sanitize_column_name(field_name) # Handle conflicts with PostgreSQL auto-generated columns if col_name in ['id', 'mongo_id', 'created_at', 'updated_at']: col_name = f"field_{col_name}" col_type = self._determine_postgres_type(field_info) # Make all fields nullable by default to avoid constraint violations columns.append(f"{col_name} {col_type}") # Add metadata columns columns.extend([ "created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP", "updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP" ]) column_definitions = ',\n '.join(columns) table_sql = f""" CREATE TABLE IF NOT EXISTS {table_name} ( {column_definitions} ); -- Create indexes based on MongoDB indexes """ # Get list of actual columns that will exist in the table existing_columns = set(['id', 'mongo_id', 'created_at', 'updated_at']) for field_name in schema.keys(): if field_name != '_id': col_name = self._sanitize_column_name(field_name) # Handle conflicts with PostgreSQL auto-generated columns if col_name in ['id', 'mongo_id', 'created_at', 'updated_at']: col_name = f"field_{col_name}" existing_columns.add(col_name) # Add indexes from MongoDB metadata metadata = self.collections[collection_name].get('metadata', {}) indexes = metadata.get('indexes', []) for index in indexes: if index['name'] != '_id_': # Skip the default _id index # Sanitize index name - remove special characters sanitized_index_name = re.sub(r'[^a-zA-Z0-9_]', '_', index['name']) index_name = f"idx_{table_name}_{sanitized_index_name}" index_keys = list(index['key'].keys()) if index_keys: sanitized_keys = [] for key in index_keys: if key != '_id': sanitized_key = self._sanitize_column_name(key) # Handle conflicts with PostgreSQL auto-generated columns if sanitized_key in ['id', 'mongo_id', 'created_at', 'updated_at']: sanitized_key = f"field_{sanitized_key}" # Only add if the column actually exists in our table if sanitized_key in existing_columns: sanitized_keys.append(sanitized_key) if sanitized_keys: table_sql += f"CREATE INDEX IF NOT EXISTS {index_name} ON {table_name} ({', '.join(sanitized_keys)});\n" table_definitions[collection_name] = table_sql return table_definitions def _sanitize_table_name(self, name: str) -> str: """Convert MongoDB collection name to PostgreSQL table name""" # Remove rocketchat_ prefix if present if name.startswith('rocketchat_'): name = name[11:] # Replace special characters with underscores name = re.sub(r'[^a-zA-Z0-9_]', '_', name) # Ensure it starts with a letter if name and name[0].isdigit(): name = 'table_' + name return name.lower() def _sanitize_column_name(self, name: str) -> str: """Convert MongoDB field name to PostgreSQL column name""" # Handle nested field names (convert dots to underscores) name = name.replace('.', '_') # Replace special characters with underscores name = re.sub(r'[^a-zA-Z0-9_]', '_', name) # Ensure it starts with a letter or underscore if name and name[0].isdigit(): name = 'col_' + name # Handle PostgreSQL reserved words reserved = { 'user', 'order', 'group', 'table', 'index', 'key', 'value', 'date', 'time', 'timestamp', 'default', 'select', 'from', 'where', 'insert', 'update', 'delete', 'create', 'drop', 'alter', 'grant', 'revoke', 'commit', 'rollback', 'begin', 'end', 'case', 'when', 'then', 'else', 'if', 'null', 'not', 'and', 'or', 'in', 'exists', 'between', 'like', 'limit', 'offset', 'union', 'join', 'inner', 'outer', 'left', 'right', 'full', 'cross', 'natural', 'on', 'using', 'distinct', 'all', 'any', 'some', 'desc', 'asc', 'primary', 'foreign', 'references', 'constraint', 'unique', 'check', 'cascade', 'restrict', 'action', 'match', 'partial', 'full' } if name.lower() in reserved: name = name + '_col' return name.lower() def _determine_postgres_type(self, field_info: Dict[str, Any]) -> str: """Determine PostgreSQL column type from MongoDB field analysis with improved logic""" types = field_info['types'] # Convert set to list for easier checking type_list = list(types) # If there's only one type (excluding null), use specific typing non_null_types = [t for t in type_list if t != 'null'] if len(non_null_types) == 1: single_type = non_null_types[0] if single_type == 'bool': return 'BOOLEAN' elif single_type == 'int': return 'INTEGER' elif single_type == 'float': return 'NUMERIC' elif single_type == 'str': return 'TEXT' elif single_type == 'datetime': return 'TIMESTAMP' elif single_type == 'ObjectId': return 'TEXT' # Handle mixed types more conservatively if 'array' in types or field_info.get('is_array', False): return 'JSONB' # Arrays always go to JSONB elif 'object' in types: return 'JSONB' # Objects always go to JSONB elif len(non_null_types) > 1: # Multiple non-null types - check for common combinations if set(non_null_types) <= {'int', 'float'}: return 'NUMERIC' # Can handle both int and float elif set(non_null_types) <= {'bool', 'str'}: return 'TEXT' # Convert everything to text elif set(non_null_types) <= {'str', 'ObjectId'}: return 'TEXT' # Both are string-like else: return 'JSONB' # Complex mixed types go to JSONB elif 'ObjectId' in types: return 'TEXT' elif 'datetime' in types: return 'TIMESTAMP' elif 'bool' in types: return 'BOOLEAN' elif 'int' in types: return 'INTEGER' elif 'float' in types: return 'NUMERIC' elif 'str' in types: return 'TEXT' else: return 'TEXT' # Default fallback def create_postgres_database(self, table_definitions: Dict[str, str]): """Create PostgreSQL database and tables""" print("Creating PostgreSQL database schema...") try: # Connect to PostgreSQL conn = psycopg2.connect(**self.postgres_config) conn.autocommit = True cursor = conn.cursor() # Create tables for collection_name, table_sql in table_definitions.items(): print(f"Creating table for {collection_name}...") cursor.execute(table_sql) cursor.close() conn.close() print("Database schema created successfully!") except Exception as e: print(f"Error creating database schema: {e}") raise def convert_and_insert_data(self, batch_size: int = 1000): """Convert BSON data and insert into PostgreSQL""" print("Converting and inserting data...") try: conn = psycopg2.connect(**self.postgres_config) conn.autocommit = False for collection_name in self.collections: print(f"Processing {collection_name}...") self._convert_collection(conn, collection_name, batch_size) conn.close() print("Data conversion completed successfully!") except Exception as e: print(f"Error converting data: {e}") raise def _convert_collection(self, conn, collection_name: str, batch_size: int): """Convert a single collection""" bson_file = self.collections[collection_name]['bson_file'] if bson_file.stat().st_size == 0: print(f" Skipping empty collection {collection_name}") return table_name = self._sanitize_table_name(collection_name) cursor = conn.cursor() batch = [] total_inserted = 0 errors = 0 try: with open(bson_file, 'rb') as f: while True: try: doc_size = int.from_bytes(f.read(4), byteorder='little') if doc_size <= 0: break f.seek(-4, 1) doc_bytes = f.read(doc_size) if len(doc_bytes) != doc_size: break doc = bson.decode(doc_bytes) batch.append(doc) if len(batch) >= batch_size: inserted, batch_errors = self._insert_batch(cursor, table_name, batch, collection_name) total_inserted += inserted errors += batch_errors batch = [] conn.commit() if total_inserted % 5000 == 0: # Less frequent progress updates print(f" Inserted {total_inserted} documents...") except (bson.InvalidBSON, struct.error, OSError): break # Insert remaining documents if batch: inserted, batch_errors = self._insert_batch(cursor, table_name, batch, collection_name) total_inserted += inserted errors += batch_errors conn.commit() if errors > 0: print(f" Completed {collection_name}: {total_inserted} documents inserted ({errors} errors)") else: print(f" Completed {collection_name}: {total_inserted} documents inserted") except Exception as e: print(f" Error processing {collection_name}: {e}") conn.rollback() finally: cursor.close() def _insert_batch(self, cursor, table_name: str, documents: List[Dict], collection_name: str): """Insert a batch of documents with proper transaction handling""" if not documents: return 0, 0 # Get schema info for this collection schema = self.schema_info.get(collection_name, {}) # Build column list columns = ['mongo_id'] for field_name in schema.keys(): if field_name != '_id': col_name = self._sanitize_column_name(field_name) # Handle conflicts with PostgreSQL auto-generated columns if col_name in ['id', 'mongo_id', 'created_at', 'updated_at']: col_name = f"field_{col_name}" columns.append(col_name) # Build INSERT statement placeholders = ', '.join(['%s'] * len(columns)) sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" self.log_debug(f"SQL: {sql}", collection_name) # Convert documents to tuples rows = [] errors = 0 for doc_idx, doc in enumerate(documents): try: row = [] # Add mongo_id row.append(str(doc.get('_id', ''))) # Add other fields for field_name in schema.keys(): if field_name != '_id': try: value = self._get_nested_value(doc, field_name) converted_value = self._convert_value_for_postgres(value, field_name, schema) row.append(converted_value) except Exception as e: self.log_error(collection_name, 'field_conversion', f"Field '{field_name}' in doc {doc_idx}: {str(e)}") # Only show debug for collections we're focusing on if collection_name in self.debug_collections: print(f" āš ļø Error converting field '{field_name}': {e}") row.append(None) # Use NULL for problematic fields rows.append(tuple(row)) except Exception as e: self.log_error(collection_name, 'document_conversion', f"Document {doc_idx}: {str(e)}") errors += 1 continue # Execute batch insert if rows: try: cursor.executemany(sql, rows) return len(rows), errors except Exception as batch_error: self.log_error(collection_name, 'batch_insert', str(batch_error)) # Only show detailed debugging for targeted collections if collection_name in self.debug_collections: print(f" šŸ”“ Batch insert failed for {collection_name}: {batch_error}") print(" Trying individual inserts with rollback handling...") # Rollback the failed transaction cursor.connection.rollback() # Try inserting one by one in individual transactions success_count = 0 for row_idx, row in enumerate(rows): try: cursor.execute(sql, row) cursor.connection.commit() # Commit each successful insert success_count += 1 except Exception as row_error: cursor.connection.rollback() # Rollback failed insert self.log_error(collection_name, 'row_insert', f"Row {row_idx}: {str(row_error)}") # Show detailed error only for the first few failures and only for targeted collections if collection_name in self.debug_collections and errors < 3: print(f" Row {row_idx} failed: {row_error}") print(f" Row data: {len(row)} values, expected {len(columns)} columns") errors += 1 continue return success_count, errors return 0, errors def _get_nested_value(self, doc: Dict, field_path: str): """Get value from nested document using dot notation""" keys = field_path.split('.') value = doc for key in keys: if isinstance(value, dict) and key in value: value = value[key] else: return None return value def _convert_value_for_postgres(self, value, field_name: str = None, schema: Dict = None): """Convert MongoDB value to PostgreSQL compatible value with schema-aware conversion""" if value is None: return None # Get the expected PostgreSQL type for this field if available expected_type = None if schema and field_name and field_name in schema: field_info = schema[field_name] expected_type = self._determine_postgres_type(field_info) # Handle conversion based on expected type if expected_type == 'BOOLEAN': if isinstance(value, bool): return value elif isinstance(value, str): return value.lower() in ('true', '1', 'yes', 'on') elif isinstance(value, (int, float)): return bool(value) else: return None elif expected_type == 'INTEGER': if isinstance(value, int): return value elif isinstance(value, float): return int(value) elif isinstance(value, str) and value.isdigit(): return int(value) elif isinstance(value, bool): return int(value) else: return None elif expected_type == 'NUMERIC': if isinstance(value, (int, float)): return value elif isinstance(value, str): try: return float(value) except ValueError: return None elif isinstance(value, bool): return float(value) else: return None elif expected_type == 'TEXT': if isinstance(value, str): return value elif value is not None: str_value = str(value) # Handle very long strings if len(str_value) > 65535: return str_value[:65535] return str_value else: return None elif expected_type == 'TIMESTAMP': if hasattr(value, 'isoformat'): return value.isoformat() elif isinstance(value, str): return value else: return str(value) if value is not None else None elif expected_type == 'JSONB': if isinstance(value, (dict, list)): return json.dumps(value, default=self._json_serializer) elif isinstance(value, str): # Check if it's already valid JSON try: json.loads(value) return value except (json.JSONDecodeError, TypeError): # Not valid JSON, wrap it return json.dumps(value) else: return json.dumps(value, default=self._json_serializer) # Fallback to original logic if no expected type or type not recognized if isinstance(value, bool): return value elif isinstance(value, (int, float)): return value elif isinstance(value, str): return value elif isinstance(value, (dict, list)): return json.dumps(value, default=self._json_serializer) elif hasattr(value, 'isoformat'): # datetime return value.isoformat() elif hasattr(value, '__str__'): str_value = str(value) if len(str_value) > 65535: return str_value[:65535] return str_value else: return str(value) def _json_serializer(self, obj): """Custom JSON serializer for complex objects with better error handling""" try: if hasattr(obj, 'isoformat'): # datetime return obj.isoformat() elif hasattr(obj, '__str__'): return str(obj) else: return None except Exception as e: self.log_debug(f"JSON serialization error: {e}") return str(obj) def run_conversion(self, sample_size: int = 100, batch_size: int = 1000): """Run the full conversion process with focused debugging""" print("Starting MongoDB to PostgreSQL conversion...") print("This will convert your Rocket.Chat database from MongoDB to PostgreSQL") if self.debug_mode: if self.debug_collections: print(f"šŸ› DEBUG MODE: Focusing on collections: {', '.join(self.debug_collections)}") else: print("šŸ› DEBUG MODE: All collections") print("=" * 70) # Step 1: Discover collections self.discover_collections() # Step 2: Analyze schemas print("\nAnalyzing collection schemas...") for collection_name in self.collections: self.analyze_schema(collection_name, sample_size) # Sample problematic collections if debugging if self.debug_mode and self.debug_collections: for coll in self.debug_collections: if coll in self.collections: self.sample_documents(coll, 2) # Step 3: Generate PostgreSQL schema table_definitions = self.generate_postgres_schema() # Step 4: Create database schema self.create_postgres_database(table_definitions) # Step 5: Convert and insert data self.convert_and_insert_data(batch_size) # Step 6: Show error summary self._print_error_summary() print("=" * 70) print("āœ… Conversion completed!") print(f" Database: {self.postgres_config['database']}") print(f" Tables created: {len(table_definitions)}") def _print_error_summary(self): """Print a focused summary of errors""" if not self.error_log: print("\nāœ… No errors encountered during conversion!") return print("\nāš ļø ERROR SUMMARY:") print("=" * 50) # Sort by error count descending sorted_collections = sorted(self.error_log.items(), key=lambda x: len(x[1]), reverse=True) for collection, errors in sorted_collections: error_types = {} for error in errors: error_type = error['type'] if error_type not in error_types: error_types[error_type] = [] error_types[error_type].append(error['details']) print(f"\nšŸ”“ {collection} ({len(errors)} total errors):") for error_type, details_list in error_types.items(): print(f" {error_type}: {len(details_list)} errors") # Show sample errors for critical collections if collection in ['rocketchat_settings', 'rocketchat_room'] and len(details_list) > 0: print(f" Sample: {details_list[0][:100]}...") def main(): parser = argparse.ArgumentParser( description='Convert MongoDB BSON export to PostgreSQL', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic usage python3 mongo_to_postgres_converter.py \\ --mongo-path db/database/62df06d44234d20001289144 \\ --pg-database rocketchat_converted \\ --pg-user rocketchat_user \\ --pg-password mypassword # Debug specific failing collections python3 mongo_to_postgres_converter.py \\ --mongo-path db/database/62df06d44234d20001289144 \\ --pg-database rocketchat_converted \\ --pg-user rocketchat_user \\ --pg-password mypassword \\ --debug-collections rocketchat_settings rocketchat_room Before running this script: 1. Run: sudo -u postgres psql -f reset_database.sql 2. Update the password in reset_database.sql """ ) parser.add_argument('--mongo-path', required=True, help='Path to MongoDB export directory') parser.add_argument('--pg-host', default='localhost', help='PostgreSQL host (default: localhost)') parser.add_argument('--pg-port', default='5432', help='PostgreSQL port (default: 5432)') parser.add_argument('--pg-database', required=True, help='PostgreSQL database name') parser.add_argument('--pg-user', required=True, help='PostgreSQL username') parser.add_argument('--pg-password', required=True, help='PostgreSQL password') parser.add_argument('--sample-size', type=int, default=100, help='Number of documents to sample for schema analysis (default: 100)') parser.add_argument('--batch-size', type=int, default=1000, help='Batch size for data insertion (default: 1000)') parser.add_argument('--debug', action='store_true', help='Enable debug mode with detailed error logging') parser.add_argument('--debug-collections', nargs='*', help='Specific collections to debug (e.g., rocketchat_settings rocketchat_room)') args = parser.parse_args() postgres_config = { 'host': args.pg_host, 'port': args.pg_port, 'database': args.pg_database, 'user': args.pg_user, 'password': args.pg_password } # Enable debug mode if debug collections are specified debug_mode = args.debug or (args.debug_collections is not None) converter = MongoToPostgresConverter(args.mongo_path, postgres_config, debug_mode, args.debug_collections) converter.run_conversion(args.sample_size, args.batch_size) if __name__ == '__main__': main()