Skip to content

Отмена асинхронных операций в Axum

Отмена асинхронных операций - важная часть разработки надежных веб-приложений. В этом разделе рассмотрим, как эффективно реализовать отмену операций в Axum, используя возможности экосистемы Tokio.

Содержание

Основы отмены операций

В асинхронном программировании отмена операций обычно основана на механизме кооперативной отмены, когда задача периодически проверяет, не была ли она отменена. В Rust и Tokio основные способы отмены:

  1. Использование tokio::select! для выбора первого завершенного фьючерса
  2. Использование токенов отмены (CancellationToken)
  3. Отмена задач через tokio::task::JoinHandle
  4. Использование таймаутов

Использование токенов отмены

Токены отмены - удобный способ сигнализировать о необходимости прекратить выполнение операции:

rust
use axum::{
    extract::State,
    routing::post,
    Json, Router,
};
use tokio::sync::oneshot;
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};

// Структура для хранения информации о долгих задачах
#[derive(Clone)]
struct AppState {
    tasks: Arc<Mutex<Vec<TaskInfo>>>,
}

struct TaskInfo {
    id: String,
    cancellation_sender: Option<oneshot::Sender<()>>,
}

#[derive(Deserialize)]
struct StartTaskRequest {
    task_id: String,
}

#[derive(Serialize)]
struct TaskResponse {
    task_id: String,
    status: String,
}

// Обработчик для запуска долгой задачи
async fn start_task(
    State(state): State<AppState>,
    Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
    // Создание канала для отмены
    let (cancel_tx, cancel_rx) = oneshot::channel();
    
    // Сохранение информации о задаче
    let task_id = payload.task_id.clone();
    {
        let mut tasks = state.tasks.lock().unwrap();
        tasks.push(TaskInfo {
            id: task_id.clone(),
            cancellation_sender: Some(cancel_tx),
        });
    }
    
    // Запуск долгой задачи в отдельном потоке
    let task_clone = task_id.clone();
    tokio::spawn(async move {
        if let Err(_) = process_long_task(task_clone, cancel_rx).await {
            println!("Задача отменена или завершилась с ошибкой");
        }
    });
    
    Json(TaskResponse {
        task_id,
        status: "started".to_string(),
    })
}

// Обработчик для отмены задачи
async fn cancel_task(
    State(state): State<AppState>,
    Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
    let task_id = payload.task_id;
    let mut status = "not_found".to_string();
    
    // Поиск и отмена задачи
    {
        let mut tasks = state.tasks.lock().unwrap();
        if let Some(pos) = tasks.iter().position(|t| t.id == task_id) {
            let task = tasks.remove(pos);
            if let Some(cancel_sender) = task.cancellation_sender {
                let _ = cancel_sender.send(());
                status = "cancelled".to_string();
            }
        }
    }
    
    Json(TaskResponse {
        task_id,
        status,
    })
}

// Функция имитирующая долгую задачу с возможностью отмены
async fn process_long_task(
    task_id: String,
    mut cancel_rx: oneshot::Receiver<()>,
) -> Result<(), String> {
    // Имитация 10 шагов обработки
    for i in 0..10 {
        // Проверка сигнала отмены
        if cancel_rx.try_recv().is_ok() {
            return Err(format!("Задача {} отменена на шаге {}", task_id, i));
        }
        
        // Выполнение шага задачи
        println!("Задача {}: выполнение шага {}", task_id, i);
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
    }
    
    println!("Задача {} успешно завершена", task_id);
    Ok(())
}

// Настройка маршрутов
let app_state = AppState {
    tasks: Arc::new(Mutex::new(Vec::new())),
};

let app = Router::new()
    .route("/tasks/start", post(start_task))
    .route("/tasks/cancel", post(cancel_task))
    .with_state(app_state);

Отмена при закрытии соединения клиентом

Важно отменять операции, когда клиент закрывает соединение. Это можно реализовать с помощью axum::extract::Extension и специального middleware:

rust
use axum::{
    extract::Extension,
    http::{Request, StatusCode},
    middleware::{self, Next},
    response::Response,
};
use std::sync::Arc;
use tokio::sync::broadcast;
use futures::FutureExt;
use tower::ServiceBuilder;

// Токен отмены для каждого запроса
#[derive(Clone)]
struct RequestCancellation {
    receiver: broadcast::Receiver<()>,
}

// Middleware для создания токена отмены для каждого запроса
async fn cancellation_middleware<B>(
    mut request: Request<B>,
    next: Next<B>,
) -> Response {
    // Создание канала для сигнала отмены
    let (cancel_tx, cancel_rx) = broadcast::channel(1);
    
    // Добавление приемника в расширения запроса
    request.extensions_mut().insert(RequestCancellation { 
        receiver: cancel_rx,
    });
    
    // Обертка для обнаружения отмены запроса
    let response_future = next.run(request);
    
    // Если клиент закрывает соединение, cancel_tx будет удален,
    // что вызовет отправку сигнала всем получателям
    let sender = Arc::new(cancel_tx);
    
    // Выполнение обработчика и возврат ответа
    response_future.await
}

// Обработчик с поддержкой отмены
async fn handler_with_cancellation(
    Extension(cancel): Extension<RequestCancellation>,
) -> &'static str {
    let mut rx = cancel.receiver.resubscribe();
    
    // Выполнение долгой операции с возможностью отмены
    match tokio::select! {
        _ = long_operation() => Ok(()),
        _ = rx.recv() => Err("operation cancelled"),
    } {
        Ok(_) => "Операция завершена",
        Err(_) => "Операция отменена",
    }
}

// Настройка приложения с middleware
let app = Router::new()
    .route("/long-operation", get(handler_with_cancellation))
    .layer(middleware::from_fn(cancellation_middleware));

Отмена по таймауту

Отмена операций по таймауту помогает избежать блокировки ресурсов:

rust
use axum::{
    extract::State,
    routing::get,
    Json, Router,
};
use std::time::Duration;
use tokio::time::timeout;

// Обработчик с таймаутом
async fn handler_with_timeout() -> Result<Json<Vec<String>>, StatusCode> {
    // Установка таймаута 5 секунд для долгой операции
    match timeout(Duration::from_secs(5), fetch_data()).await {
        Ok(result) => match result {
            Ok(data) => Ok(Json(data)),
            Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
        },
        Err(_) => {
            // Превышен таймаут
            Err(StatusCode::REQUEST_TIMEOUT)
        }
    }
}

// Долгая операция, которая может быть отменена по таймауту
async fn fetch_data() -> Result<Vec<String>, String> {
    // Имитация долгой операции
    tokio::time::sleep(Duration::from_secs(10)).await;
    
    Ok(vec!["данные1".to_string(), "данные2".to_string()])
}

Для более сложных сценариев можно комбинировать таймаут с токенами отмены:

rust
use tokio::time::error::Elapsed;

// Тип ошибки, который может быть ошибкой таймаута или другой ошибкой
enum OperationError {
    Timeout,
    Other(String),
}

// Операция с таймаутом и возможностью отмены
async fn operation_with_timeout_and_cancel(
    cancel: broadcast::Receiver<()>,
) -> Result<Vec<String>, OperationError> {
    let mut cancel_rx = cancel.resubscribe();
    
    // Комбинирование таймаута и отмены
    match tokio::select! {
        result = timeout(Duration::from_secs(5), fetch_data()) => {
            match result {
                Ok(data) => Ok(data),
                Err(e) if e.is::<Elapsed>() => Err(OperationError::Timeout),
                Err(_) => Err(OperationError::Other("Ошибка получения данных".to_string())),
            }
        },
        _ = cancel_rx.recv() => Err(OperationError::Other("Операция отменена пользователем".to_string())),
    } {
        Ok(result) => result,
        Err(e) => Err(e),
    }
}

Распространение отмены через слои приложения

Для распространения отмены через различные уровни приложения можно использовать контексты отмены:

rust
use std::future::Future;
use std::pin::Pin;
use tokio::sync::broadcast;

// Структура контекста отмены
#[derive(Clone)]
struct CancellationContext {
    receiver: broadcast::Receiver<()>,
}

impl CancellationContext {
    // Создание нового контекста
    fn new() -> (Self, broadcast::Sender<()>) {
        let (tx, rx) = broadcast::channel(1);
        (Self { receiver: rx }, tx)
    }
    
    // Создание дочернего контекста
    fn child(&self) -> Self {
        Self {
            receiver: self.receiver.resubscribe(),
        }
    }
    
    // Ожидание сигнала отмены
    async fn cancelled(&self) -> () {
        let mut rx = self.receiver.resubscribe();
        let _ = rx.recv().await;
    }
    
    // Выполнение операции с возможностью отмены
    async fn run<F, T>(&self, future: F) -> Result<T, &'static str>
    where
        F: Future<Output = T>,
    {
        let mut rx = self.receiver.resubscribe();
        
        tokio::select! {
            result = future => Ok(result),
            _ = rx.recv() => Err("operation cancelled"),
        }
    }
}

// Использование контекста отмены в многоуровневом приложении
async fn business_logic(ctx: CancellationContext) -> Result<String, &'static str> {
    // Первый уровень операций
    let data = ctx.run(fetch_data_from_db()).await?;
    
    // Второй уровень операций с дочерним контекстом
    let child_ctx = ctx.child();
    let processed_data = child_ctx.run(process_data(data)).await?;
    
    // Третий уровень операций
    let result = ctx.run(format_result(processed_data)).await?;
    
    Ok(result)
}

// Использование в обработчике Axum
async fn handler(
    Extension(ctx): Extension<CancellationContext>,
) -> Result<String, StatusCode> {
    match business_logic(ctx).await {
        Ok(result) => Ok(result),
        Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
    }
}

Graceful Shutdown

Для корректного завершения сервера с возможностью завершения текущих запросов:

rust
use axum::{extract::State, routing::get, Router};
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use tokio::signal;

// Состояние для отслеживания активных запросов
#[derive(Clone)]
struct AppState {
    active_requests: Arc<AtomicUsize>,
}

// Middleware для подсчета активных запросов
async fn track_requests<B>(
    State(state): State<AppState>,
    request: Request<B>,
    next: Next<B>,
) -> Response {
    // Увеличиваем счетчик активных запросов
    state.active_requests.fetch_add(1, Ordering::SeqCst);
    
    // Обработка запроса
    let response = next.run(request).await;
    
    // Уменьшаем счетчик активных запросов
    state.active_requests.fetch_sub(1, Ordering::SeqCst);
    
    response
}

#[tokio::main]
async fn main() {
    // Создание состояния приложения
    let state = AppState {
        active_requests: Arc::new(AtomicUsize::new(0)),
    };
    
    // Клонирование счетчика для использования в graceful shutdown
    let active_requests = state.active_requests.clone();
    
    // Настройка приложения с middleware для отслеживания запросов
    let app = Router::new()
        .route("/", get(handler))
        .layer(middleware::from_fn_with_state(state.clone(), track_requests))
        .with_state(state);
    
    // Привязка к порту
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    
    // Запуск сервера в отдельном потоке для возможности завершения
    let server = axum::serve(listener, app.into_make_service());
    
    // Ожидание сигнала завершения
    let graceful = server.with_graceful_shutdown(shutdown_signal(active_requests));
    
    // Запуск сервера с graceful shutdown
    if let Err(err) = graceful.await {
        eprintln!("Ошибка сервера: {}", err);
    }
    
    println!("Сервер успешно завершил работу");
}

// Функция ожидания сигнала завершения
async fn shutdown_signal(active_requests: Arc<AtomicUsize>) {
    // Ожидание сигнала CTRL+C
    let ctrl_c = async {
        signal::ctrl_c()
            .await
            .expect("failed to install Ctrl+C handler");
    };

    // Ожидание сигнала SIGTERM
    #[cfg(unix)]
    let terminate = async {
        signal::unix::signal(signal::unix::SignalKind::terminate())
            .expect("failed to install signal handler")
            .recv()
            .await;
    };

    #[cfg(not(unix))]
    let terminate = std::future::pending::<()>();

    // Ожидание любого из сигналов
    tokio::select! {
        _ = ctrl_c => {},
        _ = terminate => {},
    }

    println!("Получен сигнал завершения, ожидание завершения текущих запросов...");
    
    // Ожидание завершения всех активных запросов (с таймаутом)
    const MAX_WAIT_SECS: u64 = 30;
    let start = tokio::time::Instant::now();
    
    while active_requests.load(Ordering::SeqCst) > 0 {
        // Проверка превышения таймаута
        if start.elapsed().as_secs() > MAX_WAIT_SECS {
            println!("Превышено время ожидания завершения запросов, принудительное завершение");
            break;
        }
        
        // Пауза перед следующей проверкой
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }
    
    println!("Все запросы завершены, выключение сервера");
}

Лучшие практики

  1. Проектируйте асинхронные операции с учетом отмены

    rust
    // Функция с поддержкой отмены
    async fn cancelable_operation(
        ctx: CancellationContext,
        params: Params,
    ) -> Result<Output, OperationError> {
        // Периодически проверяйте на отмену
        for step in 0..10 {
            // Проверка отмены с помощью select!
            tokio::select! {
                _ = ctx.cancelled() => return Err(OperationError::Cancelled),
                result = do_step(step, &params) => {
                    if let Err(e) = result {
                        return Err(e);
                    }
                }
            }
        }
        
        Ok(Output::default())
    }
  2. Используйте таймауты для внешних сервисов

    rust
    // Добавление таймаутов при обращении к внешним системам
    async fn call_external_api(url: &str) -> Result<Response, ApiError> {
        match timeout(Duration::from_secs(5), reqwest::get(url)).await {
            Ok(result) => match result {
                Ok(response) => Ok(response),
                Err(err) => Err(ApiError::RequestFailed(err.to_string())),
            },
            Err(_) => Err(ApiError::Timeout),
        }
    }
  3. Распространяйте контекст отмены через слои приложения

    rust
    // Использование единого контекста через слои
    async fn handler(
        ctx: CancellationContext,
        params: Params,
    ) -> Result<Output, Error> {
        // Слой бизнес-логики получает контекст
        let result = service.perform_operation(ctx.child(), params).await?;
        
        // Дальнейшая обработка
        Ok(result)
    }
  4. Освобождайте ресурсы при отмене

    rust
    // Используйте шаблон RAII для освобождения ресурсов
    struct ResourceGuard {
        resource_id: String,
    }
    
    impl Drop for ResourceGuard {
        fn drop(&mut self) {
            println!("Освобождение ресурса: {}", self.resource_id);
            // Код для освобождения ресурса
        }
    }
    
    async fn use_resource(
        ctx: CancellationContext,
        resource_id: String,
    ) -> Result<Output, Error> {
        // Захват ресурса
        let guard = ResourceGuard { resource_id: resource_id.clone() };
        
        // При выходе из функции (в том числе при отмене) ресурс будет освобожден
        let result = ctx.run(process_with_resource(&resource_id)).await?;
        
        Ok(result)
    }
  5. Используйте семантику отмены библиотеки futures

    rust
    use futures::{future::{Abortable, AbortHandle}, FutureExt};
    
    // Создание отменяемого фьючерса
    async fn make_abortable_future<F, T>(future: F) -> (impl Future<Output = Option<T>>, AbortHandle)
    where
        F: Future<Output = T> + Unpin,
    {
        let (abort_handle, abort_registration) = AbortHandle::new_pair();
        let future = Abortable::new(future, abort_registration);
        
        // Возвращаем фьючерс и ручку для отмены
        (future.map(|result| result.ok()), abort_handle)
    }
  6. Внедряйте мониторинг и логирование отмен

    rust
    // Отслеживание отмен для диагностики и метрик
    async fn monitored_operation(
        ctx: CancellationContext,
        operation_name: &str,
    ) -> Result<Output, Error> {
        let start_time = std::time::Instant::now();
        
        let result = match ctx.run(perform_operation()).await {
            Ok(output) => {
                metrics::increment_counter(&format!("{}_success", operation_name));
                Ok(output)
            },
            Err(e) if e == "operation cancelled" => {
                metrics::increment_counter(&format!("{}_cancelled", operation_name));
                log::info!(
                    "Операция {} отменена после {} мс",
                    operation_name,
                    start_time.elapsed().as_millis()
                );
                Err(Error::Cancelled)
            },
            Err(e) => {
                metrics::increment_counter(&format!("{}_error", operation_name));
                Err(Error::Other(e.to_string()))
            }
        };
        
        result
    }

Отмена асинхронных операций - важный аспект создания надежных и отзывчивых веб-приложений. Правильная реализация механизмов отмены позволяет эффективно использовать ресурсы сервера и улучшает пользовательский опыт, особенно при работе с долгими операциями.