如何分析SuperEdge 拓扑算法

发布时间:2022-01-17 10:53:49 作者:柒染
来源:亿速云 阅读:131

这篇文章主要为大家分析了如何分析SuperEdge 拓扑算法的相关知识点,内容详细易懂,操作细节合理,具有一定参考价值。如果感兴趣的话,不妨跟着跟随小编一起来看看,下面跟着小编一起深入学习“如何分析SuperEdge 拓扑算法”的知识吧。


前言

 

SuperEdge 介绍

SuperEdge 是基于原生 Kubernetes 的边缘容器管理系统。该系统把云原生能力扩展到边缘侧,很好的实现了云端对边缘端的管理和控制。同时 superedge 自研了 service group 实现了基于边缘计算的服务访问控制,极大简化了应用从云端部署到边缘端的过程。  

SuperEdge service group拓扑感知特性

SuperEdge service group 利用 application-grid-wrapper 实现拓扑感知,完成了同一个 nodeunit 内服务的闭环访问。

在深入分析 application-grid-wrapper 之前,这里先简单介绍一下社区 Kubernetes 原生支持的拓扑感知特性[1]

Kubernetes service topology awareness 特性于v1.17发布 alpha 版本,用于实现路由拓扑以及就近访问特性。用户需要在service 中添加 topologyKeys 字段标示拓扑key类型,只有具有相同拓扑域的 endpoint 会被访问到,目前有三种 topologyKeys 可供选择:

除了单独填写如上某一个拓扑key之外,还可以将这些key构造成列表进行填写,例如:["kubernetes.io/hostname", "topology.kubernetes.io/zone", "topology.kubernetes.io/region"],这表示:优先访问本节点内的 endpoint;如果不存在,则访问同一个 zone 内的 endpoint;如果再不存在,则访问同一个 region 内的 endpoint,如果都不存在则访问失败。

另外,还可以在列表最后(只能最后一项)添加"*"表示:如果前面拓扑域都失败,则访问任何有效的 endpoint,也即没有限制拓扑了,示例如下:  
# A Service that prefers node local, zonal, then regional endpoints but falls back to cluster wide endpoints.
apiVersion: v1
kind: Service
metadata:  
  name: my-service
spec:  
  selector:    
    app: my-app  
  ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 9376  
  topologyKeys:    
    - "kubernetes.io/hostname"
    - "topology.kubernetes.io/zone" 
    - "topology.kubernetes.io/region"
    - "*"
 

而service group 实现的拓扑感知和社区对比,有如下区别:

service group 实现的拓扑感知,service 配置如下:

# A Service that only prefers node zone1al endpoints.
apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
  name: servicegrid-demo-svc
spec:  
  ports:  
  - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo
 

在介绍完 service group 实现的拓扑感知后,我们深入到源码分析实现细节。同样的,这里以一个使用示例开始分析:

# step1: labels edge nodes
$ kubectl  get nodes
NAME    STATUS   ROLES    AGE   VERSIO
Nnode0   Ready    <none>   16d   v1.16.7
node1    Ready    <none>   16d   v1.16.7
node2    Ready    <none>   16d   v1.16.7
# nodeunit1(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node0 zone1=nodeunit1  
# nodeunit2(nodegroup and servicegroup zone1)
$ kubectl --kubeconfig config label nodes node1 zone1=nodeunit2
$ kubectl --kubeconfig config label nodes node2 zone1=nodeunit2

...

# step3: deploy echo ServiceGrid
$ cat <<EOF | kubectl --kubeconfig config apply -f -
apiVersion: superedge.io/v1
kind: ServiceGrid
metadata:  
  name: servicegrid-demo  
  namespace: default
spec:  
  gridUniqKey: zone1  
  template:    
    selector:      
      appGrid: echo    
    ports:    
    - protocol: TCP      
      port: 80      
      targetPort: 8080
EOF
servicegrid.superedge.io/servicegrid-demo created
# note that there is only one relevant service generated
$ kubectl  get svc
NAME                TYPE        CLUSTER-IP        EXTERNAL-IP   PORT(S)   AGE
kubernetes          ClusterIP   192.168.0.1       <none>        443/TCP   16d
servicegrid-demo-svc   ClusterIP   192.168.6.139     <none>        80/TCP    10m
    
# step4: access servicegrid-demo-svc(service topology and closed-looped)
# execute on node0
$ curl 192.168.6.139|grep "node name"        node name:      node0
# execute on node1 and node2
$ curl 192.168.6.139|grep "node name" 
       node name:      node2
$ curl 192.168.6.139|grep "node name"
       node name:      node1
 

在创建完 ServiceGrid CR 后,ServiceGrid Controller 负责根据 ServiceGrid产生对应的 service (包含由 serviceGrid.Spec.GridUniqKey 构成的 topologyKeys annotations);而 application-grid-wrapper 根据 service 实现拓扑感知,下面依次分析。

 

ServiceGrid Controller 分析

ServiceGrid Controller 逻辑和 DeploymentGrid Controller 整体一致,如下:

注意这里区别于 DeploymentGrid Controller:

func (sgc *ServiceGridController) syncServiceGrid(key string) error {    
    startTime := time.Now()    
    klog.V(4).Infof("Started syncing service grid %q (%v)", key, startTime)    
    defer func() {        
      klog.V(4).Infof("Finished syncing service grid %q (%v)", key, time.Since(startTime))    
    }()    
    
    namespace, name, err := cache.SplitMetaNamespaceKey(key)    
    if err != nil {        
      return err    
    }    
    
    sg, err := sgc.svcGridLister.ServiceGrids(namespace).Get(name)    
    if errors.IsNotFound(err) {              
        klog.V(2).Infof("service grid %v has been deleted", key)        
        return nil    
    }    
    if err != nil {        
        return err    
    }    
    
    if sg.Spec.GridUniqKey == "" {           
    sgc.eventRecorder.Eventf(sg, corev1.EventTypeWarning, "Empty", "This service grid has an empty grid key")        
        return nil    
    }    
    
    // get service workload list of this grid    
    svcList, err := sgc.getServiceForGrid(sg)    
    if err != nil {        
        return err    
    }    
    
    if sg.DeletionTimestamp != nil {        
        return nil    
    }    
    
    // sync service grid relevant services workload    
    return sgc.reconcile(sg, svcList)
    }
    
func (sgc *ServiceGridController) getServiceForGrid(sg *crdv1.ServiceGrid) ([]*corev1.Service, error) {    
  svcList, err := sgc.svcLister.Services(sg.Namespace).List(labels.Everything())    
  if err != nil {        
    return nil, err    
  }    
  
  labelSelector, err := common.GetDefaultSelector(sg.Name)    
  if err != nil {        
      return nil, err    
  }   
     
  canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error)
  {        
         fresh, err := 
  sgc.crdClient.SuperedgeV1().ServiceGrids(sg.Namespace).Get(context.TODO(), sg.Name, metav1.GetOptions{})        
      if err != nil {            
        return nil, err        
      }        
      if fresh.UID != sg.UID {           
           return nil, fmt.Errorf("orignal service grid %v/%v is gone: got uid %v, wanted %v", sg.Namespace,                  
              sg.Name, fresh.UID, sg.UID)        
      }        
      return fresh, nil    
    })    
      
    cm := controller.NewServiceControllerRefManager(sgc.svcClient, sg, labelSelector, util.ControllerKind, canAdoptFunc)    
    return cm.ClaimService(svcList)
}
    
func (sgc *ServiceGridController) reconcile(g *crdv1.ServiceGrid, svcList 
[]*corev1.Service) error {    
    var (        
        adds    []*corev1.Service     
        updates []*corev1.Service     
        deletes []*corev1.Service    
    )    
    
    sgTargetSvcName := util.GetServiceName(g)    
    isExistingSvc := false    
    for _, svc := range svcList {     
        if svc.Name == sgTargetSvcName {            
            isExistingSvc = true     
            template := util.KeepConsistence(g, svc)            
            if !apiequality.Semantic.DeepEqual(template, svc) {           
                updates = append(updates, template)            
            }        
        } else {            
            deletes = append(deletes, svc)        
        }    
    }    
    
    if !isExistingSvc {        
        adds = append(adds, util.CreateService(g))    
    }    
    
    return sgc.syncService(adds, updates, deletes)
}

func CreateService(sg *crdv1.ServiceGrid) *corev1.Service {    
    svc := &corev1.Service{        
        ObjectMeta: metav1.ObjectMeta{            
          Name:      GetServiceName(sg),            
          Namespace: sg.Namespace,   
          // Append existed ServiceGrid labels to service to be created  
          Labels: func() map[string]string {                
              if sg.Labels != nil { 
                  newLabels := sg.Labels                    
                  newLabels[common.GridSelectorName] = sg.Name             
                  newLabels[common.GridSelectorUniqKeyName] = sg.Spec.GridUniqKey                    
                  return newLabels              
             } else {               
                  return map[string]string{                        
                      common.GridSelectorName:        sg.Name,             
                      common.GridSelectorUniqKeyName: sg.Spec.GridUniqKey, 
              }               
          }            
      }(),            
      Annotations: make(map[string]string),       
    },        
    Spec: sg.Spec.Template,   
  }    
  
  keys := make([]string, 1)    
  keys[0] = sg.Spec.GridUniqKey    
  keyData, _ := json.Marshal(keys)     
  svc.Annotations[common.TopologyAnnotationsKey] = string(keyData)    
  
  return svc
}
 

由于逻辑与 DeploymentGrid 类似,这里不展开细节,重点关注 application-grid-wrapper 部分。

 

application-grid-wrapper 分析

在 ServiceGrid Controller 创建完 service 之后,application-grid-wrapper 的作用就开始启动了:

apiVersion: v1
kind: Service
metadata:  
  annotations:    
    topologyKeys: '["zone1"]'  
  creationTimestamp: "2021-03-03T07:33:30Z"  
  labels:    
    superedge.io/grid-selector: servicegrid-demo  
    name: servicegrid-demo-svc  
    namespace: default  
    ownerReferences:  
    - apiVersion: superedge.io/v1    
      blockOwnerDeletion: true    
      controller: true    
      kind: ServiceGrid    
      name: servicegrid-demo    
      uid: 78c74d3c-72ac-4e68-8c79-f1396af5a581  
    resourceVersion: "127987090"  
    selfLink: /api/v1/namespaces/default/services/servicegrid-demo-svc  
    uid: 8130ba7b-c27e-4c3a-8ceb-4f6dd0178dfc
spec:  
    clusterIP: 192.168.161.1  
    ports:  
    - port: 80    
    protocol: TCP    
    targetPort: 8080  
  selector:    
    appGrid: echo  
  sessionAffinity: None  
  type: ClusterIP
status:  
  loadBalancer: {}
 

为了实现 Kubernetes 零侵入,需要在 kube-proxy 与 apiserver 通信之间添加一层 wrapper,架构如下:

如何分析SuperEdge 拓扑算法  

调用链路如下:

kube-proxy -> application-grid-wrapper -> lite-apiserver -> kube-apiserver
 

因此 application-grid-wrapper 会起服务,接受来自 kube-proxy 的请求,如下:

func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
    ...    
    klog.Infof("Start to run interceptor server")    
    /* filter     
    */    
    server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
    
    if insecure {        
        return server.ListenAndServe()    
    }    
    ...    
    server.TLSConfig = tlsConfig    
    return server.ListenAndServeTLS("", "")
}

func (s *interceptorServer) buildFilterChains(debug bool) http.Handler {   
    handler := http.Handler(http.NewServeMux())    
    
    handler = s.interceptEndpointsRequest(handler)    
    handler = s.interceptServiceRequest(handler)    
    handler = s.interceptEventRequest(handler)    
    handler = s.interceptNodeRequest(handler)    
    handler = s.logger(handler)    
    
  if debug {        
      handler = s.debugger(handler)   
  }    
  
  return handler
}
 
这里会首先创建 interceptorServer,然后注册处理函数,由外到内依次如下:  
 
func (s *interceptorServer) interceptEventRequest(handler http.Handler) http.Handler {  
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {           if r.Method != http.MethodPost || !strings.HasSuffix(r.URL.Path, "/events") {          
         handler.ServeHTTP(w, r)     
         return      
     }      
     
     targetURL, _ := url.Parse(s.restConfig.Host)      
     reverseProxy := httputil.NewSingleHostReverseProxy(targetURL)      
     reverseProxy.Transport, _ = rest.TransportFor(s.restConfig)      
     reverseProxy.ServeHTTP(w, r) 
    })
  }
 

下面先重点分析 cache 部分的逻辑,然后再回过头来分析具体的 http handler List&Watch 处理逻辑。

wrapper 为了实现拓扑感知,自己维护了一个 cache,包括:node,service,endpoint。可以看到在 setupInformers 中注册了这三类资源的处理函数:

type storageCache struct {    
    // hostName is the nodeName of node which application-grid-wrapper deploys on    
    hostName         string    
    wrapperInCluster bool    
    
    // mu lock protect the following map structure    
    mu           sync.RWMutex    
    servicesMap  map[types.NamespacedName]*serviceContainer    
    endpointsMap map[types.NamespacedName]*endpointsContainer    
    nodesMap     map[types.NamespacedName]*nodeContainer    
    
    // service watch channel    
    serviceChan chan<- watch.Event   
    // endpoints watch channel    
    endpointsChan chan<- watch.Event
}
...
func NewStorageCache(hostName string, wrapperInCluster bool, serviceNotifier, endpointsNotifier chan watch.Event) *storageCache {    
   msc := &storageCache{        
       hostName:         hostName,   
       wrapperInCluster: wrapperInCluster,        
       servicesMap:      make(map[types.NamespacedName]*serviceContainer), 
       endpointsMap:     make(map[types.NamespacedName]*endpointsContainer),        
       nodesMap:         make(map[types.NamespacedName]*nodeContainer),  
       serviceChan:      serviceNotifier,        
       endpointsChan:    endpointsNotifier,    
    }    
    
    return msc
}
...
func (s *interceptorServer) Run(debug bool, bindAddress string, insecure bool, caFile, certFile, keyFile string) error {    
    ...    
    if err := s.setupInformers(ctx.Done()); err != nil {        
        return err    
   }    
  
   klog.Infof("Start to run interceptor server")    
   /* filter     
   */    
  server := &http.Server{Addr: bindAddress, Handler: s.buildFilterChains(debug)}    
  ...    
  return server.ListenAndServeTLS("", "")
}

func (s *interceptorServer) setupInformers(stop <-chan struct{}) error {   
    klog.Infof("Start to run service and endpoints informers")    
    noProxyName, err := labels.NewRequirement(apis.LabelServiceProxyName, selection.DoesNotExist, nil)    
    if err != nil {        
        klog.Errorf("can't parse proxy label, %v", err)        
        return err    
    }    
    
    noHeadlessEndpoints, err := labels.NewRequirement(v1.IsHeadlessService, selection.DoesNotExist, nil)    
    if err != nil {        
        klog.Errorf("can't parse headless label, %v", err)        
        return err    
    }    
        
    labelSelector := labels.NewSelector()    
    labelSelector = labelSelector.Add(*noProxyName, *noHeadlessEndpoints) 
    
    resyncPeriod := time.Minute * 5   
    client := kubernetes.NewForConfigOrDie(s.restConfig)    
    nodeInformerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)    
    informerFactory := informers.NewSharedInformerFactoryWithOptions(client, resyncPeriod,        
      informers.WithTweakListOptions(func(options *metav1.ListOptions) {          
          options.LabelSelector = labelSelector.String()        
       }))    
         
    nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer()    
    serviceInformer := informerFactory.Core().V1().Services().Informer()    
    endpointsInformer := informerFactory.Core().V1().Endpoints().Informer()   
    
    /*    
    */    
    nodeInformer.AddEventHandlerWithResyncPeriod(s.cache.NodeEventHandler(), resyncPeriod)    
    serviceInformer.AddEventHandlerWithResyncPeriod(s.cache.ServiceEventHandler(), resyncPeriod)    
   
    endpointsInformer.AddEventHandlerWithResyncPeriod(s.cache.EndpointsEventHandler(), resyncPeriod)    
    
    go nodeInformer.Run(stop)    
    go serviceInformer.Run(stop)    
    go endpointsInformer.Run(stop)    
    
    if !cache.WaitForNamedCacheSync("node", stop,        
        nodeInformer.HasSynced,        
        serviceInformer.HasSynced,     
        endpointsInformer.HasSynced) {      
        return fmt.Errorf("can't sync informers")    
    }    
    
    return nil
}

func (sc *storageCache) NodeEventHandler() cache.ResourceEventHandler {   
     return &nodeHandler{cache: sc}
}

func (sc *storageCache) ServiceEventHandler() cache.ResourceEventHandler {    
    return &serviceHandler{cache: sc}

}
func (sc *storageCache) EndpointsEventHandler() cache.ResourceEventHandler {    
    return &endpointsHandler{cache: sc}
}
 
这里依次分析 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler,如下:  
 

1、NodeEventHandler

NodeEventHandler 负责监听 node 资源相关 event,并将 node 以及 node Labels 添加到 storageCache.nodesMap 中(key为nodeName,value为node以及node labels)。

func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
        }
}

func (nh *nodeHandler) update(node *v1.Node) {    
    sc := nh.cache    
    
    sc.mu.Lock()    
        nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Updating node %v", nodeKey)    
    nodeContainer, found := sc.nodesMap[nodeKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("Updating non-existed node %v", nodeKey)        
        return    
    }    
    
    nodeContainer.node = node    
    // return directly when labels of node stay unchanged    
    if reflect.DeepEqual(node.Labels, nodeContainer.labels) {        
        sc.mu.Unlock()        
        return    
    }    
    nodeContainer.labels = node.Labels    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps {        
        sc.endpointsChan <- eps  
    }
}
...
 

同时由于 node 的改变会影响 endpoint,因此会调用 rebuildEndpointsMap 刷新 storageCache.endpointsMap。

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {   
    evts := make([]watch.Event, 0)   
    for name, endpointsContainer := range sc.endpointsMap {       
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)   
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,      
            Object: newEps,     
       })    
   }    
   return evts
}
 

rebuildEndpointsMap 是 cache 的核心函数,同时也是拓扑感知的算法实现:

// pruneEndpoints filters endpoints using serviceTopology rules combined by services topologyKeys and node labels
func pruneEndpoints(hostName string,
    nodes map[types.NamespacedName]*nodeContainer,    
    services map[types.NamespacedName]*serviceContainer,    
    eps *v1.Endpoints, wrapperInCluster bool) *v1.Endpoints {   
    
    epsKey := types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}  
    
    if wrapperInCluster {       
        eps = genLocalEndpoints(eps) 
    }   
    
    // dangling endpoints    
    svc, ok := services[epsKey] 
    if !ok {        
        klog.V(4).Infof("Dangling endpoints %s, %+#v", eps.Name, eps.Subsets)        
        return eps    
    }   
        
    // normal service    
    if len(svc.keys) == 0 {     
        klog.V(4).Infof("Normal endpoints %s, %+#v", eps.Name, eps.Subsets)    
        return eps   
    }    
    
    // topology endpoints    
    newEps := eps.DeepCopy()    
    for si := range newEps.Subsets { 
        subnet := &newEps.Subsets[si]
        subnet.Addresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.Addresses)        
        subnet.NotReadyAddresses = filterConcernedAddresses(svc.keys, hostName, nodes, subnet.NotReadyAddresses)    
    }    
    klog.V(4).Infof("Topology endpoints %s: subnets from %+#v to %+#v", eps.Name, eps.Subsets, newEps.Subsets) 
    
    return newEps
}

// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil    
    }    
    
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
             epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]           
             if !found {             
                 continue            
             }            
             if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                 filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
             }        
        }    
    }    
    
    return filteredEndpointAddresses 
}

func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool { 
    if n1 == nil || n2 == nil {      
        return false    
    }    
    
    for _, key := range keys {       
        val1, v1found := n1[key]     
        val2, v2found := n2[key]    
        
        if v1found && v2found && val1 == val2 {            
        return true        
        }    
    }    
    
    return false   
}
 

算法逻辑如下:

apiVersion: v1
kind: Endpoints
metadata:  
  annotations:    
    superedge.io/local-endpoint: 127.0.0.1    
    superedge.io/local-port: "51003" 
  name: kubernetes  
  namespace: default
subsets:
- addresses: 
  - ip: 172.31.0.60  
  ports:  
  - name: https    
  port: xxx    
  protocol: TCP
 
func genLocalEndpoints(eps *v1.Endpoints) *v1.Endpoints {    
    if eps.Namespace != metav1.NamespaceDefault || eps.Name != MasterEndpointName {        
        return eps    
    }    
    
    klog.V(4).Infof("begin to gen local ep %v", eps)    
    ipAddress, e := eps.Annotations[EdgeLocalEndpoint]    
    if !e {        
        return eps    
    }    
        
    portStr, e := eps.Annotations[EdgeLocalPort]    
    if !e {        
        return eps    
    }    
    
    klog.V(4).Infof("get local endpoint %s:%s", ipAddress, portStr)    
    port, err := strconv.ParseInt(portStr, 10, 32)    
    if err != nil {        
        klog.Errorf("parse int %s err %v", portStr, err)        
        return eps    
    }  
    
    ip := net.ParseIP(ipAddress)   
    if ip == nil {        
        klog.Warningf("parse ip %s nil", ipAddress)        
        return eps    
    }    
    
    nep := eps.DeepCopy()    
    nep.Subsets = []v1.EndpointSubset{        
       {            
           Addresses: []v1.EndpointAddress{                
              {                    
                   IP: ipAddress,   
                },            
            },            
            Ports: []v1.EndpointPort{                
                 {                    
                     Protocol: v1.ProtocolTCP,                    
                     Port:     int32(port),                    
                    Name:     "https",                
                },         
             },      
         },   
    }  
      
    klog.V(4).Infof("gen new endpoint complete %v", nep)      
    return nep
}
 
这样做的目的是使边缘节点上的服务采用集群内 (InCluster) 方式访问的 apiserver 为本地的 lite-apiserver,而不是云端的 apiserver。  
 
func getTopologyKeys(objectMeta *metav1.ObjectMeta) []string {   
    if !hasTopologyKey(objectMeta) { 
        return nil    
    }   
    
    var keys []string   
    keyData := objectMeta.Annotations[TopologyAnnotationsKey]    
    if err := json.Unmarshal([]byte(keyData), &keys); err != nil {        
        klog.Errorf("can't parse topology keys %s, %v", keyData, err)       
        return nil    
    }    
    
    return keys
}
 
// filterConcernedAddresses aims to filter out endpoints addresses within the same node unit
func filterConcernedAddresses(topologyKeys []string, hostName string, nodes 
map[types.NamespacedName]*nodeContainer,    
    addresses []v1.EndpointAddress) []v1.EndpointAddress {    
    hostNode, found := nodes[types.NamespacedName{Name: hostName}]    
    if !found {        
        return nil   
    }
        
    filteredEndpointAddresses := make([]v1.EndpointAddress, 0)    
    for i := range addresses {       
        addr := addresses[i]        
        if nodeName := addr.NodeName; nodeName != nil {            
            epsNode, found := nodes[types.NamespacedName{Name: *nodeName}]   
            if !found {                
                continue            
            }           
            if hasIntersectionLabel(topologyKeys, hostNode.labels, epsNode.labels) {                
                filteredEndpointAddresses = append(filteredEndpointAddresses, addr)            
            }       
        }   
    }    
    
    return filteredEndpointAddresses
}
    func hasIntersectionLabel(keys []string, n1, n2 map[string]string) bool {   
        if n1 == nil || n2 == nil {       
            return false    
        }   
    
       for _, key := range keys {       
           val1, v1found := n1[key]       
           val2, v2found := n2[key]       
    
      if v1found && v2found && val1 == val2 {           
          return true        
       }    
   }    
  
   return false
}
 
注意:如果 wrapper 所在边缘节点没有 service topologyKeys 标签,则也无法访问该 service。  
回到 rebuildEndpointsMap,在调用 pruneEndpoints 刷新了同一个拓扑域内的 endpoint 后,会将修改后的 endpoints 赋值给 storageCache .endpointsMap [endpoint]. modified (该字段记录了拓扑感知后修改的endpoints)。  
func (nh *nodeHandler) add(node *v1.Node) {    
    sc := nh.cache   
    
    sc.mu.Lock()    
    
    nodeKey := types.NamespacedName{Namespace: node.Namespace, Name: node.Name}    
    klog.Infof("Adding node %v", nodeKey)    
    sc.nodesMap[nodeKey] = &nodeContainer{        
        node:   node,        
        labels: node.Labels,    
    }    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}

// rebuildEndpointsMap updates all endpoints stored in storageCache.endpointsMap dynamically and constructs relevant modified events
func (sc *storageCache) rebuildEndpointsMap() []watch.Event {    
    evts := make([]watch.Event, 0)  
    for name, endpointsContainer := range sc.endpointsMap {        
        newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpointsContainer.endpoints, sc.wrapperInCluster)        
        if apiequality.Semantic.DeepEqual(newEps, endpointsContainer.modified) {            
            continue        
        }        
        sc.endpointsMap[name].modified = newEps        
        evts = append(evts, watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,        
        })    
    }    
    return evts
}
 
另外,如果 endpoints (拓扑感知后修改的 endpoints)发生改变,会构建 watch event,传递给 endpoints handler (interceptEndpointsRequest)处理。  

2、ServiceEventHandler

storageCache.servicesMap 结构体 key 为 service 名称(namespace/name),value 为 serviceContainer,包含如下数据:

对于 service 资源的改动,这里用 Update event 说明:  
func (sh *serviceHandler) update(service *v1.Service) {    
    sc := sh.cache   
    
    sc.mu.Lock()    
    serviceKey := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}    
    klog.Infof("Updating service %v", serviceKey)    
    newTopologyKeys := getTopologyKeys(&service.ObjectMeta)    
    serviceContainer, found := sc.servicesMap[serviceKey]    
    if !found {        
        sc.mu.Unlock()        
        klog.Errorf("update non-existed service, %v", serviceKey)        
        return    
    }   
   
    sc.serviceChan <- watch.Event{   
        Type:   watch.Modified,       
        Object: service,    
    }    
    
    serviceContainer.svc = service   
    // return directly when topologyKeys of service stay unchanged    
    if reflect.DeepEqual(serviceContainer.keys, newTopologyKeys) {        
        sc.mu.Unlock()        
        return    
    }    
    
    serviceContainer.keys = newTopologyKeys    
    
    // update endpoints    
    changedEps := sc.rebuildEndpointsMap()    
    sc.mu.Unlock()    
    
    for _, eps := range changedEps { 
        sc.endpointsChan <- eps    
    }
}
 
逻辑如下:  
 

3、EndpointsEventHandler

storageCache.endpointsMap 结构体 key 为 endpoints 名称(namespace/name),value 为 endpointsContainer,包含如下数据:

对于 endpoints 资源的改动,这里用 Update event 说   明:  
func (eh *endpointsHandler) update(endpoints *v1.Endpoints) {    
    sc := eh.cache   
     
    sc.mu.Lock()   
    endpointsKey := types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}    
    klog.Infof("Updating endpoints %v", endpointsKey)    
    
    endpointsContainer, found := sc.endpointsMap[endpointsKey]    
    if !found {        
        sc.mu.Unlock()       
        klog.Errorf("Updating non-existed endpoints %v", endpointsKey)    
        return    
    }    
    endpointsContainer.endpoints = endpoints    
    newEps := pruneEndpoints(sc.hostName, sc.nodesMap, sc.servicesMap, endpoints, sc.wrapperInCluster)    
    changed := !apiequality.Semantic.DeepEqual(endpointsContainer.modified, newEps)    
    if changed {        
        endpointsContainer.modified = newEps   
    }    
    sc.mu.Unlock()    
    
    if changed {       
        sc.endpointsChan <- watch.Event{            
            Type:   watch.Modified,   
            Object: newEps,       
        }   
    }
}
 
逻辑如下:  
 
在分析完 NodeEventHandler,ServiceEventHandler 以及 EndpointsEventHandler 之后,我们回到具体的 http handler List&Watch 处理逻辑上,这里以 endpoints 为例:      
func (s *interceptorServer) interceptEndpointsRequest(handler http.Handler) http.Handler {    
    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 
        if r.Method != http.MethodGet || 
!strings.HasPrefix(r.URL.Path, "/api/v1/endpoints") {            
            handler.ServeHTTP(w, r)   
            return       
        }        
        
        queries := r.URL.Query()     
        acceptType := r.Header.Get("Accept")        
        info, found := s.parseAccept(acceptType, s.mediaSerializer)       
        if !found {            
            klog.Errorf("can't find %s serializer", acceptType)           
            w.WriteHeader(http.StatusBadRequest)            
            return        
        }        
        
        encoder := 
scheme.Codecs.EncoderForVersion(info.Serializer, v1.SchemeGroupVersion)   
        // list request        
        if queries.Get("watch") == "" {           
            w.Header().Set("Content-Type", info.MediaType)            
            allEndpoints := s.cache.GetEndpoints()           
            epsItems := make([]v1.Endpoints, 0, len(allEndpoints))        
            for _, eps := range allEndpoints {                
                epsItems = append(epsItems, *eps)            
            }            
            
            epsList := &v1.EndpointsList{                
                Items: epsItems,     
             }           
             
          err := encoder.Encode(epsList, w)            
          if err != nil {             
              klog.Errorf("can't marshal endpoints list, %v", err)        
             w.WriteHeader(http.StatusInternalServerError)               
              return            
          }    
            
              return       
      }       
         
      // watch request       
      timeoutSecondsStr := r.URL.Query().Get("timeoutSeconds")   
      timeout := time.Minute        
      if timeoutSecondsStr != "" {  
          timeout, _ = time.ParseDuration(fmt.Sprintf("%ss", timeoutSecondsStr))       
      }        
        
      timer := time.NewTimer(timeout)      
      defer timer.Stop()       
      
      flusher, ok := w.(http.Flusher)      
      if !ok {            
          klog.Errorf("unable to start watch - can't get http.Flusher: %#v", w)            
         w.WriteHeader(http.StatusMethodNotAllowed)            
          return        
        }    
        
        e := restclientwatch.NewEncoder(           
           streaming.NewEncoder(info.StreamSerializer.Framer.NewFrameWriter(w),       
                scheme.Codecs.EncoderForVersion(info.StreamSerializer, v1.SchemeGroupVersion)),            
          encoder)        
      if info.MediaType == runtime.ContentTypeProtobuf {         
          w.Header().Set("Content-Type", 
    runtime.ContentTypeProtobuf+";stream=watch")       
      } else {           
          w.Header().Set("Content-Type", runtime.ContentTypeJSON)        
      }        
      w.Header().Set("Transfer-Encoding", "chunked")        
      w.WriteHeader(http.StatusOK)   
      flusher.Flush()        
      for {            
          select {            
          case <-r.Context().Done(): 
              return            
          case <-timer.C:            
              return           
          case evt := <-s.endpointsWatchCh:               
              klog.V(4).Infof("Send endpoint watch event: %+#v", evt)     
              err := e.Encode(&evt)   
              if err != nil {         
              klog.Errorf("can't encode watch event, %v", err)             
              return                
          }               
          
          if len(s.endpointsWatchCh) == 0 {                    
              flusher.Flush()         
            }            
         }         
      }    
  })
}
 

逻辑如下:

func (sc *storageCache) GetEndpoints() []*v1.Endpoints {    
    sc.mu.RLock()   
    defer sc.mu.RUnlock()   
    
    epList := make([]*v1.Endpoints, 0, 
len(sc.endpointsMap))    
    for _, v := range sc.endpointsMap {        
        epList = append(epList, v.modified)    
    }    
    return epList
}
  

总结

关于“如何分析SuperEdge 拓扑算法”就介绍到这了,更多相关内容可以搜索亿速云以前的文章,希望能够帮助大家答疑解惑,请多多支持亿速云网站!

推荐阅读:
  1. 如何分析EIGRP协议
  2. python中骨架提取与分水岭算法的示例分析

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

superedge

上一篇:circinteractome数据库有什么用

下一篇:Python怎么实现自动化发送邮件

相关阅读

您好,登录后才能下订单哦!

密码登录
登录注册
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》