Отмена асинхронных операций в Axum
Отмена асинхронных операций - важная часть разработки надежных веб-приложений. В этом разделе рассмотрим, как эффективно реализовать отмену операций в Axum, используя возможности экосистемы Tokio.
Содержание
- Основы отмены операций
- Использование токенов отмены
- Отмена при закрытии соединения клиентом
- Отмена по таймауту
- Распространение отмены через слои приложения
- Graceful Shutdown
- Лучшие практики
Основы отмены операций
В асинхронном программировании отмена операций обычно основана на механизме кооперативной отмены, когда задача периодически проверяет, не была ли она отменена. В Rust и Tokio основные способы отмены:
- Использование
tokio::select!
для выбора первого завершенного фьючерса - Использование токенов отмены (
CancellationToken
) - Отмена задач через
tokio::task::JoinHandle
- Использование таймаутов
Использование токенов отмены
Токены отмены - удобный способ сигнализировать о необходимости прекратить выполнение операции:
use axum::{
extract::State,
routing::post,
Json, Router,
};
use tokio::sync::oneshot;
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
// Структура для хранения информации о долгих задачах
#[derive(Clone)]
struct AppState {
tasks: Arc<Mutex<Vec<TaskInfo>>>,
}
struct TaskInfo {
id: String,
cancellation_sender: Option<oneshot::Sender<()>>,
}
#[derive(Deserialize)]
struct StartTaskRequest {
task_id: String,
}
#[derive(Serialize)]
struct TaskResponse {
task_id: String,
status: String,
}
// Обработчик для запуска долгой задачи
async fn start_task(
State(state): State<AppState>,
Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
// Создание канала для отмены
let (cancel_tx, cancel_rx) = oneshot::channel();
// Сохранение информации о задаче
let task_id = payload.task_id.clone();
{
let mut tasks = state.tasks.lock().unwrap();
tasks.push(TaskInfo {
id: task_id.clone(),
cancellation_sender: Some(cancel_tx),
});
}
// Запуск долгой задачи в отдельном потоке
let task_clone = task_id.clone();
tokio::spawn(async move {
if let Err(_) = process_long_task(task_clone, cancel_rx).await {
println!("Задача отменена или завершилась с ошибкой");
}
});
Json(TaskResponse {
task_id,
status: "started".to_string(),
})
}
// Обработчик для отмены задачи
async fn cancel_task(
State(state): State<AppState>,
Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
let task_id = payload.task_id;
let mut status = "not_found".to_string();
// Поиск и отмена задачи
{
let mut tasks = state.tasks.lock().unwrap();
if let Some(pos) = tasks.iter().position(|t| t.id == task_id) {
let task = tasks.remove(pos);
if let Some(cancel_sender) = task.cancellation_sender {
let _ = cancel_sender.send(());
status = "cancelled".to_string();
}
}
}
Json(TaskResponse {
task_id,
status,
})
}
// Функция имитирующая долгую задачу с возможностью отмены
async fn process_long_task(
task_id: String,
mut cancel_rx: oneshot::Receiver<()>,
) -> Result<(), String> {
// Имитация 10 шагов обработки
for i in 0..10 {
// Проверка сигнала отмены
if cancel_rx.try_recv().is_ok() {
return Err(format!("Задача {} отменена на шаге {}", task_id, i));
}
// Выполнение шага задачи
println!("Задача {}: выполнение шага {}", task_id, i);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
println!("Задача {} успешно завершена", task_id);
Ok(())
}
// Настройка маршрутов
let app_state = AppState {
tasks: Arc::new(Mutex::new(Vec::new())),
};
let app = Router::new()
.route("/tasks/start", post(start_task))
.route("/tasks/cancel", post(cancel_task))
.with_state(app_state);
Отмена при закрытии соединения клиентом
Важно отменять операции, когда клиент закрывает соединение. Это можно реализовать с помощью axum::extract::Extension
и специального middleware:
use axum::{
extract::Extension,
http::{Request, StatusCode},
middleware::{self, Next},
response::Response,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use futures::FutureExt;
use tower::ServiceBuilder;
// Токен отмены для каждого запроса
#[derive(Clone)]
struct RequestCancellation {
receiver: broadcast::Receiver<()>,
}
// Middleware для создания токена отмены для каждого запроса
async fn cancellation_middleware<B>(
mut request: Request<B>,
next: Next<B>,
) -> Response {
// Создание канала для сигнала отмены
let (cancel_tx, cancel_rx) = broadcast::channel(1);
// Добавление приемника в расширения запроса
request.extensions_mut().insert(RequestCancellation {
receiver: cancel_rx,
});
// Обертка для обнаружения отмены запроса
let response_future = next.run(request);
// Если клиент закрывает соединение, cancel_tx будет удален,
// что вызовет отправку сигнала всем получателям
let sender = Arc::new(cancel_tx);
// Выполнение обработчика и возврат ответа
response_future.await
}
// Обработчик с поддержкой отмены
async fn handler_with_cancellation(
Extension(cancel): Extension<RequestCancellation>,
) -> &'static str {
let mut rx = cancel.receiver.resubscribe();
// Выполнение долгой операции с возможностью отмены
match tokio::select! {
_ = long_operation() => Ok(()),
_ = rx.recv() => Err("operation cancelled"),
} {
Ok(_) => "Операция завершена",
Err(_) => "Операция отменена",
}
}
// Настройка приложения с middleware
let app = Router::new()
.route("/long-operation", get(handler_with_cancellation))
.layer(middleware::from_fn(cancellation_middleware));
Отмена по таймауту
Отмена операций по таймауту помогает избежать блокировки ресурсов:
use axum::{
extract::State,
routing::get,
Json, Router,
};
use std::time::Duration;
use tokio::time::timeout;
// Обработчик с таймаутом
async fn handler_with_timeout() -> Result<Json<Vec<String>>, StatusCode> {
// Установка таймаута 5 секунд для долгой операции
match timeout(Duration::from_secs(5), fetch_data()).await {
Ok(result) => match result {
Ok(data) => Ok(Json(data)),
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
},
Err(_) => {
// Превышен таймаут
Err(StatusCode::REQUEST_TIMEOUT)
}
}
}
// Долгая операция, которая может быть отменена по таймауту
async fn fetch_data() -> Result<Vec<String>, String> {
// Имитация долгой операции
tokio::time::sleep(Duration::from_secs(10)).await;
Ok(vec!["данные1".to_string(), "данные2".to_string()])
}
Для более сложных сценариев можно комбинировать таймаут с токенами отмены:
use tokio::time::error::Elapsed;
// Тип ошибки, который может быть ошибкой таймаута или другой ошибкой
enum OperationError {
Timeout,
Other(String),
}
// Операция с таймаутом и возможностью отмены
async fn operation_with_timeout_and_cancel(
cancel: broadcast::Receiver<()>,
) -> Result<Vec<String>, OperationError> {
let mut cancel_rx = cancel.resubscribe();
// Комбинирование таймаута и отмены
match tokio::select! {
result = timeout(Duration::from_secs(5), fetch_data()) => {
match result {
Ok(data) => Ok(data),
Err(e) if e.is::<Elapsed>() => Err(OperationError::Timeout),
Err(_) => Err(OperationError::Other("Ошибка получения данных".to_string())),
}
},
_ = cancel_rx.recv() => Err(OperationError::Other("Операция отменена пользователем".to_string())),
} {
Ok(result) => result,
Err(e) => Err(e),
}
}
Распространение отмены через слои приложения
Для распространения отмены через различные уровни приложения можно использовать контексты отмены:
use std::future::Future;
use std::pin::Pin;
use tokio::sync::broadcast;
// Структура контекста отмены
#[derive(Clone)]
struct CancellationContext {
receiver: broadcast::Receiver<()>,
}
impl CancellationContext {
// Создание нового контекста
fn new() -> (Self, broadcast::Sender<()>) {
let (tx, rx) = broadcast::channel(1);
(Self { receiver: rx }, tx)
}
// Создание дочернего контекста
fn child(&self) -> Self {
Self {
receiver: self.receiver.resubscribe(),
}
}
// Ожидание сигнала отмены
async fn cancelled(&self) -> () {
let mut rx = self.receiver.resubscribe();
let _ = rx.recv().await;
}
// Выполнение операции с возможностью отмены
async fn run<F, T>(&self, future: F) -> Result<T, &'static str>
where
F: Future<Output = T>,
{
let mut rx = self.receiver.resubscribe();
tokio::select! {
result = future => Ok(result),
_ = rx.recv() => Err("operation cancelled"),
}
}
}
// Использование контекста отмены в многоуровневом приложении
async fn business_logic(ctx: CancellationContext) -> Result<String, &'static str> {
// Первый уровень операций
let data = ctx.run(fetch_data_from_db()).await?;
// Второй уровень операций с дочерним контекстом
let child_ctx = ctx.child();
let processed_data = child_ctx.run(process_data(data)).await?;
// Третий уровень операций
let result = ctx.run(format_result(processed_data)).await?;
Ok(result)
}
// Использование в обработчике Axum
async fn handler(
Extension(ctx): Extension<CancellationContext>,
) -> Result<String, StatusCode> {
match business_logic(ctx).await {
Ok(result) => Ok(result),
Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
}
}
Graceful Shutdown
Для корректного завершения сервера с возможностью завершения текущих запросов:
use axum::{extract::State, routing::get, Router};
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use tokio::signal;
// Состояние для отслеживания активных запросов
#[derive(Clone)]
struct AppState {
active_requests: Arc<AtomicUsize>,
}
// Middleware для подсчета активных запросов
async fn track_requests<B>(
State(state): State<AppState>,
request: Request<B>,
next: Next<B>,
) -> Response {
// Увеличиваем счетчик активных запросов
state.active_requests.fetch_add(1, Ordering::SeqCst);
// Обработка запроса
let response = next.run(request).await;
// Уменьшаем счетчик активных запросов
state.active_requests.fetch_sub(1, Ordering::SeqCst);
response
}
#[tokio::main]
async fn main() {
// Создание состояния приложения
let state = AppState {
active_requests: Arc::new(AtomicUsize::new(0)),
};
// Клонирование счетчика для использования в graceful shutdown
let active_requests = state.active_requests.clone();
// Настройка приложения с middleware для отслеживания запросов
let app = Router::new()
.route("/", get(handler))
.layer(middleware::from_fn_with_state(state.clone(), track_requests))
.with_state(state);
// Привязка к порту
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
// Запуск сервера в отдельном потоке для возможности завершения
let server = axum::serve(listener, app.into_make_service());
// Ожидание сигнала завершения
let graceful = server.with_graceful_shutdown(shutdown_signal(active_requests));
// Запуск сервера с graceful shutdown
if let Err(err) = graceful.await {
eprintln!("Ошибка сервера: {}", err);
}
println!("Сервер успешно завершил работу");
}
// Функция ожидания сигнала завершения
async fn shutdown_signal(active_requests: Arc<AtomicUsize>) {
// Ожидание сигнала CTRL+C
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
// Ожидание сигнала SIGTERM
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
// Ожидание любого из сигналов
tokio::select! {
_ = ctrl_c => {},
_ = terminate => {},
}
println!("Получен сигнал завершения, ожидание завершения текущих запросов...");
// Ожидание завершения всех активных запросов (с таймаутом)
const MAX_WAIT_SECS: u64 = 30;
let start = tokio::time::Instant::now();
while active_requests.load(Ordering::SeqCst) > 0 {
// Проверка превышения таймаута
if start.elapsed().as_secs() > MAX_WAIT_SECS {
println!("Превышено время ожидания завершения запросов, принудительное завершение");
break;
}
// Пауза перед следующей проверкой
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
println!("Все запросы завершены, выключение сервера");
}
Лучшие практики
Проектируйте асинхронные операции с учетом отмены
rust// Функция с поддержкой отмены async fn cancelable_operation( ctx: CancellationContext, params: Params, ) -> Result<Output, OperationError> { // Периодически проверяйте на отмену for step in 0..10 { // Проверка отмены с помощью select! tokio::select! { _ = ctx.cancelled() => return Err(OperationError::Cancelled), result = do_step(step, ¶ms) => { if let Err(e) = result { return Err(e); } } } } Ok(Output::default()) }
Используйте таймауты для внешних сервисов
rust// Добавление таймаутов при обращении к внешним системам async fn call_external_api(url: &str) -> Result<Response, ApiError> { match timeout(Duration::from_secs(5), reqwest::get(url)).await { Ok(result) => match result { Ok(response) => Ok(response), Err(err) => Err(ApiError::RequestFailed(err.to_string())), }, Err(_) => Err(ApiError::Timeout), } }
Распространяйте контекст отмены через слои приложения
rust// Использование единого контекста через слои async fn handler( ctx: CancellationContext, params: Params, ) -> Result<Output, Error> { // Слой бизнес-логики получает контекст let result = service.perform_operation(ctx.child(), params).await?; // Дальнейшая обработка Ok(result) }
Освобождайте ресурсы при отмене
rust// Используйте шаблон RAII для освобождения ресурсов struct ResourceGuard { resource_id: String, } impl Drop for ResourceGuard { fn drop(&mut self) { println!("Освобождение ресурса: {}", self.resource_id); // Код для освобождения ресурса } } async fn use_resource( ctx: CancellationContext, resource_id: String, ) -> Result<Output, Error> { // Захват ресурса let guard = ResourceGuard { resource_id: resource_id.clone() }; // При выходе из функции (в том числе при отмене) ресурс будет освобожден let result = ctx.run(process_with_resource(&resource_id)).await?; Ok(result) }
Используйте семантику отмены библиотеки futures
rustuse futures::{future::{Abortable, AbortHandle}, FutureExt}; // Создание отменяемого фьючерса async fn make_abortable_future<F, T>(future: F) -> (impl Future<Output = Option<T>>, AbortHandle) where F: Future<Output = T> + Unpin, { let (abort_handle, abort_registration) = AbortHandle::new_pair(); let future = Abortable::new(future, abort_registration); // Возвращаем фьючерс и ручку для отмены (future.map(|result| result.ok()), abort_handle) }
Внедряйте мониторинг и логирование отмен
rust// Отслеживание отмен для диагностики и метрик async fn monitored_operation( ctx: CancellationContext, operation_name: &str, ) -> Result<Output, Error> { let start_time = std::time::Instant::now(); let result = match ctx.run(perform_operation()).await { Ok(output) => { metrics::increment_counter(&format!("{}_success", operation_name)); Ok(output) }, Err(e) if e == "operation cancelled" => { metrics::increment_counter(&format!("{}_cancelled", operation_name)); log::info!( "Операция {} отменена после {} мс", operation_name, start_time.elapsed().as_millis() ); Err(Error::Cancelled) }, Err(e) => { metrics::increment_counter(&format!("{}_error", operation_name)); Err(Error::Other(e.to_string())) } }; result }
Отмена асинхронных операций - важный аспект создания надежных и отзывчивых веб-приложений. Правильная реализация механизмов отмены позволяет эффективно использовать ресурсы сервера и улучшает пользовательский опыт, особенно при работе с долгими операциями.