Интеграция с Redis в Axum
Redis - это высокопроизводительное хранилище данных типа "ключ-значение", которое часто используется для кеширования, брокеров сообщений, сессий и других задач, требующих быстрого доступа к данным. В этом разделе рассмотрим, как интегрировать Redis с веб-приложениями на Axum.
Основные библиотеки
Для работы с Redis в Axum вам понадобятся следующие зависимости:
[dependencies]
axum = "0.7.2"
tokio = { version = "1.32.0", features = ["full"] }
redis = { version = "0.23.0", features = ["tokio-comp", "connection-manager"] }
bb8-redis = "0.13.0" # Опционально: для использования пула соединений
Простое подключение к Redis
Вот простой пример подключения к Redis и его использования в Axum:
use axum::{
extract::State,
routing::get,
Router, http::StatusCode, Json,
};
use redis::AsyncCommands;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
// Клиент Redis
struct AppState {
redis: redis::Client,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Создаем клиент Redis
let redis_client = redis::Client::open("redis://127.0.0.1:6379")?;
// Проверяем соединение
let mut con = redis_client.get_async_connection().await?;
redis::cmd("PING").query_async(&mut con).await?;
println!("Подключено к Redis!");
// Создаем состояние приложения
let state = Arc::new(AppState {
redis: redis_client,
});
// Создаем маршруты
let app = Router::new()
.route("/set-value", get(set_value))
.route("/get-value/:key", get(get_value))
.with_state(state);
// Запускаем сервер
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("Сервер запущен по адресу: http://localhost:3000");
axum::serve(listener, app).await?;
Ok(())
}
async fn set_value(
State(state): State<Arc<AppState>>,
) -> Result<Json<String>, StatusCode> {
let mut con = state.redis
.get_async_connection()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Устанавливаем значение
con.set::<_, _, ()>("test_key", "test_value")
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json("Значение установлено".to_string()))
}
async fn get_value(
State(state): State<Arc<AppState>>,
axum::extract::Path(key): axum::extract::Path<String>,
) -> Result<Json<String>, StatusCode> {
let mut con = state.redis
.get_async_connection()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Получаем значение
let value: String = con.get(&key)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(value))
}
Использование пула соединений
Для продакшена рекомендуется использовать пул соединений, чтобы эффективно управлять подключениями к Redis:
use axum::{
extract::State,
routing::get,
Router, http::StatusCode, Json,
};
use bb8_redis::{
bb8::Pool,
RedisConnectionManager,
};
use redis::AsyncCommands;
use std::sync::Arc;
// Определение типа пула соединений
type RedisPool = Pool<RedisConnectionManager>;
struct AppState {
redis_pool: RedisPool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Создаем менеджер соединений
let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
// Создаем пул с настраиваемыми параметрами
let pool = Pool::builder()
.max_size(15) // Максимальное количество соединений
.build(manager)
.await?;
// Проверяем соединение
let mut conn = pool.get().await?;
redis::cmd("PING").query_async(&mut *conn).await?;
println!("Подключено к Redis!");
// Создаем состояние приложения
let state = Arc::new(AppState {
redis_pool: pool,
});
// Создаем маршруты
let app = Router::new()
.route("/cache", get(get_cached_data).post(set_cached_data))
.with_state(state);
// Запускаем сервер
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
println!("Сервер запущен по адресу: http://localhost:3000");
axum::serve(listener, app).await?;
Ok(())
}
async fn get_cached_data(
State(state): State<Arc<AppState>>,
axum::extract::Path(key): axum::extract::Path<String>,
) -> Result<Json<String>, StatusCode> {
let mut conn = state.redis_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Получаем кешированные данные
let data: Option<String> = conn
.get(&key)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
match data {
Some(value) => Ok(Json(value)),
None => Err(StatusCode::NOT_FOUND),
}
}
async fn set_cached_data(
State(state): State<Arc<AppState>>,
axum::extract::Path(key): axum::extract::Path<String>,
Json(value): Json<String>,
) -> Result<StatusCode, StatusCode> {
let mut conn = state.redis_pool
.get()
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
// Устанавливаем данные в кеш с TTL в 300 секунд
let _: () = redis::cmd("SET")
.arg(&[&key, &value, "EX", "300"])
.query_async(&mut *conn)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(StatusCode::OK)
}
Практические примеры использования Redis
Middleware для кеширования ответов API
use axum::{
extract::{Request, State},
middleware::{self, Next},
response::Response,
routing::get,
Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use std::{sync::Arc, time::Duration};
type RedisPool = Pool<RedisConnectionManager>;
struct AppState {
redis_pool: RedisPool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Инициализация пула Redis...
let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
let pool = Pool::builder().max_size(15).build(manager).await?;
let state = Arc::new(AppState { redis_pool: pool });
let app = Router::new()
.route("/api/data", get(get_data))
.layer(middleware::from_fn_with_state(
state.clone(),
cache_middleware,
))
.with_state(state);
// Запуск сервера...
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
// Middleware для кеширования
async fn cache_middleware(
State(state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Response {
// Создаем ключ на основе пути запроса
let cache_key = format!("cache:{}", request.uri().path());
// Пытаемся получить данные из кеша
let mut conn = match state.redis_pool.get().await {
Ok(conn) => conn,
Err(_) => return next.run(request).await,
};
if let Ok(Some(cached_data)) = conn.get::<_, Option<Vec<u8>>>(&cache_key).await {
// Если данные найдены в кеше, возвращаем их
let headers = [(axum::http::header::CONTENT_TYPE, "application/json")];
return Response::builder()
.status(200)
.header("X-Cache", "HIT")
.headers(headers)
.body(axum::body::Body::from(cached_data))
.unwrap();
}
// Если данных в кеше нет, выполняем запрос
let response = next.run(request).await;
// Кешируем ответ, если он успешный
if response.status().is_success() {
if let Ok(bytes) = hyper::body::to_bytes(response.into_body()).await {
let _ = conn
.set_ex::<_, _, ()>(&cache_key, bytes.to_vec(), 60)
.await; // Кешируем на 60 секунд
let headers = [(axum::http::header::CONTENT_TYPE, "application/json")];
return Response::builder()
.status(200)
.header("X-Cache", "MISS")
.headers(headers)
.body(axum::body::Body::from(bytes))
.unwrap();
}
}
response
}
// Обработчик, возвращающий данные
async fn get_data() -> axum::Json<serde_json::Value> {
// Имитация дорогостоящей операции
tokio::time::sleep(Duration::from_millis(500)).await;
axum::Json(serde_json::json!({
"data": "Это дорогостоящие данные, которые стоит кешировать",
"timestamp": chrono::Utc::now().to_rfc3339()
}))
}
Ограничение скорости запросов (Rate Limiting)
use axum::{
extract::{Request, State},
http::StatusCode,
middleware::{self, Next},
response::Response,
routing::get,
Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use std::sync::Arc;
type RedisPool = Pool<RedisConnectionManager>;
struct AppState {
redis_pool: RedisPool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Инициализация пула Redis...
let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
let pool = Pool::builder().max_size(15).build(manager).await?;
let state = Arc::new(AppState { redis_pool: pool });
let app = Router::new()
.route("/api/limited", get(limited_endpoint))
.layer(middleware::from_fn_with_state(
state.clone(),
rate_limit_middleware,
))
.with_state(state);
// Запуск сервера...
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
// Middleware для ограничения скорости запросов
async fn rate_limit_middleware(
State(state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Response {
// Получаем IP-адрес пользователя (в реальном приложении учитывайте прокси)
let ip = request
.headers()
.get("x-forwarded-for")
.and_then(|h| h.to_str().ok())
.unwrap_or("unknown")
.to_string();
let rate_key = format!("ratelimit:{}", ip);
let mut conn = match state.redis_pool.get().await {
Ok(conn) => conn,
Err(_) => return next.run(request).await,
};
// Используем Lua-скрипт для атомарного увеличения счетчика и проверки лимита
let script = redis::Script::new(r"
local key = KEYS[1]
local limit = tonumber(ARGV[1])
local window = tonumber(ARGV[2])
local current = redis.call('INCR', key)
if current == 1 then
redis.call('EXPIRE', key, window)
end
if current > limit then
return 0
end
return current
");
// Устанавливаем лимит: 10 запросов в течение 60 секунд
let result: Option<i64> = script
.key(&rate_key)
.arg(10) // лимит
.arg(60) // окно в секундах
.invoke_async(&mut *conn)
.await
.unwrap_or(None);
match result {
Some(0) => {
// Превышен лимит
let retry_after = conn
.ttl::<_, i64>(&rate_key)
.await
.unwrap_or(60);
Response::builder()
.status(StatusCode::TOO_MANY_REQUESTS)
.header("Retry-After", retry_after.to_string())
.body(axum::body::Body::from(
"Слишком много запросов. Попробуйте позже."
))
.unwrap()
}
_ => {
// Запрос в пределах лимита
next.run(request).await
}
}
}
// Защищенный конечный пункт API
async fn limited_endpoint() -> &'static str {
"Это защищенный конечный пункт с ограничением скорости запросов!"
}
Управление сессиями
use axum::{
extract::{Request, State},
middleware::{self, Next},
response::{IntoResponse, Response},
routing::{get, post},
Json, Router,
};
use bb8_redis::{bb8::Pool, RedisConnectionManager};
use redis::AsyncCommands;
use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration};
use uuid::Uuid;
type RedisPool = Pool<RedisConnectionManager>;
struct AppState {
redis_pool: RedisPool,
}
#[derive(Serialize, Deserialize)]
struct LoginRequest {
username: String,
password: String,
}
#[derive(Serialize, Deserialize)]
struct SessionData {
user_id: String,
username: String,
expires_at: i64,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Инициализация пула Redis...
let manager = RedisConnectionManager::new("redis://127.0.0.1:6379")?;
let pool = Pool::builder().max_size(15).build(manager).await?;
let state = Arc::new(AppState { redis_pool: pool });
let app = Router::new()
.route("/api/login", post(login))
.route(
"/api/protected",
get(protected_route)
.layer(middleware::from_fn_with_state(
state.clone(),
auth_middleware,
))
)
.route("/api/logout", post(logout))
.with_state(state);
// Запуск сервера...
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await?;
axum::serve(listener, app).await?;
Ok(())
}
// Обработчик для входа в систему
async fn login(
State(state): State<Arc<AppState>>,
Json(credentials): Json<LoginRequest>,
) -> impl IntoResponse {
// В реальном приложении здесь должна быть проверка учетных данных
// Для примера мы просто проверяем, что пароль не пустой
if credentials.password.is_empty() {
return (axum::http::StatusCode::UNAUTHORIZED, "Неверные учетные данные").into_response();
}
// Создаем данные сессии
let session_id = Uuid::new_v4().to_string();
let expires_at = chrono::Utc::now()
.checked_add_signed(chrono::Duration::hours(24))
.unwrap()
.timestamp();
let session_data = SessionData {
user_id: format!("user_{}", credentials.username),
username: credentials.username,
expires_at,
};
// Сохраняем сессию в Redis
let mut conn = match state.redis_pool.get().await {
Ok(conn) => conn,
Err(_) => return (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Ошибка сервера").into_response(),
};
let session_json = serde_json::to_string(&session_data).unwrap();
let _: () = conn
.set_ex::<_, _, ()>(
format!("session:{}", session_id),
session_json,
60 * 60 * 24, // 24 часа (в секундах)
)
.await
.unwrap_or(());
// Создаем куки с идентификатором сессии
let cookie = format!(
"session_id={}; Path=/; HttpOnly; Max-Age=86400; SameSite=Strict",
session_id
);
// Возвращаем успешный ответ с куки
Response::builder()
.status(axum::http::StatusCode::OK)
.header(axum::http::header::SET_COOKIE, cookie)
.body(axum::body::Body::from("Вход выполнен успешно"))
.unwrap()
}
// Middleware для проверки аутентификации
async fn auth_middleware(
State(state): State<Arc<AppState>>,
mut request: Request,
next: Next,
) -> Response {
// Извлекаем куки
let session_id = match request
.headers()
.get(axum::http::header::COOKIE)
.and_then(|c| c.to_str().ok())
.and_then(|c| {
c.split(';')
.find_map(|part| {
let part = part.trim();
if part.starts_with("session_id=") {
Some(part[11..].to_string())
} else {
None
}
})
}) {
Some(id) => id,
None => {
return Response::builder()
.status(axum::http::StatusCode::UNAUTHORIZED)
.body(axum::body::Body::from("Требуется аутентификация"))
.unwrap();
}
};
// Получаем данные сессии из Redis
let mut conn = match state.redis_pool.get().await {
Ok(conn) => conn,
Err(_) => {
return Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from("Ошибка сервера"))
.unwrap();
}
};
let session_data: Option<String> = conn
.get::<_, Option<String>>(format!("session:{}", session_id))
.await
.unwrap_or(None);
match session_data {
Some(data) => {
// Парсим данные сессии
let session: SessionData = match serde_json::from_str(&data) {
Ok(s) => s,
Err(_) => {
return Response::builder()
.status(axum::http::StatusCode::UNAUTHORIZED)
.body(axum::body::Body::from("Неверная сессия"))
.unwrap();
}
};
// Проверяем срок действия
let now = chrono::Utc::now().timestamp();
if session.expires_at < now {
// Сессия истекла
let _: () = conn
.del::<_, ()>(format!("session:{}", session_id))
.await
.unwrap_or(());
return Response::builder()
.status(axum::http::StatusCode::UNAUTHORIZED)
.body(axum::body::Body::from("Сессия истекла"))
.unwrap();
}
// Добавляем данные о пользователе к запросу в extensions
request.extensions_mut().insert(session);
// Продолжаем обработку запроса
next.run(request).await
}
None => Response::builder()
.status(axum::http::StatusCode::UNAUTHORIZED)
.body(axum::body::Body::from("Сессия не найдена"))
.unwrap(),
}
}
// Защищенный маршрут, требующий аутентификации
async fn protected_route(request: Request) -> impl IntoResponse {
// Получаем данные сессии из extensions
let session = request.extensions().get::<SessionData>().unwrap();
Json(serde_json::json!({
"message": format!("Привет, {}! Это защищенный маршрут.", session.username),
"user_id": session.user_id
}))
}
// Обработчик для выхода из системы
async fn logout(
State(state): State<Arc<AppState>>,
request: Request,
) -> impl IntoResponse {
// Извлекаем идентификатор сессии из куки
let session_id = match request
.headers()
.get(axum::http::header::COOKIE)
.and_then(|c| c.to_str().ok())
.and_then(|c| {
c.split(';')
.find_map(|part| {
let part = part.trim();
if part.starts_with("session_id=") {
Some(part[11..].to_string())
} else {
None
}
})
}) {
Some(id) => id,
None => {
return Response::builder()
.status(axum::http::StatusCode::BAD_REQUEST)
.body(axum::body::Body::from("Сессия не найдена"))
.unwrap();
}
};
// Удаляем сессию из Redis
let mut conn = match state.redis_pool.get().await {
Ok(conn) => conn,
Err(_) => {
return Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body(axum::body::Body::from("Ошибка сервера"))
.unwrap();
}
};
let _: () = conn
.del::<_, ()>(format!("session:{}", session_id))
.await
.unwrap_or(());
// Создаем куки с истекшим сроком действия для удаления
let cookie = "session_id=; Path=/; HttpOnly; Max-Age=0; SameSite=Strict";
// Возвращаем успешный ответ
Response::builder()
.status(axum::http::StatusCode::OK)
.header(axum::http::header::SET_COOKIE, cookie)
.body(axum::body::Body::from("Выход выполнен успешно"))
.unwrap()
}
Лучшие практики
Использование пулов соединений - Всегда используйте пул соединений для Redis, чтобы избежать накладных расходов на создание новых соединений при каждом запросе.
Обработка ошибок - Всегда корректно обрабатывайте ошибки Redis, чтобы сбои не влияли на работу основной функциональности приложения.
TTL для кешей - Всегда устанавливайте TTL (time-to-live) для кешированных данных, чтобы избежать проблем с устаревшими данными.
Транзакции - Используйте Redis-транзакции для атомарных операций, когда это необходимо.
Мониторинг - Добавьте мониторинг использования Redis (количество соединений, память, хиты/промахи кеша) для своевременного выявления проблем.
Защита от отказа Redis - Реализуйте механизмы отказоустойчивости, чтобы ваше приложение могло продолжать работу при временной недоступности Redis.
Заключение
Redis предоставляет мощный инструмент для повышения производительности и масштабируемости веб-приложений на Axum. Используя подходы, описанные в этой документации, вы можете эффективно интегрировать Redis в свои приложения для кеширования, управления сессиями, ограничения скорости запросов и других задач, требующих быстрого доступа к данным.