写在前面
阅读本文需要最起码了解envoy
相关的概念
本文只是一个类似于demo
的测试,只为了学习istio
,更好的理解istio
中的控制面和数据面(pilot -> proxy)是如何交互的,下图的蓝色虚线
先说go-control-plane
是什么,它是一个用于实现xDS API
golang库,xDS API
是Envoy
用于动态配置的协议。我们实现了go-control-plane
就可以做到
- 动态配置管理:允许控制面动态更新数据面代理的配置
- 支持多种 xDS 资源
- 缓存和版本管理:提供快照缓存机制,管理配置的版本和更新
使用go-control-plane
我们可以创建一个自定义控制面来管理·Envoy`,从而实现动态的服务发现、负载均衡和路由等等…
xds
则是一系列服务发现的总称
- lds 监听器服务发现
- rds 路由服务发现
- cds 集群服务发现
- eds 端点服务发现
- ads 聚合服务发现
等等等等,还有一些其他的服务发现,本文不涉及就没有说到,如果不理解这些概念,建议先去官网了解一下 https://www.envoyproxy.io
实现 go-control-plane
功能描述
上文是envoy代理的架构,程序中的逻辑我使用倒叙的方式描述
- 创建
endpoint
地址是www.envoyproxy.io
端口是443
- 创建
cluster
叫做xds_demo_cluster
它的端点就是上面创建的 - 创建路由在
filter_chins
下的http_connection_manager
中名称叫做xds_demo_route
,没有任何的路由规则,路由的cluster
名称(请求转发的目的地)叫做xds_demo_cluster
- 最后创建
listener
名称是listener_xds_demo
监听的地址是0.0.0.0:12000
整体就是当我们访问localhost:12000
的时候会将我们的请求转发到www.envoyproxy.io
。
如果运行过默认的envoy
的用户可能就会发现我程序中下发的配置就是默认运行envoy
时的配置,只不过默认运行envoy
是静态配置文件
的方式就是所有的配置都写在envoy.yaml
中,而本文是动态的方式。
envoy
有多种运行方式本文不做赘述
功能实现
package mainimport ("context""log""net""time"cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"routerv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"http_connection_manager "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"tlsv3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3""github.com/envoyproxy/go-control-plane/pkg/cache/types""github.com/envoyproxy/go-control-plane/pkg/cache/v3""github.com/envoyproxy/go-control-plane/pkg/resource/v3""github.com/envoyproxy/go-control-plane/pkg/server/v3""google.golang.org/grpc""google.golang.org/protobuf/types/known/anypb""google.golang.org/protobuf/types/known/durationpb"
)func main() {ctx := context.Background()lis, err := net.Listen("tcp", ":18000")if err != nil {log.Fatalf("Failed to listen: %v", err)}sc := cache.NewSnapshotCache(true, cache.IDHash{}, nil)srv := server.NewServer(ctx, sc, nil)// new grpc servergs := grpc.NewServer()clusterservice.RegisterClusterDiscoveryServiceServer(gs, srv)endpointservice.RegisterEndpointDiscoveryServiceServer(gs, srv)listenerservice.RegisterListenerDiscoveryServiceServer(gs, srv)routeservice.RegisterRouteDiscoveryServiceServer(gs, srv)discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, srv)err = setSnapshot(ctx, "xds-node-id", sc)if err != nil {log.Fatalf("set snapshot error: %v", err)} else {log.Println("set snapshot success")}log.Println("Starting control plane server...")if err := gs.Serve(lis); err != nil {log.Fatalf("Failed to serve: %v", err)}
}func setSnapshot(ctx context.Context, nodeId string, sc cache.SnapshotCache) error {clusterName := "xds_demo_cluster"manager := buildHttpManager(clusterName)fcs := buildFilterChain(manager)serviceListener := buildListener("0.0.0.0", 12000, fcs)serviceEndpoint := buildEndpoint()serviceCluster := buildCluster(clusterName, serviceEndpoint)rs := map[resource.Type][]types.Resource{resource.ClusterType: {serviceCluster},resource.EndpointType: {serviceEndpoint},resource.ListenerType: {serviceListener},resource.RouteType: {manager},}snapshot, err := cache.NewSnapshot("1", rs)if err != nil {log.Fatalf("new snapshot error: %v", err)}return sc.SetSnapshot(ctx, nodeId, snapshot)
}func buildFilterChain(manager *http_connection_manager.HttpConnectionManager) []*listener.FilterChain {managerPB, err := anypb.New(manager)if err != nil {log.Fatalf("Failed to marshal HttpConnectionManager: %v", err)}fcs := make([]*listener.FilterChain, 0, 0)fcs = append(fcs, &listener.FilterChain{Filters: []*listener.Filter{{Name: "envoy.filters.network.http_connection_manager",ConfigType: &listener.Filter_TypedConfig{TypedConfig: managerPB,},},},})return fcs
}func buildHttpManager(clusterName string) *http_connection_manager.HttpConnectionManager {xdsRoute := &route.RouteConfiguration{Name: "xds_demo_route",VirtualHosts: []*route.VirtualHost{{Name: "xds_demo_service",Domains: []string{"*"},Routes: []*route.Route{{Match: &route.RouteMatch{PathSpecifier: &route.RouteMatch_Prefix{Prefix: "/",},},Action: &route.Route_Route{Route: &route.RouteAction{HostRewriteSpecifier: &route.RouteAction_HostRewriteLiteral{HostRewriteLiteral: "www.envoyproxy.io",},// 集群要去下文一致ClusterSpecifier: &route.RouteAction_Cluster{Cluster: clusterName,},},},},},},},}routerConfig, _ := anypb.New(&routerv3.Router{})// http 链接管理器manager := &http_connection_manager.HttpConnectionManager{StatPrefix: "ingress_http",RouteSpecifier: &http_connection_manager.HttpConnectionManager_RouteConfig{RouteConfig: xdsRoute,},HttpFilters: []*http_connection_manager.HttpFilter{{Name: "envoy.filters.http.router",ConfigType: &http_connection_manager.HttpFilter_TypedConfig{TypedConfig: routerConfig,},},},SchemeHeaderTransformation: &corev3.SchemeHeaderTransformation{Transformation: &corev3.SchemeHeaderTransformation_SchemeToOverwrite{SchemeToOverwrite: "https",},},}return manager
}func buildEndpoint() *endpoint.LbEndpoint {epTarget := &endpoint.LbEndpoint{HostIdentifier: &endpoint.LbEndpoint_Endpoint{Endpoint: &endpoint.Endpoint{Address: &corev3.Address{Address: &corev3.Address_SocketAddress{SocketAddress: &corev3.SocketAddress{Address: "www.envoyproxy.io",PortSpecifier: &corev3.SocketAddress_PortValue{PortValue: 443,},},},},},},}return epTarget
}func buildCluster(clusterName string, ep *endpoint.LbEndpoint) *cluster.Cluster {serviceCluster := &cluster.Cluster{Name: clusterName,ConnectTimeout: durationpb.New(time.Second * 3),ClusterDiscoveryType: &cluster.Cluster_Type{Type: cluster.Cluster_STRICT_DNS,},DnsLookupFamily: cluster.Cluster_V4_ONLY,LbPolicy: cluster.Cluster_ROUND_ROBIN,LoadAssignment: &endpoint.ClusterLoadAssignment{ClusterName: clusterName,Endpoints: []*endpoint.LocalityLbEndpoints{{LbEndpoints: []*endpoint.LbEndpoint{ep},},},},TransportSocket: &corev3.TransportSocket{Name: "envoy.transport_sockets.tls",ConfigType: nil,},}us := &tlsv3.UpstreamTlsContext{Sni: "www.envoyproxy.io",}tlsConfig, _ := anypb.New(us)serviceCluster.TransportSocket.ConfigType = &corev3.TransportSocket_TypedConfig{TypedConfig: tlsConfig,}return serviceCluster
}func buildListener(ip string, port uint32, fcs []*listener.FilterChain) *listener.Listener {return &listener.Listener{Name: "listener_xds_demo",Address: &corev3.Address{Address: &corev3.Address_SocketAddress{SocketAddress: &corev3.SocketAddress{Address: ip,PortSpecifier: &corev3.SocketAddress_PortValue{PortValue: port,},},},},// 过滤器链FilterChains: fcs,}
}
cache.NewSnapshotCache
返回一个SnapshotCache
是go-control-plane
中的一个核心组件,用于管理和存储Envoy
所需的xDS
资源的快照,并且向Envoy
实例推送更新server.NewServer
创建xDS
服务实例sc.SetSnapshot
用于将生成的配置快照设置到指定的节点上,是动态配置和更新Envoy
代理的行为,入参有一个id 是下文envoy引导配置中的id
截止到现在我们就可以启动这个服务,我们要记住当前服务监听的地址,因为在envoy.yaml
中会需要使用到的
为 envoy 设置引导配置
引导配置(bootstrap configuration),引导配置文件通常指定控制面地址(如xDS
服务器地址)、监听器、集群、管理接口等基本信息。
node:id: xds-node-idcluster: xds-node-clusterdynamic_resources:ads_config:api_type: GRPCgrpc_services:- envoy_grpc:cluster_name: xds_clustercds_config:ads: {}lds_config:ads: {}static_resources:clusters:- name: xds_clusterconnect_timeout: 1stype: STRICT_DNSlb_policy: ROUND_ROBINhttp2_protocol_options: {}load_assignment:cluster_name: xds_clusterendpoints:- lb_endpoints:- endpoint:address:socket_address:address: 127.0.0.1port_value: 18000admin:access_log_path: /dev/nulladdress:socket_address:address: 0.0.0.0port_value: 9901
解释
- static_resources
- 定义静态集群
xds_cluster
,用于与xDS
服务器通信。这里的xDS
服务器运行在127.0.0.1:18000
就是我们上面服务监听的地址
- 定义静态集群
- dynamic_resources
- ads_config: 配置聚合发现服务(ADS)来动态获取配置,使用
gRPC
服务与xds_cluster
进行通信 cds_config
和lds_config
分别表示使用ADS
来获取配置
- ads_config: 配置聚合发现服务(ADS)来动态获取配置,使用
- admin
- 定义管理接口,监听
0.0.0.0:9901
,用于查看Envoy
的状态和配置
- 定义管理接口,监听
- node
- id 节点的唯一标识符,用于在
xDS
服务器中区分不同的Envoy
实例 - cluster 节点所属的集群名称。
- id 节点的唯一标识符,用于在
然后启动envoy
,从输出的日志中我们可以看到通过控制面下发的配置,数据面已经加载成功了。
[2024-06-27 07:47:53.524][1][info][main] [source/server/server.cc:977] starting main dispatch loop
[2024-06-27 07:47:53.526][1][info][upstream] [source/common/upstream/cds_api_helper.cc:32] cds: add 1 cluster(s), remove 1 cluster(s)
[2024-06-27 07:47:53.527][1][info][upstream] [source/common/upstream/cds_api_helper.cc:71] cds: added/updated 1 cluster(s), skipped 0 unmodified cluster(s)
[2024-06-27 07:47:53.544][1][info][upstream] [source/common/upstream/cluster_manager_impl.cc:240] cm init: all clusters initialized
[2024-06-27 07:47:53.544][1][info][main] [source/server/server.cc:957] all clusters initialized. initializing init manager
[2024-06-27 07:47:53.546][1][info][upstream] [source/common/listener_manager/lds_api.cc:106] lds: add/update listener 'listener_xds_demo'
[2024-06-27 07:47:53.546][1][info][config] [source/common/listener_manager/listener_manager_impl.cc:930] all dependencies initialized. starting workers
下一步我们可以访问一下监听的地址,可以看到成功转发到了envoy
的官方网站。
~ $ curl http://localhost:12000 | grep title% Total % Received % Xferd Average Speed Time Time Time CurrentDload Upload Total Spent Left Speed19 15571 19 3098 0 0 5286 0 0:00:02 --:--:-- 0:00:02 5286 <title>Envoy proxy - home</title>
100 15571 100 15571 0 0 21685 0 --:--:-- --:--:-- --:--:-- 21686
我们还可以通过 ~ $ curl http://localhost:9901/config_dump
来查看envoy
的实时配置
写在最后
动态配置的方式是在内存中加载配置,不会更新到静态的文件中。
更高级、复杂的用法可以参考istio
;具体来说pilot
watch
集群中的服务、端点、配置等资源的变化。当检测到这些资源的变化时,pilot
会生成新的配置,并通过xDS API
将更新推送到相应的Envoy
实例,从而实现动态配置和管理服务网格中的流量控制和路由规则。这样可以确保 Envoy
始终具有最新的服务发现信息和路由配置。
源码目录 https://github.com/istio/istio/tree/master/pilot