Files
inventory/inventory-server/scripts/import/historical-data.js

961 lines
31 KiB
JavaScript

const { outputProgress, formatElapsedTime, estimateRemaining, calculateRate } = require('../metrics-new/utils/progress');
const fs = require('fs');
const path = require('path');
const { pipeline } = require('stream');
const { promisify } = require('util');
// Configuration constants to control which tables get imported
const IMPORT_PRODUCT_CURRENT_PRICES = false;
const IMPORT_DAILY_INVENTORY = false;
const IMPORT_PRODUCT_STAT_HISTORY = true;
// For product stat history, limit to more recent data for faster initial import
const USE_RECENT_MONTHS = 12; // Just use the most recent months for product_stat_history
/**
* Validates a date from MySQL before inserting it into PostgreSQL
* @param {string|Date|null} mysqlDate - Date string or object from MySQL
* @returns {string|null} Valid date string or null if invalid
*/
function validateDate(mysqlDate) {
// Handle null, undefined, or empty values
if (!mysqlDate) {
return null;
}
// Convert to string if it's not already
const dateStr = String(mysqlDate);
// Handle MySQL zero dates and empty values
if (dateStr === '0000-00-00' ||
dateStr === '0000-00-00 00:00:00' ||
dateStr.indexOf('0000-00-00') !== -1 ||
dateStr === '') {
return null;
}
// Check if the date is valid
const date = new Date(mysqlDate);
// If the date is invalid or suspiciously old (pre-1970), return null
if (isNaN(date.getTime()) || date.getFullYear() < 1970) {
return null;
}
return mysqlDate;
}
/**
* Imports historical data from MySQL to PostgreSQL
*/
async function importHistoricalData(
prodConnection,
localConnection,
options = {}
) {
const {
incrementalUpdate = true,
oneYearAgo = new Date(new Date().setFullYear(new Date().getFullYear() - 1))
} = options;
const oneYearAgoStr = oneYearAgo.toISOString().split('T')[0];
const startTime = Date.now();
// Use larger batch sizes to improve performance
const BATCH_SIZE = 5000; // For fetching from small tables
const INSERT_BATCH_SIZE = 500; // For inserting to small tables
const LARGE_BATCH_SIZE = 10000; // For fetching from large tables
const LARGE_INSERT_BATCH_SIZE = 1000; // For inserting to large tables
// Calculate date for recent data
const recentDateStr = new Date(
new Date().setMonth(new Date().getMonth() - USE_RECENT_MONTHS)
).toISOString().split('T')[0];
console.log(`Starting import with:
- One year ago date: ${oneYearAgoStr}
- Recent months date: ${recentDateStr} (for product_stat_history)
- Incremental update: ${incrementalUpdate}
- Standard batch size: ${BATCH_SIZE}
- Standard insert batch size: ${INSERT_BATCH_SIZE}
- Large table batch size: ${LARGE_BATCH_SIZE}
- Large table insert batch size: ${LARGE_INSERT_BATCH_SIZE}
- Import product_current_prices: ${IMPORT_PRODUCT_CURRENT_PRICES}
- Import daily_inventory: ${IMPORT_DAILY_INVENTORY}
- Import product_stat_history: ${IMPORT_PRODUCT_STAT_HISTORY}`);
try {
// Get last sync time for incremental updates
const lastSyncTimes = {};
if (incrementalUpdate) {
try {
const syncResult = await localConnection.query(`
SELECT table_name, last_sync_timestamp
FROM sync_status
WHERE table_name IN (
'imported_product_current_prices',
'imported_daily_inventory',
'imported_product_stat_history'
)
`);
// Add check for rows existence and type
if (syncResult && Array.isArray(syncResult.rows)) {
for (const row of syncResult.rows) {
lastSyncTimes[row.table_name] = row.last_sync_timestamp;
console.log(`Last sync time for ${row.table_name}: ${row.last_sync_timestamp}`);
}
} else {
console.warn('Sync status query did not return expected rows. Proceeding without last sync times.');
}
} catch (error) {
console.error('Error fetching sync status:', error);
}
}
// Determine how many tables will be imported
const tablesCount = [
IMPORT_PRODUCT_CURRENT_PRICES,
IMPORT_DAILY_INVENTORY,
IMPORT_PRODUCT_STAT_HISTORY
].filter(Boolean).length;
// Run all imports sequentially for better reliability
console.log(`Starting sequential imports for ${tablesCount} tables...`);
outputProgress({
status: "running",
operation: "Historical data import",
message: `Starting sequential imports for ${tablesCount} tables...`,
current: 0,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
let progressCount = 0;
let productCurrentPricesResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
let dailyInventoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
let productStatHistoryResult = { recordsAdded: 0, recordsUpdated: 0, totalProcessed: 0, errors: [] };
// Import product current prices
if (IMPORT_PRODUCT_CURRENT_PRICES) {
console.log('Importing product current prices...');
productCurrentPricesResult = await importProductCurrentPrices(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTimes['imported_product_current_prices'],
BATCH_SIZE,
INSERT_BATCH_SIZE,
incrementalUpdate,
startTime
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Import daily inventory
if (IMPORT_DAILY_INVENTORY) {
console.log('Importing daily inventory...');
dailyInventoryResult = await importDailyInventory(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTimes['imported_daily_inventory'],
BATCH_SIZE,
INSERT_BATCH_SIZE,
incrementalUpdate,
startTime
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Import product stat history - using optimized approach
if (IMPORT_PRODUCT_STAT_HISTORY) {
console.log('Importing product stat history...');
productStatHistoryResult = await importProductStatHistory(
prodConnection,
localConnection,
recentDateStr, // Use more recent date for this massive table
lastSyncTimes['imported_product_stat_history'],
LARGE_BATCH_SIZE,
LARGE_INSERT_BATCH_SIZE,
incrementalUpdate,
startTime,
USE_RECENT_MONTHS // Pass the recent months constant
);
progressCount++;
outputProgress({
status: "running",
operation: "Historical data import",
message: `Completed import ${progressCount} of ${tablesCount}`,
current: progressCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
}
// Aggregate results
const totalRecordsAdded =
productCurrentPricesResult.recordsAdded +
dailyInventoryResult.recordsAdded +
productStatHistoryResult.recordsAdded;
const totalRecordsUpdated =
productCurrentPricesResult.recordsUpdated +
dailyInventoryResult.recordsUpdated +
productStatHistoryResult.recordsUpdated;
const totalProcessed =
productCurrentPricesResult.totalProcessed +
dailyInventoryResult.totalProcessed +
productStatHistoryResult.totalProcessed;
const allErrors = [
...productCurrentPricesResult.errors,
...dailyInventoryResult.errors,
...productStatHistoryResult.errors
];
// Log import summary
console.log(`
Historical data import complete:
-------------------------------
Records added: ${totalRecordsAdded}
Records updated: ${totalRecordsUpdated}
Total processed: ${totalProcessed}
Errors: ${allErrors.length}
Time taken: ${formatElapsedTime(startTime)}
`);
// Final progress update
outputProgress({
status: "complete",
operation: "Historical data import",
message: `Import complete. Added: ${totalRecordsAdded}, Updated: ${totalRecordsUpdated}, Errors: ${allErrors.length}`,
current: tablesCount,
total: tablesCount,
elapsed: formatElapsedTime(startTime)
});
// Log any errors
if (allErrors.length > 0) {
console.log('Errors encountered during import:');
console.log(JSON.stringify(allErrors, null, 2));
}
// Calculate duration
const endTime = Date.now();
const durationSeconds = Math.round((endTime - startTime) / 1000);
const finalStatus = allErrors.length === 0 ? 'complete' : 'failed';
const errorMessage = allErrors.length > 0 ? JSON.stringify(allErrors) : null;
// Update import history
await localConnection.query(`
INSERT INTO import_history (
table_name,
end_time,
duration_seconds,
records_added,
records_updated,
is_incremental,
status,
error_message,
additional_info
)
VALUES ($1, NOW(), $2, $3, $4, $5, $6, $7, $8)
`, [
'historical_data_combined',
durationSeconds,
totalRecordsAdded,
totalRecordsUpdated,
incrementalUpdate,
finalStatus,
errorMessage,
JSON.stringify({
totalProcessed,
tablesImported: {
imported_product_current_prices: IMPORT_PRODUCT_CURRENT_PRICES,
imported_daily_inventory: IMPORT_DAILY_INVENTORY,
imported_product_stat_history: IMPORT_PRODUCT_STAT_HISTORY
}
})
]);
// Return summary
return {
recordsAdded: totalRecordsAdded,
recordsUpdated: totalRecordsUpdated,
totalProcessed,
errors: allErrors,
timeTaken: formatElapsedTime(startTime)
};
} catch (error) {
console.error('Error importing historical data:', error);
// Final progress update on error
outputProgress({
status: "failed",
operation: "Historical data import",
message: `Import failed: ${error.message}`,
elapsed: formatElapsedTime(startTime)
});
throw error;
}
}
/**
* Imports product_current_prices data from MySQL to PostgreSQL
*/
async function importProductCurrentPrices(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM product_current_prices
WHERE (date_active >= ? OR date_deactive >= ?)
${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''}
`, [oneYearAgoStr, oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
outputProgress({
status: "running",
operation: "Historical data import - Product Current Prices",
message: `Found ${totalCount} records to process`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production
const [rows] = await prodConnection.query(`
SELECT
price_id,
pid,
qty_buy,
is_min_qty_buy,
price_each,
qty_limit,
no_promo,
checkout_offer,
active,
date_active,
date_deactive
FROM product_current_prices
WHERE (date_active >= ? OR date_deactive >= ?)
${incrementalUpdate && lastSyncTime ? `AND date_deactive > ?` : ''}
ORDER BY price_id
LIMIT ? OFFSET ?
`, [
oneYearAgoStr,
oneYearAgoStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
row.price_id,
row.pid,
row.qty_buy,
row.is_min_qty_buy ? true : false,
row.price_each,
row.qty_limit, // PostgreSQL will handle null values properly
row.no_promo ? true : false,
row.checkout_offer ? true : false,
row.active ? true : false,
validateDate(row.date_active),
validateDate(row.date_deactive)
);
}
// Execute batch insert
const result = await localConnection.query(`
WITH ins AS (
INSERT INTO imported_product_current_prices (
price_id, pid, qty_buy, is_min_qty_buy, price_each, qty_limit,
no_promo, checkout_offer, active, date_active, date_deactive
)
VALUES ${placeholders.join(',\n')}
ON CONFLICT (price_id) DO UPDATE SET
pid = EXCLUDED.pid,
qty_buy = EXCLUDED.qty_buy,
is_min_qty_buy = EXCLUDED.is_min_qty_buy,
price_each = EXCLUDED.price_each,
qty_limit = EXCLUDED.qty_limit,
no_promo = EXCLUDED.no_promo,
checkout_offer = EXCLUDED.checkout_offer,
active = EXCLUDED.active,
date_active = EXCLUDED.date_active,
date_deactive = EXCLUDED.date_deactive,
updated = CURRENT_TIMESTAMP
RETURNING (xmax = 0) AS inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) AS inserted_count,
COUNT(*) FILTER (WHERE NOT inserted) AS updated_count
FROM ins
`, values);
// Safely update counts based on the result
if (result && result.rows && result.rows.length > 0) {
const insertedCount = parseInt(result.rows[0].inserted_count || 0);
const updatedCount = parseInt(result.rows[0].updated_count || 0);
recordsAdded += insertedCount;
recordsUpdated += updatedCount;
}
} catch (error) {
console.error(`Error in batch import of product_current_prices at offset ${i}:`, error);
errors.push({
table: 'imported_product_current_prices',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Product Current Prices",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of product_current_prices:', error);
errors.push({
table: 'imported_product_current_prices',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_product_current_prices', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in product current prices import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_product_current_prices',
error: error.message
}]
};
}
}
/**
* Imports daily_inventory data from MySQL to PostgreSQL
*/
async function importDailyInventory(
prodConnection,
localConnection,
oneYearAgoStr,
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM daily_inventory
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''}
`, [oneYearAgoStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
outputProgress({
status: "running",
operation: "Historical data import - Daily Inventory",
message: `Found ${totalCount} records to process`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production
const [rows] = await prodConnection.query(`
SELECT
date,
pid,
amountsold,
times_sold,
qtyreceived,
price,
costeach,
stamp
FROM daily_inventory
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND stamp > ?` : ''}
ORDER BY date, pid
LIMIT ? OFFSET ?
`, [
oneYearAgoStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
validateDate(row.date),
row.pid,
row.amountsold || 0,
row.times_sold || 0,
row.qtyreceived || 0,
row.price || 0,
row.costeach || 0,
validateDate(row.stamp)
);
}
// Execute batch insert
const result = await localConnection.query(`
WITH ins AS (
INSERT INTO imported_daily_inventory (
date, pid, amountsold, times_sold, qtyreceived, price, costeach, stamp
)
VALUES ${placeholders.join(',\n')}
ON CONFLICT (date, pid) DO UPDATE SET
amountsold = EXCLUDED.amountsold,
times_sold = EXCLUDED.times_sold,
qtyreceived = EXCLUDED.qtyreceived,
price = EXCLUDED.price,
costeach = EXCLUDED.costeach,
stamp = EXCLUDED.stamp,
updated = CURRENT_TIMESTAMP
RETURNING (xmax = 0) AS inserted
)
SELECT
COUNT(*) FILTER (WHERE inserted) AS inserted_count,
COUNT(*) FILTER (WHERE NOT inserted) AS updated_count
FROM ins
`, values);
// Safely update counts based on the result
if (result && result.rows && result.rows.length > 0) {
const insertedCount = parseInt(result.rows[0].inserted_count || 0);
const updatedCount = parseInt(result.rows[0].updated_count || 0);
recordsAdded += insertedCount;
recordsUpdated += updatedCount;
}
} catch (error) {
console.error(`Error in batch import of daily_inventory at offset ${i}:`, error);
errors.push({
table: 'imported_daily_inventory',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Daily Inventory",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of daily_inventory:', error);
errors.push({
table: 'imported_daily_inventory',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_daily_inventory', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in daily inventory import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_daily_inventory',
error: error.message
}]
};
}
}
/**
* Imports product_stat_history data from MySQL to PostgreSQL
* Using fast direct inserts without conflict checking
*/
async function importProductStatHistory(
prodConnection,
localConnection,
recentDateStr, // Use more recent date instead of one year ago
lastSyncTime,
batchSize,
insertBatchSize,
incrementalUpdate,
startTime,
recentMonths // Add parameter for recent months
) {
let recordsAdded = 0;
let recordsUpdated = 0;
let totalProcessed = 0;
let errors = [];
let offset = 0;
let allProcessed = false;
let lastRateCheck = Date.now();
let lastProcessed = 0;
try {
// Get total count for progress reporting
const [countResult] = await prodConnection.query(`
SELECT COUNT(*) as total
FROM product_stat_history
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''}
`, [recentDateStr, ...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : [])]);
const totalCount = countResult[0].total;
console.log(`Found ${totalCount} records to process in product_stat_history (using recent date: ${recentDateStr})`);
// Progress indicator
outputProgress({
status: "running",
operation: "Historical data import - Product Stat History",
message: `Found ${totalCount} records to process (last ${recentMonths} months only)`,
current: 0,
total: totalCount,
elapsed: formatElapsedTime(startTime)
});
// If not incremental, truncate the table first for better performance
if (!incrementalUpdate) {
console.log('Truncating imported_product_stat_history for full import...');
await localConnection.query('TRUNCATE TABLE imported_product_stat_history');
} else if (lastSyncTime) {
// For incremental updates, delete records that will be reimported
console.log(`Deleting records from imported_product_stat_history since ${lastSyncTime}...`);
await localConnection.query('DELETE FROM imported_product_stat_history WHERE date > $1', [lastSyncTime]);
}
// Process in batches for better performance
while (!allProcessed) {
try {
// Fetch batch from production with minimal filtering and no sorting
const [rows] = await prodConnection.query(`
SELECT
pid,
date,
COALESCE(score, 0) as score,
COALESCE(score2, 0) as score2,
COALESCE(qty_in_baskets, 0) as qty_in_baskets,
COALESCE(qty_sold, 0) as qty_sold,
COALESCE(notifies_set, 0) as notifies_set,
COALESCE(visibility_score, 0) as visibility_score,
COALESCE(health_score, 0) as health_score,
COALESCE(sold_view_score, 0) as sold_view_score
FROM product_stat_history
WHERE date >= ?
${incrementalUpdate && lastSyncTime ? `AND date > ?` : ''}
LIMIT ? OFFSET ?
`, [
recentDateStr,
...(incrementalUpdate && lastSyncTime ? [lastSyncTime] : []),
batchSize,
offset
]);
if (rows.length === 0) {
allProcessed = true;
break;
}
// Process rows in smaller batches for better performance
for (let i = 0; i < rows.length; i += insertBatchSize) {
const batch = rows.slice(i, i + insertBatchSize);
if (batch.length === 0) continue;
try {
// Build parameterized query to handle NULL values properly
const values = [];
const placeholders = [];
let placeholderIndex = 1;
for (const row of batch) {
const rowPlaceholders = [
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`,
`$${placeholderIndex++}`
];
placeholders.push(`(${rowPlaceholders.join(', ')})`);
values.push(
row.pid,
validateDate(row.date),
row.score,
row.score2,
row.qty_in_baskets,
row.qty_sold,
row.notifies_set,
row.visibility_score,
row.health_score,
row.sold_view_score
);
}
// Execute direct batch insert without conflict checking
await localConnection.query(`
INSERT INTO imported_product_stat_history (
pid, date, score, score2, qty_in_baskets, qty_sold, notifies_set,
visibility_score, health_score, sold_view_score
)
VALUES ${placeholders.join(',\n')}
`, values);
// All inserts are new records when using this approach
recordsAdded += batch.length;
} catch (error) {
console.error(`Error in batch insert of product_stat_history at offset ${i}:`, error);
errors.push({
table: 'imported_product_stat_history',
batchOffset: i,
batchSize: batch.length,
error: error.message
});
}
}
totalProcessed += rows.length;
offset += rows.length;
// Calculate current rate every 10 seconds or 100,000 records
const now = Date.now();
if (now - lastRateCheck > 10000 || totalProcessed - lastProcessed > 100000) {
const timeElapsed = (now - lastRateCheck) / 1000; // seconds
const recordsProcessed = totalProcessed - lastProcessed;
const currentRate = Math.round(recordsProcessed / timeElapsed);
console.log(`Current import rate: ${currentRate} records/second`);
lastRateCheck = now;
lastProcessed = totalProcessed;
}
// Update progress
outputProgress({
status: "running",
operation: "Historical data import - Product Stat History",
message: `Processed ${totalProcessed} of ${totalCount} records`,
current: totalProcessed,
total: totalCount,
elapsed: formatElapsedTime(startTime),
remaining: estimateRemaining(startTime, totalProcessed, totalCount),
rate: calculateRate(startTime, totalProcessed)
});
} catch (error) {
console.error('Error in batch import of product_stat_history:', error);
errors.push({
table: 'imported_product_stat_history',
error: error.message,
offset: offset,
batchSize: batchSize
});
// Try to continue with next batch
offset += batchSize;
}
}
// Update sync status
await localConnection.query(`
INSERT INTO sync_status (table_name, last_sync_timestamp)
VALUES ('imported_product_stat_history', NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_sync_timestamp = NOW()
`);
return { recordsAdded, recordsUpdated, totalProcessed, errors };
} catch (error) {
console.error('Error in product stat history import:', error);
return {
recordsAdded,
recordsUpdated,
totalProcessed,
errors: [...errors, {
table: 'imported_product_stat_history',
error: error.message
}]
};
}
}
module.exports = importHistoricalData;