Состояние приложения
Управление состоянием в веб-приложениях — критически важная задача, особенно в контексте многопоточных и асинхронных платформ. Axum предоставляет мощные и типобезопасные механизмы для хранения и доступа к состоянию приложения. В этом разделе мы рассмотрим, как эффективно организовать и использовать состояние в Axum.
Основные концепции
В Axum "состояние приложения" относится к данным, которые:
- Создаются при запуске приложения
- Доступны для всех обработчиков
- Живут на протяжении всего времени работы приложения
- Могут быть безопасно разделены между несколькими параллельными запросами
Типичные примеры состояния:
- Соединения с базами данных
- Конфигурация приложения
- Общие кэши
- HTTP-клиенты
- Очереди сообщений
Использование состояния в Axum
Определение типа состояния
Для начала определим тип, представляющий состояние нашего приложения:
use std::sync::Arc;
use sqlx::PgPool;
use reqwest::Client;
// Определение структуры состояния
#[derive(Clone)]
struct AppState {
db: PgPool,
http_client: Client,
api_key: String,
cache: Arc<SomeCache>,
}
Добавление состояния к роутеру
Состояние добавляется к роутеру с помощью метода .with_state()
:
use axum::{Router, routing::get};
async fn main() {
// Создание состояния
let state = AppState {
db: create_db_pool().await,
http_client: Client::new(),
api_key: std::env::var("API_KEY").unwrap_or_default(),
cache: Arc::new(SomeCache::new()),
};
// Создание роутера с состоянием
let app = Router::new()
.route("/users", get(list_users))
.route("/users/:id", get(get_user))
.with_state(state);
}
Доступ к состоянию в обработчиках
Для доступа к состоянию в обработчиках используется экстрактор State
:
use axum::{extract::State, Json};
use serde::Serialize;
#[derive(Serialize)]
struct User {
id: i64,
name: String,
email: String,
}
async fn list_users(
State(state): State<AppState>,
) -> Json<Vec<User>> {
// Используем соединение с БД из состояния
let users = sqlx::query_as!(
User,
"SELECT id, name, email FROM users LIMIT 100"
)
.fetch_all(&state.db)
.await
.unwrap_or_default();
Json(users)
}
async fn get_user(
State(state): State<AppState>,
Path(user_id): Path<i64>,
) -> Result<Json<User>, StatusCode> {
let user = sqlx::query_as!(
User,
"SELECT id, name, email FROM users WHERE id = $1",
user_id
)
.fetch_optional(&state.db)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
user
.map(Json)
.ok_or(StatusCode::NOT_FOUND)
}
Частичное извлечение состояния
Если вам нужен доступ только к части состояния, можно экстрагировать отдельные поля с помощью деструктуризации:
async fn fetch_external_data(
// Извлекаем только http_client и api_key
State(AppState { http_client, api_key, .. }): State<AppState>,
) -> impl IntoResponse {
let response = http_client
.get("https://api.example.com/data")
.header("Authorization", format!("Bearer {}", api_key))
.send()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// ...
}
Шаблоны управления состоянием
Создание состояния
Часто бывает полезно вынести создание состояния в отдельную функцию:
async fn create_app_state() -> AppState {
// Загрузка конфигурации
let config = load_config().expect("Failed to load config");
// Создание пула соединений с БД
let db_pool = PgPoolOptions::new()
.max_connections(config.db.max_connections)
.connect(&config.db.url)
.await
.expect("Failed to connect to database");
// Создание HTTP-клиента
let http_client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("Failed to create HTTP client");
// Создание кэша
let cache = Arc::new(SomeCache::new());
AppState {
db: db_pool,
http_client,
api_key: config.api_key,
cache,
}
}
async fn main() {
let app = Router::new()
.route("/", get(handler))
.with_state(create_app_state().await);
}
Разделение состояния для разных маршрутов
Иногда разным группам маршрутов требуются разные части состояния:
// Состояние для публичных маршрутов
#[derive(Clone)]
struct PublicState {
db: PgPool,
config: Arc<Config>,
}
// Состояние для административных маршрутов
#[derive(Clone)]
struct AdminState {
db: PgPool,
config: Arc<Config>,
admin_tools: AdminTools,
}
let config = Arc::new(load_config());
let db_pool = create_db_pool().await;
// Маршруты для публичного API
let public_api = Router::new()
.route("/users", get(list_users))
.with_state(PublicState {
db: db_pool.clone(),
config: config.clone(),
});
// Маршруты для администраторов
let admin_api = Router::new()
.route("/stats", get(get_stats))
.route("/logs", get(get_logs))
.with_state(AdminState {
db: db_pool.clone(),
config: config.clone(),
admin_tools: AdminTools::new(),
});
// Объединение всех маршрутов
let app = Router::new()
.nest("/api/v1", public_api)
.nest("/admin", admin_api);
Конкурентный доступ и синхронизация
Thread Safety
Axum запускает обработчики конкурентно, поэтому состояние должно быть потокобезопасным. Обычно это достигается с помощью Arc
(Atomic Reference Counting) в сочетании с синхронизационными примитивами, где необходимо:
use std::sync::{Arc, Mutex, RwLock};
// Состояние только для чтения
#[derive(Clone)]
struct ReadOnlyState {
config: Arc<Config>, // Immutable, не требует дополнительной синхронизации
}
// Состояние с изменяемыми данными
#[derive(Clone)]
struct MutableState {
cache: Arc<Mutex<HashMap<String, CachedValue>>>, // Требуется синхронизация для изменения
stats: Arc<RwLock<Stats>>, // Поддержка множества читателей или одного писателя
}
Выбор правильного синхронизационного примитива
- Arc — для неизменяемых данных или данных с внутренней синхронизацией
- Mutex — когда требуется эксклюзивный доступ к данным
- RwLock — когда чтение происходит чаще, чем запись
- Atomic типы — для простых числовых счетчиков и флагов
- Channels — для обмена данными между асинхронными задачами
Пример со статистикой запросов:
use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
#[derive(Clone, Default)]
struct AppStats {
total_requests: Arc<AtomicUsize>,
successful_requests: Arc<AtomicUsize>,
failed_requests: Arc<AtomicUsize>,
}
impl AppStats {
fn increment_total(&self) {
self.total_requests.fetch_add(1, Ordering::SeqCst);
}
fn increment_successful(&self) {
self.successful_requests.fetch_add(1, Ordering::SeqCst);
}
fn increment_failed(&self) {
self.failed_requests.fetch_add(1, Ordering::SeqCst);
}
fn get_stats(&self) -> (usize, usize, usize) {
(
self.total_requests.load(Ordering::SeqCst),
self.successful_requests.load(Ordering::SeqCst),
self.failed_requests.load(Ordering::SeqCst),
)
}
}
// Middleware для подсчета статистики
async fn stats_middleware<B>(
State(stats): State<AppStats>,
request: Request<B>,
next: Next<B>,
) -> Response {
stats.increment_total();
let response = next.run(request).await;
if response.status().is_success() {
stats.increment_successful();
} else {
stats.increment_failed();
}
response
}
Асинхронное состояние
Tokio для асинхронной синхронизации
Для сценариев, требующих асинхронной синхронизации, можно использовать примитивы из Tokio:
use tokio::sync::{Mutex, RwLock, Semaphore};
#[derive(Clone)]
struct AsyncState {
// Мьютекс, который не блокирует поток
cache: Arc<Mutex<HashMap<String, String>>>,
// Блокировка чтения-записи, которая работает асинхронно
config: Arc<RwLock<Config>>,
// Семафор для ограничения одновременных операций
rate_limiter: Arc<Semaphore>,
}
async fn update_cache(
State(state): State<AsyncState>,
Json(payload): Json<CacheUpdate>,
) -> impl IntoResponse {
// Асинхронное получение блокировки
let mut cache = state.cache.lock().await;
// Обновление кэша
cache.insert(payload.key, payload.value);
StatusCode::OK
}
async fn limited_operation(
State(state): State<AsyncState>,
) -> impl IntoResponse {
// Получаем разрешение семафора (до 10 одновременных операций)
let _permit = state.rate_limiter.acquire().await.unwrap();
// Выполняем ограниченную операцию
// ...
// Разрешение автоматически освобождается при выходе из области видимости
"Operation completed"
}
Обработка инициализации с ошибками
Иногда инициализация состояния может завершиться ошибкой. Можно использовать Result
или Option
с Arc
для безопасной обработки:
#[derive(Clone)]
struct AppState {
db: Arc<Option<PgPool>>,
}
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let db_result = PgPool::connect(&std::env::var("DATABASE_URL")?).await;
let state = AppState {
db: Arc::new(db_result.ok()),
};
let app = Router::new()
.route("/", get(handler))
.with_state(state);
// ...
Ok(())
}
async fn handler(
State(state): State<AppState>,
) -> impl IntoResponse {
match state.db.as_ref() {
Some(db) => {
// Используем соединение с БД
// ...
"Connected to database"
}
None => {
// Обработка случая отсутствия соединения
(StatusCode::SERVICE_UNAVAILABLE, "Database not available")
}
}
}
Лучшие практики
Модульное состояние
Для больших приложений разделение состояния на модульные компоненты может улучшить управляемость:
#[derive(Clone)]
struct AppState {
db: DatabaseState,
auth: AuthState,
cache: CacheState,
metrics: MetricsState,
}
#[derive(Clone)]
struct DatabaseState {
pool: PgPool,
migrations: Arc<MigrationManager>,
}
#[derive(Clone)]
struct AuthState {
jwt_encoder: Arc<JwtEncoder>,
jwt_decoder: Arc<JwtDecoder>,
auth_cache: Arc<Mutex<HashMap<String, AuthInfo>>>,
}
// ... и так далее
Типобезопасная конфигурация
Используйте строгую типизацию для конфигурации, которая является частью состояния:
use serde::Deserialize;
use std::path::PathBuf;
use std::time::Duration;
#[derive(Clone, Deserialize)]
struct Config {
server: ServerConfig,
database: DatabaseConfig,
auth: AuthConfig,
cache: CacheConfig,
}
#[derive(Clone, Deserialize)]
struct ServerConfig {
host: String,
port: u16,
timeout: Duration,
log_level: String,
}
#[derive(Clone, Deserialize)]
struct DatabaseConfig {
url: String,
max_connections: u32,
timeout: Duration,
}
// ... и так далее
Тестирование с использованием состояния
Состояние делает тестирование проще, позволяя подменять реальные зависимости на макеты:
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::{Request, StatusCode};
use tower::ServiceExt;
#[tokio::test]
async fn test_list_users() {
// Создаем тестовое состояние с мок базой данных
let state = AppState {
db: create_test_db().await,
// ... остальные поля
};
// Создаем тестовый роутер
let app = Router::new()
.route("/users", get(list_users))
.with_state(state);
// Создаем тестовый запрос
let request = Request::builder()
.uri("/users")
.body(Body::empty())
.unwrap();
// Выполняем запрос
let response = app
.oneshot(request)
.await
.unwrap();
// Проверяем результат
assert_eq!(response.status(), StatusCode::OK);
}
}
Заключение
Управление состоянием — важный аспект разработки веб-приложений на Axum. Используя типобезопасный подход и правильные методы синхронизации, вы можете создать надежное и производительное приложение, которое эффективно обрабатывает параллельные запросы.
Ключевые советы:
- Используйте
#[derive(Clone)]
для типов состояния - Применяйте соответствующие примитивы синхронизации для изменяемых данных
- Разделяйте состояние на логические компоненты
- Эффективно используйте асинхронную синхронизацию, где это уместно
- Создавайте тестируемые абстракции, которые можно легко подменить при тестировании
Следуя этим рекомендациям, вы создадите масштабируемое приложение с чистой архитектурой и предсказуемым поведением.