Version: Next
轮询负载均衡算法
提示
恢复默认轮询负载均衡策略 RoundRobinRule
- 在主启动类位置,不写
@RibbonClient
注解即可
轮询原理
rest 接口第几次调用 % 服务器集群节点总数 = 实际调用服务器位置下标
- 每次注册中心服务器重启后,REST 接口计数从
1
开始- 有循环队列内味儿
RoundRobinRule 源码分析
/**
* The most well known and basic load balancing strategy, i.e. Round Robin Rule.
*
* @author stonse
* @author Nikos Michalakis <nikos@netflix.com>
*
*/
public class RoundRobinRule extends AbstractLoadBalancerRule {
private AtomicInteger nextServerCyclicCounter; // 原子整型
private static final boolean AVAILABLE_ONLY_SERVERS = true;
private static final boolean ALL_SERVERS = false;
private static Logger log = LoggerFactory.getLogger(RoundRobinRule.class);
public RoundRobinRule() {
nextServerCyclicCounter = new AtomicInteger(0); // 初始化为0
}
public RoundRobinRule(ILoadBalancer lb) {
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer"); // 没负载均衡器,直接报错
return null;
}
Server server = null; // 目标服务器
int count = 0; // 尝试寻找次数
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers(); // 所有可用、活的服务器
List<Server> allServers = lb.getAllServers(); // 所有服务器
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) { // 根本没服务器,报错
log.warn("No up servers available from load balancer: " + lb);
return null;
}
// 定义在下面的方法
int nextServerIndex = incrementAndGetModulo(serverCount); // 加以 然后 取模 拿余数
server = allServers.get(nextServerIndex); // 用余数做下标,拿具体服务器
if (server == null) { // 服务器为空,线程礼让
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
return (server); // 没毛病,就用这个服务器
}
// Next.
server = null; // 走到这里,重试一次
}
if (count >= 10) { // 重试了10次
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb); // 报错,负载均衡器捣鼓了10次了都没找到可用服务器
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get(); // 一开始是0
int next = (current + 1) % modulo; // 加1取模取余
if (nextServerCyclicCounter.compareAndSet(current, next)) // CAS自旋锁
return next; // 确保自己是并发获胜者才更新
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
手写轮询算法
修改
provider-payment8001
和provider-payment8002
的Controller
,添加以下方法@GetMapping("/payment/lb")public String getPaymentLB() {return serverPort;}
- 在
cloud-consumer-order80
微服务中,找到com.bsx.springcloud.config.ApplicaitionContextConfig
配置类,其中注册了一个 Spring Bean,是RestTemplate
,移除其@LoandBalanced
注解,即关闭其Ribbon
自带的负载均衡器- 新建包
com.bsx.springcloud.lb
,新建接口MyLoadBalancer
- 实现类
MyLoadBalancerImpl
@Component
确保 Spring 容器可以扫描到这个类- 注入服务发现
discoveryClient
,用来扫注册中心上的可用微服务节点列表- 定义原子整型变量,初始化为
0
- 定义方法
final int getAndIncrement()
,实现原子自增- 通过取模计算下标
- 从列表中根据下标取出目标微服务节点实例
- 来到
Order80
的Controller
- 注入
MyLoadBalancer
、discoveryClient
- 添加
String getPaymentLB()
方法
GetMapping
,路由为/consumer/payment/lb
- 使用
discoveryClient
获取注册中心微服务实例列表- 调用
myLoadBalancer.instance
方法,传入微服务实例列表,该方法根据我们自定义的负载均衡策略返回一个目标微服务实例- 获取目标微服务实例后,使用 RestTemplate 远程调用它
接口
/**
* 自定义负载均衡器接口
* - 获取 注册中心 所有活着的微服务节点
*/
public interface MyLoadBalancer {
/**
* 给我一个 微服务实例 的列表,我根据负载均衡策略,选一个具体的微服务实例,访问它
*
* @param serviceInstances 注册中心上所有可用微服务实例的列表
* @return 最后选择的要访问的那个实例
*/
ServiceInstance instance(List<ServiceInstance> serviceInstances);
}
实现类
@Component
public class MyLoadBalancerImpl implements MyLoadBalancer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Override
public ServiceInstance instance(List<ServiceInstance> serviceInstances) {
int index = getAndIncrement() % serviceInstances.size(); // 获取下标
return serviceInstances.get(index);
}
// 原子加1
private final int getAndIncrement() {
int current;
int next;
do {
current = this.atomicInteger.get(); // 获取原子整型的值
// 如果current超出最大值,就变为0,否则+1
next = current >= Integer.MAX_VALUE ? 0 : current + 1;
} while (!atomicInteger.compareAndSet(current, next));
System.out.println("****** next: " + next + "******");
return next;
}
}
- OrderController
OrderController
@RestController
@Slf4j
public class OrderController {
// private static final String PAYMENT_URL = "http://localhost:8081";
// 基于 Eureka Server Application 服务名称
private static final String PAYMENT_URL = "http://CLOUD-PAYMENT-SERVICE";
@Resource
private RestTemplate restTemplate;
// 注入自定义的负载均衡器
@Resource
private MyLoadBalancer myLoadBalancer; // aka impl: MyLoadBalancerImpl
// discoveryClient 用来扫 Eureka Server 上的实例列表
@Resource
private DiscoveryClient discoveryClient;
/**
* 调用 支付模块 的 create 方法
*
* @param payment
* @return 调用结果
*/
@GetMapping("/consumer/payment/create")
public CommonResult<Integer> create(Payment payment) {
return restTemplate.postForObject(PAYMENT_URL + "/payment/create",
payment,
CommonResult.class);
}
@GetMapping("/consumer/payment/get/{id}")
public CommonResult<Payment> getPayment(@PathVariable("id") Long id) {
return restTemplate.getForObject(PAYMENT_URL + "/payment/get/" + id,
CommonResult.class);
}
@GetMapping("/consumer/payment/getForEntity/{id}")
public CommonResult<Payment> getPayment2(@PathVariable("id") Long id) {
ResponseEntity<CommonResult> entity = restTemplate.getForEntity(PAYMENT_URL + "/payment/get/" + id, CommonResult.class);
if (entity.getStatusCode().is2xxSuccessful()) // 如果是 200 系列响应码
return entity.getBody(); // 返回请求体
else
return new CommonResult<>(444, "获取失败");
}
/** 使用自定义负载均衡
*/
@GetMapping("/consumer/payment/lb")
public String getPaymentLB() {
List<ServiceInstance> instances = discoveryClient.getInstances("CLOUD-PAYMENT-SERVICE");
if (instances == null || instances.size() <= 0) return null;
// 有效服务列表
// 使用自定义的负载均衡策略,从列表里获取一个目标微服务实例
ServiceInstance targetInstance = myLoadBalancer.instance(instances);
URI uri = targetInstance.getUri();
System.out.println("****************** uri = " + uri);
return restTemplate.getForObject(uri + "/payment/lb", String.class);
}
}
测试
各微服务重启,访问 localhost:80/consumer/payment/lb
控制台
****** next: 1******
****************** uri = http://192.168.0.101:8082
****** next: 2******
****************** uri = http://192.168.0.101:8082
****** next: 3******
****************** uri = http://192.168.0.101:8081
****** next: 4******
****************** uri = http://192.168.0.101:8082
****** next: 5******
****************** uri = http://192.168.0.101:8081
****** next: 6******
****************** uri = http://192.168.0.101:8082
****** next: 7******
****************** uri = http://192.168.0.101:8081
****** next: 8******
****************** uri = http://192.168.0.101:8082
一致性哈希算法
- 链表化: 链表上有真节点对应一个物理服务器,也有假节点是空的,添加-移除一个服务器,只需要顺着链表找到它的下一个真实存在的服务器,进行重新分配
- 对于链表的最后一个节点,为了让它具有
下一个
服务器,将链表设置为环形链表
,也就是连续的链表,因此consistent hashing
是一个双关命名,体现了一致
和连续
,也许翻译为连续闭环哈希
更准确 - 普通的哈希时间复杂度 O(1);一致性哈希引入了链表时间复杂度 O(n),需要在链表上搜索真实存在的下一台服务器
- 对于一种哈希算法来说
O(n)
的时间复杂度太拉胯了,基于链表这种结构,可以考虑使用跳表,将时间复杂度优化到O(logn)