Add campaigns component and services

This commit is contained in:
2024-12-27 01:32:48 -05:00
parent aaa95a5b9e
commit e8b5f8ce07
13 changed files with 883 additions and 24 deletions

View File

@@ -12,6 +12,7 @@
"dotenv": "^16.4.7",
"esm": "^3.2.25",
"express": "^4.18.2",
"express-rate-limit": "^7.5.0",
"ioredis": "^5.4.1",
"luxon": "^3.5.0",
"node-fetch": "^3.3.2",
@@ -711,6 +712,21 @@
"url": "https://opencollective.com/express"
}
},
"node_modules/express-rate-limit": {
"version": "7.5.0",
"resolved": "https://registry.npmjs.org/express-rate-limit/-/express-rate-limit-7.5.0.tgz",
"integrity": "sha512-eB5zbQh5h+VenMPM3fh+nw1YExi5nMr6HUCR62ELSP11huvxm/Uir1H1QEyTkk5QX6A58pX6NmaTMceKZ0Eodg==",
"license": "MIT",
"engines": {
"node": ">= 16"
},
"funding": {
"url": "https://github.com/sponsors/express-rate-limit"
},
"peerDependencies": {
"express": "^4.11 || 5 || ^5.0.0-beta.1"
}
},
"node_modules/fast-equals": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/fast-equals/-/fast-equals-5.0.1.tgz",

View File

@@ -13,6 +13,7 @@
"dotenv": "^16.4.7",
"esm": "^3.2.25",
"express": "^4.18.2",
"express-rate-limit": "^7.5.0",
"ioredis": "^5.4.1",
"luxon": "^3.5.0",
"node-fetch": "^3.3.2",

View File

@@ -0,0 +1,71 @@
import express from 'express';
import { CampaignsService } from '../services/campaigns.service.js';
import { TimeManager } from '../utils/time.utils.js';
export function createCampaignsRouter(apiKey, apiRevision) {
const router = express.Router();
const timeManager = new TimeManager();
const campaignsService = new CampaignsService(apiKey, apiRevision);
// Get campaigns with optional filtering
router.get('/', async (req, res) => {
try {
const params = {
pageSize: parseInt(req.query.pageSize) || 50,
sort: req.query.sort || '-send_time',
status: req.query.status,
startDate: req.query.startDate,
endDate: req.query.endDate,
pageCursor: req.query.pageCursor
};
console.log('[Campaigns Route] Fetching campaigns with params:', params);
const data = await campaignsService.getCampaigns(params);
console.log('[Campaigns Route] Success:', {
count: data.data?.length || 0
});
res.json(data);
} catch (error) {
console.error('[Campaigns Route] Error:', error);
res.status(500).json({
status: 'error',
message: error.message,
details: error.response?.data || null
});
}
});
// Get campaigns by time range
router.get('/:timeRange', async (req, res) => {
try {
const { timeRange } = req.params;
const { status } = req.query;
let result;
if (timeRange === 'custom') {
const { startDate, endDate } = req.query;
if (!startDate || !endDate) {
return res.status(400).json({ error: 'Custom range requires startDate and endDate' });
}
result = await campaignsService.getCampaigns({
startDate,
endDate,
status
});
} else {
result = await campaignsService.getCampaignsByTimeRange(
timeRange,
{ status }
);
}
res.json(result);
} catch (error) {
console.error("[Campaigns Route] Error:", error);
res.status(500).json({ error: error.message });
}
});
return router;
}

View File

@@ -1,15 +1,19 @@
import express from 'express';
import { createMetricsRoutes } from './metrics.routes.js';
import { createEventsRouter } from './events.routes.js';
import { createCampaignsRouter } from './campaigns.routes.js';
import { createReportingRouter } from './reporting.routes.js';
export function createApiRouter(apiKey, apiRevision) {
const router = express.Router();
// Mount metrics routes
router.use('/metrics', createMetricsRoutes(apiKey, apiRevision));
// Mount events routes
router.use('/events', createEventsRouter(apiKey, apiRevision));
// Mount campaigns routes
router.use('/campaigns', createCampaignsRouter(apiKey, apiRevision));
// Mount reporting routes
router.use('/reporting', createReportingRouter(apiKey, apiRevision));
return router;
}

View File

@@ -0,0 +1,72 @@
import express from 'express';
import { ReportingService } from '../services/reporting.service.js';
import { TimeManager } from '../utils/time.utils.js';
export function createReportingRouter(apiKey, apiRevision) {
const router = express.Router();
const timeManager = new TimeManager();
const reportingService = new ReportingService(apiKey, apiRevision);
// Get campaign reports with optional filtering
router.get('/campaigns', async (req, res) => {
try {
const { timeRange, startDate, endDate } = req.query;
let params = {};
if (timeRange === 'custom') {
if (!startDate || !endDate) {
return res.status(400).json({ error: 'Custom range requires startDate and endDate' });
}
params = { startDate, endDate };
} else {
params = { timeRange: timeRange || 'last30days' };
}
console.log('[Reporting Route] Fetching campaign reports with params:', params);
const data = await reportingService.getEnrichedCampaignReports(params);
console.log('[Reporting Route] Success:', {
count: data.data?.length || 0
});
res.json(data);
} catch (error) {
console.error('[Reporting Route] Error:', error);
res.status(500).json({
status: 'error',
message: error.message,
details: error.response?.data || null
});
}
});
// Get campaign reports by time range
router.get('/campaigns/:timeRange', async (req, res) => {
try {
const { timeRange } = req.params;
let result;
if (timeRange === 'custom') {
const { startDate, endDate } = req.query;
if (!startDate || !endDate) {
return res.status(400).json({ error: 'Custom range requires startDate and endDate' });
}
result = await reportingService.getEnrichedCampaignReports({
startDate,
endDate
});
} else {
result = await reportingService.getEnrichedCampaignReports({
timeRange
});
}
res.json(result);
} catch (error) {
console.error("[Reporting Route] Error:", error);
res.status(500).json({ error: error.message });
}
});
return router;
}

View File

@@ -1,6 +1,7 @@
import express from 'express';
import cors from 'cors';
import dotenv from 'dotenv';
import rateLimit from 'express-rate-limit';
import { createApiRouter } from './routes/index.js';
import path from 'path';
import { fileURLToPath } from 'url';
@@ -26,6 +27,21 @@ console.log('[Server] Environment variables loaded:', {
const app = express();
const port = process.env.KLAVIYO_PORT || 3004;
// Rate limiting for reporting endpoints
const reportingLimiter = rateLimit({
windowMs: 10 * 60 * 1000, // 10 minutes
max: 10, // limit each IP to 10 requests per windowMs
message: 'Too many requests to reporting endpoint, please try again later',
keyGenerator: (req) => {
// Use a combination of IP and endpoint for more granular control
return `${req.ip}-reporting`;
},
skip: (req) => {
// Only apply to campaign-values-reports endpoint
return !req.path.includes('campaign-values-reports');
}
});
// Middleware
app.use(cors());
app.use(express.json());
@@ -36,6 +52,9 @@ app.use((req, res, next) => {
next();
});
// Apply rate limiting to reporting endpoints
app.use('/api/klaviyo/reporting', reportingLimiter);
// Create and mount API routes
const apiRouter = createApiRouter(
process.env.KLAVIYO_API_KEY,

View File

@@ -0,0 +1,206 @@
import fetch from 'node-fetch';
import { TimeManager } from '../utils/time.utils.js';
import { RedisService } from './redis.service.js';
export class CampaignsService {
constructor(apiKey, apiRevision) {
this.apiKey = apiKey;
this.apiRevision = apiRevision;
this.baseUrl = 'https://a.klaviyo.com/api';
this.timeManager = new TimeManager();
this.redisService = new RedisService();
}
async getCampaigns(params = {}) {
try {
// Add request debouncing
const requestKey = JSON.stringify(params);
if (this._pendingRequests && this._pendingRequests[requestKey]) {
return this._pendingRequests[requestKey];
}
// Try to get from cache first
const cacheKey = this.redisService._getCacheKey('campaigns', params);
let cachedData = null;
try {
cachedData = await this.redisService.get(`${cacheKey}:raw`);
if (cachedData) {
return cachedData;
}
} catch (cacheError) {
console.warn('[CampaignsService] Cache error:', cacheError);
}
this._pendingRequests = this._pendingRequests || {};
this._pendingRequests[requestKey] = (async () => {
let allCampaigns = [];
let nextCursor = params.pageCursor;
let pageCount = 0;
const filter = params.filter || this._buildFilter(params);
do {
const queryParams = new URLSearchParams();
if (filter) {
queryParams.append('filter', filter);
}
queryParams.append('sort', params.sort || '-send_time');
if (nextCursor) {
queryParams.append('page[cursor]', nextCursor);
}
const url = `${this.baseUrl}/campaigns?${queryParams.toString()}`;
try {
const response = await fetch(url, {
method: 'GET',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': `Klaviyo-API-Key ${this.apiKey}`,
'revision': this.apiRevision
}
});
if (!response.ok) {
const errorData = await response.json();
console.error('[CampaignsService] API Error:', errorData);
throw new Error(`Klaviyo API error: ${response.status} ${response.statusText}`);
}
const responseData = await response.json();
allCampaigns = allCampaigns.concat(responseData.data || []);
pageCount++;
nextCursor = responseData.links?.next ?
new URL(responseData.links.next).searchParams.get('page[cursor]') : null;
if (nextCursor) {
await new Promise(resolve => setTimeout(resolve, 50));
}
} catch (fetchError) {
console.error('[CampaignsService] Fetch error:', fetchError);
throw fetchError;
}
} while (nextCursor);
const transformedCampaigns = this._transformCampaigns(allCampaigns);
const result = {
data: transformedCampaigns,
meta: {
total_count: transformedCampaigns.length,
page_count: pageCount
}
};
try {
const ttl = this.redisService._getTTL(params.timeRange);
await this.redisService.set(`${cacheKey}:raw`, result, ttl);
} catch (cacheError) {
console.warn('[CampaignsService] Cache set error:', cacheError);
}
delete this._pendingRequests[requestKey];
return result;
})();
return await this._pendingRequests[requestKey];
} catch (error) {
console.error('[CampaignsService] Error fetching campaigns:', error);
throw error;
}
}
_buildFilter(params) {
const filters = [];
if (params.startDate && params.endDate) {
const startUtc = this.timeManager.formatForAPI(params.startDate);
const endUtc = this.timeManager.formatForAPI(params.endDate);
filters.push(`greater-or-equal(send_time,${startUtc})`);
filters.push(`less-than(send_time,${endUtc})`);
}
if (params.status) {
filters.push(`equals(status,"${params.status}")`);
}
if (params.customFilters) {
filters.push(...params.customFilters);
}
return filters.length > 0 ? (filters.length > 1 ? `and(${filters.join(',')})` : filters[0]) : null;
}
async getCampaignsByTimeRange(timeRange, options = {}) {
const range = this.timeManager.getDateRange(timeRange);
if (!range) {
throw new Error('Invalid time range specified');
}
const params = {
timeRange,
startDate: range.start.toISO(),
endDate: range.end.toISO(),
...options
};
// Try to get from cache first
const cacheKey = this.redisService._getCacheKey('campaigns', params);
let cachedData = null;
try {
cachedData = await this.redisService.get(`${cacheKey}:raw`);
if (cachedData) {
return cachedData;
}
} catch (cacheError) {
console.warn('[CampaignsService] Cache error:', cacheError);
}
return this.getCampaigns(params);
}
_transformCampaigns(campaigns) {
if (!Array.isArray(campaigns)) {
console.warn('[CampaignsService] Campaigns is not an array:', campaigns);
return [];
}
return campaigns.map(campaign => {
try {
const stats = campaign.attributes?.campaign_message?.stats || {};
return {
id: campaign.id,
name: campaign.attributes?.name || "Unnamed Campaign",
subject: campaign.attributes?.campaign_message?.subject || "",
send_time: campaign.attributes?.send_time,
stats: {
delivery_rate: stats.delivery_rate || 0,
delivered: stats.delivered || 0,
recipients: stats.recipients || 0,
open_rate: stats.open_rate || 0,
opens_unique: stats.opens_unique || 0,
opens: stats.opens || 0,
clicks_unique: stats.clicks_unique || 0,
click_rate: stats.click_rate || 0,
click_to_open_rate: stats.click_to_open_rate || 0,
conversion_value: stats.conversion_value || 0,
conversion_uniques: stats.conversion_uniques || 0
}
};
} catch (error) {
console.error('[CampaignsService] Error transforming campaign:', error, campaign);
return {
id: campaign.id || 'unknown',
name: 'Error Processing Campaign',
stats: {}
};
}
});
}
}

View File

@@ -0,0 +1,237 @@
import fetch from 'node-fetch';
import { TimeManager } from '../utils/time.utils.js';
import { RedisService } from './redis.service.js';
const METRIC_IDS = {
PLACED_ORDER: 'Y8cqcF'
};
export class ReportingService {
constructor(apiKey, apiRevision) {
this.apiKey = apiKey;
this.apiRevision = apiRevision;
this.baseUrl = 'https://a.klaviyo.com/api';
this.timeManager = new TimeManager();
this.redisService = new RedisService();
this._pendingReportRequest = null;
}
async getCampaignReports(params = {}) {
try {
// Check if there's a pending request
if (this._pendingReportRequest) {
console.log('[ReportingService] Using pending campaign report request');
return this._pendingReportRequest;
}
// Try to get from cache first
const cacheKey = this.redisService._getCacheKey('campaign_reports', params);
let cachedData = null;
try {
cachedData = await this.redisService.get(`${cacheKey}:raw`);
if (cachedData) {
console.log('[ReportingService] Using cached campaign report data');
return cachedData;
}
} catch (cacheError) {
console.warn('[ReportingService] Cache error:', cacheError);
}
// Create new request promise
this._pendingReportRequest = (async () => {
console.log('[ReportingService] Fetching fresh campaign report data');
const range = this.timeManager.getDateRange(params.timeRange || 'last30days');
const payload = {
data: {
type: "campaign-values-report",
attributes: {
timeframe: {
start: range.start.toISO(),
end: range.end.toISO()
},
statistics: [
"delivery_rate",
"delivered",
"recipients",
"open_rate",
"opens_unique",
"opens",
"click_rate",
"clicks_unique",
"click_to_open_rate",
"conversion_value",
"conversion_uniques"
],
conversion_metric_id: METRIC_IDS.PLACED_ORDER,
filter: 'equals(send_channel,"email")'
}
}
};
const response = await fetch(`${this.baseUrl}/campaign-values-reports`, {
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': `Klaviyo-API-Key ${this.apiKey}`,
'revision': this.apiRevision
},
body: JSON.stringify(payload)
});
if (!response.ok) {
const errorData = await response.json();
console.error('[ReportingService] API Error:', errorData);
throw new Error(`Klaviyo API error: ${response.status} ${response.statusText}`);
}
const reportData = await response.json();
// Cache the response for 10 minutes
try {
await this.redisService.set(`${cacheKey}:raw`, reportData, 600);
} catch (cacheError) {
console.warn('[ReportingService] Cache set error:', cacheError);
}
return reportData;
})();
const result = await this._pendingReportRequest;
this._pendingReportRequest = null;
return result;
} catch (error) {
console.error('[ReportingService] Error fetching campaign reports:', error);
this._pendingReportRequest = null;
throw error;
}
}
async getCampaignDetails(campaignIds = []) {
try {
if (!Array.isArray(campaignIds) || campaignIds.length === 0) {
return [];
}
// Process in batches of 5 to avoid rate limits
const batchSize = 5;
const campaignDetails = [];
for (let i = 0; i < campaignIds.length; i += batchSize) {
const batch = campaignIds.slice(i, i + batchSize);
const batchPromises = batch.map(async (campaignId) => {
try {
// Try to get from cache first
const cacheKey = this.redisService._getCacheKey('campaign_details', { campaignId });
const cachedData = await this.redisService.get(`${cacheKey}:raw`);
if (cachedData) {
return cachedData;
}
const response = await fetch(
`${this.baseUrl}/campaigns/${campaignId}?include=campaign-messages`, {
method: 'GET',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': `Klaviyo-API-Key ${this.apiKey}`,
'revision': this.apiRevision
}
}
);
if (!response.ok) {
console.error(`Error fetching campaign ${campaignId}:`, response.statusText);
return null;
}
const campaignData = await response.json();
// Cache individual campaign data for 1 hour
await this.redisService.set(`${cacheKey}:raw`, campaignData, 3600);
return campaignData;
} catch (error) {
console.error(`Error processing campaign ${campaignId}:`, error);
return null;
}
});
const batchResults = await Promise.all(batchPromises);
campaignDetails.push(...batchResults.filter(Boolean));
// Add delay between batches if not the last batch
if (i + batchSize < campaignIds.length) {
await new Promise(resolve => setTimeout(resolve, 1000));
}
}
return campaignDetails;
} catch (error) {
console.error('[ReportingService] Error fetching campaign details:', error);
throw error;
}
}
async getEnrichedCampaignReports(params = {}) {
try {
// Get campaign reports first
const reportData = await this.getCampaignReports(params);
// Extract campaign IDs from the report
const campaignIds = reportData.data?.attributes?.results?.map(
result => result.groupings?.campaign_id
).filter(Boolean) || [];
// Get campaign details including messages
const campaignDetails = await this.getCampaignDetails(campaignIds);
// Merge report data with campaign details
const enrichedData = reportData.data?.attributes?.results?.map(report => {
const campaignId = report.groupings?.campaign_id;
const campaignDetail = campaignDetails.find(
detail => detail.data?.id === campaignId
);
const campaignMessages = campaignDetail?.included?.filter(
item => item.type === 'campaign-message'
) || [];
return {
id: campaignId,
name: campaignDetail?.data?.attributes?.name || "Unnamed Campaign",
subject: campaignMessages[0]?.attributes?.content?.subject || "",
send_time: report.groupings?.send_time,
stats: {
delivery_rate: report.statistics?.delivery_rate || 0,
delivered: report.statistics?.delivered || 0,
recipients: report.statistics?.recipients || 0,
open_rate: report.statistics?.open_rate || 0,
opens_unique: report.statistics?.opens_unique || 0,
opens: report.statistics?.opens || 0,
click_rate: report.statistics?.click_rate || 0,
clicks_unique: report.statistics?.clicks_unique || 0,
click_to_open_rate: report.statistics?.click_to_open_rate || 0,
conversion_value: report.statistics?.conversion_value || 0,
conversion_uniques: report.statistics?.conversion_uniques || 0
}
};
}).filter(Boolean) || [];
return {
data: enrichedData,
meta: {
total_count: enrichedData.length
}
};
} catch (error) {
console.error('[ReportingService] Error getting enriched campaign reports:', error);
throw error;
}
}
}