Skip to content

Интеграция с Tokio

Axum построен на основе Tokio - популярной асинхронной рантайм-библиотеки для Rust. В этом разделе рассмотрим, как эффективно использовать возможности Tokio в приложениях на Axum.

Содержание

Основы Tokio в Axum

Tokio предоставляет асинхронный рантайм для выполнения задач, а Axum строится на этой основе для создания веб-серверов. Для использования всех возможностей Tokio в приложении на Axum необходимо сначала добавить соответствующие зависимости:

toml
[dependencies]
axum = "0.7.2"
tokio = { version = "1.32.0", features = ["full"] }

Базовое приложение Axum с Tokio:

rust
use axum::{
    routing::get,
    Router,
};

// Асинхронный обработчик
async fn hello_world() -> &'static str {
    "Привет, мир!"
}

#[tokio::main]
async fn main() {
    // Инициализация логирования (опционально)
    tracing_subscriber::fmt::init();
    
    // Создание маршрутизатора
    let app = Router::new()
        .route("/", get(hello_world));
    
    // Привязка к сетевому адресу
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
        .await
        .unwrap();
        
    println!("Сервер запущен на http://localhost:3000");
    
    // Запуск сервера с использованием Tokio
    axum::serve(listener, app).await.unwrap();
}

Управление задачами

Tokio позволяет запускать несколько асинхронных задач параллельно:

rust
use axum::{
    extract::State,
    routing::{get, post},
    Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;

// Состояние приложения
#[derive(Clone)]
struct AppState {
    tasks: Arc<Mutex<Vec<TaskInfo>>>,
}

struct TaskInfo {
    id: String,
    handle: JoinHandle<Result<String, String>>,
}

#[derive(Deserialize)]
struct StartTaskRequest {
    id: String,
    duration_secs: u64,
}

#[derive(Serialize)]
struct TaskResponse {
    id: String,
    status: String,
}

// Обработчик для запуска задачи
async fn start_task(
    State(state): State<AppState>,
    Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
    let task_id = payload.id.clone();
    
    // Запуск асинхронной задачи с помощью tokio::spawn
    let handle = tokio::spawn(async move {
        // Имитация долгой задачи
        tokio::time::sleep(tokio::time::Duration::from_secs(payload.duration_secs)).await;
        
        // Результат выполнения
        Ok(format!("Задача {} выполнена успешно", task_id))
    });
    
    // Сохранение информации о задаче
    {
        let mut tasks = state.tasks.lock().unwrap();
        tasks.push(TaskInfo {
            id: task_id.clone(),
            handle,
        });
    }
    
    Json(TaskResponse {
        id: task_id,
        status: "started".to_string(),
    })
}

// Обработчик для проверки статуса задачи
async fn check_task(
    State(state): State<AppState>,
    Json(payload): Json<TaskIdRequest>,
) -> Json<TaskResponse> {
    let task_id = payload.id;
    let mut status = "not_found".to_string();
    
    // Проверка статуса задачи
    {
        let tasks = state.tasks.lock().unwrap();
        if let Some(task) = tasks.iter().find(|t| t.id == task_id) {
            status = if task.handle.is_finished() {
                "completed".to_string()
            } else {
                "running".to_string()
            };
        }
    }
    
    Json(TaskResponse {
        id: task_id,
        status,
    })
}

// Настройка маршрутов
let app_state = AppState {
    tasks: Arc::new(Mutex::new(Vec::new())),
};

let app = Router::new()
    .route("/tasks/start", post(start_task))
    .route("/tasks/status", post(check_task))
    .with_state(app_state);

Работа с каналами

Tokio предоставляет несколько типов каналов для коммуникации между задачами:

rust
use axum::{
    extract::State,
    routing::{get, post},
    Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, broadcast};

// Структура сообщения
#[derive(Clone)]
struct Message {
    topic: String,
    content: String,
}

// Состояние приложения с различными типами каналов
#[derive(Clone)]
struct AppState {
    // Канал MPSC (множество отправителей, один получатель)
    message_sender: mpsc::Sender<Message>,
    
    // Широковещательный канал (множество отправителей, множество получателей)
    broadcaster: Arc<broadcast::Sender<Message>>,
}

#[derive(Deserialize)]
struct PublishRequest {
    topic: String,
    content: String,
}

#[derive(Serialize)]
struct PublishResponse {
    status: String,
}

// Обработчик для публикации сообщения
async fn publish_message(
    State(state): State<AppState>,
    Json(payload): Json<PublishRequest>,
) -> Json<PublishResponse> {
    let message = Message {
        topic: payload.topic.clone(),
        content: payload.content,
    };
    
    // Отправка в канал MPSC для обработки
    match state.message_sender.send(message.clone()).await {
        Ok(_) => {
            // Широковещательная рассылка всем подписчикам
            let _ = state.broadcaster.send(message);
            
            Json(PublishResponse {
                status: "published".to_string(),
            })
        },
        Err(_) => Json(PublishResponse {
            status: "failed".to_string(),
        }),
    }
}

// Функция обработки сообщений в фоновом режиме
async fn message_processor(mut receiver: mpsc::Receiver<Message>) {
    while let Some(message) = receiver.recv().await {
        println!(
            "Обработка сообщения: тема={}, содержание={}",
            message.topic, message.content
        );
        
        // Имитация обработки
        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
    }
}

// Настройка приложения с каналами
#[tokio::main]
async fn main() {
    // Создание канала MPSC
    let (tx, rx) = mpsc::channel::<Message>(100);
    
    // Создание широковещательного канала
    let (broadcast_tx, _) = broadcast::channel::<Message>(100);
    let broadcast_tx = Arc::new(broadcast_tx);
    
    // Запуск обработчика сообщений в фоновом режиме
    tokio::spawn(message_processor(rx));
    
    // Создание состояния приложения
    let app_state = AppState {
        message_sender: tx,
        broadcaster: broadcast_tx,
    };
    
    // Настройка маршрутов
    let app = Router::new()
        .route("/publish", post(publish_message))
        .with_state(app_state);
    
    // Запуск сервера
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Использование oneshot каналов для запросов/ответов

rust
use tokio::sync::oneshot;

// Структура запроса с каналом для ответа
struct Request {
    query: String,
    response_channel: oneshot::Sender<String>,
}

// Обработчик с использованием oneshot каналов
async fn query_handler(
    State(state): State<AppState>,
    Json(payload): Json<QueryRequest>,
) -> Json<QueryResponse> {
    // Создание канала для ответа
    let (tx, rx) = oneshot::channel();
    
    // Формирование запроса с каналом для ответа
    let request = Request {
        query: payload.query.clone(),
        response_channel: tx,
    };
    
    // Отправка запроса обработчику
    if let Err(_) = state.request_sender.send(request).await {
        return Json(QueryResponse {
            status: "error".to_string(),
            result: None,
        });
    }
    
    // Ожидание ответа
    match rx.await {
        Ok(result) => Json(QueryResponse {
            status: "success".to_string(),
            result: Some(result),
        }),
        Err(_) => Json(QueryResponse {
            status: "error".to_string(),
            result: None,
        }),
    }
}

// Фоновый обработчик запросов
async fn request_processor(mut receiver: mpsc::Receiver<Request>) {
    while let Some(request) = receiver.recv().await {
        // Обработка запроса
        let result = process_query(&request.query).await;
        
        // Отправка результата через канал ответа
        let _ = request.response_channel.send(result);
    }
}

Таймеры и интервалы

Tokio предоставляет инструменты для работы с таймерами и интервалами:

rust
use axum::{
    extract::State,
    routing::get,
    Router,
};
use std::sync::{Arc, RwLock};
use tokio::time::{self, Duration, Instant};

// Состояние приложения
#[derive(Clone)]
struct AppState {
    start_time: Instant,
    stats: Arc<RwLock<ServerStats>>,
}

struct ServerStats {
    requests_total: usize,
    last_update: Instant,
}

// Обработчик для получения статистики
async fn get_stats(State(state): State<AppState>) -> String {
    let stats = state.stats.read().unwrap();
    let uptime = state.start_time.elapsed();
    
    format!(
        "Сервер работает: {:?}, Всего запросов: {}, Последнее обновление: {:?} назад",
        uptime,
        stats.requests_total,
        stats.last_update.elapsed()
    )
}

// Функция для периодического обновления статистики
async fn periodic_stats_update(stats: Arc<RwLock<ServerStats>>) {
    // Создание интервала 10 секунд
    let mut interval = time::interval(Duration::from_secs(10));
    
    loop {
        // Ожидание следующего тика интервала
        interval.tick().await;
        
        // Обновление статистики
        let mut stats = stats.write().unwrap();
        stats.last_update = Instant::now();
        println!("Статистика обновлена, всего запросов: {}", stats.requests_total);
    }
}

// Инициализация приложения с периодическими задачами
#[tokio::main]
async fn main() {
    // Инициализация состояния
    let stats = Arc::new(RwLock::new(ServerStats {
        requests_total: 0,
        last_update: Instant::now(),
    }));
    
    let app_state = AppState {
        start_time: Instant::now(),
        stats: Arc::clone(&stats),
    };
    
    // Запуск фоновой задачи с интервалом
    tokio::spawn(periodic_stats_update(Arc::clone(&stats)));
    
    // Настройка маршрутов
    let app = Router::new()
        .route("/stats", get(get_stats))
        .with_state(app_state);
    
    // Запуск сервера
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Использование таймаутов

rust
use axum::{
    http::StatusCode,
    routing::get,
    Router,
};
use tokio::time::{timeout, Duration};

// Обработчик с таймаутом
async fn with_timeout() -> Result<String, StatusCode> {
    // Установка таймаута в 1 секунду
    match timeout(Duration::from_secs(1), slow_operation()).await {
        Ok(result) => Ok(result),
        Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
    }
}

// Медленная операция
async fn slow_operation() -> String {
    // Имитация долгой обработки
    tokio::time::sleep(Duration::from_secs(2)).await;
    "Операция завершена".to_string()
}

Работа с файловой системой

Tokio предоставляет асинхронные API для работы с файловой системой:

rust
use axum::{
    extract::{Path, Multipart},
    routing::{get, post},
    http::{header, StatusCode},
    response::{IntoResponse, Response},
    Router,
};
use tokio::{fs, io::AsyncWriteExt};

// Обработчик для загрузки файла
async fn upload_file(mut multipart: Multipart) -> Result<String, StatusCode> {
    // Создание директории, если не существует
    fs::create_dir_all("./uploads").await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    while let Some(field) = multipart.next_field().await
        .map_err(|_| StatusCode::BAD_REQUEST)? {
            
        let file_name = match field.file_name() {
            Some(name) => name.to_string(),
            None => continue,
        };
        
        let data = field.bytes().await
            .map_err(|_| StatusCode::BAD_REQUEST)?;
        
        // Асинхронная запись файла
        let path = format!("./uploads/{}", file_name);
        let mut file = fs::File::create(&path).await
            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
        
        file.write_all(&data).await
            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    }
    
    Ok("Файлы успешно загружены".to_string())
}

// Обработчик для скачивания файла
async fn download_file(Path(filename): Path<String>) -> Result<impl IntoResponse, StatusCode> {
    let path = format!("./uploads/{}", filename);
    
    // Проверка существования файла
    if fs::metadata(&path).await.is_err() {
        return Err(StatusCode::NOT_FOUND);
    }
    
    // Асинхронное чтение файла
    let contents = fs::read(&path).await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Определение MIME-типа (упрощенно)
    let mime_type = if filename.ends_with(".pdf") {
        "application/pdf"
    } else if filename.ends_with(".jpg") || filename.ends_with(".jpeg") {
        "image/jpeg"
    } else if filename.ends_with(".png") {
        "image/png"
    } else {
        "application/octet-stream"
    };
    
    // Формирование ответа
    let response = Response::builder()
        .header(header::CONTENT_TYPE, mime_type)
        .header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}\"", filename))
        .body(contents)
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    Ok(response)
}

Блокирующие операции

Для CPU-интенсивных или блокирующих операций Tokio предоставляет spawn_blocking:

rust
use axum::{
    extract::Json,
    routing::post,
    http::StatusCode,
    Router,
};
use serde::{Deserialize, Serialize};
use argon2::{
    password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
    Argon2,
};

#[derive(Deserialize)]
struct HashRequest {
    password: String,
}

#[derive(Serialize)]
struct HashResponse {
    hash: String,
}

// Обработчик для хеширования пароля
async fn hash_password(
    Json(payload): Json<HashRequest>,
) -> Result<Json<HashResponse>, StatusCode> {
    // Выполнение CPU-интенсивной операции в отдельном потоке
    let hash = tokio::task::spawn_blocking(move || {
        // Генерация случайной соли
        let salt = SaltString::generate(&mut OsRng);
        
        // Хеширование пароля (CPU-интенсивная операция)
        Argon2::default()
            .hash_password(payload.password.as_bytes(), &salt)
            .map(|hash| hash.to_string())
            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
    }).await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    match hash {
        Ok(hash) => Ok(Json(HashResponse { hash })),
        Err(status) => Err(status),
    }
}

// Обработчик с блокирующей операцией ввода-вывода
async fn process_large_file(Path(filename): Path<String>) -> Result<String, StatusCode> {
    // Выполнение блокирующей операции в отдельном потоке
    let result = tokio::task::spawn_blocking(move || {
        // Блокирующее чтение большого файла
        match std::fs::read_to_string(&filename) {
            Ok(content) => {
                // Обработка содержимого файла
                let line_count = content.lines().count();
                Ok(format!("Файл {} содержит {} строк", filename, line_count))
            },
            Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
        }
    }).await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    result
}

Конфигурация рантайма

Можно настроить рантайм Tokio для оптимальной производительности:

rust
use axum::Router;
use tokio::runtime::Builder;
use std::time::Duration;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Ручная настройка рантайма Tokio
    let runtime = Builder::new_multi_thread()
        // Установка числа рабочих потоков
        .worker_threads(8)
        // Настройка интервала для очистки бездействующих потоков
        .thread_keep_alive(Duration::from_secs(60))
        // Установка имени для потоков (полезно для отладки)
        .thread_name("axum-worker")
        // Включение всех возможностей
        .enable_all()
        // Построение рантайма
        .build()?;
    
    // Запуск приложения в настроенном рантайме
    runtime.block_on(async {
        let app = Router::new()
            .route("/", axum::routing::get(|| async { "Привет, мир!" }));
        
        let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
        axum::serve(listener, app).await?;
        
        Ok::<_, Box<dyn std::error::Error>>(())
    })?;
    
    Ok(())
}

Лучшие практики

  1. Избегайте блокирования асинхронного потока

    rust
    // Плохо - блокирует асинхронный контекст
    async fn bad_handler() -> String {
        std::thread::sleep(std::time::Duration::from_secs(1));
        "Завершено".to_string()
    }
    
    // Хорошо - использует асинхронный сон
    async fn good_handler() -> String {
        tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
        "Завершено".to_string()
    }
  2. Используйте spawn_blocking для CPU-интенсивных операций

    rust
    // Изображение, требующее CPU-интенсивной обработки
    async fn process_image(image_data: Vec<u8>) -> Result<Vec<u8>, StatusCode> {
        // Выполнение в отдельном потоке
        tokio::task::spawn_blocking(move || {
            // CPU-интенсивная обработка изображения
            let processed_image = expensive_image_processing(image_data);
            processed_image
        })
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
    }
  3. Правильно используйте каналы

    rust
    // Выбор правильного типа канала для задачи
    
    // MPSC: множество отправителей, один получатель
    // Хорошо для обработки множества сообщений в одном месте
    let (tx, rx) = tokio::sync::mpsc::channel(100);
    
    // oneshot: один отправитель, один получатель, одно сообщение
    // Хорошо для запросов/ответов
    let (tx, rx) = tokio::sync::oneshot::channel();
    
    // broadcast: множество отправителей, множество получателей
    // Хорошо для рассылки событий многим подписчикам
    let (tx, rx) = tokio::sync::broadcast::channel(100);
    
    // watch: один отправитель, множество получателей, с хранением последнего значения
    // Хорошо для распространения обновлений состояния
    let (tx, rx) = tokio::sync::watch::channel(initial_value);
  4. Используйте join! и select! эффективно

    rust
    // join!: выполнение нескольких задач параллельно и ожидание всех результатов
    async fn parallel_tasks() -> (Result<A, Error>, Result<B, Error>) {
        tokio::join!(task_a(), task_b())
    }
    
    // select!: выполнение нескольких задач и выбор первой завершенной
    async fn first_completed() -> Result<(), Error> {
        tokio::select! {
            result_a = task_a() => result_a,
            result_b = task_b() => result_b,
            _ = tokio::time::sleep(Duration::from_secs(5)) => Err(Error::Timeout),
        }
    }
  5. Разделение долгих операций на мелкие части

    rust
    // Разделение большой задачи на части для избежания блокировки потока
    async fn process_large_dataset(dataset: Vec<Data>) -> Result<Stats, Error> {
        let chunk_size = 1000;
        let mut results = Vec::new();
        
        // Обработка по частям
        for chunk in dataset.chunks(chunk_size) {
            // Периодически уступаем контроль другим задачам
            tokio::task::yield_now().await;
            
            // Обработка части данных
            let result = process_chunk(chunk).await?;
            results.push(result);
        }
        
        // Объединение результатов
        Ok(combine_results(results))
    }
  6. Мониторинг и управление ресурсами

    rust
    // Ограничение числа параллельных задач
    use tokio::sync::Semaphore;
    
    async fn limited_parallel_processing(tasks: Vec<Task>) -> Vec<Result<Output, Error>> {
        // Ограничение до 10 параллельных задач
        let semaphore = Arc::new(Semaphore::new(10));
        let mut handles = Vec::new();
        
        for task in tasks {
            let permit = semaphore.clone().acquire_owned().await.unwrap();
            
            let handle = tokio::spawn(async move {
                // При выходе из этой области permit будет освобожден
                let _permit = permit;
                
                // Обработка задачи
                process_task(task).await
            });
            
            handles.push(handle);
        }
        
        // Сбор результатов
        let mut results = Vec::new();
        for handle in handles {
            results.push(handle.await.unwrap());
        }
        
        results
    }
  7. Избегайте нагрузки на сборщик мусора

    rust
    // Повторное использование буферов вместо постоянного создания новых
    use bytes::BytesMut;
    
    async fn efficient_buffer_handling(stream: &mut TcpStream) -> Result<Vec<Message>, Error> {
        let mut buffer = BytesMut::with_capacity(8192);
        let mut messages = Vec::new();
        
        loop {
            // Чтение данных в существующий буфер
            stream.read_buf(&mut buffer).await?;
            
            // Обработка сообщений из буфера
            while let Some(message) = extract_message(&mut buffer)? {
                messages.push(message);
            }
            
            if messages.len() >= 10 || buffer.is_empty() {
                break;
            }
        }
        
        Ok(messages)
    }

Tokio предоставляет мощный набор инструментов для создания высокопроизводительных асинхронных приложений. Правильное использование этих инструментов в контексте Axum позволяет создавать эффективные, масштабируемые веб-серверы, способные обрабатывать большое количество параллельных запросов.