Интеграция с Tokio
Axum построен на основе Tokio - популярной асинхронной рантайм-библиотеки для Rust. В этом разделе рассмотрим, как эффективно использовать возможности Tokio в приложениях на Axum.
Содержание
- Основы Tokio в Axum
- Управление задачами
- Работа с каналами
- Таймеры и интервалы
- Работа с файловой системой
- Блокирующие операции
- Конфигурация рантайма
- Лучшие практики
Основы Tokio в Axum
Tokio предоставляет асинхронный рантайм для выполнения задач, а Axum строится на этой основе для создания веб-серверов. Для использования всех возможностей Tokio в приложении на Axum необходимо сначала добавить соответствующие зависимости:
[dependencies]
axum = "0.7.2"
tokio = { version = "1.32.0", features = ["full"] }
Базовое приложение Axum с Tokio:
use axum::{
routing::get,
Router,
};
// Асинхронный обработчик
async fn hello_world() -> &'static str {
"Привет, мир!"
}
#[tokio::main]
async fn main() {
// Инициализация логирования (опционально)
tracing_subscriber::fmt::init();
// Создание маршрутизатора
let app = Router::new()
.route("/", get(hello_world));
// Привязка к сетевому адресу
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000")
.await
.unwrap();
println!("Сервер запущен на http://localhost:3000");
// Запуск сервера с использованием Tokio
axum::serve(listener, app).await.unwrap();
}
Управление задачами
Tokio позволяет запускать несколько асинхронных задач параллельно:
use axum::{
extract::State,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use tokio::task::JoinHandle;
// Состояние приложения
#[derive(Clone)]
struct AppState {
tasks: Arc<Mutex<Vec<TaskInfo>>>,
}
struct TaskInfo {
id: String,
handle: JoinHandle<Result<String, String>>,
}
#[derive(Deserialize)]
struct StartTaskRequest {
id: String,
duration_secs: u64,
}
#[derive(Serialize)]
struct TaskResponse {
id: String,
status: String,
}
// Обработчик для запуска задачи
async fn start_task(
State(state): State<AppState>,
Json(payload): Json<StartTaskRequest>,
) -> Json<TaskResponse> {
let task_id = payload.id.clone();
// Запуск асинхронной задачи с помощью tokio::spawn
let handle = tokio::spawn(async move {
// Имитация долгой задачи
tokio::time::sleep(tokio::time::Duration::from_secs(payload.duration_secs)).await;
// Результат выполнения
Ok(format!("Задача {} выполнена успешно", task_id))
});
// Сохранение информации о задаче
{
let mut tasks = state.tasks.lock().unwrap();
tasks.push(TaskInfo {
id: task_id.clone(),
handle,
});
}
Json(TaskResponse {
id: task_id,
status: "started".to_string(),
})
}
// Обработчик для проверки статуса задачи
async fn check_task(
State(state): State<AppState>,
Json(payload): Json<TaskIdRequest>,
) -> Json<TaskResponse> {
let task_id = payload.id;
let mut status = "not_found".to_string();
// Проверка статуса задачи
{
let tasks = state.tasks.lock().unwrap();
if let Some(task) = tasks.iter().find(|t| t.id == task_id) {
status = if task.handle.is_finished() {
"completed".to_string()
} else {
"running".to_string()
};
}
}
Json(TaskResponse {
id: task_id,
status,
})
}
// Настройка маршрутов
let app_state = AppState {
tasks: Arc::new(Mutex::new(Vec::new())),
};
let app = Router::new()
.route("/tasks/start", post(start_task))
.route("/tasks/status", post(check_task))
.with_state(app_state);
Работа с каналами
Tokio предоставляет несколько типов каналов для коммуникации между задачами:
use axum::{
extract::State,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, broadcast};
// Структура сообщения
#[derive(Clone)]
struct Message {
topic: String,
content: String,
}
// Состояние приложения с различными типами каналов
#[derive(Clone)]
struct AppState {
// Канал MPSC (множество отправителей, один получатель)
message_sender: mpsc::Sender<Message>,
// Широковещательный канал (множество отправителей, множество получателей)
broadcaster: Arc<broadcast::Sender<Message>>,
}
#[derive(Deserialize)]
struct PublishRequest {
topic: String,
content: String,
}
#[derive(Serialize)]
struct PublishResponse {
status: String,
}
// Обработчик для публикации сообщения
async fn publish_message(
State(state): State<AppState>,
Json(payload): Json<PublishRequest>,
) -> Json<PublishResponse> {
let message = Message {
topic: payload.topic.clone(),
content: payload.content,
};
// Отправка в канал MPSC для обработки
match state.message_sender.send(message.clone()).await {
Ok(_) => {
// Широковещательная рассылка всем подписчикам
let _ = state.broadcaster.send(message);
Json(PublishResponse {
status: "published".to_string(),
})
},
Err(_) => Json(PublishResponse {
status: "failed".to_string(),
}),
}
}
// Функция обработки сообщений в фоновом режиме
async fn message_processor(mut receiver: mpsc::Receiver<Message>) {
while let Some(message) = receiver.recv().await {
println!(
"Обработка сообщения: тема={}, содержание={}",
message.topic, message.content
);
// Имитация обработки
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
}
// Настройка приложения с каналами
#[tokio::main]
async fn main() {
// Создание канала MPSC
let (tx, rx) = mpsc::channel::<Message>(100);
// Создание широковещательного канала
let (broadcast_tx, _) = broadcast::channel::<Message>(100);
let broadcast_tx = Arc::new(broadcast_tx);
// Запуск обработчика сообщений в фоновом режиме
tokio::spawn(message_processor(rx));
// Создание состояния приложения
let app_state = AppState {
message_sender: tx,
broadcaster: broadcast_tx,
};
// Настройка маршрутов
let app = Router::new()
.route("/publish", post(publish_message))
.with_state(app_state);
// Запуск сервера
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Использование oneshot каналов для запросов/ответов
use tokio::sync::oneshot;
// Структура запроса с каналом для ответа
struct Request {
query: String,
response_channel: oneshot::Sender<String>,
}
// Обработчик с использованием oneshot каналов
async fn query_handler(
State(state): State<AppState>,
Json(payload): Json<QueryRequest>,
) -> Json<QueryResponse> {
// Создание канала для ответа
let (tx, rx) = oneshot::channel();
// Формирование запроса с каналом для ответа
let request = Request {
query: payload.query.clone(),
response_channel: tx,
};
// Отправка запроса обработчику
if let Err(_) = state.request_sender.send(request).await {
return Json(QueryResponse {
status: "error".to_string(),
result: None,
});
}
// Ожидание ответа
match rx.await {
Ok(result) => Json(QueryResponse {
status: "success".to_string(),
result: Some(result),
}),
Err(_) => Json(QueryResponse {
status: "error".to_string(),
result: None,
}),
}
}
// Фоновый обработчик запросов
async fn request_processor(mut receiver: mpsc::Receiver<Request>) {
while let Some(request) = receiver.recv().await {
// Обработка запроса
let result = process_query(&request.query).await;
// Отправка результата через канал ответа
let _ = request.response_channel.send(result);
}
}
Таймеры и интервалы
Tokio предоставляет инструменты для работы с таймерами и интервалами:
use axum::{
extract::State,
routing::get,
Router,
};
use std::sync::{Arc, RwLock};
use tokio::time::{self, Duration, Instant};
// Состояние приложения
#[derive(Clone)]
struct AppState {
start_time: Instant,
stats: Arc<RwLock<ServerStats>>,
}
struct ServerStats {
requests_total: usize,
last_update: Instant,
}
// Обработчик для получения статистики
async fn get_stats(State(state): State<AppState>) -> String {
let stats = state.stats.read().unwrap();
let uptime = state.start_time.elapsed();
format!(
"Сервер работает: {:?}, Всего запросов: {}, Последнее обновление: {:?} назад",
uptime,
stats.requests_total,
stats.last_update.elapsed()
)
}
// Функция для периодического обновления статистики
async fn periodic_stats_update(stats: Arc<RwLock<ServerStats>>) {
// Создание интервала 10 секунд
let mut interval = time::interval(Duration::from_secs(10));
loop {
// Ожидание следующего тика интервала
interval.tick().await;
// Обновление статистики
let mut stats = stats.write().unwrap();
stats.last_update = Instant::now();
println!("Статистика обновлена, всего запросов: {}", stats.requests_total);
}
}
// Инициализация приложения с периодическими задачами
#[tokio::main]
async fn main() {
// Инициализация состояния
let stats = Arc::new(RwLock::new(ServerStats {
requests_total: 0,
last_update: Instant::now(),
}));
let app_state = AppState {
start_time: Instant::now(),
stats: Arc::clone(&stats),
};
// Запуск фоновой задачи с интервалом
tokio::spawn(periodic_stats_update(Arc::clone(&stats)));
// Настройка маршрутов
let app = Router::new()
.route("/stats", get(get_stats))
.with_state(app_state);
// Запуск сервера
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
Использование таймаутов
use axum::{
http::StatusCode,
routing::get,
Router,
};
use tokio::time::{timeout, Duration};
// Обработчик с таймаутом
async fn with_timeout() -> Result<String, StatusCode> {
// Установка таймаута в 1 секунду
match timeout(Duration::from_secs(1), slow_operation()).await {
Ok(result) => Ok(result),
Err(_) => Err(StatusCode::REQUEST_TIMEOUT),
}
}
// Медленная операция
async fn slow_operation() -> String {
// Имитация долгой обработки
tokio::time::sleep(Duration::from_secs(2)).await;
"Операция завершена".to_string()
}
Работа с файловой системой
Tokio предоставляет асинхронные API для работы с файловой системой:
use axum::{
extract::{Path, Multipart},
routing::{get, post},
http::{header, StatusCode},
response::{IntoResponse, Response},
Router,
};
use tokio::{fs, io::AsyncWriteExt};
// Обработчик для загрузки файла
async fn upload_file(mut multipart: Multipart) -> Result<String, StatusCode> {
// Создание директории, если не существует
fs::create_dir_all("./uploads").await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
while let Some(field) = multipart.next_field().await
.map_err(|_| StatusCode::BAD_REQUEST)? {
let file_name = match field.file_name() {
Some(name) => name.to_string(),
None => continue,
};
let data = field.bytes().await
.map_err(|_| StatusCode::BAD_REQUEST)?;
// Асинхронная запись файла
let path = format!("./uploads/{}", file_name);
let mut file = fs::File::create(&path).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
file.write_all(&data).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
}
Ok("Файлы успешно загружены".to_string())
}
// Обработчик для скачивания файла
async fn download_file(Path(filename): Path<String>) -> Result<impl IntoResponse, StatusCode> {
let path = format!("./uploads/{}", filename);
// Проверка существования файла
if fs::metadata(&path).await.is_err() {
return Err(StatusCode::NOT_FOUND);
}
// Асинхронное чтение файла
let contents = fs::read(&path).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Определение MIME-типа (упрощенно)
let mime_type = if filename.ends_with(".pdf") {
"application/pdf"
} else if filename.ends_with(".jpg") || filename.ends_with(".jpeg") {
"image/jpeg"
} else if filename.ends_with(".png") {
"image/png"
} else {
"application/octet-stream"
};
// Формирование ответа
let response = Response::builder()
.header(header::CONTENT_TYPE, mime_type)
.header(header::CONTENT_DISPOSITION, format!("attachment; filename=\"{}\"", filename))
.body(contents)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(response)
}
Блокирующие операции
Для CPU-интенсивных или блокирующих операций Tokio предоставляет spawn_blocking
:
use axum::{
extract::Json,
routing::post,
http::StatusCode,
Router,
};
use serde::{Deserialize, Serialize};
use argon2::{
password_hash::{rand_core::OsRng, PasswordHasher, SaltString},
Argon2,
};
#[derive(Deserialize)]
struct HashRequest {
password: String,
}
#[derive(Serialize)]
struct HashResponse {
hash: String,
}
// Обработчик для хеширования пароля
async fn hash_password(
Json(payload): Json<HashRequest>,
) -> Result<Json<HashResponse>, StatusCode> {
// Выполнение CPU-интенсивной операции в отдельном потоке
let hash = tokio::task::spawn_blocking(move || {
// Генерация случайной соли
let salt = SaltString::generate(&mut OsRng);
// Хеширование пароля (CPU-интенсивная операция)
Argon2::default()
.hash_password(payload.password.as_bytes(), &salt)
.map(|hash| hash.to_string())
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match hash {
Ok(hash) => Ok(Json(HashResponse { hash })),
Err(status) => Err(status),
}
}
// Обработчик с блокирующей операцией ввода-вывода
async fn process_large_file(Path(filename): Path<String>) -> Result<String, StatusCode> {
// Выполнение блокирующей операции в отдельном потоке
let result = tokio::task::spawn_blocking(move || {
// Блокирующее чтение большого файла
match std::fs::read_to_string(&filename) {
Ok(content) => {
// Обработка содержимого файла
let line_count = content.lines().count();
Ok(format!("Файл {} содержит {} строк", filename, line_count))
},
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
result
}
Конфигурация рантайма
Можно настроить рантайм Tokio для оптимальной производительности:
use axum::Router;
use tokio::runtime::Builder;
use std::time::Duration;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Ручная настройка рантайма Tokio
let runtime = Builder::new_multi_thread()
// Установка числа рабочих потоков
.worker_threads(8)
// Настройка интервала для очистки бездействующих потоков
.thread_keep_alive(Duration::from_secs(60))
// Установка имени для потоков (полезно для отладки)
.thread_name("axum-worker")
// Включение всех возможностей
.enable_all()
// Построение рантайма
.build()?;
// Запуск приложения в настроенном рантайме
runtime.block_on(async {
let app = Router::new()
.route("/", axum::routing::get(|| async { "Привет, мир!" }));
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
Ok::<_, Box<dyn std::error::Error>>(())
})?;
Ok(())
}
Лучшие практики
Избегайте блокирования асинхронного потока
rust// Плохо - блокирует асинхронный контекст async fn bad_handler() -> String { std::thread::sleep(std::time::Duration::from_secs(1)); "Завершено".to_string() } // Хорошо - использует асинхронный сон async fn good_handler() -> String { tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; "Завершено".to_string() }
Используйте
spawn_blocking
для CPU-интенсивных операцийrust// Изображение, требующее CPU-интенсивной обработки async fn process_image(image_data: Vec<u8>) -> Result<Vec<u8>, StatusCode> { // Выполнение в отдельном потоке tokio::task::spawn_blocking(move || { // CPU-интенсивная обработка изображения let processed_image = expensive_image_processing(image_data); processed_image }) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) }
Правильно используйте каналы
rust// Выбор правильного типа канала для задачи // MPSC: множество отправителей, один получатель // Хорошо для обработки множества сообщений в одном месте let (tx, rx) = tokio::sync::mpsc::channel(100); // oneshot: один отправитель, один получатель, одно сообщение // Хорошо для запросов/ответов let (tx, rx) = tokio::sync::oneshot::channel(); // broadcast: множество отправителей, множество получателей // Хорошо для рассылки событий многим подписчикам let (tx, rx) = tokio::sync::broadcast::channel(100); // watch: один отправитель, множество получателей, с хранением последнего значения // Хорошо для распространения обновлений состояния let (tx, rx) = tokio::sync::watch::channel(initial_value);
Используйте
join!
иselect!
эффективноrust// join!: выполнение нескольких задач параллельно и ожидание всех результатов async fn parallel_tasks() -> (Result<A, Error>, Result<B, Error>) { tokio::join!(task_a(), task_b()) } // select!: выполнение нескольких задач и выбор первой завершенной async fn first_completed() -> Result<(), Error> { tokio::select! { result_a = task_a() => result_a, result_b = task_b() => result_b, _ = tokio::time::sleep(Duration::from_secs(5)) => Err(Error::Timeout), } }
Разделение долгих операций на мелкие части
rust// Разделение большой задачи на части для избежания блокировки потока async fn process_large_dataset(dataset: Vec<Data>) -> Result<Stats, Error> { let chunk_size = 1000; let mut results = Vec::new(); // Обработка по частям for chunk in dataset.chunks(chunk_size) { // Периодически уступаем контроль другим задачам tokio::task::yield_now().await; // Обработка части данных let result = process_chunk(chunk).await?; results.push(result); } // Объединение результатов Ok(combine_results(results)) }
Мониторинг и управление ресурсами
rust// Ограничение числа параллельных задач use tokio::sync::Semaphore; async fn limited_parallel_processing(tasks: Vec<Task>) -> Vec<Result<Output, Error>> { // Ограничение до 10 параллельных задач let semaphore = Arc::new(Semaphore::new(10)); let mut handles = Vec::new(); for task in tasks { let permit = semaphore.clone().acquire_owned().await.unwrap(); let handle = tokio::spawn(async move { // При выходе из этой области permit будет освобожден let _permit = permit; // Обработка задачи process_task(task).await }); handles.push(handle); } // Сбор результатов let mut results = Vec::new(); for handle in handles { results.push(handle.await.unwrap()); } results }
Избегайте нагрузки на сборщик мусора
rust// Повторное использование буферов вместо постоянного создания новых use bytes::BytesMut; async fn efficient_buffer_handling(stream: &mut TcpStream) -> Result<Vec<Message>, Error> { let mut buffer = BytesMut::with_capacity(8192); let mut messages = Vec::new(); loop { // Чтение данных в существующий буфер stream.read_buf(&mut buffer).await?; // Обработка сообщений из буфера while let Some(message) = extract_message(&mut buffer)? { messages.push(message); } if messages.len() >= 10 || buffer.is_empty() { break; } } Ok(messages) }
Tokio предоставляет мощный набор инструментов для создания высокопроизводительных асинхронных приложений. Правильное использование этих инструментов в контексте Axum позволяет создавать эффективные, масштабируемые веб-серверы, способные обрабатывать большое количество параллельных запросов.