Skip to content

Commit 7b564a8

Browse files
committed
fix: improve alarm handling in SourceScraperDO and update model version in processArticles workflow
1 parent 067f7df commit 7b564a8

File tree

2 files changed

+161
-145
lines changed

2 files changed

+161
-145
lines changed

apps/backend/src/durable_objects/sourceScraperDO.ts

Lines changed: 160 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -192,159 +192,175 @@ export class SourceScraperDO extends DurableObject<Env> {
192192
* 6. Schedules the next alarm
193193
*/
194194
async alarm(): Promise<void> {
195-
const state = await this.ctx.storage.get<SourceState>('state');
196-
if (!state) {
197-
this.logger.error('State not found in alarm. Cannot proceed.');
198-
// Maybe schedule alarm far in the future or log an error to an external system
199-
// We cannot proceed without state.
200-
return;
201-
}
202-
203-
// Validate state to protect against corruption
204-
const validatedState = SourceStateSchema.safeParse(state);
205-
if (!validatedState.success) {
206-
const logger = this.logger.child({ operation: 'alarm', validation_error: validatedState.error.format() });
207-
logger.error('State validation failed. Cannot proceed with corrupted state.');
208-
// Schedule a far-future alarm to prevent continuous failed attempts
209-
await this.ctx.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
210-
return;
211-
}
195+
// Keep logger instance outside try block if possible,
196+
// but create child logger inside if needed after state is fetched.
197+
let alarmLogger = this.logger.child({ operation: 'alarm' }); // Initial logger
198+
199+
try {
200+
const state = await this.ctx.storage.get<SourceState>('state');
201+
if (state === undefined) {
202+
this.logger.error('State not found in alarm. Cannot proceed.');
203+
// Maybe schedule alarm far in the future or log an error to an external system
204+
// We cannot proceed without state.
205+
return;
206+
}
212207

213-
const { sourceId, url, scrapeFrequencyTier } = validatedState.data;
214-
const alarmLogger = this.logger.child({ operation: 'alarm', source_id: sourceId, url });
215-
alarmLogger.info('Alarm triggered');
216-
217-
const interval = tierIntervals[scrapeFrequencyTier] || DEFAULT_INTERVAL;
218-
219-
// --- Schedule the *next* regular alarm run immediately ---
220-
// This ensures that even if this current run fails completely after all retries,
221-
// the process will attempt again later according to its schedule.
222-
const nextScheduledAlarmTime = Date.now() + interval;
223-
await this.ctx.storage.setAlarm(nextScheduledAlarmTime);
224-
alarmLogger.info('Next regular alarm scheduled', { next_alarm: new Date(nextScheduledAlarmTime).toISOString() });
225-
226-
// --- Workflow Step 1: Fetch Feed with Retries ---
227-
const fetchLogger = alarmLogger.child({ step: 'Fetch' });
228-
const fetchResult = await attemptWithRetries(
229-
async () => {
230-
const respResult = await tryCatchAsync(
231-
fetch(url, {
232-
method: 'GET',
233-
headers: {
234-
'User-Agent': userAgents[Math.floor(Math.random() * userAgents.length)],
235-
Referer: 'https://www.google.com/',
236-
},
237-
})
238-
);
239-
if (respResult.isErr()) return err(respResult.error as Error);
240-
// Ensure response is OK before trying to read body
241-
if (respResult.value.ok === false) {
242-
return err(new Error(`Fetch failed with status: ${respResult.value.status} ${respResult.value.statusText}`));
243-
}
244-
// Read body - this can also fail
245-
const textResult = await tryCatchAsync(respResult.value.text());
246-
if (textResult.isErr()) return err(textResult.error as Error);
247-
return ok(textResult.value);
248-
},
249-
MAX_STEP_RETRIES,
250-
INITIAL_RETRY_DELAY_MS,
251-
fetchLogger
252-
);
253-
if (fetchResult.isErr()) {
254-
// Error already logged by attemptWithRetries
255-
return;
256-
}
257-
const feedText = fetchResult.value;
258-
259-
// --- Workflow Step 2: Parse Feed with Retries ---
260-
const parseLogger = alarmLogger.child({ step: 'Parse' });
261-
const parseResult = await attemptWithRetries(
262-
async () => parseRSSFeed(feedText),
263-
MAX_STEP_RETRIES,
264-
INITIAL_RETRY_DELAY_MS,
265-
parseLogger
266-
);
267-
if (parseResult.isErr()) {
268-
// Error already logged by attemptWithRetries
269-
return;
270-
}
271-
const articles = parseResult.value; // Type: ParsedArticle[]
208+
// Validate state to protect against corruption
209+
const validatedState = SourceStateSchema.safeParse(state);
210+
if (validatedState.success === false) {
211+
const logger = this.logger.child({ operation: 'alarm', validation_error: validatedState.error.format() });
212+
logger.error('State validation failed. Cannot proceed with corrupted state.');
213+
// Schedule a far-future alarm to prevent continuous failed attempts
214+
await this.ctx.storage.setAlarm(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
215+
return;
216+
}
272217

273-
// --- Filter Articles ---
274-
const now = Date.now();
218+
const { sourceId, url, scrapeFrequencyTier } = validatedState.data;
219+
const alarmLogger = this.logger.child({ operation: 'alarm', source_id: sourceId, url });
220+
alarmLogger.info('Alarm triggered');
221+
222+
const interval = tierIntervals[scrapeFrequencyTier] || DEFAULT_INTERVAL;
223+
224+
// --- Schedule the *next* regular alarm run immediately ---
225+
// This ensures that even if this current run fails completely after all retries,
226+
// the process will attempt again later according to its schedule.
227+
const nextScheduledAlarmTime = Date.now() + interval;
228+
await this.ctx.storage.setAlarm(nextScheduledAlarmTime);
229+
alarmLogger.info('Next regular alarm scheduled', { next_alarm: new Date(nextScheduledAlarmTime).toISOString() });
230+
231+
// --- Workflow Step 1: Fetch Feed with Retries ---
232+
const fetchLogger = alarmLogger.child({ step: 'Fetch' });
233+
const fetchResult = await attemptWithRetries(
234+
async () => {
235+
const respResult = await tryCatchAsync(
236+
fetch(url, {
237+
method: 'GET',
238+
headers: {
239+
'User-Agent': userAgents[Math.floor(Math.random() * userAgents.length)],
240+
Referer: 'https://www.google.com/',
241+
},
242+
})
243+
);
244+
if (respResult.isErr()) return err(respResult.error as Error);
245+
// Ensure response is OK before trying to read body
246+
if (respResult.value.ok === false) {
247+
return err(
248+
new Error(`Fetch failed with status: ${respResult.value.status} ${respResult.value.statusText}`)
249+
);
250+
}
251+
// Read body - this can also fail
252+
const textResult = await tryCatchAsync(respResult.value.text());
253+
if (textResult.isErr()) return err(textResult.error as Error);
254+
return ok(textResult.value);
255+
},
256+
MAX_STEP_RETRIES,
257+
INITIAL_RETRY_DELAY_MS,
258+
fetchLogger
259+
);
260+
if (fetchResult.isErr()) {
261+
// Error already logged by attemptWithRetries
262+
return;
263+
}
264+
const feedText = fetchResult.value;
265+
266+
// --- Workflow Step 2: Parse Feed with Retries ---
267+
const parseLogger = alarmLogger.child({ step: 'Parse' });
268+
const parseResult = await attemptWithRetries(
269+
async () => parseRSSFeed(feedText),
270+
MAX_STEP_RETRIES,
271+
INITIAL_RETRY_DELAY_MS,
272+
parseLogger
273+
);
274+
if (parseResult.isErr()) {
275+
// Error already logged by attemptWithRetries
276+
return;
277+
}
278+
const articles = parseResult.value; // Type: ParsedArticle[]
279+
280+
// --- Filter Articles ---
281+
const now = Date.now();
282+
283+
const newArticles = articles.map(article => ({
284+
sourceId: sourceId,
285+
url: article.link,
286+
title: article.title,
287+
publishDate: article.pubDate,
288+
}));
289+
290+
if (newArticles.length === 0) {
291+
alarmLogger.info('No new articles found worth inserting');
292+
293+
// Successfully processed, update lastChecked
294+
validatedState.data.lastChecked = now;
295+
await this.ctx.storage.put('state', validatedState.data);
296+
alarmLogger.info('Updated lastChecked', { timestamp: new Date(now).toISOString() });
297+
return;
298+
}
275299

276-
const newArticles = articles.map(article => ({
277-
sourceId: sourceId,
278-
url: article.link,
279-
title: article.title,
280-
publishDate: article.pubDate,
281-
}));
300+
alarmLogger.info('Found new articles to insert', { article_count: newArticles.length });
301+
302+
// --- Workflow Step 3: Insert Articles with Retries ---
303+
const dbLogger = alarmLogger.child({ step: 'DB Insert' });
304+
const insertResult = await attemptWithRetries(
305+
async () =>
306+
ResultAsync.fromPromise(
307+
getDb(this.env.DATABASE_URL)
308+
.insert($articles)
309+
.values(newArticles)
310+
.onConflictDoNothing({ target: $articles.url }) // Make sure 'url' is unique constraint name or column
311+
.returning({ insertedId: $articles.id }),
312+
e => (e instanceof Error ? e : new Error(`DB Insert failed: ${String(e)}`)) // Error mapper
313+
),
314+
MAX_STEP_RETRIES,
315+
INITIAL_RETRY_DELAY_MS,
316+
dbLogger
317+
);
318+
if (insertResult.isErr()) {
319+
// Error already logged by attemptWithRetries
320+
return;
321+
}
282322

283-
if (newArticles.length === 0) {
284-
alarmLogger.info('No new articles found worth inserting');
323+
const insertedRows = insertResult.value; // Type: { insertedId: number }[]
324+
dbLogger.info(`DB Insert completed`, { affected_rows: insertedRows.length });
325+
326+
// --- Send to Queue (No Retry here, relies on Queue's built-in retries/DLQ) ---
327+
if (insertedRows.length > 0 && this.env.ARTICLE_PROCESSING_QUEUE) {
328+
const insertedIds = insertedRows.map(r => r.insertedId);
329+
const BATCH_SIZE_LIMIT = 100; // Adjust as needed
330+
331+
const queueLogger = alarmLogger.child({ step: 'Queue', total_ids: insertedIds.length });
332+
queueLogger.info('Sending IDs to queue');
333+
334+
for (let i = 0; i < insertedIds.length; i += BATCH_SIZE_LIMIT) {
335+
const batch = insertedIds.slice(i, i + BATCH_SIZE_LIMIT);
336+
queueLogger.debug('Sending batch to queue', { batch_size: batch.length, batch_index: i / BATCH_SIZE_LIMIT });
337+
338+
this.ctx.waitUntil(
339+
this.env.ARTICLE_PROCESSING_QUEUE.send({ articles_id: batch }).catch(queueError => {
340+
queueLogger.error(
341+
'Failed to send batch to queue',
342+
{ batch_index: i / BATCH_SIZE_LIMIT, batch_size: batch.length },
343+
queueError instanceof Error ? queueError : new Error(String(queueError))
344+
);
345+
})
346+
);
347+
}
348+
}
285349

286-
// Successfully processed, update lastChecked
350+
// --- Final Step: Update lastChecked only on full success ---
351+
alarmLogger.info('All steps successful. Updating lastChecked');
287352
validatedState.data.lastChecked = now;
288353
await this.ctx.storage.put('state', validatedState.data);
289354
alarmLogger.info('Updated lastChecked', { timestamp: new Date(now).toISOString() });
290-
return;
291-
}
292-
293-
alarmLogger.info('Found new articles to insert', { article_count: newArticles.length });
294-
295-
// --- Workflow Step 3: Insert Articles with Retries ---
296-
const dbLogger = alarmLogger.child({ step: 'DB Insert' });
297-
const insertResult = await attemptWithRetries(
298-
async () =>
299-
ResultAsync.fromPromise(
300-
getDb(this.env.DATABASE_URL)
301-
.insert($articles)
302-
.values(newArticles)
303-
.onConflictDoNothing({ target: $articles.url }) // Make sure 'url' is unique constraint name or column
304-
.returning({ insertedId: $articles.id }),
305-
e => (e instanceof Error ? e : new Error(`DB Insert failed: ${String(e)}`)) // Error mapper
306-
),
307-
MAX_STEP_RETRIES,
308-
INITIAL_RETRY_DELAY_MS,
309-
dbLogger
310-
);
311-
if (insertResult.isErr()) {
312-
// Error already logged by attemptWithRetries
313-
return;
314-
}
315-
316-
const insertedRows = insertResult.value; // Type: { insertedId: number }[]
317-
dbLogger.info(`DB Insert completed`, { affected_rows: insertedRows.length });
318-
319-
// --- Send to Queue (No Retry here, relies on Queue's built-in retries/DLQ) ---
320-
if (insertedRows.length > 0 && this.env.ARTICLE_PROCESSING_QUEUE) {
321-
const insertedIds = insertedRows.map(r => r.insertedId);
322-
const BATCH_SIZE_LIMIT = 100; // Adjust as needed
323-
324-
const queueLogger = alarmLogger.child({ step: 'Queue', total_ids: insertedIds.length });
325-
queueLogger.info('Sending IDs to queue');
326-
327-
for (let i = 0; i < insertedIds.length; i += BATCH_SIZE_LIMIT) {
328-
const batch = insertedIds.slice(i, i + BATCH_SIZE_LIMIT);
329-
queueLogger.debug('Sending batch to queue', { batch_size: batch.length, batch_index: i / BATCH_SIZE_LIMIT });
330-
331-
this.ctx.waitUntil(
332-
this.env.ARTICLE_PROCESSING_QUEUE.send({ articles_id: batch }).catch(queueError => {
333-
queueLogger.error(
334-
'Failed to send batch to queue',
335-
{ batch_index: i / BATCH_SIZE_LIMIT, batch_size: batch.length },
336-
queueError instanceof Error ? queueError : new Error(String(queueError))
337-
);
338-
})
339-
);
340-
}
355+
} catch (error) {
356+
// Use the latest available logger instance (might be base or detailed)
357+
const errorLogger = alarmLogger || this.logger;
358+
errorLogger.error(
359+
'Unhandled exception occurred within alarm handler',
360+
{ error_name: error instanceof Error ? error.name : 'UnknownError' },
361+
error instanceof Error ? error : new Error(String(error)) // Log the error object/stack
362+
);
341363
}
342-
343-
// --- Final Step: Update lastChecked only on full success ---
344-
alarmLogger.info('All steps successful. Updating lastChecked');
345-
validatedState.data.lastChecked = now;
346-
await this.ctx.storage.put('state', validatedState.data);
347-
alarmLogger.info('Updated lastChecked', { timestamp: new Date(now).toISOString() });
348364
}
349365

350366
/**

apps/backend/src/workflows/processArticles.workflow.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ export class ProcessArticles extends WorkflowEntrypoint<Env, ProcessArticlesPara
245245
{ retries: { limit: 3, delay: '2 seconds', backoff: 'exponential' }, timeout: '1 minute' },
246246
async () => {
247247
const response = await generateObject({
248-
model: google('gemini-2.0-flash'),
248+
model: google('gemini-1.5-flash-8b-001'),
249249
temperature: 0,
250250
prompt: getArticleAnalysisPrompt(article.title, article.text),
251251
schema: articleAnalysisSchema,

0 commit comments

Comments
 (0)