Skip to content

Интеграция с Redis в Axum

Redis - это высокопроизводительное хранилище данных типа "ключ-значение", которое часто используется для кеширования, брокеров сообщений, сессий и других задач, требующих быстрого доступа к данным. В этом разделе рассмотрим, как интегрировать Redis с веб-приложениями на Axum.

Основные библиотеки

Для работы с Redis в Axum вам понадобятся следующие зависимости:

toml
[dependencies]
axum = "0.7.2"
tokio = { version = "1.32.0", features = ["full"] }
redis = { version = "0.23.0", features = ["tokio-comp", "connection-manager"] }
bb8-redis = "0.13.0"  # Опционально: для использования пула соединений

Простое подключение к Redis

Вот простой пример подключения к Redis и его использования в Axum:

rust
use axum::{
    extract::State,
    routing::get,
    Router, http::StatusCode, Json,
};
use redis::AsyncCommands;
use std::sync::Arc;
use serde::{Deserialize, Serialize};

// Клиент Redis
struct AppState {
    redis: redis::Client,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Создаем клиент Redis
    let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
    
    // Проверяем соединение
    let mut con = redis_client.get_async_connection().await?;
    redis::cmd("PING").query_async(&mut con).await?;
    println!("Подключено к Redis!");
    
    // Создаем состояние приложения
    let state = Arc::new(AppState {
        redis: redis_client,
    });
    
    // Создаем маршруты
    let app = Router::new()
        .route("/set-value", get(set_value))
        .route("/get-value/:key", get(get_value))
        .with_state(state);
    
    // Запускаем сервер
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    println!("Сервер запущен по адресу: http://localhost:3000");
    axum::serve(listener, app).await?;
    
    Ok(())
}

async fn set_value(
    State(state): State<Arc<AppState>>,
) -> Result<Json<String>, StatusCode> {
    let mut con = state.redis
        .get_async_connection()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Устанавливаем значение
    con.set::<_, _, ()>("test_key", "test_value")
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    Ok(Json("Значение установлено".to_string()))
}

async fn get_value(
    State(state): State<Arc<AppState>>,
    axum::extract::Path(key): axum::extract::Path<String>,
) -> Result<Json<String>, StatusCode> {
    let mut con = state.redis
        .get_async_connection()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Получаем значение
    let value: String = con.get(&key)
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    Ok(Json(value))
}

Использование пула соединений

Для продакшена рекомендуется использовать пул соединений, чтобы эффективно управлять подключениями к Redis:

rust
use axum::{
    extract::State,
    routing::get,
    Router, http::StatusCode, Json,
};
use bb8_redis::{
    bb8::Pool,
    RedisConnectionManager,
};
use redis::AsyncCommands;
use std::sync::Arc;

// Определение типа пула соединений
type RedisPool = Pool<RedisConnectionManager>;

struct AppState {
    redis_pool: RedisPool,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Создаем менеджер соединений
    let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
    
    // Создаем пул с настраиваемыми параметрами
    let pool = Pool::builder()
        .max_size(15)  // Максимальное количество соединений
        .build(manager)
        .await?;
    
    // Проверяем соединение
    let mut conn = pool.get().await?;
    redis::cmd("PING").query_async(&mut *conn).await?;
    println!("Подключено к Redis!");
    
    // Создаем состояние приложения
    let state = Arc::new(AppState {
        redis_pool: pool,
    });
    
    // Создаем маршруты
    let app = Router::new()
        .route("/cache", get(get_cached_data).post(set_cached_data))
        .with_state(state);
    
    // Запускаем сервер
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    println!("Сервер запущен по адресу: http://localhost:3000");
    axum::serve(listener, app).await?;
    
    Ok(())
}

async fn get_cached_data(
    State(state): State<Arc<AppState>>,
    axum::extract::Path(key): axum::extract::Path<String>,
) -> Result<Json<String>, StatusCode> {
    let mut conn = state.redis_pool
        .get()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Получаем кешированные данные
    let data: Option<String> = conn
        .get(&key)
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    match data {
        Some(value) => Ok(Json(value)),
        None => Err(StatusCode::NOT_FOUND),
    }
}

async fn set_cached_data(
    State(state): State<Arc<AppState>>,
    axum::extract::Path(key): axum::extract::Path<String>,
    Json(value): Json<String>,
) -> Result<StatusCode, StatusCode> {
    let mut conn = state.redis_pool
        .get()
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    // Устанавливаем данные в кеш с TTL в 300 секунд
    let _: () = redis::cmd("SET")
        .arg(&[&key, &value, "EX", "300"])
        .query_async(&mut *conn)
        .await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    Ok(StatusCode::OK)
}

Практические примеры использования Redis

Middleware для кеширования ответов API

rust
use axum::{
    extract::{Request, State},
    middleware::{self, Next},
    response::Response,
    routing::get,
    Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use std::{sync::Arc, time::Duration};

type RedisPool = Pool<RedisConnectionManager>;

struct AppState {
    redis_pool: RedisPool,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Инициализация пула Redis...
    let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
    let pool = Pool::builder().max_size(15).build(manager).await?;
    
    let state = Arc::new(AppState { redis_pool: pool });
    
    let app = Router::new()
        .route("/api/data", get(get_data))
        .layer(middleware::from_fn_with_state(
            state.clone(),
            cache_middleware,
        ))
        .with_state(state);
    
    // Запуск сервера...
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

// Middleware для кеширования
async fn cache_middleware(
    State(state): State<Arc<AppState>>,
    request: Request,
    next: Next,
) -> Response {
    // Создаем ключ на основе пути запроса
    let cache_key = format!("cache:{}", request.uri().path());
    
    // Пытаемся получить данные из кеша
    let mut conn = match state.redis_pool.get().await {
        Ok(conn) => conn,
        Err(_) => return next.run(request).await,
    };
    
    if let Ok(Some(cached_data)) = conn.get::<_, Option<Vec<u8>>>(&cache_key).await {
        // Если данные найдены в кеше, возвращаем их
        let headers = [(axum::http::header::CONTENT_TYPE, "application/json")];
        return Response::builder()
            .status(200)
            .header("X-Cache", "HIT")
            .headers(headers)
            .body(axum::body::Body::from(cached_data))
            .unwrap();
    }
    
    // Если данных в кеше нет, выполняем запрос
    let response = next.run(request).await;
    
    // Кешируем ответ, если он успешный
    if response.status().is_success() {
        if let Ok(bytes) = hyper::body::to_bytes(response.into_body()).await {
            let _ = conn
                .set_ex::<_, _, ()>(&cache_key, bytes.to_vec(), 60)
                .await; // Кешируем на 60 секунд
            
            let headers = [(axum::http::header::CONTENT_TYPE, "application/json")];
            return Response::builder()
                .status(200)
                .header("X-Cache", "MISS")
                .headers(headers)
                .body(axum::body::Body::from(bytes))
                .unwrap();
        }
    }
    
    response
}

// Обработчик, возвращающий данные
async fn get_data() -> axum::Json<serde_json::Value> {
    // Имитация дорогостоящей операции
    tokio::time::sleep(Duration::from_millis(500)).await;
    
    axum::Json(serde_json::json!({
        "data": "Это дорогостоящие данные, которые стоит кешировать",
        "timestamp": chrono::Utc::now().to_rfc3339()
    }))
}

Ограничение скорости запросов (Rate Limiting)

rust
use axum::{
    extract::{Request, State},
    http::StatusCode,
    middleware::{self, Next},
    response::Response,
    routing::get,
    Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use std::sync::Arc;

type RedisPool = Pool<RedisConnectionManager>;

struct AppState {
    redis_pool: RedisPool,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Инициализация пула Redis...
    let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
    let pool = Pool::builder().max_size(15).build(manager).await?;
    
    let state = Arc::new(AppState { redis_pool: pool });
    
    let app = Router::new()
        .route("/api/limited", get(limited_endpoint))
        .layer(middleware::from_fn_with_state(
            state.clone(),
            rate_limit_middleware,
        ))
        .with_state(state);
    
    // Запуск сервера...
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

// Middleware для ограничения скорости запросов
async fn rate_limit_middleware(
    State(state): State<Arc<AppState>>,
    request: Request,
    next: Next,
) -> Response {
    // Получаем IP-адрес пользователя (в реальном приложении учитывайте прокси)
    let ip = request
        .headers()
        .get("x-forwarded-for")
        .and_then(|h| h.to_str().ok())
        .unwrap_or("unknown")
        .to_string();
    
    let rate_key = format!("ratelimit:{}", ip);
    
    let mut conn = match state.redis_pool.get().await {
        Ok(conn) => conn,
        Err(_) => return next.run(request).await,
    };
    
    // Используем Lua-скрипт для атомарного увеличения счетчика и проверки лимита
    let script = redis::Script::new(r"
        local key = KEYS[1]
        local limit = tonumber(ARGV[1])
        local window = tonumber(ARGV[2])
        
        local current = redis.call('INCR', key)
        if current == 1 then
            redis.call('EXPIRE', key, window)
        end
        
        if current > limit then
            return 0
        end
        
        return current
    ");
    
    // Устанавливаем лимит: 10 запросов в течение 60 секунд
    let result: Option<i64> = script
        .key(&rate_key)
        .arg(10)  // лимит
        .arg(60)  // окно в секундах
        .invoke_async(&mut *conn)
        .await
        .unwrap_or(None);
    
    match result {
        Some(0) => {
            // Превышен лимит
            let retry_after = conn
                .ttl::<_, i64>(&rate_key)
                .await
                .unwrap_or(60);
            
            Response::builder()
                .status(StatusCode::TOO_MANY_REQUESTS)
                .header("Retry-After", retry_after.to_string())
                .body(axum::body::Body::from(
                    "Слишком много запросов. Попробуйте позже."
                ))
                .unwrap()
        }
        _ => {
            // Запрос в пределах лимита
            next.run(request).await
        }
    }
}

// Защищенный конечный пункт API
async fn limited_endpoint() -> &'static str {
    "Это защищенный конечный пункт с ограничением скорости запросов!"
}

Управление сессиями

rust
use axum::{
    extract::{Request, State},
    middleware::{self, Next},
    response::{IntoResponse, Response},
    routing::{get, post},
    Json, Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use uuid::Uuid;

type RedisPool = Pool<RedisConnectionManager>;

struct AppState {
    redis_pool: RedisPool,
}

#[derive(Serialize, Deserialize)]
struct LoginRequest {
    username: String,
    password: String,
}

#[derive(Serialize, Deserialize)]
struct SessionData {
    user_id: String,
    username: String,
    expires_at: i64,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Инициализация пула Redis...
    let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
    let pool = Pool::builder().max_size(15).build(manager).await?;
    
    let state = Arc::new(AppState { redis_pool: pool });
    
    let app = Router::new()
        .route("/api/login", post(login))
        .route(
            "/api/protected", 
            get(protected_route)
                .layer(middleware::from_fn_with_state(
                    state.clone(),
                    auth_middleware,
                ))
        )
        .route("/api/logout", post(logout))
        .with_state(state);
    
    // Запуск сервера...
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
    axum::serve(listener, app).await?;
    
    Ok(())
}

// Обработчик для входа в систему
async fn login(
    State(state): State<Arc<AppState>>,
    Json(credentials): Json<LoginRequest>,
) -> impl IntoResponse {
    // В реальном приложении здесь должна быть проверка учетных данных
    // Для примера мы просто проверяем, что пароль не пустой
    if credentials.password.is_empty() {
        return (axum::http::StatusCode::UNAUTHORIZED, "Неверные учетные данные").into_response();
    }
    
    // Создаем данные сессии
    let session_id = Uuid::new_v4().to_string();
    let expires_at = chrono::Utc::now()
        .checked_add_signed(chrono::Duration::hours(24))
        .unwrap()
        .timestamp();
    
    let session_data = SessionData {
        user_id: format!("user_{}", credentials.username),
        username: credentials.username,
        expires_at,
    };
    
    // Сохраняем сессию в Redis
    let mut conn = match state.redis_pool.get().await {
        Ok(conn) => conn,
        Err(_) => return (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Ошибка сервера").into_response(),
    };
    
    let session_json = serde_json::to_string(&session_data).unwrap();
    let _: () = conn
        .set_ex::<_, _, ()>(
            format!("session:{}", session_id),
            session_json,
            60 * 60 * 24, // 24 часа (в секундах)
        )
        .await
        .unwrap_or(());
    
    // Создаем куки с идентификатором сессии
    let cookie = format!(
        "session_id={}; Path=/; HttpOnly; Max-Age=86400; SameSite=Strict",
        session_id
    );
    
    // Возвращаем успешный ответ с куки
    Response::builder()
        .status(axum::http::StatusCode::OK)
        .header(axum::http::header::SET_COOKIE, cookie)
        .body(axum::body::Body::from("Вход выполнен успешно"))
        .unwrap()
}

// Middleware для проверки аутентификации
async fn auth_middleware(
    State(state): State<Arc<AppState>>,
    mut request: Request,
    next: Next,
) -> Response {
    // Извлекаем куки
    let session_id = match request
        .headers()
        .get(axum::http::header::COOKIE)
        .and_then(|c| c.to_str().ok())
        .and_then(|c| {
            c.split(';')
                .find_map(|part| {
                    let part = part.trim();
                    if part.starts_with("session_id=") {
                        Some(part[11..].to_string())
                    } else {
                        None
                    }
                })
        }) {
        Some(id) => id,
        None => {
            return Response::builder()
                .status(axum::http::StatusCode::UNAUTHORIZED)
                .body(axum::body::Body::from("Требуется аутентификация"))
                .unwrap();
        }
    };
    
    // Получаем данные сессии из Redis
    let mut conn = match state.redis_pool.get().await {
        Ok(conn) => conn,
        Err(_) => {
            return Response::builder()
                .status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
                .body(axum::body::Body::from("Ошибка сервера"))
                .unwrap();
        }
    };
    
    let session_data: Option<String> = conn
        .get::<_, Option<String>>(format!("session:{}", session_id))
        .await
        .unwrap_or(None);
    
    match session_data {
        Some(data) => {
            // Парсим данные сессии
            let session: SessionData = match serde_json::from_str(&data) {
                Ok(s) => s,
                Err(_) => {
                    return Response::builder()
                        .status(axum::http::StatusCode::UNAUTHORIZED)
                        .body(axum::body::Body::from("Неверная сессия"))
                        .unwrap();
                }
            };
            
            // Проверяем срок действия
            let now = chrono::Utc::now().timestamp();
            if session.expires_at < now {
                // Сессия истекла
                let _: () = conn
                    .del::<_, ()>(format!("session:{}", session_id))
                    .await
                    .unwrap_or(());
                
                return Response::builder()
                    .status(axum::http::StatusCode::UNAUTHORIZED)
                    .body(axum::body::Body::from("Сессия истекла"))
                    .unwrap();
            }
            
            // Добавляем данные о пользователе к запросу в extensions
            request.extensions_mut().insert(session);
            
            // Продолжаем обработку запроса
            next.run(request).await
        }
        None => Response::builder()
            .status(axum::http::StatusCode::UNAUTHORIZED)
            .body(axum::body::Body::from("Сессия не найдена"))
            .unwrap(),
    }
}

// Защищенный маршрут, требующий аутентификации
async fn protected_route(request: Request) -> impl IntoResponse {
    // Получаем данные сессии из extensions
    let session = request.extensions().get::<SessionData>().unwrap();
    
    Json(serde_json::json!({
        "message": format!("Привет, {}! Это защищенный маршрут.", session.username),
        "user_id": session.user_id
    }))
}

// Обработчик для выхода из системы
async fn logout(
    State(state): State<Arc<AppState>>,
    request: Request,
) -> impl IntoResponse {
    // Извлекаем идентификатор сессии из куки
    let session_id = match request
        .headers()
        .get(axum::http::header::COOKIE)
        .and_then(|c| c.to_str().ok())
        .and_then(|c| {
            c.split(';')
                .find_map(|part| {
                    let part = part.trim();
                    if part.starts_with("session_id=") {
                        Some(part[11..].to_string())
                    } else {
                        None
                    }
                })
        }) {
        Some(id) => id,
        None => {
            return Response::builder()
                .status(axum::http::StatusCode::BAD_REQUEST)
                .body(axum::body::Body::from("Сессия не найдена"))
                .unwrap();
        }
    };
    
    // Удаляем сессию из Redis
    let mut conn = match state.redis_pool.get().await {
        Ok(conn) => conn,
        Err(_) => {
            return Response::builder()
                .status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
                .body(axum::body::Body::from("Ошибка сервера"))
                .unwrap();
        }
    };
    
    let _: () = conn
        .del::<_, ()>(format!("session:{}", session_id))
        .await
        .unwrap_or(());
    
    // Создаем куки с истекшим сроком действия для удаления
    let cookie = "session_id=; Path=/; HttpOnly; Max-Age=0; SameSite=Strict";
    
    // Возвращаем успешный ответ
    Response::builder()
        .status(axum::http::StatusCode::OK)
        .header(axum::http::header::SET_COOKIE, cookie)
        .body(axum::body::Body::from("Выход выполнен успешно"))
        .unwrap()
}

Лучшие практики

  1. Использование пулов соединений - Всегда используйте пул соединений для Redis, чтобы избежать накладных расходов на создание новых соединений при каждом запросе.

  2. Обработка ошибок - Всегда корректно обрабатывайте ошибки Redis, чтобы сбои не влияли на работу основной функциональности приложения.

  3. TTL для кешей - Всегда устанавливайте TTL (time-to-live) для кешированных данных, чтобы избежать проблем с устаревшими данными.

  4. Транзакции - Используйте Redis-транзакции для атомарных операций, когда это необходимо.

  5. Мониторинг - Добавьте мониторинг использования Redis (количество соединений, память, хиты/промахи кеша) для своевременного выявления проблем.

  6. Защита от отказа Redis - Реализуйте механизмы отказоустойчивости, чтобы ваше приложение могло продолжать работу при временной недоступности Redis.

Заключение

Redis предоставляет мощный инструмент для повышения производительности и масштабируемости веб-приложений на Axum. Используя подходы, описанные в этой документации, вы можете эффективно интегрировать Redis в свои приложения для кеширования, управления сессиями, ограничения скорости запросов и других задач, требующих быстрого доступа к данным.