Home
avatar

.Wang

使用elysia出现的redis连接失败问题

近期我在开发我的服务端项目时,使用了 elysia 框架,但是在连接 redis 时出现了连接失败的问题。

发现官方内置的redis插件连接不够稳定,时不时的出现了连接失败的问题。 于是在原基础上我换了一个ioredis插件,俩者的对比如下图:

ioredis插件对比

完整代码

ioredis 插件

import Redis from "ioredis";
// 自己的redis配置
import { redisConfig } from "@/config/redis";
// 日志工具
import { logger } from "@/utils/logger";

// 改进的Redis配置
const improvedRedisConfig = {
	host: redisConfig.host,
	port: redisConfig.port,
	password: redisConfig.password,
	username: redisConfig.username,
	db: redisConfig.db,

	// 连接配置
	retryDelayOnFailover: 100,
	maxRetriesPerRequest: 3,
	lazyConnect: true,

	// 重连配置
	retryDelayOnClusterDown: 300,

	// 超时配置
	connectTimeout: 10000,
	commandTimeout: 5000,

	// 连接池配置
	enableReadyCheck: true,
	maxLoadingTimeout: 10000,

	// 重连策略
	retryStrategy: (times: number) => {
		const delay = Math.min(times * 50, 2000);
		return delay;
	},

	// 自动重连
	reconnectOnError: (err: Error) => {
		const targetError = "READONLY";
		if (err.message.includes(targetError)) {
			return true;
		}
		return false;
	},
};

class RedisManager {
	private client: Redis;
	private isConnected: boolean = false;
	private reconnectAttempts: number = 0;
	private maxReconnectAttempts: number = 10;
	private healthCheckTimer: NodeJS.Timeout | null = null;

	constructor() {
		this.client = new Redis(improvedRedisConfig);
		this.setupEventHandlers();
		this.startHealthCheck();
	}

	private setupEventHandlers() {
		this.client.on("connect", () => {
			logger.info("Redis 连接成功");
			this.isConnected = true;
			this.reconnectAttempts = 0;
		});

		this.client.on("ready", () => {
			logger.info("Redis 准备就绪");
			this.isConnected = true;
			this.reconnectAttempts = 0;
		});

		this.client.on("error", error => {
			logger.error("Redis 连接错误:" + error.message);
			this.isConnected = false;
		});

		this.client.on("close", () => {
			logger.warn("Redis 连接关闭");
			this.isConnected = false;
		});

		this.client.on("reconnecting", () => {
			logger.info("Redis 正在重连...");
			this.reconnectAttempts++;
		});

		this.client.on("end", () => {
			logger.warn("Redis 连接结束");
			this.isConnected = false;
		});

		this.client.on("reconnect", () => {
			logger.info("Redis 重连成功");
			this.isConnected = true;
		});
	}

	private startHealthCheck() {
		// 每30秒检查一次连接状态
		this.healthCheckTimer = setInterval(async () => {
			if (this.isConnected) {
				try {
					await this.client.ping();
					logger.info("Redis健康检查通过");
				} catch {
					logger.warn("Redis健康检查失败,标记为断开连接");
					this.isConnected = false;
				}
			}
		}, 30000);
	}

	// 健康检查
	async ping(): Promise<string> {
		return await this.client.ping();
	}

	// 获取连接状态
	getConnectionStatus(): boolean {
		return this.isConnected;
	}

	// 获取原始客户端
	getClient(): Redis {
		return this.client;
	}

	// 关闭连接
	async close(): Promise<void> {
		if (this.healthCheckTimer) {
			clearInterval(this.healthCheckTimer);
		}
		await this.client.quit();
	}

	// 强制重连
	async forceReconnect(): Promise<void> {
		logger.info("强制重连Redis");
		this.reconnectAttempts = 0;
		await this.client.disconnect();
		await this.client.connect();
	}
}

// 创建Redis管理器实例
export const redisManager = new RedisManager();

// 导出便捷方法
export const client = redisManager.getClient();

// 包装Redis操作,添加重试机制
export async function withRetry<T>(
	operation: () => Promise<T>,
	maxRetries: number = 3,
	delay: number = 1000
): Promise<T> {
	for (let i = 0; i < maxRetries; i++) {
		try {
			return await operation();
		} catch (error: any) {
			if (i === maxRetries - 1) {
				throw error;
			}

			logger.warn(`Redis操作失败,第${i + 1}次重试:` + error.message);
			await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i)));
		}
	}
	throw new Error("Redis操作重试失败");
}

// 导出便捷的Redis操作方法
export const redisClient = {
	// 基本操作
	get: (key: string) => withRetry(() => client.get(key)),
	set: (key: string, value: string) => withRetry(() => client.set(key, value)),
	setex: (key: string, seconds: number, value: string) =>
		withRetry(() => client.setex(key, seconds, value)),
	del: (key: string) => withRetry(() => client.del(key)),
	exists: (key: string) => withRetry(() => client.exists(key)),
	expire: (key: string, seconds: number) =>
		withRetry(() => client.expire(key, seconds)),
	ttl: (key: string) => withRetry(() => client.ttl(key)),

	// 批量操作
	mset: (keyValues: Record<string, string>) =>
		withRetry(() => client.mset(keyValues)),
	mget: (keys: string[]) => withRetry(() => client.mget(keys)),

	// 列表操作
	lpush: (key: string, ...values: string[]) =>
		withRetry(() => client.lpush(key, ...values)),
	rpush: (key: string, ...values: string[]) =>
		withRetry(() => client.rpush(key, ...values)),
	lpop: (key: string) => withRetry(() => client.lpop(key)),
	rpop: (key: string) => withRetry(() => client.rpop(key)),
	lrange: (key: string, start: number, stop: number) =>
		withRetry(() => client.lrange(key, start, stop)),

	// 哈希操作
	hset: (key: string, field: string, value: string) =>
		withRetry(() => client.hset(key, field, value)),
	hget: (key: string, field: string) =>
		withRetry(() => client.hget(key, field)),
	hgetall: (key: string) => withRetry(() => client.hgetall(key)),
	hdel: (key: string, ...fields: string[]) =>
		withRetry(() => client.hdel(key, ...fields)),

	// 集合操作
	sadd: (key: string, ...members: string[]) =>
		withRetry(() => client.sadd(key, ...members)),
	srem: (key: string, ...members: string[]) =>
		withRetry(() => client.srem(key, ...members)),
	smembers: (key: string) => withRetry(() => client.smembers(key)),
	sismember: (key: string, member: string) =>
		withRetry(() => client.sismember(key, member)),

	// 有序集合操作
	zadd: (key: string, score: number, member: string) =>
		withRetry(() => client.zadd(key, score, member)),
	zscore: (key: string, member: string) =>
		withRetry(() => client.zscore(key, member)),
	zrange: (key: string, start: number, stop: number, withScores?: boolean) =>
		withRetry(() =>
			withScores
				? client.zrange(key, start, stop, "WITHSCORES")
				: client.zrange(key, start, stop)
		),

	// 工具方法
	ping: () => withRetry(() => client.ping()),
	flushdb: () => withRetry(() => client.flushdb()),
	info: () => withRetry(() => client.info()),

	// 连接状态
	getConnectionStatus: () => redisManager.getConnectionStatus(),
	forceReconnect: () => redisManager.forceReconnect(),
	close: () => redisManager.close(),
};

bun 内置的插件

我在原基础上进行了封装,添加了重试机制,连接状态管理,健康检查等功能。

// redis 配置
import { redisConfig } from "@/config/redis";
// 日志工具
import { logger } from "@/utils/logger";

// Redis连接状态管理
class RedisManager {
	private client: any = null;
	private isConnected: boolean = false;
	private reconnectAttempts: number = 0;
	private maxReconnectAttempts: number = 10;
	private reconnectDelay: number = 1000;
	private reconnectTimer: NodeJS.Timeout | null = null;
	private healthCheckTimer: NodeJS.Timeout | null = null;

	constructor() {
		this.initRedis();
		this.startHealthCheck();
	}

	private async initRedis() {
		try {
			// 动态导入Redis客户端
			const { RedisClient } = await import("bun");
			this.client = new RedisClient(redisConfig.url);

			this.setupEventHandlers();
			await this.connect();
		} catch (error: any) {
			logger.error("Redis初始化失败:" + error.message);
			this.scheduleReconnect();
		}
	}

	private setupEventHandlers() {
		if (!this.client) return;

		this.client.onconnect = () => {
			logger.info("Redis 连接成功");
			this.isConnected = true;
			this.reconnectAttempts = 0;
		};

		this.client.onclose = () => {
			logger.warn("Redis 连接关闭");
			this.isConnected = false;
			this.scheduleReconnect();
		};

		this.client.onerror = (error: any) => {
			logger.error("Redis 连接错误:" + error.message);
			this.isConnected = false;
		};
	}

	private async connect() {
		try {
			if (this.client && !this.isConnected) {
				await this.client.connect();
			}
		} catch (error: any) {
			logger.error("Redis 连接失败:" + error.message);
			this.scheduleReconnect();
		}
	}

	private scheduleReconnect() {
		if (this.reconnectTimer) {
			clearTimeout(this.reconnectTimer);
		}

		if (this.reconnectAttempts < this.maxReconnectAttempts) {
			this.reconnectAttempts++;
			const delay =
				this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

			logger.info(
				`Redis 将在 ${delay}ms 后尝试重连 (第 ${this.reconnectAttempts} 次)`
			);

			this.reconnectTimer = setTimeout(() => {
				this.connect();
			}, delay);
		} else {
			logger.error("Redis 重连次数已达上限,停止重连");
		}
	}

	private startHealthCheck() {
		// 每30秒检查一次连接状态
		this.healthCheckTimer = setInterval(() => {
			if (this.isConnected && this.client) {
				this.ping().catch(() => {
					logger.warn("Redis健康检查失败,标记为断开连接");
					this.isConnected = false;
					this.scheduleReconnect();
				});
			}
		}, 30000);
	}

	private async ping(): Promise<string> {
		if (!this.client) throw new Error("Redis客户端未初始化");
		return await this.client.ping();
	}

	// 检查连接状态
	private checkConnection(): boolean {
		if (!this.client || !this.isConnected) {
			logger.warn("Redis 未连接,尝试重连");
			this.scheduleReconnect();
			return false;
		}
		return true;
	}

	// 包装Redis操作,添加重试机制
	private async executeWithRetry<T>(
		operation: () => Promise<T>,
		maxRetries: number = 3
	): Promise<T> {
		for (let i = 0; i < maxRetries; i++) {
			try {
				if (!this.checkConnection()) {
					throw new Error("Redis未连接");
				}
				return await operation();
			} catch (error: any) {
				if (i === maxRetries - 1) {
					throw error;
				}

				logger.warn(`Redis操作失败,第${i + 1}次重试:` + error.message);
				await new Promise(resolve => setTimeout(resolve, 100 * (i + 1)));
			}
		}
		throw new Error("Redis操作重试失败");
	}

	// Redis操作方法
	async get(key: string): Promise<string | null> {
		return this.executeWithRetry(async () => {
			return await this.client.get(key);
		});
	}

	async set(key: string, value: string): Promise<string | null> {
		return this.executeWithRetry(async () => {
			return await this.client.set(key, value);
		});
	}

	async setex(
		key: string,
		seconds: number,
		value: string
	): Promise<string | null> {
		return this.executeWithRetry(async () => {
			return await this.client.setex(key, seconds, value);
		});
	}

	async del(key: string): Promise<number> {
		return this.executeWithRetry(async () => {
			return await this.client.del(key);
		});
	}

	async exists(key: string): Promise<number> {
		return this.executeWithRetry(async () => {
			return await this.client.exists(key);
		});
	}

	async expire(key: string, seconds: number): Promise<number> {
		return this.executeWithRetry(async () => {
			return await this.client.expire(key, seconds);
		});
	}

	async ttl(key: string): Promise<number> {
		return this.executeWithRetry(async () => {
			return await this.client.ttl(key);
		});
	}

	// 获取连接状态
	getConnectionStatus(): boolean {
		return this.isConnected;
	}

	// 手动重连
	async forceReconnect(): Promise<void> {
		logger.info("强制重连Redis");
		this.reconnectAttempts = 0;
		if (this.reconnectTimer) {
			clearTimeout(this.reconnectTimer);
		}
		await this.connect();
	}

	// 关闭连接
	async close(): Promise<void> {
		if (this.reconnectTimer) {
			clearTimeout(this.reconnectTimer);
		}
		if (this.healthCheckTimer) {
			clearInterval(this.healthCheckTimer);
		}
		if (this.client) {
			try {
				await this.client.quit();
			} catch (error: any) {
				logger.error("关闭Redis连接时出错:" + error.message);
			}
		}
	}
}

// 创建Redis管理器实例
export const redisManager = new RedisManager();

// 导出便捷方法
export const client = {
	get: (key: string) => redisManager.get(key),
	set: (key: string, value: string) => redisManager.set(key, value),
	setex: (key: string, seconds: number, value: string) =>
		redisManager.setex(key, seconds, value),
	del: (key: string) => redisManager.del(key),
	exists: (key: string) => redisManager.exists(key),
	expire: (key: string, seconds: number) => redisManager.expire(key, seconds),
	ttl: (key: string) => redisManager.ttl(key),
	getConnectionStatus: () => redisManager.getConnectionStatus(),
	forceReconnect: () => redisManager.forceReconnect(),
	close: () => redisManager.close(),
};
Node Elysia Redis