消息队列灰度


最近公司项目比较多,多项目多市场并行开发,需要服务支持灰度发布,以便进行生产验证,避免出现问题。在项目中消息中间件和分布式Task是必不可少的,接口调用的灰度转发可以通过网关实现,因此主要需要对这两个中间件做灰度转发控制

所谓灰度服务,是指一个新的功能,用一个额外的灰度实例发布。对于加了灰度用户的名单可以看到这个新的功能,而原有的功能不受影响。这样可以让少部分人体验新功能,确认没有问题后,再逐步放更多的灰度用户使用,直至全量。因此生产环境在灰度期间会有正常服务和灰度服务并行,一般会持续两周左右

灰度服务期间,用户流量如下

这里只能容纳一个灰度服务 ,但实际多项目并行时,会出现很个灰度的情况,而一些较为通用的公共服务则不需要灰度,那么生成实际会更加复杂,如下图

消息队列的灰度

公司项目中主要用到消息队列有两个 KafkaPulsar,对于kafka应该都很熟悉了,比较主流的消息队列。Pulsar是一个新兴的消息队列,比Kafka更好扩展和迁移。

灰度方案

消息队列的转发方案,是对正常服务的topic同时创建一些灰度topic。举个例子,这里有一个topic名叫 test-msg。我要对它做灰度

假设同时有两个项目要上,比如高级订单和夜盘交易。为了不互相影响,创建了两个topic分别是test-msg-v1和test-msg-v2。如果想让用户B使用高级订单的功能则用v1,想让用户C使用夜盘交易的功能则用v2,而用户A则看不到高级订单和夜盘交易的功能,消息仍然在正常服务上。这样同时灰度,互不影响。

代码实现

消息的转发主要在两个端,生产端和消费端。代码实现是怎么考虑无侵入式的实现我们的功能。我这里主要用cglib的动态代理实现。自定义一个注解,在spring服务启动的时候,织入转发逻辑,实现无感的灰度实现。

定义注解

以消费者为例,首先自定义一个注解,用来标识这个是一个灰度的消费者(生产者同理),代码如下

package com.webull.st.order.platform.receiver.savings.consumer.test;

import org.springframework.core.annotation.AliasFor;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;

@KafkaListener
public @interface CanaryKafkaListener {

    @AliasFor(annotation = KafkaListener.class, attribute = "id")
    String id() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "containerFactory")
    String containerFactory() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "topics")
    String[] topics() default {};

    @AliasFor(annotation = KafkaListener.class, attribute = "topicPattern")
    String topicPattern() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "topicPartitions")
    TopicPartition[] topicPartitions() default {};

    @AliasFor(annotation = KafkaListener.class, attribute = "containerGroup")
    String containerGroup() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "errorHandler")
    String errorHandler() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "groupId")
    String groupId() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "idIsGroup")
    boolean idIsGroup() default true;

    @AliasFor(annotation = KafkaListener.class, attribute = "clientIdPrefix")
    String clientIdPrefix() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "beanRef")
    String beanRef() default "__listener";

    @AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
    String concurrency() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "autoStartup")
    String autoStartup() default "";

    @AliasFor(annotation = KafkaListener.class, attribute = "properties")
    String[] properties() default {};
}

这里只是覆盖了原来@KafkaListener注解的一些属性,最重要的是下面这个类

package com.webull.st.order.platform.receiver.savings.consumer.test;

import com.webull.inst.framework.common.util.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cglib.proxy.Enhancer;
import org.springframework.cglib.proxy.MethodInterceptor;
import org.springframework.cglib.proxy.MethodProxy;
import org.springframework.stereotype.Component;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.List;

/**
 * @author : lushunjian
 * @createDate : 2023-10-18 09:42:34
 * @Description :
 **/
@Slf4j
@Component
public class TestProcessor implements BeanPostProcessor {

    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        // 找bean的过程,通过注解去找被CanaryKafkaListener注解的方法
        // 这里为了简单写死的beanName=orderPlaceConsumer
        if("myConsumer".equals(beanName)){
            log.info("postProcessAfterInitialization start beanName:{},bean:{}",beanName,bean.getClass().getName());
            // 创建增强器
            Enhancer enhancer = new Enhancer();
            // 设置需要增强的类的对象
            enhancer.setSuperclass(bean.getClass());
            // 设置回调方法
            enhancer.setCallback(new MethodInterceptor() {
                @Override
                public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy)
                        throws Throwable {
                    log.info("do orderPlaceConsumer before....method:{}", method.getName());
                    if(method.getName().equals("consume")){
                        List list = null;
                        for(Object arg : args) {
                            if(arg instanceof List){
                                list = (List) arg;
                            }
                        }
                        // 这里list就是收到的消息 List<ConsumerRecord<String,String>>
                        log.info("do orderPlaceConsumer before....list:{}", list);
                        // 拿到ConsumerRecord后,通过ConsumerRecord.key可以拿到userId(前提是生产端消息的key是userId,不然没法通过user做灰度转发,当然也可根据业务场景使用业务主键)
                        // 再调用原来的消费方法之前,加入转发策略,如果用户在灰度名单里就转发
                        // kafkaSender.producer("test-msg-v1","msg");
                        // return
                        // 调用原方法
                        Object result = methodProxy.invokeSuper(o, args);
                        log.info("do orderPlaceConsumer after....");
                        return result;
                    }else {
                        return methodProxy.invokeSuper(o, args);
                    }
                }
            });
            log.info("postProcessAfterInitialization end beanName:{},bean:{}",beanName,bean.getClass().getName());
            return enhancer.create();
        }
        return bean;
    }

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        if("myTestService".equals(beanName)){
            log.info("postProcessAfterInitialization start beanName:{},bean:{}",beanName,bean.getClass().getName());
            // 创建增强器
            Enhancer enhancer = new Enhancer();
            // 设置需要增强的类的对象
            enhancer.setSuperclass(bean.getClass());
            // 设置回调方法
            enhancer.setCallback(new MethodInterceptor() {
                @Override
                public Object intercept(Object o, Method method, Object[] args, MethodProxy methodProxy)
                        throws Throwable {
                    if(method.getName().equals("test")){
                        List list = null;
                        String str = null;
                        for(Object arg : args) {
                            if(arg instanceof List){
                                list = (List) arg;
                            }
                            if(arg instanceof String){
                                str = (String) arg;
                            }
                        }
                        log.info("do test before....list:{},str:{}", JsonUtils.toJson(list),str);
                        // 调用原方法
                        Object result = methodProxy.invokeSuper(o, args);
                        log.info("do test after....");
                        return result;
                     }else {
                        return methodProxy.invokeSuper(o, args);
                    }
                }
            });
            log.info("postProcessAfterInitialization end beanName:{},bean:{}",beanName,bean.getClass().getName());
            return enhancer.create();
        }
        return bean;
    }

}

然后就是具体使用的地方了

package com.webull.st.order.platform.receiver.savings.consumer.test;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.Acknowledgment;

import java.util.List;

/**
 * @author : lushunjian
 * @createDate : 2023-10-21 13:00:30
 * @Description :
 **/
public class MyConsumer {

    @CanaryKafkaListener(topics = {"#{'${fix.execution.topic}'.split(',')}"}, containerFactory = "fixMsgContainerFactory")
    public void consume(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {

    }
}

如果是 Pulsar 的话,也是同理的,需要覆盖Pulsar的消费者注解 @PulsarConsumer。其他代码和 Kafka类似了

这样在实际过程中如果我们要对 Kafka和Pulsar 做灰度,那么就是使用自定义的注解 @CanaryKafkaListener@CanaryPulsarConsumer 注解即可。当然这里用户的灰度转发策略和topic之间的关系还需要用一个数据库表去维护,这里省略了


Author: 顺坚
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint polocy. If reproduced, please indicate source 顺坚 !
评论
 Previous
分布式任务框架设计 分布式任务框架设计
在项目中使用到分布式任务框架,用起来比较好用实用,便研究了一下设计的原理。由于是公司自研的分布式框架,不便写的太过详细,这里主要记录一下设计的原理。同时我也会对比我以前使用的任务框架,做一下对比 公司自研的分布式任务对客户端来说,使用感觉友
2023-10-22
Next 
监听行情触发条件单 监听行情触发条件单
最近在做公司交易系统时,接到需求要做条件订单,组合订单。同时条件的触发要基于监听实时行情,如果满足条件就触发下单送交易所。此类订单对于系统设计和性能都有一定的复杂性,故记录一些思考。条件单类型有触及限价单,触及市价单,多条件单。而需求二级还
2023-07-23
  TOC