This commit is contained in:
Joe
2026-01-16 15:49:34 +08:00
commit 550d3e1f42
380 changed files with 62024 additions and 0 deletions
+524
View File
@@ -0,0 +1,524 @@
package commands
import (
"context"
"fmt"
"strings"
"time"
"github.com/goravel/framework/contracts/console"
"github.com/goravel/framework/contracts/console/command"
"github.com/goravel/framework/facades"
"github.com/samber/lo"
"goravel/app/utils"
"goravel/app/utils/errorlog"
)
type QueueStats struct {
}
// Signature The name and signature of the console command.
func (r *QueueStats) Signature() string {
return "queue:stats"
}
// ./main artisan queue:stats --connection=redis --queue=long-running
func (r *QueueStats) Description() string {
return "查询队列统计信息,显示待执行、正在执行和失败任务数量"
}
// Extend The console command extend.
func (r *QueueStats) Extend() command.Extend {
return command.Extend{
Category: "queue",
Flags: []command.Flag{
&command.StringFlag{
Name: "queue",
Aliases: []string{"q"},
Usage: "队列名称(可选,用于筛选特定队列)",
},
&command.StringFlag{
Name: "connection",
Aliases: []string{"c"},
Usage: "队列连接名称(可选,默认使用默认连接)",
},
},
}
}
// Handle Execute the console command.
func (r *QueueStats) Handle(ctx console.Context) error {
queueName := ctx.Option("queue")
connectionName := ctx.Option("connection")
if connectionName == "" {
connectionName = facades.Config().GetString("queue.default", "sync")
}
ctx.Info(fmt.Sprintf("队列连接: %s", connectionName))
if queueName != "" {
ctx.Info(fmt.Sprintf("队列名称: %s", queueName))
}
ctx.Info("")
// 判断队列驱动类型
driver := facades.Config().GetString(fmt.Sprintf("queue.connections.%s.driver", connectionName), "")
ctx.Info(fmt.Sprintf("驱动类型: %s", driver))
// 检查是否是 Redis 驱动(custom driver with via
isRedis := r.isRedisDriver(connectionName)
if isRedis {
// Redis 驱动:通过 Redis 客户端查询队列大小
defaultQueue := facades.Config().GetString(fmt.Sprintf("queue.connections.%s.queue", connectionName), "default")
originalQueueName := queueName
queueNameForStats := queueName
if queueNameForStats == "" {
queueNameForStats = defaultQueue
}
// 获取 Redis 连接名称(从队列配置中获取)
redisConnectionName := r.getRedisConnectionName(connectionName)
if redisConnectionName == "" {
ctx.Warning("无法确定 Redis 连接名称")
return nil
}
// 查询 Redis 队列统计
stats, err := r.getRedisQueueStats(redisConnectionName, connectionName, queueNameForStats)
if err != nil {
ctx.Error(fmt.Sprintf("查询 Redis 队列统计失败: %v", err))
ctx.Info("提示:请确保 Redis 连接配置正确且 Redis 服务正在运行")
return err
}
totalCount := stats.Pending + stats.Reserved
// 显示统计信息
ctx.Info("═══════════════════════════════════════")
ctx.Info("队列统计信息 (Redis)")
ctx.Info("═══════════════════════════════════════")
ctx.Info(fmt.Sprintf("队列名称: %s", queueNameForStats))
ctx.Info(fmt.Sprintf("待执行任务: %d", stats.Pending))
ctx.Info(fmt.Sprintf("正在执行任务: %d", stats.Reserved))
ctx.Info(fmt.Sprintf("延迟任务: %d", stats.Delayed))
ctx.Info(fmt.Sprintf("失败任务: %d", stats.Failed))
ctx.Info(fmt.Sprintf("总任务数: %d", totalCount))
ctx.Info("═══════════════════════════════════════")
// 提示信息
if stats.Pending > 0 {
ctx.Info("")
ctx.Warning(fmt.Sprintf("提示:队列中有 %d 个待执行任务", stats.Pending))
ctx.Info("")
ctx.Info("如果需要处理这些任务:")
ctx.Info(" 1. 启动主程序(main.go 中会自动启动队列 Worker")
ctx.Info(" go run .")
ctx.Info(" 2. 确保 Worker 监听正确的队列名称和连接")
ctx.Info(" 3. 确保任务已正确注册到 QueueServiceProvider")
ctx.Info("")
ctx.Info("如果不需要这些任务,可以清理队列:")
ctx.Info(" 使用 Redis 客户端执行以下命令清理队列:")
appName := facades.Config().GetString("app.name", "goravel")
baseKey := r.redisQueueKey(connectionName, queueNameForStats)
ctx.Info(fmt.Sprintf(" # app.name=%s, queue.connection=%s, queue=%s", appName, connectionName, queueNameForStats))
ctx.Info(fmt.Sprintf(" redis-cli DEL %s", baseKey))
ctx.Info(fmt.Sprintf(" redis-cli DEL %s:reserved", baseKey))
ctx.Info(fmt.Sprintf(" redis-cli DEL %s:delayed", baseKey))
ctx.Info(" 或者使用命令:go run . artisan queue:clear --queue=" + queueNameForStats)
}
// 如果未指定队列名称,显示按队列分组的统计
if originalQueueName == "" {
ctx.Info("")
ctx.Info("按队列分组统计:")
byQueue, err := r.getRedisStatsByQueue(redisConnectionName, connectionName)
if err != nil {
ctx.Warning(fmt.Sprintf("获取按队列分组统计失败: %v", err))
} else {
if len(byQueue) == 0 {
ctx.Info(" 暂无队列数据")
} else {
for qName, qStats := range byQueue {
ctx.Info(fmt.Sprintf(" 队列 [%s]:", qName))
ctx.Info(fmt.Sprintf(" 待执行: %d, 正在执行: %d, 延迟: %d, 失败: %d, 总计: %d",
qStats.Pending, qStats.Reserved, qStats.Delayed, qStats.Failed, qStats.Total))
}
}
}
}
return nil
}
if driver == "sync" {
ctx.Info("同步驱动:任务立即执行,无队列数据")
return nil
}
if driver != "database" {
ctx.Warning(fmt.Sprintf("驱动 %s 暂不支持统计查询", driver))
return nil
}
// Database 驱动:查询 jobs 表
var pendingCount, reservedCount int64
var err error
// 查询待执行任务数(available_at <= now 且 reserved_at 为 null
pendingQuery := facades.Orm().Query().Table("jobs").
Where("available_at", "<=", time.Now()).
Where("reserved_at IS NULL")
// 查询正在执行任务数(reserved_at 不为 null
reservedQuery := facades.Orm().Query().Table("jobs").
Where("reserved_at IS NOT NULL")
// 如果指定了队列名称,添加筛选条件
if queueName != "" {
pendingQuery = pendingQuery.Where("queue", "=", queueName)
reservedQuery = reservedQuery.Where("queue", "=", queueName)
}
pendingCount, err = pendingQuery.Count()
if err != nil {
ctx.Error(fmt.Sprintf("查询待执行任务数失败: %v", err))
return err
}
reservedCount, err = reservedQuery.Count()
if err != nil {
ctx.Error(fmt.Sprintf("查询正在执行任务数失败: %v", err))
return err
}
// 查询失败任务数(从 failed_jobs 表)
failedQuery := facades.Orm().Query().Table("failed_jobs")
if queueName != "" {
failedQuery = failedQuery.Where("queue", "=", queueName)
}
failedCount, err := failedQuery.Count()
if err != nil {
ctx.Error(fmt.Sprintf("查询失败任务数失败: %v", err))
return err
}
totalCount := pendingCount + reservedCount
// 显示统计信息
ctx.Info("═══════════════════════════════════════")
ctx.Info("队列统计信息")
ctx.Info("═══════════════════════════════════════")
ctx.Info(fmt.Sprintf("待执行任务: %d", pendingCount))
ctx.Info(fmt.Sprintf("正在执行任务: %d", reservedCount))
ctx.Info(fmt.Sprintf("失败任务: %d", failedCount))
ctx.Info(fmt.Sprintf("总任务数: %d", totalCount))
ctx.Info("═══════════════════════════════════════")
// 如果未指定队列名称,显示按队列分组的统计
if queueName == "" {
ctx.Info("")
ctx.Info("按队列分组统计:")
byQueue, err := r.getStatsByQueue()
if err != nil {
ctx.Warning(fmt.Sprintf("获取按队列分组统计失败: %v", err))
} else {
if len(byQueue) == 0 {
ctx.Info(" 暂无队列数据")
} else {
for qName, stats := range byQueue {
ctx.Info(fmt.Sprintf(" 队列 [%s]:", qName))
ctx.Info(fmt.Sprintf(" 待执行: %d, 正在执行: %d, 失败: %d, 总计: %d",
stats.Pending, stats.Reserved, stats.Failed, stats.Total))
}
}
}
}
return nil
}
// isRedisDriver 判断是否是 Redis 驱动
func (r *QueueStats) isRedisDriver(connectionName string) bool {
via := facades.Config().Get(fmt.Sprintf("queue.connections.%s.via", connectionName))
return via != nil || strings.Contains(connectionName, "redis")
}
// QueueStatsInfo 队列统计信息
type QueueStatsInfo struct {
Pending int64
Reserved int64
Failed int64
Total int64
}
// RedisQueueStatsInfo Redis 队列统计信息
type RedisQueueStatsInfo struct {
Pending int64
Reserved int64
Delayed int64
Failed int64
Total int64
}
// getStatsByQueue 按队列分组获取统计信息
func (r *QueueStats) getStatsByQueue() (map[string]QueueStatsInfo, error) {
// 获取所有队列名称
var queues []string
err := facades.Orm().Query().Table("jobs").
Select("DISTINCT queue").
Pluck("queue", &queues)
if err != nil {
return nil, err
}
// 获取失败任务的队列名称
var failedQueues []string
err = facades.Orm().Query().Table("failed_jobs").
Select("DISTINCT queue").
Pluck("queue", &failedQueues)
if err != nil {
return nil, err
}
// 合并队列名称
queueMap := make(map[string]bool)
for _, q := range queues {
queueMap[q] = true
}
for _, q := range failedQueues {
queueMap[q] = true
}
result := make(map[string]QueueStatsInfo)
now := time.Now()
for qName := range queueMap {
// 待执行任务数
pendingCount, _ := facades.Orm().Query().Table("jobs").
Where("queue", "=", qName).
Where("available_at", "<=", now).
Where("reserved_at IS NULL").
Count()
// 正在执行任务数
reservedCount, _ := facades.Orm().Query().Table("jobs").
Where("queue", "=", qName).
Where("reserved_at IS NOT NULL").
Count()
// 失败任务数
failedCount, _ := facades.Orm().Query().Table("failed_jobs").
Where("queue", "=", qName).
Count()
result[qName] = QueueStatsInfo{
Pending: pendingCount,
Reserved: reservedCount,
Failed: failedCount,
Total: pendingCount + reservedCount,
}
}
return result, nil
}
// getRedisConnectionName 从队列连接配置中获取 Redis 连接名称
func (r *QueueStats) getRedisConnectionName(queueConnectionName string) string {
// 从队列配置中获取 connection 字段
connection := facades.Config().GetString(fmt.Sprintf("queue.connections.%s.connection", queueConnectionName), "default")
// 如果队列连接名称包含 redis,尝试使用它
if strings.Contains(queueConnectionName, "redis") {
// 检查 Redis 配置中是否存在对应的连接
redisHost := facades.Config().GetString(fmt.Sprintf("database.redis.%s.host", queueConnectionName), "")
if redisHost != "" {
return queueConnectionName
}
}
// 默认使用 default
return connection
}
// getRedisQueueStats 获取 Redis 队列统计信息
func (r *QueueStats) getRedisQueueStats(redisConnectionName, queueConnectionName, queueName string) (*RedisQueueStatsInfo, error) {
// 使用公共 Redis 客户端
redisClient, err := utils.GetRedisClient(redisConnectionName)
if err != nil {
errorlog.Record(context.Background(), "queue", "获取 Redis 客户端失败", map[string]any{
"connection": redisConnectionName,
"error": err.Error(),
}, "获取 Redis 客户端失败: %v", err)
return nil, fmt.Errorf("获取 Redis 客户端失败: %v", err)
}
// 注意:使用公共 Redis 客户端池,不需要手动关闭
ctx := context.Background()
stats := &RedisQueueStatsInfo{}
// Goravel Redis driver:
// pending: {app}_queues:{queueConnection}_{queue} (List)
// reserved: {app}_queues:{queueConnection}_{queue}:reserved (ZSET)
// delayed: {app}_queues:{queueConnection}_{queue}:delayed (ZSET)
// 注意:这里的 queueConnectionName 是队列连接名(例如 redis),不是 redis client connectiondefault
baseKey := r.redisQueueKey(queueConnectionName, queueName)
pendingKey := baseKey
pendingLen, err := redisClient.LLen(ctx, pendingKey).Result()
if err != nil {
errorlog.Record(context.Background(), "queue", "查询待执行队列失败", map[string]any{
"queue_name": queueName,
"key": pendingKey,
"error": err.Error(),
}, "查询待执行队列失败: %v", err)
return nil, fmt.Errorf("查询待执行队列失败: %v", err)
}
stats.Pending = pendingLen
// 正在执行队列:{baseKey}:reserved (ZSET)
reservedKey := fmt.Sprintf("%s:reserved", baseKey)
reservedLen, err := redisClient.ZCard(ctx, reservedKey).Result()
if err != nil {
errorlog.Record(context.Background(), "queue", "查询正在执行队列失败", map[string]any{
"queue_name": queueName,
"key": reservedKey,
"error": err.Error(),
}, "查询正在执行队列失败: %v", err)
return nil, fmt.Errorf("查询正在执行队列失败: %v", err)
}
stats.Reserved = reservedLen
// 延迟队列:{baseKey}:delayed (ZSET)
delayedKey := fmt.Sprintf("%s:delayed", baseKey)
delayedLen, err := redisClient.ZCard(ctx, delayedKey).Result()
if err != nil {
errorlog.Record(context.Background(), "queue", "查询延迟队列失败", map[string]any{
"queue_name": queueName,
"key": delayedKey,
"error": err.Error(),
}, "查询延迟队列失败: %v", err)
return nil, fmt.Errorf("查询延迟队列失败: %v", err)
}
stats.Delayed = delayedLen
// 调试信息:显示 Redis 键的实际值(可选,用于排查问题)
// 可以查看队列中的实际内容
if pendingLen > 0 {
// 查看队列中的第一个任务(不移除)
firstTask, _ := redisClient.LIndex(ctx, pendingKey, 0).Result()
if firstTask != "" {
// 只显示前100个字符,避免输出过长
if len(firstTask) > 100 {
firstTask = firstTask[:100] + "..."
}
}
}
// 失败任务:从数据库 failed_jobs 表查询(Redis 队列的失败任务也存储在数据库中)
var failedCount int64
if queueName != "" {
failedCount, err = facades.Orm().Query().Table("failed_jobs").
Where("queue = ?", queueName).
Count()
} else {
failedCount, err = facades.Orm().Query().Table("failed_jobs").Count()
}
if err != nil {
// 失败任务查询失败不影响其他统计
stats.Failed = 0
} else {
stats.Failed = failedCount
}
stats.Total = stats.Pending + stats.Reserved
return stats, nil
}
// getRedisStatsByQueue 按队列分组获取 Redis 队列统计信息
func (r *QueueStats) getRedisStatsByQueue(redisConnectionName, queueConnectionName string) (map[string]*RedisQueueStatsInfo, error) {
// 使用公共 Redis 客户端
redisClient, err := utils.GetRedisClient(redisConnectionName)
if err != nil {
errorlog.Record(context.Background(), "queue", "获取 Redis 客户端失败", map[string]any{
"connection": redisConnectionName,
"error": err.Error(),
}, "获取 Redis 客户端失败: %v", err)
return nil, fmt.Errorf("获取 Redis 客户端失败: %v", err)
}
// 注意:使用公共 Redis 客户端池,不需要手动关闭
ctx := context.Background()
result := make(map[string]*RedisQueueStatsInfo)
// 查找所有队列键({app}_queues:{queueConnection}_*
appName := facades.Config().GetString("app.name", "goravel")
prefix := fmt.Sprintf("%s_queues:%s_", appName, queueConnectionName)
pattern := prefix + "*"
keys, err := redisClient.Keys(ctx, pattern).Result()
if err != nil {
errorlog.Record(context.Background(), "queue", "查找队列键失败", map[string]any{
"pattern": pattern,
"error": err.Error(),
}, "查找队列键失败: %v", err)
return nil, fmt.Errorf("查找队列键失败: %v", err)
}
// 提取队列名称(排除 reserved 和 delayed 键)
queueNames := lo.FilterMap(keys, func(key string, _ int) (string, bool) {
// 跳过 reserved 和 delayed 键(ZSET
if strings.HasSuffix(key, ":reserved") || strings.HasSuffix(key, ":delayed") {
return "", false
}
// 必须以 prefix 开头
if !strings.HasPrefix(key, prefix) {
return "", false
}
after := strings.TrimPrefix(key, prefix)
if after == "" {
return "", false
}
return after, true
})
// 去重队列名称
queueMap := lo.SliceToMap(lo.Uniq(queueNames), func(queueName string) (string, bool) {
return queueName, true
})
// 如果没有找到队列键,尝试从失败任务表中获取队列名称
if len(queueMap) == 0 {
var failedQueues []string
err = facades.Orm().Query().Table("failed_jobs").
Select("DISTINCT queue").
Pluck("queue", &failedQueues)
if err == nil {
validQueues := lo.Filter(failedQueues, func(q string, _ int) bool {
return q != ""
})
queueMap = lo.SliceToMap(validQueues, func(queueName string) (string, bool) {
return queueName, true
})
}
}
// 为每个队列获取统计信息
for queueName := range queueMap {
stats, err := r.getRedisQueueStats(redisConnectionName, queueConnectionName, queueName)
if err != nil {
// 单个队列查询失败不影响其他队列
continue
}
result[queueName] = stats
}
return result, nil
}
// redisQueueKey Goravel Redis queue key format:
// {appName}_queues:{queueConnection}_{queue}
func (r *QueueStats) redisQueueKey(queueConnectionName, queueName string) string {
appName := facades.Config().GetString("app.name", "goravel")
return fmt.Sprintf("%s_queues:%s_%s", appName, queueConnectionName, queueName)
}