手撸一个ingress controller来打通dubbo+k8s网络

背景

由于公司内部所有服务都是跑在阿里云 k8s 上的,然后 dubbo 提供者默认向注册中心上报的 IP 都是Pod IP,这意味着在 k8s 集群外的网络环境是调用不了 dubbo 服务的,如果本地开发需要访问 k8s 内的 dubbo 提供者服务的话,需要手动把服务暴露到外网,我们的做法是针对每一个提供者服务暴露一个SLB IP+自定义端口,并且通过 dubbo 提供的DUBBO_IP_TO_REGISTRYDUBBO_PORT_TO_REGISTRY环境变量来把对应的SLB IP+自定义端口注册到注册中心里,这样就实现了本地网络和 k8s dubbo 服务的打通,但是这种方式管理起来非常麻烦,每个服务都得自定义一个端口,而且每个服务之间端口还不能冲突,当服务多起来之后非常难以管理。

于是我就在想能不能像nginx ingress一样实现一个七层代理+虚拟域名来复用一个端口,通过目标 dubbo 提供者的application.name来做对应的转发,这样的话所有的服务只需要注册同一个SLB IP+端口就可以了,大大的提升便利性,一方调研之后发现可行就开撸了!

项目已开源:https://github.com/monkeyWie/dubbo-ingress-controller

技术预研

思路

  1. 首先 dubbo RPC 调用默认是走的dubbo协议,所以我需要先去看看协议里有没有可以利用做转发的报文信息,就是寻找类似于 HTTP 协议里的 Host 请求头,如果有的话就可以根据此信息做反向代理虚拟域名的转发,在此基础之上实现一个类似nginxdubbo网关
  2. 第二步就是要实现dubbo ingress controller,通过 k8s ingress 的 watcher 机制动态的更新dubbo 网关的虚拟域名转发配置,然后所有的提供者服务都由此服务同一转发,并且上报到注册中心的地址也统一为此服务的地址。

架构图

dubbo 协议

先上一个官方的协议图:

可以看到 dubbo 协议的 header 是固定的16个字节,里面并没有类似于 HTTP Header 的可扩展字段,也没有携带目标提供者的application.name字段,于是我向官方提了个issue,官方的答复是通过消费者自定义Filter来将目标提供者的application.name放到attachments里,这里不得不吐槽下 dubbo 协议,扩展字段竟然是放在body里,如果要实现转发需要把请求报文全部解析完才能拿到想要报文,不过问题不大,因为主要是做给开发环境用的,这一步勉强可以实现。

k8s ingress

k8s ingress 是为 HTTP 而生的,但是里面的字段够用了,来看一段 ingress 配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: user-rpc-dubbo
annotations:
kubernetes.io/ingress.class: "dubbo"
spec:
rules:
- host: user-rpc
http:
paths:
- backend:
serviceName: user-rpc
servicePort: 20880
path: /

配置和 http 一样通过host来做转发规则,但是host配置的是目标提供者的application.name,后端服务是目标提供者对应的service,这里有一个比较特殊的是使用了一个kubernetes.io/ingress.class注解,这个注解可以指定此ingress对哪个ingress controller生效,后面我们的dubbo ingress controller就只会解析注解值为dubbo的 ingress 配置。

开发

前面的技术预研一切顺利,接着就进入开发阶段了。

消费者自定义 Filter

前面有提到如果请求里要携带目标提供者的application.name,需要消费者自定义Filter,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Activate(group = CONSUMER)
public class AddTargetFilter implements Filter {

@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
String targetApplication = StringUtils.isBlank(invoker.getUrl().getRemoteApplication()) ?
invoker.getUrl().getGroup() : invoker.getUrl().getRemoteApplication();
// 目标提供者的application.name放入attachment
invocation.setAttachment("target-application", targetApplication);
return invoker.invoke(invocation);
}
}

这里又要吐槽一下,dubbo 消费者首次访问时会发起一个获取 metadata 的请求,这个请求通过invoker.getUrl().getRemoteApplication()是拿不到值的,通过invoker.getUrl().getGroup()才能拿到。

dubbo 网关

这里就要开发一个类似nginxdubbo网关,并实现七层代理和虚拟域名转发,编程语言直接选择了 go,首先 go 做网络开发心智负担低,另外有个 dubbo-go 项目,可以直接利用里面的解码器,然后 go 有原生的 k8s sdk 支持,简直完美!

思路就是开启一个TCP Server,然后解析 dubbo 请求的报文,把attachment里的target-application属性拿到,再反向代理到真正的 dubbo 提供者服务上,核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
routingTable := map[string]string{
"user-rpc": "user-rpc:20880",
"pay-rpc": "pay-rpc:20880",
}

listener, err := net.Listen("tcp", ":20880")
if err != nil {
return err
}
for {
clientConn, err := listener.Accept()
if err != nil {
logger.Errorf("accept error:%v", err)
continue
}
go func() {
defer clientConn.Close()
var proxyConn net.Conn
defer func() {
if proxyConn != nil {
proxyConn.Close()
}
}()
scanner := bufio.NewScanner(clientConn)
scanner.Split(split)
// 解析请求报文,拿到一个完整的请求
for scanner.Scan() {
data := scanner.Bytes()
// 通过dubbo-go提供的库把[]byte反序列化成dubbo请求结构体
buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
pkg.Unmarshal()
body := pkg.Body.(map[string]interface{})
attachments := body["attachments"].(map[string]interface{})
// 从attachments里拿到目标提供者的application.name
target := attachments["target-application"].(string)
if proxyConn == nil {
// 反向代理到真正的后端服务上
host := routingTable[target]
proxyConn, _ = net.Dial("tcp", host)
go func() {
// 原始转发
io.Copy(clientConn, proxyConn)
}()
}
// 把原始报文写到真正后端服务上,然后走原始转发即可
proxyConn.Write(data)
}
}()
}

func split(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

buf := bytes.NewBuffer(data)
pkg := impl.NewDubboPackage(buf)
err = pkg.ReadHeader()
if err != nil {
if errors.Is(err, hessian.ErrHeaderNotEnough) || errors.Is(err, hessian.ErrBodyNotEnough) {
return 0, nil, nil
}
return 0, nil, err
}
if !pkg.IsRequest() {
return 0, nil, errors.New("not request")
}
requestLen := impl.HEADER_LENGTH + pkg.Header.BodyLen
if len(data) < requestLen {
return 0, nil, nil
}
return requestLen, data[0:requestLen], nil
}

dubbo ingress controller 实现

前面已经实现了一个dubbo网关,但是里面的虚拟域名转发配置(routingTable)还是写死在代码里的,现在要做的就是当检测到k8s ingress有更新时,动态的更新这个配置就可以了。

首先先简单的说明下ingress controller的原理,拿我们常用的nginx ingress controller为例,它也是一样通过监听k8s ingress资源变动,然后动态的生成nginx.conf文件,当发现配置发生了改变时,触发nginx -s reload重新加载配置文件。

里面用到的核心技术就是informers,利用它来监听k8s资源的变动,示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 在集群内获取k8s访问配置
cfg, err := rest.InClusterConfig()
if err != nil {
logger.Fatal(err)
}
// 创建k8s sdk client实例
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
logger.Fatal(err)
}
// 创建Informer工厂
factory := informers.NewSharedInformerFactory(client, time.Minute)
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 新增事件
},
UpdateFunc: func(oldObj, newObj interface{}) {
// 更新事件
},
DeleteFunc: func(obj interface{}) {
// 删除事件
},
}
// 监听ingress变动
informer := factory.Extensions().V1beta1().Ingresses().Informer()
informer.AddEventHandler(handler)
informer.Run(ctx.Done())

通过实现上面的三个事件来动态的更新转发配置,每个事件都会携带对应的Ingress对象信息过来,然后进行对应的处理即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
ingress, ok := obj.(*v1beta12.Ingress)
if ok {
// 通过注解过滤出dubbo ingress
ingressClass := ingress.Annotations["kubernetes.io/ingress.class"]
if ingressClass == "dubbo" && len(ingress.Spec.Rules) > 0 {
rule := ingress.Spec.Rules[0]
if len(rule.HTTP.Paths) > 0 {
backend := rule.HTTP.Paths[0].Backend
host := rule.Host
service := fmt.Sprintf("%s:%d", backend.ServiceName+"."+ingress.Namespace, backend.ServicePort.IntVal)
// 获取到ingress配置中host对应的service,通知给dubbo网关进行更新
notify(host,service)
}
}
}

docker 镜像提供

k8s 之上所有的服务都需要跑在容器里的,这里也不例外,需要把dubbo ingress controller构建成 docker 镜像,这里通过两阶段构建优化,来减小镜像体积:

1
2
3
4
5
6
7
8
9
10
11
12
13
FROM golang:1.17.3 AS builder
WORKDIR /src
COPY . .
ENV GOPROXY https://goproxy.cn
ENV CGO_ENABLED=0
RUN go build -ldflags "-w -s" -o main cmd/main.go

FROM debian AS runner
ENV TZ=Asia/shanghai
WORKDIR /app
COPY --from=builder /src/main .
RUN chmod +x ./main
ENTRYPOINT ["./main"]

yaml 模板提供

由于要在集群内访问 k8s API,需要给 Pod 进行授权,通过K8S rbac进行授权,并以Deployment类型服务进行部署,最终模板如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
apiVersion: v1
kind: ServiceAccount
metadata:
name: dubbo-ingress-controller
namespace: default

---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: dubbo-ingress-controller
rules:
- apiGroups:
- extensions
resources:
- ingresses
verbs:
- get
- list
- watch

---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1beta1
metadata:
name: dubbo-ingress-controller
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: dubbo-ingress-controller
subjects:
- kind: ServiceAccount
name: dubbo-ingress-controller
namespace: default

---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: default
name: dubbo-ingress-controller
labels:
app: dubbo-ingress-controller
spec:
selector:
matchLabels:
app: dubbo-ingress-controller
template:
metadata:
labels:
app: dubbo-ingress-controller
spec:
serviceAccountName: dubbo-ingress-controller
containers:
- name: dubbo-ingress-controller
image: liwei2633/dubbo-ingress-controller:0.0.1
ports:
- containerPort: 20880

后期需要的话可以做成Helm进行管理。

后记

至此dubbo ingress controller实现完成,可以说麻雀虽小但是五脏俱全,里面涉及到了dubbo协议TCP协议七层代理k8s ingressdocker等等很多内容,这些很多知识都是在云原生越来越流行的时代需要掌握的,开发完之后感觉受益匪浅。

关于完整的使用教程可以通过github查看。

参考链接: