Skip to content
On this page

DotNetCore.CAP

1、基础入门

1.1.背景

Cap(Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性))是分布式系统中的一个重要理念,根据CAP定理,存在网络分区(微服务即时网络分区架构)时,Web应用不可能同时满足可用性和一致性,即便不是在构建微服务,在构建分布式应用的过程中也会遇到分布式事务的问题,那么 CAP 就是在这样的背景下诞生的。DotNetCore.CAP使用“异步确保”方案,利用消息队列和本地消息列表实现最终数据一致性。异步确保模式是补偿模式的一个典型案例,通过异步的方式进行处理,处理后把结果通过通知系统通知给使用方

1.2.介绍

CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架。它有助于创建可扩展,可靠并且易于更改的微服务系统。 相对于其他的 Service Bus 或者 Event Bus, CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。 CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。 官方文档:https://cap.dotnetcore.xyz/ 开源地址:https://github.com/dotnetcore/CAP

1.3.应用场景

CAP 的应用场景主要有以下两个:

  • 分布式事务中的最终一致性(异步确保)的方案 分布式事务是在分布式系统中不可避免的一个硬性需求,而目前的分布式事务的解决方案也无外乎就那么几种 CAP 没有采用两阶段提交(2PC)这种事务机制,而是采用的 本地消息表+MQ 这种经典的实现方式,这种方式又叫做 异步确保。

  • 具有高可用性的 EventBus CAP 实现了 EventBus 中的发布/订阅,它具有 EventBus 的所有功能。也就是说你可以像使用 EventBus 一样来使用 CAP,另外 CAP 的 EventBus 是具有高可用性的,这是什么意思呢? CAP 借助于本地消息表来对 EventBus 中的消息进行了持久化,这样可以保证 EventBus 发出的消息是可靠的, 当消息队列出现宕机或者连接失败的情况时,消息也不会丢失。

1.4.Quick Start

  • 引用 NuGet 包 使用一下命令来引用CAP的NuGet包: PM> Install-Package DotNetCore.CAP 根据使用的不同类型的消息队列,来引入不同的扩展包: PM> Install-Package DotNetCore.CAP.RabbitMQ PM> Install-Package DotNetCore.CAP.Kafka 根据使用的不同类型的数据库,来引入不同的扩展包: PM> Install-Package DotNetCore.CAP.SqlServer PM> Install-Package DotNetCore.CAP.MySql

  • 启动配置 在 ASP.NET Core 程序中,你可以在 Startup.cs 文件 ConfigureServices() 中配置 CAP 使用到的服务:

csharp
public void ConfigureServices(IServiceCollection services)
    {
        services.AddCap(x =>
        {
            x.UseSqlServer("Your ConnectionStrings");
            x.UseKafka("localhost");
            //x. DefaultGroupName=””  // 消费者组的名字,默认值:cap.queue.{程序集名称}
            //x. GroupNamePrefix=””   // 为订阅 Group 统一添加前缀
            //x. TopicNamePrefix=””   // 为 Topic 统一添加前缀
            //x. FailedRetryInterval  // 默认值:60,发送失败,CAP将会对消息进行重试, 间隔时间
            //x. UseStorageLock	// 设置为true,使用基于数据库的分布式锁以应对重试进程,生成表cap.lock
            //x. ConsumerThreadCount  // 消费者线程并行处理的线程数,当值大于1时,将不能保证消息执行的顺序
            //x. CollectorCleaningInterval  // 默认值:300 秒,收集器删除已经过期消息的时间间隔
            //x. FailedRetryCount	  // 默认值:50,重试的最大次数。
            //x. FailedThresholdCallback	 // 重试阈值的失败回调。当重试达到 FailedRetryCount 设置的值的时候,将调用此 Action 回调,你可以通过指//定此回调来接收失败达到最大的通知,以做出人工介入。例如发送邮件或者短信。
            //x. SucceedMessageExpiredAfter // 成功消息的过期时间(秒)默认值:24*3600 秒(1天后)
            //x. FailedMessageExpiredAfter  // 失败消息的过期时间(秒)默认值:15*24*3600 秒(15天后)
            //x. UseDispatchingPerGroup	// 内存同一个Channel中,然后线性处理。
            //x. EnableConsumerPrefetch	// 从消息队列读取一条, 执行完成后才会读取下一条. 设置为 true, 消费端会将消息预取到内存队列,然后再放入.NET 线程池并行执行
        });
    }

在 Configure() 中配置启动 CAP :

csharp
public void Configure(IApplicationBuilder app)
    {
    	app.UseCap();
    }

2、API接口

CAP 的 API 接口只有一个,就是 ICapPublisher 接口,你可以从 DI 容器中获取到该接口的实例进行调用。

2.1.发布/发送

你可以使用 ICapPublisher 接口中的 Publish<T> 或者 PublishAsync<T> 方法来发送消息:

csharp
public class PublishController : Controller
    {
        private readonly ICapPublisher _publisher;

        //在构造函数中获取接口实例
        public PublishController(ICapPublisher publisher)
        {
            _publisher = publisher;
        }

        [HttpGet]
        [Route("~/checkAccount")]
        public async Task<IActionResult> PublishMessage()
        {
            await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });

            return Ok();
        }

        [HttpGet]
        public async Task<IActionResult> PushMsgHeadersAsync(string strMsg)
        {
            var result = await Task.Run(async () =>
            {
                var headers = new Dictionary<string, string>()
                {
                    ["my.header.id"] = "0000001",
                    ["my.header.name"] = "市场易1"
                };
                await this._publisher.PublishAsync("MQ_KEY_0000002", strMsg, headers);
                _logger.LogInformation($"发送消息:{strMsg}");
                return true;
            });

            return Ok();
        }
    }
默认情况下,在调用此方法的时候 CAP 将在内部创建事务,然后将消息写入到 Cap.Published 这个消息表。
  • 2.1.1事务 事务在 CAP 具有重要作用,它是保证消息可靠性的一个基石。 在发送一条消息到消息队列的过程中,如果不使用事务,我们是没有办法保证我们的业务代码在执行成功后消息已经成功的发送到了消息队列,或者是消息成功 的发送到了消息队列,但是业务代码确执行失败。 这里的失败原因可能是多种多样的,比如连接异常,网络故障等等。 只有业务代码和CAP的Publish代码必须在同一个事务中,才能够保证业务代码和消息代码同时成功或者失败。 以下是两种使用事务进行Publish的代码:
    • EntityFramework
csharp
using (var transaction = dbContext.Database.BeginTransaction())
    {
        await _publisher.PublishAsync("xxx.services.account.check", new Person { Name = "Foo", Age = 11 });
        // 你的业务代码。
        transaction.Commit();
    }
你的业务代码可以位于 Publish 之前或者之后,只需要保证在同一个事务。
当CAP检测到 Publish 是在EF事务区域内的时候,将使用当前的事务上下文进行消息的存储。其中,发送的内容会序列化为Json存储到消息表中。

* Dapper
csharp
var connString = "数据库连接字符串";
    using (var connection = new MySqlConnection(connString))
    {
        connection.Open();
        using (var transaction = connection.BeginTransaction())
        {
            await _publisher.PublishAsync("xxx.services.bar",
                new Person { Name = "Foo", Age = 11 }, connection,transaction);
            // 你的业务代码。
            transaction.Commit();
        }
    }
在 Dapper 中,由于不能获取到事务上下文,所以需要用户手动的传递事务上下文到CAP中。

2.2.订阅/消费

注意:消息端在方法实现的过程中需要实现幂等性。

  • 使用 CapSubscribeAttribute 来订阅 CAP 发布出去的消息。
csharp
[CapSubscribe("xxx.services.bar")] 
    public void BarMessageProcessor()
    {

    }
  • 订阅 CAP 发布出去的消息,并接收Header参数传递
csharp
[NonAction]
    [CapSubscribe("MQ_KEY_0000002", Group = "group00002")]
    public async void TestConsumer2(string strMsg, [FromCap] CapHeader headers)
    {
        try
        {
            _logger.LogInformation($"消费者MQ_KEY_0000002---收到Header消息:{JsonConvert.SerializeObject(headers)}");
            _logger.LogInformation($"消费者MQ_KEY_0000002---收到消息:{strMsg}");
        }
        catch (Exception ex)
        {

        }
    }
  • 这里,你也可以使用多个 CapSubscribe[""] 来同时订阅多个不同的消息 :
csharp
[CapSubscribe("xxx.services.bar")] 
    [CapSubscribe("xxx.services.foo")] 
    public void BarAndFooMessageProcessor()
    {

    }
其中, xxx.services.bar 为订阅的消息名称,内部实现上,这个名称在不同的消息队列具有不同的代表。 
在Kafka 中,这个名称即为 Topic Name。 在RabbitMQ 中,为 RouteKey。
  • RabbitMQ 中的 RouteKey 支持绑定键表达式写法,有两种主要的绑定键: **(星号)可以代替一个单词. #(井号) 可以代替0个或多个单词. 比如在下面这个图中(P为发送者,X为RabbitMQ中的Exchange,C为消费者,Q为队列) eg:1 在这个示例中,我们将发送一条关于动物描述的消息,也就是说 Name(routeKey) 字段中的内容包含 3 个单词。第一个单词是描述速度的(celerity),第二个单词是描述颜色的(colour),第三个是描述哪种动物的(species),它们组合起来类似:“..”。 然后在使用 CapSubscribe 绑定的时候,Q1绑定为 CapSubscribe[".orange."] , Q2 绑定为CapSubscribe["..rabbit"] 和 [CapSubscribe["lazy.#] 。

    那么,当发送一个名为 "quick.orange.rabbit" 消息的时候,这两个队列将会同时收到该消息。同样名为lazy.orange.elephant 的消息也会被同时收到。另外,名为 "quick.orange.fox" 的消息将仅会被发送到Q1队列,名为 "lazy.brown.fox" 的消息仅会被发送到Q2。"lazy.pink.rabbit" 仅会被发送到Q2一次,即使它被绑定了2次。"quick.brown.fox" 没有匹配到任何绑定的队列,所以它将会被丢弃。

    另外一种情况,如果你违反约定,比如使用 4个单词进行组合,例如 "quick.orange.male.rabbit",那么它将匹配不到任何的队列,消息将会被丢弃。 但是,假如你的消息名为 "lazy.orange.male.rabbit",那么他们将会被发送到Q2,因为 #(井号)可以匹配 0 或者多个单词。

在 CAP 中,我们把每一个拥有 CapSubscribe[] 标记的方法叫做订阅者,你可以把订阅者进行分组。
组(Group),是订阅者的一个集合,每一组可以有一个或者多个消费者,但是一个订阅者只能属于某一个组。同一个组内的订阅者订阅的消息只能被消费一次。
csharp
[CapSubscribe("xxx.services.foo", Group = "moduleA")] 
    public void FooMessageProcessor()
    {

    }

2.2.1例外情况 这里有几种情况可能需要知道:

  • ① 消息发布的时候订阅方还未启动 Kafka: 当 Kafka 中,发布的消息存储于持久化的日志文件中,所以消息不会丢失,当订阅者所在的程序启动的时候会消费掉这些消息。

    RabbitMQ: 在 RabbitMQ 中,应用程序首次启动会创建具有持久化的 Exchange 和 Queue,CAP 会针对每一个订阅者Group会新建一个消费者队列,由于首次启动时候订阅者未启动的所以是没有队列的,消息无法进行持久化,这个时候生产者发的消息会丢失。

  • ② 消息没有任何订阅者 如果你发送了一条个没有被任何订阅者订阅的消息,那么此消息将会被丢弃。

3、配置

Cap 使用 Microsoft.Extensions.DependencyInjection 进行配置的注入,你也可以依赖于 DI 从json文件中读取配置。

3.1.Cap Options

你可以使用如下方式来配置 CAP 中的一些配置项,例如

csharp
services.AddCap(capOptions => 
{ 
    capOptions.FailedCallback = //...
});

CapOptions 提供了一下配置项:

NAMEDESCRIPTIONTYPEDEFAULT
PollingDelay处理消息的线程默认轮询等待时间(秒)int15 秒
QueueProcessorCount启动队列中消息的处理器个数int2
FailedMessageWaitingInterval轮询失败消息的间隔(秒)int180 秒
FailedCallback执行失败消息时的回调函数,详情见下文ActionNULL

CapOptions 提供了 FailedCallback 为处理失败的消息时的回调函数。当消息多次发送失败后,CAP会将消息状态标记为 Failed ,CAP有一个专门的处理者用来处理这种失败的消息,针对失败的消息会重新放入到队

FailedCallback 的类型为 Action<MessageType,string,string> ,第一个参数为消息类型(发送的还是接收到),第二个参数为消息的名称(name),第三个参数为消息的内容(content)。

3.2.RabbitMQ Options

CAP 采用的是针对 CapOptions 进行扩展来实现RabbitMQ的配置功能,所以针对 RabbitMQ 的配置用法如下:

csharp
services.AddCap(capOptions => 
{ 
    capOptions.UseRabbitMQ(rabbitMQOption=>{
        // rabbitmq options.
    });
});

RabbitMQOptions 提供了有关RabbitMQ相关的配置:

NAMEDESCRIPTIONTYPEDEFAULT
HostName宿主地址stringlocalhost
UserName用户名stringguest
Password密码stringguest
VirtualHost虚拟主机string/
Port端口号int-1
TopicExchangeNameCAP默认Exchange名称stringcap.default.topic
RequestedConnectionTimeoutRabbitMQ连接超时时间int30,000 毫秒
SocketReadTimeoutRabbitMQ消息读取超时时间int30,000 毫秒
SocketWriteTimeoutRabbitMQ消息写入超时时间int30,000 毫秒
QueueMessageExpires队列中消息自动删除时间int(10天) 毫秒

3.3.Kafka Options

CAP 采用的是针对 CapOptions 进行扩展来实现 Kafka 的配置功能,所以针对 Kafka 的配置用法如下:

csharp
services.AddCap(capOptions => 
{ 
    capOptions.UseKafka(kafkaOption=>{
        // kafka options.
        // kafkaOptions.MainConfig.Add("", "");
    });
});

KafkaOptions 提供了有关 Kafka 相关的配置,由于Kafka的配置比较多,所以此处使用的是提供的 MainConfig 字典来支持进行自定义配置,你可以查看这里来获取对配置项的支持信息。 https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

3.4.SqlServer Options

如果你使用的是 EntityFramewrok,你用不到该配置项下的内容。 CAP 采用的是针对 CapOptions 进行扩展来实现 SqlServer 的配置功能,所以针对 SqlServer 的配置用法如下:

csharp
services.AddCap(capOptions => 
{ 
    capOptions.UseSqlServer(sqlserverOptions => {
        // sqlserverOptions.ConnectionString
    });
});
NAMEDESCRIPTIONTYPEDEFAULT
SchemaCap表架构stringCap
ConnectionString数据库连接字符串stringnull

3.5.MySql Options

如果你使用的是 EntityFramewrok,你用不到该配置项下的内容。 CAP 采用的是针对 CapOptions 进行扩展来实现 MySql 的配置功能,所以针对 MySql 的配置用法如下:

csharp
services.AddCap(capOptions => 
{ 
    capOptions.UseMySql(mysqlOptions => {
        // mysqlOptions.ConnectionString
    });
});
NAMEDESCRIPTIONTYPEDEFAULT
TableNamePrefixCap表名前缀stringcap
ConnectionString数据库连接字符串stringnull

4、设计原理

4.1.动机

随着微服务架构的流行,越来越多的人在尝试使用微服务来架构他们的系统,而在这其中我们会遇到例如分布式事务的问题,为了解决这些问题,我没有发现简单并且易于使用的解决方案,所以我决定来打造这样一个库来解决这个问题。 最初 CAP 是为了解决分布式系统中的事务问题,她采用的是 异步确保 这种机制实现了分布式事务的最终一致性,更多这方面的信息可以查看第6节。 现在 CAP 除了解决分布式事务的问题外,她另外一个重要的功能就是作为 EventBus 来使用,她具有 EventBus的所有功能,并且提供了更加简化的方式来处理EventBus中的发布/订阅。

4.2.持久化

CAP 依靠本地数据库实现消息的持久化,CAP 使用这种方式来应对一切环境或者网络异常导致消息丢失的情况,消息的可靠性是分布式事务的基石,所以在任何情况下消息都不能丢失。 对于消息的持久化分为两种:

  • ① 消息进入消息队列之前的持久化 在消息进入到消息队列之前,CAP使用本地数据库表对消息进行持久化,这样可以保证当消息队列出现异常或者网络错误时候消息是没有丢失的。 为了保证这种机制的可靠性,CAP使用和业务代码相同的数据库事务来保证业务操作和CAP的消息在持久化的过程中是强一致的。也就是说在进行消息持久化的过程中,任何一方发生异常情况数据库都会进行回滚操作。

  • ② 消息进入到消息队列之后的持久化 消息进入到消息队列之后,CAP会启动消息队列的持久化功能,我们需要说明一下在 RabbitMQ 和 Kafka 中CAP的消息是如何持久化的。 针对于 RabbitMQ 中的消息持久化,CAP 使用的是具有消息持久化功能的消费者队列,但是这里面可能有例外情况,参加 2.2.1 章节。 由于 Kafka 天生设计的就是使用文件进行的消息持久化,在所以在消息进入到Kafka之后,Kafka会保证消息能够正确被持久化而不丢失。

4.3.通讯数据流

CAP 中消息的流转过程大致如下: eg:2
“ P ” 代表消息发送者(生产者)。 “ C ” 代表消息消费者(订阅者)。

4.4.一致性

CAP 采用最终一致性作为的一致性方案,此方案是遵循 CAP 理论,以下是CAP理论的描述。 C(一致性)一致性是指数据的原子性,在经典的数据库中通过事务来保障,事务完成时,无论成功或回滚,数据都会处于一致的状态,在分布式环境下,一致性是指多个节点数据是否一致; A(可用性)服务一直保持可用的状态,当用户发出一个请求,服务能在一定的时间内返回结果; P(分区容忍性)在分布式应用中,可能因为一些分布式的原因导致系统无法运转,好的分区容忍性,使应用虽然是一个分布式系统,但是好像一个可以正常运转的整体 根据 “CAP”分布式理论, 在一个分布式系统中,我们往往为了可用性和分区容错性,忍痛放弃强一致支持,转而追求最终一致性。大部分业务场景下,我们是可以接受短暂的不一致的。

5、实现

CAP 封装了在 ASP.NET Core 中的使用依赖注入来获取 Publisher ( ICapPublisher )的接口。而启动方式类似于 “中间件” 的形式,通过在 Startup.cs 配置 ConfigureServices 和 Configure 进行启动。

5.1.消息表

当系统引入CAP之后并首次启动后,CAP会在客户端生成 3 个表,分别是 Cap.Published, Cap.Received, Cap.Queue。注意表名可能在不同的数据库具有不同的大小写区分,如果你在运行项目的时候没有显式的指定数据库生成架构(SQL Server)或者表名前缀(MySql)的话,默认情况下就是以上3个名字。

  • **Cap.Published:**这个表主要是用来存储 CAP 发送到MQ(Message Queue)的客户端消息,也就是说你使用ICapPublisher 接口 Publish 的消息内容。

  • **Cap.Received:**这个表主要是用来存储 CAP 接收到 MQ(Message Queue) 的客户端订阅的消息,也就是使用CapSubscribe[] 订阅的那些消息。

  • **Cap.Queue:**这个表主要是CAP内部用来处理发送和接收消息的一个临时表,通常情况下,如果系统不出现问题,这个表将是空的。

Published 和 Received 表具有 StatusName 字段,这个字段用来标识当前消息的状态。目前共有Scheduled,Enqueued,Processing,Successed,Failed 等几个状态。 CAP 在处理消息的过程中会依次从Scheduled到 Successed 来改变这些消息状态的值。如果是状态值为 Successed,代表该消息已经成功的发送到了 MQ 中。如果为 Failed 则代表消息发送失败,消息发送失败后 CAP 会对消息进行重试,直到成功。

  • **关于数据清理:**CAP 默认情况下会每隔一个小时将消息表的数据进行清理删除,避免数据量过多导致性能的降低。清理规则为 ExpiresAt 不为空并且小于当前时间的数据。

5.2.消息格式

CAP 采用 JSON 格式进行消息传输,以下是消息的对象模型:

NAMEDESCRIPTIONTYPE
Id消息编号int
Name消息名称string
Content内容string
Group所属消费组string
Added创建时间DateTime
ExpiresAt过期时间DateTime
Retries重试次数int
StatusName状态string

对于 Cap.Received 中的消息,会多一个 Group 字段来标记所属的消费者组。

5.3.EventBus

EventBus 采用 发布-订阅 风格进行组件之间的通讯,它不需要显式在组件中进行注册。 eg:3 上图是EventBus的一个Event的流程,关于 EventBus 的更多信息就不在这里介绍了

在 CAP 中,为什么说 CAP 实现了 EventBus 中的全部特性,因为 EventBus 具有的两个大功能就是发布和订阅, 在 CAP 中 使用了另外一种优雅的方式来实现的,另外一个 CAP 提供的强大功能就是消息的持久化,以及在任何异常情况下消息的可靠性,这是EventBus不具有的功能。 eg:4 CAP 里面发送一个消息可以看做是一个 “Event”,一个使用了CAP的ASP.NET Core 应用程序既可以进行发送也可以进行订阅接收。

5.4.重试

重试在实现分布式事务中具有重要作用,CAP 中会针对发送失败或者执行失败的消息进行重试。在整个 CAP 的设计过程中有以下几处采用的重试策略。

  • ① 消息发送重试 在消息发送过程中,当出现 Broker 宕机或者连接失败的情况亦或者出现异常的情况下,这个时候 CAP 会对发送的重试,重试策略为默认 15 次失败重试,当15次过后仍然失败时,CAP会将此消息状态标记为失败。

  • ② 消息消费重试 当 Consumer 接收到消息时,会执行消费者方法,在执行消费者方法出现异常时,会进行重试。这个重试策略和 ① 是相同的。

  • ③ 失败消息重试 CAP 会定期针对 ① 和 ② 中状态为“失败的”消息进行重试,CAP会对他们进行重新“入队(Enqueue)”,入队时会将消息中的重试次数标记为0,状态置为 Enqueued。

6、分布式事务

针对于分布式事务的处理,CAP 采用的是“异步确保”这种方案。

6.1.异步确保

异步确保这种方案又叫做本地消息表,这是一种经典的方案,方案最初来源于 eBay,参考资料见段末链接。这种方案目前也是企业中使用最多的方案之一。

相对于 TCC 或者 2PC/3PC 来说,这个方案对于分布式事务来说是最简单的,而且它是去中心化的。在TCC 或者 2PC 的方案中,必须具有事务协调器来处理每个不同服务之间的状态,而此种方案不需要事务协调器。 另外 2PC/TCC 这种方案如果服务依赖过多,会带来管理复杂性增加和稳定性风险增大的问题。试想如果我们强依赖 10 个服务,9 个都执行成功了,最后一个执行失败了,那么是不是前面 9 个都要回滚掉?这个成本还是非常高的。

但是,并不是说 2PC 或者 TCC 这种方案不好,因为每一种方案都有其相对优势的使用场景和优缺点。

Date: 2023/09/26

Authors: 刘川

Tags: CAP