SpringCloud

服务与服务之间的调用

RestTemplate:RestTemplate提供了多种便捷访问远程Http服务的方法,是一种简单便捷的访问restful服务模板类,是spring提供的用于访问Rest服务的客户端模板工具类

1
2
restTemplate.postForObject(url,参数,返回结果类.class)//当我们向请求提供方发送post请求时
restTemplate.getForObject(url,返回结果类.class) //当我们向请求提供方发送get请求时

服务注册中心

Eureka

Eureka的基本知识

什么是服务治理

SpringCloud 封装了 NetFlix公司开发的Eureka模块来实现服务治理

在传统饿的RPC远程调用框架中管理每个服务和每个服务之间依赖关系比较复杂,管理比较复杂,所以需要使用服务治理,管理服务于服务之间依赖关系,可以实现服务调用、负载均衡、容错等,实现服务发现与注册

什么是服务的注册与发现

Eureka采用了CS的设计架构,Eureka Server作为服务注册功能的服务器,他是服务注册中心,而系统中的其他微服务没使用Eureka的客户端连接到Eureka Server并维持心跳连接。这样系统的维护人员就可以通过Eureka Server来监控系统中的各个微服务是否正常运行

在微服务注册与发现中,有一个注册中心,当服务器启动时,会把当前自己服务器的信息比如, 服务器通讯地址等以别名的方式注册到注册中心上,另一个方(消费者|服务提供者)以该别名的方式去注册中心上获取到世纪的服务通讯地址,然后再实现本地RPC调用,RPC远程调用框架核心思想在于注册中心,因为注册中心管理每个服务与服务之间的一个依赖关系(服务治理观念)。在任何rpc远程夸那个价中,都会有一个注册中心(存放地址相关信息(接口地址))

Eureka的两大组件:Eureka Server和Eureka Client
  • Eureka Server 提供服务注册服务
  • Eureka Client通过注册中心进行访问

我们构建 Eureka Server时,要导入依赖

1
2
3
4
5
6
7
8
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

然后修改配置文件application.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
server:
port: 7001 # 该服务的端口号

spring:
application:
name: eureka-server #该服务的服务名

eureka:
instance:
hostname: localhost #该服务的主机IP
client:
service-url:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka #访问Eureka Server所要进入的默认路径
register-with-eureka: false # false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务
fetch-registry: false # false表示自己就是注册中心。我的职责就是维护服务实例,并不需要去检索服务

在启动类 中加上注解 @EnableEurekaServer

开启服务并访问 Eureka server

image-20230222202954794

Eureka Client

首先导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

然后在配置文件中修改配置

1
2
3
4
5
6
7
8
eureka:
client:
#表示是否将自己注册进Eureka Server 默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的服务信息,默认为true,单点无所谓集群必须设置为true才能配合Ribbon使用负载均衡
fetch-registry: true
service-url:
defaultZone: http://localhost:7001/eureka

在启动类上加上注解 @EnableEurekaClient

image-20230222205942068

服务重新部署以后我们会在 EurekaServer页面看到

image-20230222210028207

这里的Application的名称即为 服务提供者的Application中的spring.application.name

image-20230222210630618

Eureka集群

EurekaServer 集群

EurekaServer集群注册秉承着相互注册的方式(你中有我,我中有你)

image-20230223144021218

EurekaServer集群实现

首先两个服务端要有分别不同的端口号例如(7001,7002)

其次我们在对应模块的application.yaml中修改主机名称配置:由原先的localhost变为eruekaXXXX.com

再修改要连接的Eureka服务注册与发现中心配置,但要为另一个服务端的连接地址

image-20230223144722821

我们将这两个服务端的这两个配置修改完成后,去修改 C:\Windows\System32\drivers\etc文件夹下的hosts文件,向其中修改映射地址

image-20230223144905374

一切修改完成后我们可以重启这两个服务接下来就可以看到

image-20230223145016661

image-20230223145054505

EurekaClient集群的实现

EurekaClient集群实现非常容易,在application.yaml中的修改

将两个EurekaServer的服务注册地址全部添加到配置文件中

image-20230223145954080

然后可以发现

Eureka服务调用

在服务消费者的模块中,我们可以看到,我们所调用的服务提供者模块是写死的,只能写为端口号为8001或8002的主机的IP地址以及端口号,但这与我们构建集群的目的相互违背,所以是调用8001端口的主机还是调用8002端口的主机,这种问题的结局方案就是 负载均衡

我们在服务消费者模块下的配置类的RestTemplate上加入注解@LoadBalanced ,而后在 Controller类中的URL的IP地址以及端口除改为服务消费者的名称

CLOUD-PAYMENT-SERVICE 即可实现对服务提供者集群的调用,并且实现了负载均衡

image-20230223151511296

image-20230223151526058

image-20230223151701195

image-20230223151720385

actuator微服务信息的完善

当我们在Eureka网页中看到两个服务提供者时他们的名称默认为 主机IP:服务应用名:端口号的形式,我们可以修改格式将其改为我们想修改的名称

1
2
3
eureka:  
instance:
instance-id: Payment8001
1
2
3
eureka:  
instance:
instance-id: Payment8002

image-20230223153544797

增加配置可显示详细信息(服务的IP地址以及端口号)

1
2
3
4
eureka:  
instance:
instance-id: Payment8002
prefer-ip-address: true

服务注册发现(DisCovery)

首先我们在订单启动类中加入注解@EnableDiscoveryClient,然后在Controller类中注入类DisCoveryClient

首先了解discoveryClient的两个方法

discoveryClient.getServices 获取在Eureka中的服务

discoveryClient.getInstances(ServiceID); 获取该ServiceID集群下的所有具体实例

image-20230223162529968

我们可以获取实例下的一些配置信息

image-20230223163314450

Zookeeper

关于Zookeeper的基础知识在其它博客中,本节主要介绍,zookeeper与springcloud的结合使用作为服务注册中心

服务提供者注册进zookeeper

导入依赖坐标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
 <dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--由于该坐标依赖中zookeeper的版本依赖与本机的虚拟机中zookeeper的版本号不同,所以要先排除zookeeper-->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.6</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--再者,SpringBootWeb的依赖坐标中的子项也有对slf4j的依赖,所以在此要排除zookeeper中的slf4j依赖-->

在模块的配置文件中,添加连接地址

1
2
3
4
spring:  
cloud:
zookeeper:
connect-string: 192.168.26.132:2181

Zookeeper端开启服务,SpringBoot开启服务

Zookeeper客户端中可看到

image-20230301170555894

在浏览器中可看到

image-20230301170710438

Zookeeper服务注册中心中节点的状态

当我们断开SpringBoot的连接时我们会发现

image-20230301171418594

38处我们会发现 /services节点消失了,这说明什么?

这说明 zookeeper作为服务注册中心时,其注册的服务在zookeeper中的节点应为临时节点,一旦连接断开,时间超过规定时间,那么节点就会自动删除

Ribbon

什么是Ribbon

Springcloud Rinbbon 是基于netflix Rinbbon实现的一套客户端负载均衡工具

简单来说,Ribbon是Netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法和服务调用,Ribbon客户端组件提供一系列完善的配置项如连接超时,重试。简单来说,就是在配置文件中列出Load Balanced(LB)后面所有的机器,Ribbon会自动的帮助你基于某种规则(如简单轮询,随机连接等)去连接这些机器。我们很容易使用Ribbon实现自定义的负载均衡算法

LB负载均衡(Load Balance)是什么

简单的说就是将用户的请求平摊的分配到多个服务上,从而达到系统高可用

常见的负载均衡软件有Nginx,LVS,硬件F5等

Ribbon本地负载均衡服务器VSNginx服务端负载均衡的区别

Nginx是服务器负载均衡,客户端所有请求都会交给nginx,然后由nginx实现转发请求。即负载均衡是由服务端实现的

Ribbon本地负载均衡,在调用微服务接口时,会在注册中心上获取注册信息服务列表之后缓存到JVM本地,从而在本地实现RPC远程服务调用技术

在进程中的负载均衡将负载均衡逻辑集成到消费方,消费方从服务注册中心获知有哪些地址可用,然后自己再从这些地址中选择出一个合适的服务器

Ribbon就属于进程中的LB,他只是一个类库,集成于消费方进程,消费方通过它来获取到服务提供方的地址

Ribbon的Rest调用

在RestTemplate中会有方法 getForEntity和 postForEntity,调用它时会返回一个ResponseEntity<~>泛型类的对象,可查询状态码查询成功

image-20230302125142832

我们可判断其中的状态码

并且该对象可直接将泛型中的对象获取

1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/consumer/payment/EntityGet/{id}")
public CommentResult<Payment> getPayment(@PathVariable("id") Long id){

ResponseEntity<CommentResult> result= restTemplate.getForEntity(PAYMENT_URL+"/payment/get/"+id,CommentResult.class);
if (result.getStatusCode().is2xxSuccessful()) {
return result.getBody();
}
else {
return new CommentResult<>(444,"操作失败");
}
}

定义Ribbon的负载均衡策略时,不能在主启动类包及其子包下

image-20230302130407669

我们对负载均衡策略配置时

1
2
3
4
5
6
7
@Configuration
public class MyselfRule {
@Bean
public IRule myRule(){
return new RandomRule(); // 选择随机策略
}
}

在启动类中加入注解

image-20230302150726019

轮询算法的原理

负载均衡算法:rest接口第几次请求数%服务器集群数量=实际调用服务器位置下标,每次服务重启后rest接口记数从1 开始

image-20230302151520503

OpenFeign

Feign是什么

Feign是一个声明式的Web服务客户端,让编写Web服务客户端变得非常容易,只需要创建一个接口并在接口上添加注解即可

Feign能干什么

Feign旨在使编写Java Http客户端变得更容易

前面在使用Ribbon+RestTemplate时,利用RestTemplate对http请求的封装处理,形成了一套模版化的调用方法,但在实际开发中,由于对服务以来的调用可能不止一处,往往一个接口被多处调用,通常都会针对每个服务自行封装一些客户端类来包装这些依赖服务的调用。所以,Feign在此基础上做了进一步封装,由他来帮助我们定义和实现依赖服务接口的定义,在Feign的实现下,我们只需创建一个接口并使用注解的方式来配置它(以前是Dao接口标注Mapper注解,现在是一个微服务接口上标注一个Feign注解即可),即可完成对服务提供方的接口绑定,简化使用了SpringCloud Ribbon 时,自动封装服务调用客户端的开发量

Feign集成了Ribbon

利用Ribbon维护了Payment的服务列表信息,并且通过轮询实现客户端负载均衡,而Ribbon不同的是,通过feign只需要定义服务绑定接口且以生命是的方法,优雅而简单的实现服务调用

Feign的使用

Feign和Eureka实现了服务的注册发现和调用

首先导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>

在Service接口中(FeignClient中后面跟的Value是服务的名称)

1
2
3
4
5
6
@Component
@FeignClient(value = "CLOUD-PAYMENT-SERVICE")
public interface PaymentFeignService {
@GetMapping("/payment/get/{id}")
CommentResult<Payment> getPaymentById(@PathVariable("id") Long id); //该方法要与服务提供方的方法一致
}

在主启动类上,添加注解@EnableFeignClient

image-20230305120558331

OpenFeign的超时调用

当我们的响应时间超过一秒时,会出现以下状况

image-20230305200427586

OpenFeign的底层原理就是Ribbon+RestTemplate的动态调用,Ribbon默认设置为响应超过一秒直接报错(超时调用),我们但当网络不通畅时,我们的调用无论怎样都会报错,那么我们如何去延长Ribbon超时报错的时间呢?

修改服务消费者配置文件中的配置

1
2
3
ribbon:
ReadTimeout: 5000 <!--调用时长超过5000毫秒-->
ConnectTimeout: 5000 <!--连接时长超过5000毫秒-->

Hystrix

什么是Hystrix

Hystrix是一个用于处理分布式系统的延迟和容错的开源库,在分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,

Hystrix能够保证在同一个依赖出问题的情况下,不会导致整体服务失败,避免级联故障,以提高分布式系统的弹性

“断路器”本身是一种开关装置,当某个单元发生故障后,通过断路器的故障监控(类似于熔断保险丝),向调用方返回一个符合预期的、可处理的备选响应(fallBack),而不是长时间的等待或者抛出调用方无法处理的异常,这样就保证了服务调用方的超线程不被长时间、不必要地占用,从而避免了故障在分布式系统中的蔓延,乃至雪崩

面试题

当我们出现以下情况的时候:如果D服务发生了故障不能响应,B服务调用D时只能阻塞等待。假如B服务调用D服务设置超时时间是10秒,请求速率是每秒100个,那10秒内就会有1000个请求线程被阻塞等待,如果B的线程池大小设置1000,那B系统因为线程资源耗尽已经不能对外提供服务了。而这又影响了入口系统A的服务,最终导致系统全面崩溃。

所以我们需要对服务雪崩的容错设计,所以需要系统容错中的限流、熔断和服务降级。

什么是服务限流

当系统的处理能力不能应对外部请求的突增流量时,为了不让系统崩溃,必须采取限流的措施。

限流中有三个指标:

  1. TPS:每秒系统事务吞吐量,系统吞吐量是衡量系统性能的关键指标,按照事务的完成数量来限流是最合理的。
  2. HPS:每秒请求数,指每秒钟服务端收到客户端的请求数量。
  3. QPS:服务端每秒能够响应的客户端查询请求数量。
服务限流的几个算法:
  1. ​ 流量计数器:直接去限制每秒请求数量,例如限制为100,超过100就会被拒绝访问

    • 缺点:在单位时间内,很难把控HPS(服务器每秒收到的请求数),倘若上一秒(1s为单位时间)还不到100但是实际上,在上一秒到这一秒的时间间隔内可能已经出现了HPS>100
    • 有一段时间流量超了,也不一定真的需要限流,系统HPS限制50,虽然前3s流量超了,但是如果都超时时间设置为5s,并不需要限流
  2. 滑动时间窗口: 例如操作系统中的滑动时间窗口

    • image-20230306001006758

    • 开始的时候,我们把t1t5看做一个时间窗口,每个窗口1s,如果我们定的限流目标是每秒50个请求,那t1t5这个窗口的请求总和不能超过250个。

      这个窗口是滑动的,下一秒的窗口成了t2~t6,这时把t1时间片的统计抛弃,加入t6时间片进行统计。这段时间内的请求数量也不能超过250个。

    • 缺点:

      • 一旦流量超过就必须抛弃或者走服务降级
      • 对流量控制不够精细,不能限制集中在短时间内的流量,也不能削峰填谷
  3. 漏桶方法

    • image-20230306001603783
    • 在客户端的请求发送到服务器之前,先用漏桶缓存起来,这个漏桶可以是一个长度固定的队列,这个队列中的请求均匀地发送到服务端。如果客户端的请求速率太快,漏桶的队列满了,就会被拒绝掉,或者走降级处理逻辑。这样服务端就不会受到突发流量的冲击。
    • 有3个问题需要考虑:
      • 漏桶的大小,如果太大,可能给服务端带来较大处理压力,太小可能会有大量请求被丢弃。
      • 漏桶给服务端的请求发送速率。
      • 使用缓存请求的方式,会使请求响应时间变长。
  4. 令牌桶算法

    • 令牌桶算法就跟病人去医院看病一样,找医生之前需要先挂号,而医院每天放的号是有限的。当天的号用完了,第二天又会放一批号。
    • image-20230306002005877

什么是服务熔断

它就相当于一个开关,打开后可以阻止流量通过。比如保险丝,当电流过大时,就会熔断,从而避免元器件损坏。服务熔断是指调用方访问服务时通过断路器做代理进行访问,断路器会持续观察服务返回的成功、失败的状态,当失败超过设置的阈值时断路器打开,请求就不能真正地访问到服务了。

image-20230306113939085

断路器的几个状态

断路器有3种状态:

  • CLOSED:默认状态。断路器观察到请求失败比例没有达到阈值,断路器认为被代理服务状态良好。
  • OPEN:断路器观察到请求失败比例已经达到阈值,断路器认为被代理服务故障,打开开关,请求不再到达被代理的服务,而是快速失败。
  • HALF OPEN:断路器打开后,为了能自动恢复对被代理服务的访问,会切换到半开放状态,去尝试请求被代理服务以查看服务是否已经故障恢复。如果成功,会转成CLOSED状态,否则转到OPEN状态。
  • image-20230306114132852

使用断路器需要考虑一些问题:

  • 针对不同的异常,定义不同的熔断后处理逻辑。
  • 设置熔断的时长,超过这个时长后切换到HALF OPEN进行重试。
  • 记录请求失败日志,供监控使用。
  • 主动重试,比如对于connection timeout造成的熔断,可以用异步线程进行网络检测,比如telenet,检测到网络畅通时切换到HALF OPEN进行重试。
  • 补偿接口,断路器可以提供补偿接口让运维人员手工关闭。
  • 重试时,可以使用之前失败的请求进行重试,但一定要注意业务上是否允许这样做。

使用场景

  • 服务故障或者升级时,让客户端快速失败
  • 失败处理逻辑容易定义
  • 响应耗时较长,客户端设置的read timeout会比较长,防止客户端大量重试请求导致的连接、线程资源不能释放

什么是服务降级

当服务器压力剧增的情况下,根据实际业务情况及流量,对一些服务和页面有策略的不处理或换种简单的方式处理,从而释放服务器资源以保证核心交易正常运作或高效运作

使用场景

  • 服务处理异常,把异常信息直接反馈给客户端,不再走其他逻辑
  • 服务处理异常,把请求缓存下来,给客户端返回一个中间态,事后再重试缓存的请求
  • 监控系统检测到突增流量,为了避免非核心业务功能耗费系统资源,关闭这些非核心功能
  • 数据库请求压力大,可以考虑返回缓存中的数据
  • 对于耗时的写操作,可以改为异步写
  • 暂时关闭跑批任务,以节省系统资源

配置服务降级

在服务提供方进行服务降级

首先要导入Hystrix的坐标依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

在主启动类上添加注解@EnableHytrix

然后在服务提供者Service中可能出现错误的方法上添加注解

1
2
3
4
@HystrixCommand(fallbackMethod = "paymentInfo_TimeoutHandler" // 做服务降级,fallbackMethod意为当出现错误时,所降级到的服务
,commandProperties = {@HystrixProperty (name = "execution.isolation.thread.timeoutInMilliseconds",value = "3000")
//HystrixProperty 为规定出现的何种错误(name=XX)和到达何值时出现错误 (value=XX)
})

一般情况下我们自己所写的fallbackMethod都是极其简单的方法,不会出现错误

当出现线程超时时,我们会进行线程隔离,所调用的线程是Hystrix所创建出的线程而并非TOMCAT线程池中的线程

image-20230306182947685

image-20230306182846728

在服务消费方进行服务降级

首先在配置文件中进行修改

1
2
3
feign:
hystrix:
enabled: true

在主启动类上加入注解

@EnableHystrix

image-20230306200827126

在业务层中的配置方法和服务提供方一致

1
2
3
4
5
6
7
8
9
10
@GetMapping("/consumer/payment/hystrix/Timeout/{id}")
@HystrixCommand(fallbackMethod = "paymentTimeOutFallbackMethod",commandProperties = {
@HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "1500")
})
String paymentInfo_Timeout(@PathVariable("id") Integer id){
return paymentHystrixService.paymentInfo_Timeout(id);
}
public String paymentTimeOutFallbackMethod(@PathVariable("id") Integer id){
return "我是消费者80,对方支付系统繁忙或者运行出错,请等待10秒钟后再试或检查自己5555";
}

倘若服务提供方和服务消费方都对线程超时时间做出了限制,不管是服务消费者方的超时时间大于服务提供方的超时时间,还是服务消费者方的超时时间小于服务提供方的超时时间,只要超时都会调用服务消费者方的Fallback函数

默认服务降级(DefaultProperties)

当我们发现我们对每个方法都进行服务降级时,每个方法上面都带着一大坨注解代码,这样的情况并不符合程序设计理念,多个方法的服务降级是相同的,我们就可以用默认服务降级的方法

首先我们在Controller类中加入注解@DefaultProperties

image-20230307104616969

defaultFallback即为默认的fallback方法

1
2
3
String paymentGlobal_FallbackMethod(){
return "Global异常处理信息,请稍后再试。。。。。";
}

再将每个需要服务降级的方法上加入注解@HystrixCommand即可

1
2
3
4
5
6
7
8
9
10
11
  @GetMapping("/consumer/payment/hystrix/Timeout/{id}")
// @HystrixCommand(fallbackMethod = "paymentTimeOutFallbackMethod",commandProperties = {
// @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "1500")
// })
@HystrixCommand
String paymentInfo_Timeout(@PathVariable("id") Integer id){
return paymentHystrixService.paymentInfo_Timeout(id);
}
public String paymentTimeOutFallbackMethod(@PathVariable("id") Integer id){
return "我是消费者80,对方支付系统繁忙或者运行出错,请等待10秒钟后再试或检查自己5555";
}

重启服务我们会发现,出现异常服务降级时,我们并不执行先前设置的FallBack方法(先前的设置已经设置为注解了)

image-20230307104835900

服务降级是就近原则,如果@HystrixCommand中设置了规定的方法和所处理的异常,那么就按照@HystrixCommand中的配置执行,如果没有则要进行默认方法配置,如果没有@HystrixCommand那么在程序出现错误时,不执行服务降级

将自定义的服务降级配置打开后

image-20230307105848159

全局服务降级

为了避免fallback函数和业务函数混在一起导致的代码混乱,我们需要对此进行调整

我们直接创建实现服务消费者业务接口类,实现其中的方法

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class paymentFallbackService implements PaymentHystrixService{
@Override
public String paymentInfo_OK(Integer id) {
return "PaymentFallBackService fall back OK 55555";
}

@Override
public String paymentInfo_Timeout(Integer id) {
return "PaymentFallBackService fall back Timeout 5555";
}
}

​ 在@FeignClient注解中添加属性该类即为实现接口类

image-20230307123100580

在配置文件中修改配置

1
2
3
feign:
hystrix:
enabled: true

image-20230307123210430

配置服务熔断

我们在服务提供方的Controller类中设置一个用户接口用于测试熔断

1
2
3
4
5
6
@GetMapping("/payment/circuit/{id}")
public String paymentCircuitBeaker(@PathVariable("id") Integer id){
String result = paymentService.paymentCircuitBreaker(id);
log.info("*****result"+result);
return result;
}

将Service层中设置我们的熔断机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 @HystrixCommand(fallbackMethod = "paymentCircuitBreaker_fallback",  // 兜底方法,用于熔断时开启
commandProperties = {
@HystrixProperty(name = "circuitBreaker.enabled",value = "true"), //是否开启熔断机制
@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"), //请求多少次失败后自动开启熔断
@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"), //熔断机制开启多长时间后会转为半开
@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60") // 错误率为多少则自动开启熔断
}
)
public String paymentCircuitBreaker(Integer id){
if (id <0){
throw new RuntimeException("******id不能为负数");
}
String serialNumber= IdUtil.simpleUUID();
return Thread.currentThread().getName()+"\t"+"调用成功,流水号:"+serialNumber;
}
public String paymentCircuitBreaker_fallback(Integer id){
return "id 不能负数,请稍后再试,/ToT/~~ id"+id;
}
}
服务熔断流程

当我们开启服务并多次请求(超过10次)测试熔断接口后,服务熔断自动开启,继续访问,自动响应兜底函数,即使当我们使用正确的方式请求时,也会Contreller调用兜底函数,过段时间后,系统就开始进入halfopen模式,默默开放部分服务,让倘若测试成功,就直接开启这项服务(此项服务的熔断判定会刷新)

官方完整流程图

img

服务正常访问

image-20230308113939498

服务访问失败

image-20230308114031742

服务熔断开启,即使访问方式正常,也会调用兜底函数

image-20230308114549233

服务熔断关闭

image-20230308114611682

图形化Dashboard

首先创建模块导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-openfeign</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId>
</dependency>
</dependencies>

创建配置文件yaml

1
2
3
4
5
6
server:
port: 9001

hystrix:
dashboard:
proxy-stream-allow-list: localhost

创建启动类

1
2
3
4
5
6
7
8
@EnableHystrixDashboard
@SpringBootApplication
@EnableCircuitBreaker
public class HystrixDashboard9001 {
public static void main(String[] args) {
SpringApplication.run(HystrixDashboard9001.class,args);
}
}

在服务提供者8001中修改启动类,注入bean(由于版本问题不得不注入)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@EnableEurekaClient
@SpringBootApplication
@EnableHystrix
@EnableCircuitBreaker
public class PaymentHystrixMain8001 {
public static void main(String[] args) {
SpringApplication.run(PaymentHystrixMain8001.class,args);
}

@Bean
public ServletRegistrationBean getServlet(){
HystrixMetricsStreamServlet streamServlet=new HystrixMetricsStreamServlet();
ServletRegistrationBean registrationBean = new ServletRegistrationBean(streamServlet);
registrationBean.setLoadOnStartup(1);
registrationBean.addUrlMappings("/hystrix.stream");
registrationBean.setName("HystrixMetricsStreamServlet");
return registrationBean;
}
}

启动各个服务模块,访问 https:localhost:9001/hystrix

出现该页面

image-20230308160646552

证明我们配置的没有问题,图中一号区域时我们所要监控的地址,delay为页面刷新信息的时间 ,title为HystrixApp名(随便写)

①:http://localhost:8001/hystrix.stream

image-20230308162359687

GeteWay网关

API网关

什么是API网关

网关的角色是作为一个 API 架构,用来保护、增强和控制对于 API 服务的访问。

API 网关是一个处于应用程序或服务(提供 REST API 接口服务)之前的系统,用来管理授权、访问控制和流量限制等,这样 REST API 接口服务就被 API 网关保护起来,对所有的调用者透明。因此,隐藏在 API 网关后面的业务系统就可以专注于创建和管理服务,而不用去处理这些策略性的基础设施。

img

API网关的分类和功能

img

API网关在微服务框架中的位置

image-20230308203533394

Gateway

Gateway是什么

Spring Cloud Gateway是Spring官方基于Spring 5.0,Spring Boot 2.0和Project Reactor等技术开发的网关,Spring Cloud Gateway旨在为微服务架构提供一种简单而有效的统一的API路由管理方式。Spring Cloud Gateway作为Spring Cloud生态系中的网关,目标是替代ZUUL,其不仅提供统一的路由方式,并且基于Filter链的方式提供了网关基本的功能,例如:安全,监控/埋点,和限流等。

为什么要用Gateway

Spring Cloud Gateway 可以看做是一个 Zuul 1.x 的升级版和代替品,比 Zuul 2 更早的使用 Netty 实现异步 IO,从而实现了一个简单、比 Zuul 1.x 更高效的、与 Spring Cloud 紧密配合的 API 网关。
Spring Cloud Gateway 里明确的区分了 Router 和 Filter,并且一个很大的特点是内置了非常多的开箱即用功能,并且都可以通过 SpringBoot 配置或者手工编码链式调用来使用。
比如内置了 10 种 Router,使得我们可以直接配置一下就可以随心所欲的根据 Header、或者 Path、或者 Host、或者 Query 来做路由。
比如区分了一般的 Filter 和全局 Filter,内置了 20 种 Filter 和 9 种全局 Filter,也都可以直接用。当然自定义 Filter 也非常方便。

Gateway的三大核心

Gateway的三大核心:Route(路由),Pridicate(断言),Filter(过滤)

路由

路由是构建网关的基本模块它是由ID,目标URI,一系列的断言和过滤器构成,如果断言为true则匹配该路由

断言

Spring Cloud Gateway 通过 Predicate 断言来实现 Route 路由的匹配规则。简单点说,Predicate 是路由转发的判断条件,请求只有满足了 Predicate 的条件,才会被转发到指定的服务上进行处理。

使用 Predicate 断言需要注意以下 3 点:

  • Route 路由与 Predicate 断言的对应关系为“一对多”,一个路由可以包含多个不同断言。
  • 一个请求想要转发到指定的路由上,就必须同时匹配路由上的所有断言。
  • 当一个请求同时满足多个路由的断言条件时,请求只会被首个成功匹配的路由转发。
配置断言

断言有许多种例如:After Route Pridicate 、 Before Route Predicate 、 Between Route Predicate、 Method Route Predicate 、Path Route Predicate

  • After Route Pridicate 、 Before Route Predicate 、 Between Route Predicate:在 XXX时间之后、在XXX时间之前,在XXX和XXX时间之间可以使得断言成功时间的格式为: YYYY-MM-DDTHH-mm-ss.sss+08:00[Asia/Shanghai]
  • Method Route Predicate:指定某种请求方法(GET POST PUT DELETE)方法才能断言成功
  • Path Route Predicate:在Path断言中填入一段URL匹配规则,当实际请求的URL和断言中的规则相匹配的时候,就下发到该路由中URI指定地址,这个地址可以是一个具体的HTTP地址,也可以是一个Eureka中注册的服务名称,路由规则可以一次编写多个绑定关系
  • Cookie Route Predicate:当映射地址访问时,查看Cookie是否带有断言中的属性和其对应的值 、
  • Header Route Pridicate :当访问时,请求头中是否带有某属性且该属性符合什么正则表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: payment_route #路由ID没有固定规则但是要求唯一,建议配合服务名
uri: lb://CLOUD-PAYMENT-SERVICE #匹配后提供服务的路由地址
predicates:
- Path=/payment/get/** #断言,路径相匹配的进行路由

- id: payment_route2 #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名
uri: lb://CLOUD-PAYMENT-SERVICE #匹配后提供的路由地址
#以下为部分断言配置
predicates:
- Path=/payment/lb/** #断言,路径相匹配进行路由
- After=2023-03-09T15:50:37.485+08:00[Asia/Shanghai]
- Cookie=username,zhangfeng
- Header=X-Request-ID, \d+
- Method=GET
filters:
- AddRequestParameter=asd,1024

过滤器

通常情况下,出于安全方面的考虑,服务端提供的服务往往都会有一定的校验逻辑,例如用户登陆状态校验、签名校验等。

在微服务架构中,系统由多个微服务组成,所有这些服务都需要这些校验逻辑,此时我们就可以将这些校验逻辑写到 Spring Cloud Gateway 的 Filter 过滤器中。

Filter的分类
  • Pre 类型:这种过滤器在请求被转发到微服务之前可以对请求进行拦截和修改,例如参数校验、权限校验、流量监控、日志输出以及协议转换等操作。
  • Post 类型:这种过滤器在微服务对请求做出响应后可以对响应进行拦截和再处理,例如修改响应内容或响应头、日志输出、流量监控等。

配置过滤器

配置局部过滤器

配置文件中配置过滤器

1
2
3
4
5
6
7
8
spring:
cloud:
gateway:
routes:
- id: payment_route2 #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名
uri: lb://CLOUD-PAYMENT-SERVICE #匹配后提供的路由地址
filters:
- AddRequestParameter=asd,1024 #配置为请求参数中带有asd=1024

不仅仅是AddRequestParameter还有许多内置的过滤器:

image-20230309185138920

image-20230309185212855

image-20230309185233053

image-20230309185251088

自定义局部过滤器

首先要将其注入到容器中,然后实现GlobalFilter接口和Ordered接口,重写 filter与getOrder方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.atguigu.springcloud.filter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

@Component
@Slf4j
public class MyLogGateway implements GlobalFilter, Ordered {


@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String uname = exchange.getRequest().getQueryParams().getFirst("uname"); //请求路径中中是否带有参数 uname
if (uname==null){
log.info("******用户为null,非法用户=-=");
exchange.getResponse().setStatusCode(HttpStatus.NOT_ACCEPTABLE); //设置状态码为 不可接受状态
}
return chain.filter(exchange); //该过滤器放行,进入下一过滤器,若无其他过滤器则开始访问
}

@Override
public int getOrder() {
return 0;
}
}

配置动态网关

首先将创建网关模块 Gateway-9527

添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

添加配置

要将Gateway也注册进Erueka中,方便网关调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
server:
port: 9527
spring:
application:
name: cloud-gateway
cloud:
gateway:
discovery:
locator:
enabled: true
routes:
- id: payment_route #路由ID没有固定规则但是要求唯一,建议配合服务名
uri: lb://CLOUD-PAYMENT-SERVICE #匹配后提供服务的路由地址(由于是从erueka中获取,所以直接通过服务名去找) lb LoadBalanced所写
predicates:
- Path=/payment/get/** #断言,路径相匹配的进行路由

- id: payment_route2 #payment_route #路由的ID,没有固定规则但要求唯一,建议配合服务名
uri: lb://CLOUD-PAYMENT-SERVICE #匹配后提供的路由地址
predicates:
- Path=/payment/lb/** #断言,路径相匹配进行路由

eureka:
instance:
hostname: cloud-gateway-service
perfer-ip-address: true
client:
#表示是否将自己注册进Eureka Server 默认为true
register-with-eureka: true
#是否从EurekaServer抓取已有的服务信息,默认为true,单点无所谓集群必须设置为true才能配合Ribbon使用负载均衡
fetch-registry: true
service-url:
defaultZone: http://eureka7001.com:7001/eureka

启动类的构建

1
2
3
4
5
6
7
8
9
10
11
12
package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
@SpringBootApplication
@EnableEurekaClient
public class GatewayMain9527 {
public static void main(String[] args) {
SpringApplication.run(GatewayMain9527.class,args);
}
}
自定义全局过滤器

暂时先不整,项目中好好看

Config

微服务意味着要将单体应用的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现大量服务。由于每个服务都需要必要的配置信息才能运行,所以一套集中式的、动态的配置管理设施是必不可少的

什么是SpringCLoudConfig

它为为服务架构中的微服务提供集中化的外部配置支持,配置服务器为各个不同微服务应用的所有环境提供一个中心化外部配置

怎么用

SpringCloud Config分为服务端和客户端两部分

服务端也称为分布式配置中心,它是一个独立的微服务应用,用来连接配置服务器并为客户端提供获取配置信息,加密/解密信息等访问接口

客户端则是通过指定的配置中心来管理应用资源,以及与业务相关的配置内容,并在启动的时候,从配置中心获取和加载配置信息,配置服务器默认采用git来存储配置信息,这样就对环境配置进行版本管理,并且通过Git客户端工具来方便管理和访问配置内容

config总控中心搭建

首先要保证我们的github中有仓库存储该配置

image-20230314124529283

创建模块cloud-config-center-3344

添加依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-config-server</artifactId> <!--配置依赖-->
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
</dependencies>

修改模块配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
server:
port: 3344

spring:
application:
name: cloud-config-center #注册到eureka微服务名
cloud:
config:
server:
git:
uri: https://github.com/zzzzzf2001/springcloud-config.git #git的链接
search-paths:
- springcloud-config #搜索目录
label: main #读取分支

#服务注册到eureka
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka/

主启动类的构建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.atguigu.springcloud;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
@EnableConfigServer // 开启配置中心服务
public class ConfigCenterMain3344 {
public static void main(String[] args) {
SpringApplication.run(ConfigCenterMain3344.class,args);
}
}

启动主启动类(不要用校园网)

根本ping不通我裂开了!!!!!!!!!!!

Bus

概述

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个公用的消息主题,并让系统中所有为服务实例都连接上来,由于该主题中产生的消息会被所有实例监听和消费,所以它被称为消息总线,在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题的实例都知道的消息

基本原理:ConfigClient实例都监听MQ中同一个topic(默认是springCloudBus) 。当一个服务刷新数据的时候,他会把这个信息放到Topic中,这样其他监听统一Topic的服务就能得到通知,然后去更新自身配置

image-20230320185515231

SpringCloud Bus动态刷新全局广播

image-20230320185643261

服务端

1
2
3
4
5
6
7
8
9
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

配好RabbitMQ环境嗷!

BootStrap.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
server:
port: 3366

spring:
application:
name: config-client

cloud:
config:
label: master
name: config
profile: dev
uri: http://localhost:3344
#rabbitMQ 的相关配置
rabbitmq:
port: 5672
host: 192.168.26.XXX
username: admin
password: 123456

eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka

#暴露监控端点
management:
endpoint:
web:
exposure:
include: "*"

客户端

1
2
3
4
5
6
7
8
9
<!--添加消息总线RabbitMQ支持-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Bootstrap.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
server:
port: 3355

spring:
application:
name: config-client
cloud:
#Config客户端配置
config:
label: master #分支名称
name: config #配置文件名称
profile: dev #读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取
uri: http://localhost:3344 #配置中心地址k
#rabbitMQ 的相关配置
rabbitmq:
port: 5672
host: 192.168.26.XXX
username: admin
password: 123456

#服务注册到eureka地址
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
# 暴露监控端点
management:
endpoints:
web:
exposure:
include: "*" # 'refresh'

image-20230320190334101

image-20230320190412403

Stream

为什么要引入cloud Stream

MQ(消息中间件):ActiveMQ ,RabbitMQ,RocketMQ,Kafka

正常一个程序系统分为三部分

image-20230320191126947

我们Java程序员和大数据程序员所用的MQ并非是同一款MQ,这就导致其中的维护成本很高并且开发和切换起来很麻烦

所以我们急需一款技术,让我们不在关注MQ的细节,我们只需要一种适配绑定的方式,自动的给我们在各种MQ内切换

什么是SpringCloudStream

官方定义SpringCloud Stream 是一个构建消息驱动微服务的框架

应用程序通过inputs或者outputs来与SpringCloud Stream的binder对象交互

通过我们配置来binding(绑定),而SpringCloud Stream 的binder对象负责与消息中间件交互、所以,我们只需要搞清楚如何与SpringCloudStream交互就可以方便实用消息队列的方式

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动

SpringCloud Stream为一些供应商的消息中间件产品提供了一些个性化的自动配置实现,引用了发布-订阅、消费组、分区的三个核心概念(目前仅支持RabbitMQ、Kafka)

两种差异

RabbitMQ和Kafka这两个消息中间件的架构有所不同:RabbitMQ有exchange,kafka有Topic和Partitions分区

这些中间件的差异性导致我们实际开发项目有不少的困扰,我们如果用了两个消息队列中的其中一个,后面的业务需求,我们向往另一种消息队列进行迁移,这无疑是一个灾难性的改变,几乎所有的东西都需要重新做,因为它和我们的系统耦合了,我们这时候SpringCloud Stream给我们提供了一种解耦合的 方式

SpringCLoudStream为什么可以隔离差异

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接和消息中间件进行信息交互,由于各信息消息中间件的构建初衷不同,他们的实现细节上也会有较大的差异性,通过绑定器作为中间层,完美的实现了应用程序和消息中间件细节之间的隔离,通过想应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的中间件实现

Binder中间层所处的位置

image-20230320194527623

Stream的组成

image-20230320194735180

Binder:很方便的连接中间件,是应用和消息中间件之间的封装,通过Binder可以跟方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic和RabbitMQ的exchange),这些都可以通过配置文件来实现

Channel:信道,是队列的一种抽象,在消息通讯系统重就是实现存储和转发的媒介,通过Channel对队列进行配置

Source和Sink:简单的理解就是从Stream发布的消息就是输出,接受的消息就是输入

image-20230320195817990

Middleware:中间件,现在仅支持RabbitMQ和Kafka

@Input:注解表示输入通道,通过输入通道接收到信息进入应用程序

@Output:注解表示输出通道,发布的消息将通过该通道离开应用程序

@StreamListener:监听队列,用于消费者的队列的消息接收

@EnableBinding:指Channel和exchange绑定在一起

消息生产者编码

首先导入依赖

老生常谈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

引入的新依赖

1
2
3
4
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server:
port: 8801

spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
output: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka

接下来是Service层(和平时项目中Service层类似)

1
2
3
public interface IMessageProvider {
public String send();
}

实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.atguigu.cloud.service.impl;

import com.atguigu.cloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class) //定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {

@Resource
private MessageChannel output;

@Override
public String send() {
String serial= UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*******serial:"+serial);
return null;
}
}

Controller层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.atguigu.cloud.controller;

import com.atguigu.cloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@Slf4j
public class sendMeesageController {
@Resource
private IMessageProvider MessageProvider;

@GetMapping("/sendMessage")
public String sendMessage(){
return MessageProvider.send();
}

}

消息消费者者编码

导入依赖和消费者一样

配置文件有些出入 (首先是应用名,其次是通道名称由output改为了input)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
server:
port: 8802

spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitmq的服务信息;
defaultRabbit: # 表示定义的名称,用于于binding整合
type: rabbit # 消息组件类型
environment: # 设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的Exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit # 设置要绑定的消息服务的具体设置


eureka:
client: # 客户端进行Eureka注册的配置
service-url:
defaultZone: http://localhost:7001/eureka

接下来是消息消费者的消息监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.atguigu.cloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号 ----> port:" + serverPort + "\t从8801接受到的消息是:" + message.getPayload());
}
}

主启动类二者相似,开启服务

image-20230321164907872

image-20230321164921184

消息重复消费问题

我们创建再创建一个消费者8804,当生产者将消息发送到交换机之后,我们可以在管理者页面发现

image-20230321170538570

并且两个队列绑定的交换机均是studyExchange而且Routingkey都是#,所以就导致,消息被发往了两个队列中,而两队列对应着两个消费者,所以就会有

image-20230321170646587

image-20230321170745340

image-20230321170757253

image-20230321170808108

消息被重复消费了,为了解决这个问题我们就要涉及到分组了

在实际生产中,用户在商城下单后,8801端口服务将订单消息发送到订单系统的集群环境中,这时会有RabbitMQ将消息发给了8801而且还发给了8802,两个系统都处理了该消息请求,就会导致在继续将数据存储在数据库时,会发现有两条订单消息相同的订单,这个问题将会是灾难性的

image-20230321171026373

在Stream中处于同一个Group中的多个消费者是竞争关系,这就能保证消息只会被其中一个应用消费一次,在底层的RabbitMQ中就是,队列中的两个Channel竞争一条消息

这时候就需要我们对其进行分组了,在消息消费者配置文件中输入配置(两个都要输入)

image-20230321172542154

这时我们再登录管理页面就会发现

image-20230321172625058

用图解说明就是

添加配置前

image-20230321173635603

添加配置后

image-20230321173838833

当生产者发送了4条消息后,注意两个消费者收到消息的次序,依次是消费者1收到消费者2收到消费者1收到消费者2收到,由此可见是轮训发送的

image-20230321173958826

image-20230321174008284

image-20230321174017880

倘若二者分组不同

image-20230321174304273

和配置前的情况一样

消息持久化

当我们不加分组就将消息消费者打开时我们会发现,会生成两个队列,其中两个队列的名字非常奇怪,随机生成,不由得让我们想起,RabbitMQ 中的临时队列,RabbitMQ的临时队列名称也是随机的,但是在服务关闭时,队列会消失,我们也将消息消费者服务关闭,这时我们就会发现,队列也消失了。当我们加上group配置时我们会发现,即使队列名称就是配置交换机加对应的属性名,并且在服务关闭时队列并不消失

由上述我们可以得知,其实加group配置就是将该队列进行持久化的一个配置。与RabbitMQ中持久化队列和临时队列的差别相同,当我们的两个服务消费者尚未启动时(8802为持久队列,8804为随时队列二者之前都有开启过),此时消息生产者生产了许多消息,由于临时队列随用随起,不用则弃,所以8804是无法收到消息的,但8802因为是持久化队列

image-20230321192029399

Sleuth

为什么需要Spring Cloud Sleuth

微服务架构是一个分布式架构,它按业务划分服务单元,一个分布式系统往往有很多个服务单元。由于服务单元数量众多,业务的复杂性,如果出现了错误和异常,很难去定位。主要体现在,一个请求可能需要调用很多个服务,而内部服务的调用复杂性,决定了问题难以定位。所以微服务架构中,必须实现分布式链路追踪,去跟进一个请求到底有哪些服务参与,参与的顺序又是怎样的,从而达到每个请求的步骤清晰可见,出了问题,很快定位。

举个例子,在微服务系统中,一个来自用户的请求,请求先达到前端A(如前端界面),然后通过远程调用,达到系统的中间件B、C(如负载均衡、网关等),最后达到后端服务D、E,后端经过一系列的业务逻辑计算最后将数据返回给用户。对于这样一个请求,经历了这么多个服务,怎么样将它的请求过程的数据记录下来呢?这就需要用到服务链路追踪。

Zipkin

Zipkin是一种分布式链路追踪系统。 它有助于收集解决微服务架构中的延迟问题所需的时序数据。 它管理这些数据的收集和查找。

首先在官网上下载Zipkin的jar包

新版本下载地址:https://repo1.maven.org/maven2/io/zipkin/zipkin-server/

然后 运行该jar包

1
java -jar .\zipkin-server-2.19.2-exec.jar

image-20230321210941514

然后我们在浏览器地址栏中输入

Zipkin(http://localhost:9411/zipkin/) 到达页面

image-20230321211044267

在服务消费者方和服务生产者方加入配置

1
2
3
4
5
6
7
spring:
zipkin:
base-url: http://localhost:9411
sleuth:
sampler:
#采样率介于0-1之间,1表示全部采集
probability: 1

由于是服务消费者调用服务生产者,当我们开始通过请求调用时,我们去zipkin的页面去查询一下

image-20230321211619469