Skip to content

Commit 2c22595

Browse files
michaelchuclaude
andcommitted
feat: add POST /tasks/pipeline endpoint for background pipeline execution
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b72ee28 commit 2c22595

File tree

2 files changed

+76
-3
lines changed

2 files changed

+76
-3
lines changed

src/server/handlers/tasks.rs

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use std::sync::atomic::Ordering;
1717
use std::sync::Arc;
1818

1919
use crate::application::error::{ApplicationError, ApplicationErrorKind};
20-
use crate::application::{backtests, sweeps, tasks as app_tasks, workflows};
20+
use crate::application::{backtests, pipeline, sweeps, tasks as app_tasks, workflows};
2121
use crate::engine::walk_forward::{WalkForwardParams, WfMode, WfObjective};
2222
use crate::scripting::engine::CachingDataLoader;
2323
use crate::server::state::AppState;
@@ -302,6 +302,67 @@ pub async fn submit_sweep(
302302
Ok(Json(SubmitResponse { task_id }))
303303
}
304304

305+
/// `POST /tasks/pipeline` — Submit a full pipeline task (sweep + gates + WF + MC).
306+
#[allow(clippy::unused_async)]
307+
pub async fn submit_pipeline(
308+
State(state): State<AppState>,
309+
Json(req): Json<SubmitSweepRequest>,
310+
) -> Result<Json<SubmitResponse>, (StatusCode, String)> {
311+
let symbol = req
312+
.params
313+
.get("symbol")
314+
.and_then(Value::as_str)
315+
.unwrap_or("pending")
316+
.to_owned();
317+
318+
let params_json =
319+
serde_json::to_value(&req.params).unwrap_or(Value::Object(serde_json::Map::default()));
320+
321+
let task = state.task_manager.register(
322+
TaskKind::Sweep,
323+
&req.strategy,
324+
&symbol,
325+
req.thread_id.clone(),
326+
params_json,
327+
);
328+
let task_id = task.id.clone();
329+
330+
let tm = Arc::clone(&state.task_manager);
331+
let server = state.server.clone();
332+
tokio::spawn(async move {
333+
Box::pin(app_tasks::execute_queued_task(
334+
tm,
335+
Arc::clone(&task),
336+
async move {
337+
let pipeline_req = pipeline::PipelineRequest {
338+
strategy: req.strategy,
339+
mode: req.mode,
340+
objective: req.objective,
341+
params: req.params,
342+
sweep_params: req.sweep_params,
343+
max_evaluations: req.max_evaluations,
344+
num_permutations: req.num_permutations,
345+
thread_id: req.thread_id,
346+
};
347+
348+
let result = pipeline::execute(&server, &pipeline_req, "manual")
349+
.await
350+
.map_err(|e| e.to_string())?;
351+
352+
let result_json = serde_json::to_value(&result).unwrap_or(Value::Null);
353+
354+
Ok(app_tasks::TaskCompletion {
355+
result_json,
356+
result_id: result.sweep_id,
357+
})
358+
},
359+
))
360+
.await;
361+
});
362+
363+
Ok(Json(SubmitResponse { task_id }))
364+
}
365+
305366
/// `POST /tasks/walk-forward` — Submit a walk-forward validation task.
306367
#[allow(clippy::unused_async, clippy::too_many_lines)]
307368
pub async fn submit_walk_forward(

src/server/router.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use axum::Router;
88
use tower_http::cors::CorsLayer;
99

1010
use crate::server::handlers::{
11-
backtests, chat as chat_handlers, forward_tests, pipeline, profiles, runs, strategies, sweeps,
12-
tasks,
11+
backtests, chat as chat_handlers, forward_tests, hypotheses, pipeline, profiles, runs,
12+
strategies, sweeps, tasks,
1313
};
1414
use crate::server::state::AppState;
1515

@@ -77,6 +77,13 @@ pub fn build_api_router(state: AppState) -> Router {
7777
)
7878
.with_state(state.clone());
7979

80+
let analysis_routes = Router::new()
81+
.route(
82+
"/hypotheses",
83+
axum::routing::post(hypotheses::generate_hypotheses),
84+
)
85+
.with_state(state.clone());
86+
8087
let misc_routes = Router::new()
8188
.route("/profiles", axum::routing::get(profiles::list_profiles))
8289
.route("/health", axum::routing::get(|| async { "ok" }))
@@ -137,6 +144,10 @@ pub fn build_api_router(state: AppState) -> Router {
137144
axum::routing::post(tasks::submit_backtest),
138145
)
139146
.route("/tasks/sweep", axum::routing::post(tasks::submit_sweep))
147+
.route(
148+
"/tasks/pipeline",
149+
axum::routing::post(tasks::submit_pipeline),
150+
)
140151
.route(
141152
"/tasks/walk-forward",
142153
axum::routing::post(tasks::submit_walk_forward),
@@ -180,6 +191,7 @@ pub fn build_api_router(state: AppState) -> Router {
180191
.merge(run_routes)
181192
.merge(task_routes)
182193
.merge(forward_test_routes)
194+
.merge(analysis_routes)
183195
.merge(misc_routes)
184196
.layer(CorsLayer::permissive())
185197
}

0 commit comments

Comments
 (0)