|
| 1 | +# Meridian Backend Worker (`@meridian/backend`) |
| 2 | + |
| 3 | +This Cloudflare Worker application forms the core data ingestion, processing, and API layer for the Meridian project. It handles fetching news sources, orchestrating article content scraping, performing AI analysis, managing data persistence, and providing API endpoints. |
| 4 | + |
| 5 | +It leverages several Cloudflare platform features for resilience and scalability: |
| 6 | + |
| 7 | +- **Workers:** Runs the Hono API server, queue consumer logic, Workflow triggers, and Durable Object interactions. |
| 8 | +- **Durable Objects (`SourceScraperDO`):** Manages the state and scheduled fetching for individual news sources via Alarms. |
| 9 | +- **Queues (`article-processing-queue`):** Decouples the initial lightweight source check from the more intensive article processing. |
| 10 | +- **Workflows (`ProcessArticles`):** Provides durable, multi-step execution for scraping, analyzing, and storing article content, handling retries automatically. |
| 11 | +- **R2:** Stores full article text content. |
| 12 | + |
| 13 | +## Key Components |
| 14 | + |
| 15 | +1. **Hono API Server (`app.ts`):** |
| 16 | + |
| 17 | + - Provides HTTP endpoints for: |
| 18 | + - Managing reports (`/reports`). |
| 19 | + - Managing sources (`/sources` - e.g., deletion). |
| 20 | + - Generating OpenGraph images (`/openGraph`). |
| 21 | + - Internal/Admin operations (DO initialization `POST /do/admin/initialize-dos`). |
| 22 | + - Health check (`/ping`). |
| 23 | + - Handles routing requests to specific `SourceScraperDO` instances (`GET /do/source/:sourceId/*`). |
| 24 | + - Uses Bearer Token authentication (`API_TOKEN`) for protected routes (`hasValidAuthToken`). |
| 25 | + |
| 26 | +2. **Source Scraper Durable Object (`SourceScraperDO`):** |
| 27 | + |
| 28 | + - One instance per news source URL (`idFromName(url)`). |
| 29 | + - Uses Cloudflare Alarms (`ctx.storage.setAlarm`) for scheduled, periodic RSS feed checks based on frequency tiers. |
| 30 | + - Fetches and parses RSS feeds (`parseRSSFeed`), handling various formats and cleaning URLs/content. |
| 31 | + - Uses `ON CONFLICT DO NOTHING` to efficiently insert only new article metadata (URL, title, source ID, publish date) into the database. |
| 32 | + - Sends batches of newly inserted article database IDs to the `ARTICLE_PROCESSING_QUEUE`. |
| 33 | + - Implements robust retries with exponential backoff (`attemptWithRetries`) for fetching, parsing, and DB insertion. |
| 34 | + - Validates and stores its state (`SourceState`: sourceId, url, frequency, lastChecked) in Durable Object storage, protecting against corruption. |
| 35 | + - Includes a `destroy` method for cleanup. |
| 36 | + |
| 37 | +3. **Article Processing Queue (`article-processing-queue`):** |
| 38 | + |
| 39 | + - Receives messages containing batches of article IDs needing full processing. |
| 40 | + - The queue consumer (`queue` handler in `index.ts`) aggregates IDs from the batch. |
| 41 | + - Triggers the `ProcessArticles` Workflow to handle the actual processing for the aggregated IDs. |
| 42 | + - Configured with settings like `max_batch_size`, `max_retries`, and a Dead Letter Queue (`article-processing-dlq`). |
| 43 | + |
| 44 | +4. **Process Articles Workflow (`ProcessArticles`):** |
| 45 | + |
| 46 | + - Receives a list of article IDs from the queue consumer trigger. |
| 47 | + - Fetches necessary article details (URL, title, publish date) from the database, filtering for recent, unprocessed, non-failed articles. |
| 48 | + - Uses a `DomainRateLimiter` to manage scraping politeness and concurrency across different source domains. |
| 49 | + - Scrapes full article content using `step.do` for durable, retried execution: |
| 50 | + - Attempts direct `fetch` (`getArticleWithFetch`) first. |
| 51 | + - Falls back to the Cloudflare Browser Rendering API (`getArticleWithBrowser`) for tricky domains (`TRICKY_DOMAINS`) or initial fetch failures. This involves executing JavaScript snippets via `addScriptTag` to bypass cookie consents, paywalls, and clean the DOM before extraction. |
| 52 | + - Uses `@mozilla/readability` (`parseArticle`) to extract the core article text from the scraped HTML. |
| 53 | + - Handles PDF links by marking them as `SKIPPED_PDF` in the database. |
| 54 | + - Sends the extracted title and text to Google Gemini (`gemini-2.0-flash`) via `@ai-sdk/google` (`generateObject`) for structured analysis based on `articleAnalysis.prompt.ts` and `articleAnalysisSchema`. |
| 55 | + - Generates embeddings for the processed content using an external ML service (`createEmbeddings`). |
| 56 | + - Uploads the extracted article text to R2 (`ARTICLES_BUCKET`). |
| 57 | + - Updates the corresponding articles in the database with the analysis results (language, keywords, entities, summary, etc.), embedding vector, R2 key, and final status (`PROCESSED`), or marks them with a specific failure status (`FETCH_FAILED`, `RENDER_FAILED`, `AI_ANALYSIS_FAILED`, `EMBEDDING_FAILED`, `R2_UPLOAD_FAILED`) and `failReason`. |
| 58 | + - Leverages Workflow steps (`step.do`, `step.sleep`) for automatic retries, durability, and managing execution state. |
| 59 | + |
| 60 | +5. **Core Libraries & Utilities (`src/lib`):** |
| 61 | + |
| 62 | + - `articleFetchers.ts`: Contains logic for `getArticleWithFetch` and `getArticleWithBrowser`, including the browser rendering script definitions. |
| 63 | + - `embeddings.ts`: Interface for generating embeddings via the external ML service. |
| 64 | + - `logger.ts`: Simple structured JSON logger class for Cloudflare Logs. |
| 65 | + - `parsers.ts`: Includes `parseRSSFeed` and `parseArticle` (using `Readability` and `linkedom`). |
| 66 | + - `rateLimiter.ts`: Implements the `DomainRateLimiter`. |
| 67 | + - `tryCatchAsync.ts`: Utility for converting promise rejections to `neverthrow` Results. |
| 68 | + - `utils.ts`: Helper functions like `getDb` (with `prepare: false`), `hasValidAuthToken`, `generateSearchText`. |
| 69 | + |
| 70 | +6. **Integrations:** |
| 71 | + - **Database (`@meridian/database`):** Uses Drizzle ORM and `postgres.js` to interact with the PostgreSQL database. Configured with `prepare: false` for pooler compatibility. |
| 72 | + - **Cloudflare Browser Rendering API:** Used as a fallback mechanism for robust scraping. |
| 73 | + - **Google AI (Gemini):** Used for core article analysis. |
| 74 | + - **External ML Service:** Used via `ML_SERVICE_URL` for generating embeddings. |
| 75 | + |
| 76 | +## How It Works (High-Level Flow) |
| 77 | + |
| 78 | +1. **Initialization:** (If needed) The `/do/admin/initialize-dos` endpoint is called to create/update `SourceScraperDO` instances based on sources in the database. |
| 79 | +2. **Scheduled Fetch:** A `SourceScraperDO` instance's alarm triggers. |
| 80 | +3. **RSS Processing:** The DO fetches its RSS feed, parses it, and inserts basic metadata for new articles using `ON CONFLICT DO NOTHING`. |
| 81 | +4. **Queueing:** The DO sends the database IDs of newly inserted articles to the `ARTICLE_PROCESSING_QUEUE`. |
| 82 | +5. **Queue Consumption:** The Worker's `queue` handler receives a batch of article IDs. |
| 83 | +6. **Workflow Trigger:** The `queue` handler triggers the `ProcessArticles` Workflow with the batch of IDs. |
| 84 | +7. **Content Scraping & Parsing:** The Workflow fetches article details, scrapes the full content using the rate limiter and appropriate method (fetch/browser), and parses it using Readability. |
| 85 | +8. **AI Analysis & Embeddings:** The Workflow sends content to Gemini for analysis and generates embeddings via the ML service. |
| 86 | +9. **Storage:** The Workflow uploads article text to R2. |
| 87 | +10. **DB Update:** The Workflow updates the articles in the database with the analysis results, R2 key, embedding, and final status. |
| 88 | +11. **API Access:** The Hono API server allows querying processed data or performing management actions. |
| 89 | + |
| 90 | +## Configuration |
| 91 | + |
| 92 | +Configuration relies on `wrangler.jsonc` for infrastructure bindings and environment variables/secrets for credentials and runtime parameters. |
| 93 | + |
| 94 | +### `wrangler.jsonc` Highlights |
| 95 | + |
| 96 | +Ensure the following bindings and configurations are correctly set up: |
| 97 | + |
| 98 | +- **`durable_objects`:** Binding `SOURCE_SCRAPER` to `SourceScraperDO` class, plus `migrations` definition. |
| 99 | +- **`queues`:** Producer binding `ARTICLE_PROCESSING_QUEUE` and Consumer config for `article-processing-queue` (pointing to the Worker). |
| 100 | +- **`r2_buckets`:** Binding `ARTICLES_BUCKET` to your R2 bucket name. |
| 101 | +- **`workflows`:** Binding `PROCESS_ARTICLES` to `ProcessArticles` class. |
| 102 | +- **`compatibility_date` / `compatibility_flags`:** Set appropriately (e.g., `nodejs_compat`). |
| 103 | +- **`observability`:** Enabled for better monitoring. |
| 104 | + |
| 105 | +### Environment Variables & Secrets |
| 106 | + |
| 107 | +The following are required (use `.dev.vars` locally, Cloudflare Secrets in production): |
| 108 | + |
| 109 | +- `DATABASE_URL`: PostgreSQL connection string. |
| 110 | +- `API_TOKEN`: Secret Bearer token for protecting API endpoints. |
| 111 | +- `CLOUDFLARE_ACCOUNT_ID`: Your Cloudflare account ID. |
| 112 | +- `CLOUDFLARE_BROWSER_RENDERING_API_TOKEN`: API token with Browser Rendering permissions. |
| 113 | +- `GEMINI_API_KEY`: API key for Google AI (Gemini). |
| 114 | +- `GEMINI_BASE_URL`: (Optional) Custom base URL for Google AI API. |
| 115 | +- `ML_SERVICE_URL`: URL for the external embeddings service. |
| 116 | +- `ML_SERVICE_API_TOKEN`: API token for the external embeddings service. |
| 117 | + |
| 118 | +## Running Locally |
| 119 | + |
| 120 | +1. Ensure Node.js (v22+), pnpm (v9.15+), Docker (for Postgres+pgvector), and Wrangler are installed. |
| 121 | +2. Navigate to the monorepo root (`meridian/`). |
| 122 | +3. Install dependencies: `pnpm install`. |
| 123 | +4. Start a local PostgreSQL database with the pgvector extension (see `@meridian/database/README.MD` or use Supabase local dev). |
| 124 | +5. Configure and run database migrations: |
| 125 | + - Set `DATABASE_URL` in `packages/database/.env`. |
| 126 | + - Run `pnpm --filter @meridian/database migrate`. |
| 127 | + - (Optional) Seed initial sources: `pnpm --filter @meridian/database seed`. |
| 128 | +6. Create a `.dev.vars` file in `apps/backend/` and populate the required environment variables listed above. |
| 129 | +7. Start the local development server: `pnpm --filter @meridian/backend run dev`. |
| 130 | + - This uses `wrangler dev` with local emulation. |
| 131 | + - Local emulation for DOs, Queues, and Workflows has limitations. |
| 132 | +8. **Initialize Durable Objects:** Manually trigger the DO initialization endpoint once after starting the server and seeding sources: |
| 133 | + ```bash |
| 134 | + curl -X POST -H "Authorization: Bearer YOUR_API_TOKEN" http://localhost:8787/do/admin/initialize-dos |
| 135 | + ``` |
| 136 | + Replace `YOUR_API_TOKEN` with the value from your `.dev.vars`. |
| 137 | + |
| 138 | +## Testing |
| 139 | + |
| 140 | +Unit tests for core utilities and parsers are located in the `test/` directory and can be run with `pnpm --filter @meridian/backend run test`. Integration or end-to-end testing involving Workers, DOs, and Workflows locally can be complex but may be valuable future additions. |
| 141 | + |
| 142 | +## Deployment |
| 143 | + |
| 144 | +Deployment is handled via Cloudflare Wrangler: |
| 145 | + |
| 146 | +1. Ensure `wrangler.jsonc` is correct for the target environment. |
| 147 | +2. Set production secrets using `npx wrangler secret put <SECRET_NAME>`. |
| 148 | +3. Deploy using `npx wrangler deploy` from within the `apps/backend` directory or via configured CI/CD. |
| 149 | + |
| 150 | +## Key Libraries & Technologies |
| 151 | + |
| 152 | +- **Hono:** Web framework for the API server. |
| 153 | +- **Drizzle ORM:** TypeScript ORM for database interactions. |
| 154 | +- **postgres.js:** PostgreSQL client library. |
| 155 | +- **Neverthrow:** Functional error handling. |
| 156 | +- **Zod:** Schema validation. |
| 157 | +- **@ai-sdk/google:** SDK for interacting with Google Gemini. |
| 158 | +- **@mozilla/readability:** Core library for article content extraction. |
| 159 | +- **linkedom:** DOM parser used with Readability. |
| 160 | +- **fast-xml-parser:** For parsing RSS feeds. |
| 161 | +- **Vitest:** Unit testing framework. |
| 162 | +- **Cloudflare Workers, Durable Objects, Queues, Workflows, R2** |
0 commit comments