Failover
故障转移是分布式系统中重要的容错机制,当一个服务实例发生故障或不可用时,可以自动切换到其他可用实例,从而保证系统的高可用性。
故障转移基础
fyerrpc的故障转移系统包含如下的功能设计:
- 快速探测:快速准确地检测服务实例故障
- 动态恢复:在服务恢复后能够自动将其重新纳入可用实例池
- 灵活策略:提供多种故障转移策略,适应不同场景需求
- 熔断保护:结合熔断器模式避免持续调用不可用实例
- 智能重试:针对不同类型的错误采用合适的重试策略
故障转移流程
fyerrpc的故障转移工作流程如下:
- 故障检测:检测服务实例是否健康可用
- 熔断保护:对频繁出错的实例进行熔断
- 实例选择:根据策略选择可用实例
- 调用执行:执行远程调用操作
- 结果处理:根据调用结果进行状态更新和指标收集
- 故障恢复:定期检测并恢复不健康实例
核心组件
FailoverHandler 接口
FailoverHandler是故障转移功能的核心接口:
type FailoverHandler interface {
// Execute 执行带故障转移的调用
Execute(ctx context.Context, instances []*naming.Instance, operation func(context.Context, *naming.Instance) error) (*FailoverResult, error)
// GetDetector 获取故障检测器
GetDetector() Detector
// GetCircuitBreaker 获取熔断器
GetCircuitBreaker() CircuitBreaker
// GetRetryPolicy 获取重试策略
GetRetryPolicy() RetryPolicy
// GetRecoveryStrategy 获取恢复策略
GetRecoveryStrategy() RecoveryStrategy
// GetMonitor 获取实例监控器
GetMonitor() InstanceMonitor
}
故障转移配置
故障转移功能通过Config结构进行配置:
type Config struct {
// 重试相关配置
MaxRetries int // 最大重试次数
RetryInterval time.Duration // 重试间隔基准时间
RetryableErrors []string // 可重试的错误类型列表
RetryStrategy string // 重试策略: simple, exponential, jittered
// 熔断相关配置
CircuitBreakThreshold int // 熔断阈值,连续失败次数
CircuitBreakTimeout time.Duration // 熔断超时时间
// 故障检测配置
FailureDetectionTime time.Duration // 故障检测时间窗口
FailureThreshold int // 故障阈值次数
SuccessThreshold int // 成功阈值次数
// 恢复策略配置
RecoveryStrategy string // 恢复策略:immediate, gradual, probing
// 通用配置
FailoverStrategy string // 故障转移策略:next, random, best
}
可以使用函数选项模式进行配置:
config := failover.NewConfig(
failover.WithMaxRetries(3),
failover.WithRetryInterval(100*time.Millisecond),
failover.WithRetryBackoff(1.5, time.Second),
failover.WithRetryJitter(0.2),
failover.WithRetryableErrors([]string{"connection refused", "timeout"}),
failover.WithCircuitBreaker(5, 30*time.Second),
failover.WithFailoverStrategy("next"),
)
故障检测
故障检测负责判断服务实例是否健康,是故障转移的基础。
Detector 接口
type Detector interface {
// Detect 检测实例是否健康
Detect(ctx context.Context, instance *naming.Instance) (Status, error)
// MarkFailed 标记实例为失败状态
MarkFailed(ctx context.Context, instance *naming.Instance) error
// MarkSuccess 标记实例为成功状态
MarkSuccess(ctx context.Context, instance *naming.Instance) error
}
实例状态
服务实例可能处于以下状态:
type Status int
const (
StatusHealthy Status = iota // 健康状态
StatusUnhealthy // 不健康状态
StatusSuspect // 可疑状态,可能不健康
StatusIsolated // 被隔离状态
)
检测器实现
fyerrpc提供了多种故障检测器:
- TimeoutDetector:基于连接超时的检测器,通过尝试TCP连接来判断实例健康状态
// 创建基于超时的故障检测器
detector := failover.NewTimeoutDetector(config)
// 检测实例状态
status, err := detector.Detect(ctx, instance)
if err != nil || status != failover.StatusHealthy {
log.Printf("Instance %s is not healthy: %v", instance.Address, err)
}
-
ErrorRateDetector:基于错误率的检测器,记录调用错误率判断实例健康状态
-
HealthCheckDetector:通过定期健康检查判断实例状态
熔断器
熔断器模式防止持续调用不健康的服务实例,提高系统的稳定性。
CircuitBreaker 接口
type CircuitBreaker interface {
// Allow 判断请求是否允许通过
Allow(ctx context.Context, instance *naming.Instance) (bool, error)
// MarkSuccess 标记成功调用
MarkSuccess(ctx context.Context, instance *naming.Instance) error
// MarkFailure 标记失败调用
MarkFailure(ctx context.Context, instance *naming.Instance, err error) error
// GetState 获取熔断器状态
GetState(instance *naming.Instance) (State, error)
// Reset 重置熔断器状态
Reset(instance *naming.Instance) error
}
熔断器状态
熔断器有三种状态:
type State int
const (
StateClosed State = iota // 关闭状态,允许请求通过
StateOpen // 打开状态,请求被拒绝
StateHalfOpen // 半开状态,允许部分请求通过以探测服务是否恢复
)
熔断器实现
fyerrpc默认使用SimpleCircuitBreaker,其工作流程如下:
- 关闭状态:所有请求正常通过,但记录失败次数
- 打开状态:当连续失败次数达到阈值时,熔断器打开,拒绝所有请求
- 半开状态:超时后熔断器进入半开状态,允许有限请求通过
- 恢复机制:半开状态下请求成功率达到阈值,熔断器关闭;任何失败会使熔断器重新打开
// 创建熔断器
circuitBreaker := failover.NewCircuitBreaker(config)
// 判断请求是否允许通过
allow, err := circuitBreaker.Allow(ctx, instance)
if !allow {
log.Printf("Circuit breaker is open for instance %s: %v", instance.Address, err)
return err
}
// 根据调用结果更新熔断器
if callErr := callService(instance); callErr != nil {
circuitBreaker.MarkFailure(ctx, instance, callErr)
} else {
circuitBreaker.MarkSuccess(ctx, instance)
}
重试策略
重试策略定义了当调用失败时如何进行重试。
RetryPolicy 接口
type RetryPolicy interface {
// ShouldRetry 决定是否需要重试
ShouldRetry(ctx context.Context, attempt int, err error) bool
// NextBackoff 计算下一次重试的等待时间
NextBackoff(attempt int) time.Duration
// MaxAttempts 返回最大重试次数
MaxAttempts() int
}
重试策略实现
fyerrpc提供了多种重试策略:
- SimpleRetryPolicy:简单固定间隔重试
// 创建简单重试策略(最多重试3次,间隔100毫秒)
retryPolicy := failover.NewSimpleRetryPolicy(
3, // 最大重试次数
100*time.Millisecond, // 固定重试间隔
[]string{"timeout"}, // 可重试错误类型
)
- ExponentialBackoffRetryPolicy:指数退避重试,重试间隔随着尝试次数增加而指数增加
// 创建指数退避重试策略
retryPolicy := failover.NewExponentialBackoffRetryPolicy(
3, // 最大重试次数
100*time.Millisecond, // 初始重试间隔
10*time.Second, // 最大重试间隔
2.0, // 乘数因子
[]string{"timeout"}, // 可重试错误类型
)
- JitteredRetryPolicy:带随机抖动的重试策略,在指数退避基础上增加随机抖动,避免多个客户端同时重试导致的"惊群效应"
// 创建带抖动的重试策略
retryPolicy := failover.NewJitteredRetryPolicy(
3, // 最大重试次数
100*time.Millisecond, // 初始重试间隔
10*time.Second, // 最大重试间隔
2.0, // 乘数因子
0.2, // 抖动因子(±20%)
[]string{"timeout"}, // 可重试错误类型
)
恢复策略
恢复策略定义了如何将故障实例恢复到可用状态。
RecoveryStrategy 接口
type RecoveryStrategy interface {
// CanRecover 判断实例是否可以恢复
CanRecover(ctx context.Context, instance *naming.Instance) bool
// Recover 恢复实例
Recover(ctx context.Context, instance *naming.Instance) error
// RecoveryDelay 返回恢复尝试间隔
RecoveryDelay(instance *naming.Instance) time.Duration
}
恢复策略实现
fyerrpc提供了多种恢复策略:
-
ImmediateRecoveryStrategy:立即恢复策略,发现故障后立即尝试恢复
-
GradualRecoveryStrategy:渐进式恢复策略,在失败后使用逐渐增加的延迟尝试恢复
// 创建渐进式恢复策略
recoveryStrategy := failover.NewGradualRecoveryStrategy(detector, config)
- ProbingRecoveryStrategy:探测式恢复策略,在恢复前先发送少量请求探测服务状态
// 创建探测式恢复策略
recoveryStrategy := failover.NewProbingRecoveryStrategy(detector, config)
使用故障转移
创建故障转移处理器
// 创建配置
config := failover.NewConfig(
failover.WithMaxRetries(3),
failover.WithRetryInterval(100*time.Millisecond),
failover.WithRetryBackoff(1.5, time.Second),
failover.WithRetryJitter(0.2),
failover.WithCircuitBreaker(5, 30*time.Second),
failover.WithFailoverStrategy("next"),
)
// 创建故障转移处理器
handler, err := failover.NewFailoverHandler(config)
if err != nil {
log.Fatalf("Failed to create failover handler: %v", err)
}
执行带故障转移的调用
// 待调用的服务实例列表
instances := []*naming.Instance{
{ID: "inst-1", Address: "192.168.1.101:8000", Status: naming.StatusEnabled},
{ID: "inst-2", Address: "192.168.1.102:8000", Status: naming.StatusEnabled},
{ID: "inst-3", Address: "192.168.1.103:8000", Status: naming.StatusEnabled},
}
// 定义操作函数
operation := func(ctx context.Context, instance *naming.Instance) error {
// 调用服务...
return callService(ctx, instance.Address)
}
// 执行带故障转移的调用
ctx := context.Background()
result, err := handler.Execute(ctx, instances, operation)
if err != nil {
log.Printf("Failover call failed: %v", err)
log.Printf("Tried %d times, failed nodes: %v", result.RetryCount, result.FailedNodes)
} else {
log.Printf("Call succeeded using instance %s after %d retries",
result.Instance.Address, result.RetryCount)
}
故障转移结果
故障转移操作的结果通过FailoverResult结构返回:
type FailoverResult struct {
Success bool // 是否成功
Instance *naming.Instance // 最终使用的实例
RetryCount int // 重试次数
Duration time.Duration // 操作耗时
Error error // 错误信息
FailedNodes []string // 失败的节点列表
}
指标收集与监控
fyerrpc提供了故障转移指标收集功能,用于监控和分析系统健康状态。
指标收集接口
type MetricsCollector interface {
// 增加重试计数
IncrementRetries(service string)
// 增加故障转移计数
IncrementFailovers(service string, fromInstance, toInstance string)
// 记录故障检测事件
RecordDetection(instance *naming.Instance, status Status)
// 记录熔断器状态变更
RecordBreaker(instance *naming.Instance, state State)
// 记录请求延迟
RecordLatency(instance *naming.Instance, latency time.Duration)
}
内存指标收集器
// 创建内存指标收集器(保留100个延迟样本)
collector := failover.NewInMemoryMetricsCollector(100)
// 获取指标
retryCount := collector.GetRetryCount("user-service")
failoverCount := collector.GetFailoverCount("user-service")
avgLatency := collector.GetAverageLatency("instance-1")
// 获取故障检测统计
detectionStats := collector.GetDetectionStats("instance-1")
healthyCount := detectionStats[failover.StatusHealthy]
unhealthyCount := detectionStats[failover.StatusUnhealthy]
// 获取熔断器统计
breakerStats := collector.GetBreakerStats("instance-1")
openCount := breakerStats[failover.StateOpen]