您好,登录后才能下订单哦!
本篇文章给大家分享的是有关怎样实现kubeproxy源码分析,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。
kubernetes离线安装包
ipvs相对于iptables模式具备较高的性能与稳定性, 本文讲以此模式的源码解析为主,如果想去了解iptables模式的原理,可以去参考其实现,架构上无差别。
kube-proxy主要功能是监听service和endpoint的事件,然后下放代理策略到机器上。 底层调用docker/libnetwork, 而libnetwork最终调用了netlink 与netns来实现ipvs的创建等动作
<!--more-->
代码入口:cmd/kube-proxy/app/server.go Run() 函数
通过命令行参数去初始化proxyServer的配置
proxyServer, err := NewProxyServer(o)
type ProxyServer struct {
    // k8s client
	Client                 clientset.Interface
	EventClient            v1core.EventsGetter
    // ipvs 相关接口
	IptInterface           utiliptables.Interface
	IpvsInterface          utilipvs.Interface
	IpsetInterface         utilipset.Interface
    // 处理同步时的处理器
	Proxier                proxy.ProxyProvider
    // 代理模式,ipvs iptables userspace kernelspace(windows)四种
	ProxyMode              string
    // 配置同步周期
	ConfigSyncPeriod       time.Duration
    // service 与 endpoint 事件处理器
	ServiceEventHandler    config.ServiceHandler
	EndpointsEventHandler  config.EndpointsHandler
}Proxier是主要入口,抽象了两个函数:
type ProxyProvider interface {
	// Sync immediately synchronizes the ProxyProvider's current state to iptables.
	Sync()
	// 定期执行
	SyncLoop()
}ipvs 的interface 这个很重要:
type Interface interface {
	// 删除所有规则
	Flush() error
	// 增加一个virtual server
	AddVirtualServer(*VirtualServer) error
	UpdateVirtualServer(*VirtualServer) error
	DeleteVirtualServer(*VirtualServer) error
	GetVirtualServer(*VirtualServer) (*VirtualServer, error)
	GetVirtualServers() ([]*VirtualServer, error)
    // 给virtual server加个realserver, 如 VirtualServer就是一个clusterip realServer就是pod(或者自定义的endpoint)
	AddRealServer(*VirtualServer, *RealServer) error
	GetRealServers(*VirtualServer) ([]*RealServer, error)
	DeleteRealServer(*VirtualServer, *RealServer) error
}我们在下文再详细看ipvs_linux是如何实现上面接口的
virtual server与realserver, 最重要的是ip:port,然后就是一些代理的模式如sessionAffinity等:
type VirtualServer struct {
	Address   net.IP
	Protocol  string
	Port      uint16
	Scheduler string
	Flags     ServiceFlags
	Timeout   uint32
}
type RealServer struct {
	Address net.IP
	Port    uint16
	Weight  int
}创建apiserver client
client, eventClient, err := createClients(config.ClientConnection, master)
创建Proxier 这是仅仅关注ipvs模式的proxier
else if proxyMode == proxyModeIPVS {
		glog.V(0).Info("Using ipvs Proxier.")
		proxierIPVS, err := ipvs.NewProxier(
			iptInterface,
			ipvsInterface,
			ipsetInterface,
			utilsysctl.New(),
			execer,
			config.IPVS.SyncPeriod.Duration,
			config.IPVS.MinSyncPeriod.Duration,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			getNodeIP(client, hostname),
			recorder,
			healthzServer,
			config.IPVS.Scheduler,
		)
...
		proxier = proxierIPVS
		serviceEventHandler = proxierIPVS
		endpointsEventHandler = proxierIPVS这个Proxier具备以下方法:
+OnEndpointsAdd(endpoints *api.Endpoints) +OnEndpointsDelete(endpoints *api.Endpoints) +OnEndpointsSynced() +OnEndpointsUpdate(oldEndpoints, endpoints *api.Endpoints) +OnServiceAdd(service *api.Service) +OnServiceDelete(service *api.Service) +OnServiceSynced() +OnServiceUpdate(oldService, service *api.Service) +Sync() +SyncLoop()
所以ipvs的这个Proxier实现了我们需要的绝大部分接口
小结一下:
+-----------> endpointHandler | +-----------> serviceHandler | ^ | | +-------------> sync 定期同步等 | | | ProxyServer---------> Proxier --------> service 事件回调 | | | +-------------> endpoint事件回调 | | 触发 +-----> ipvs interface ipvs handler <-----+
检查是不是带了clean up参数,如果带了那么清除所有规则退出
OOM adjuster貌似没实现,忽略
resouceContainer也没实现,忽略
启动metrics服务器,这个挺重要,比如我们想监控时可以传入这个参数, 包含promethus的 metrics. metrics-bind-address参数
启动informer, 开始监听事件,分别启动协程处理。
1 2 3 4我们都不用太关注,细看5即可:
informerFactory := informers.NewSharedInformerFactory(s.Client, s.ConfigSyncPeriod) serviceConfig := config.NewServiceConfig(informerFactory.Core().InternalVersion().Services(), s.ConfigSyncPeriod) // 注册 service handler并启动 serviceConfig.RegisterEventHandler(s.ServiceEventHandler) // 这里面仅仅是把ServiceEventHandler赋值给informer回调 go serviceConfig.Run(wait.NeverStop) endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().InternalVersion().Endpoints(), s.ConfigSyncPeriod) // 注册endpoint endpointsConfig.RegisterEventHandler(s.EndpointsEventHandler) go endpointsConfig.Run(wait.NeverStop) go informerFactory.Start(wait.NeverStop)
serviceConfig.Run与endpointConfig.Run仅仅是给回调函数赋值, 所以注册的handler就给了informer, informer监听到事件时就会回调:
for i := range c.eventHandlers {
	glog.V(3).Infof("Calling handler.OnServiceSynced()")
	c.eventHandlers[i].OnServiceSynced()
}那么问题来了,注册进去的这个handler是啥? 回顾一下上文的
serviceEventHandler = proxierIPVS endpointsEventHandler = proxierIPVS
所以都是这个proxierIPVS
handler的回调函数, informer会回调这几个函数,所以我们在自己开发时实现这个interface注册进去即可:
type ServiceHandler interface {
	// OnServiceAdd is called whenever creation of new service object
	// is observed.
	OnServiceAdd(service *api.Service)
	// OnServiceUpdate is called whenever modification of an existing
	// service object is observed.
	OnServiceUpdate(oldService, service *api.Service)
	// OnServiceDelete is called whenever deletion of an existing service
	// object is observed.
	OnServiceDelete(service *api.Service)
	// OnServiceSynced is called once all the initial even handlers were
	// called and the state is fully propagated to local cache.
	OnServiceSynced()
}go informerFactory.Start(wait.NeverStop)
这里执行后,我们创建删除service endpoint等动作都会被监听到,然后回调,回顾一下上面的图,最终都是由Proxier去实现,所以后面我们重点关注Proxier即可
s.Proxier.SyncLoop()
然后开始SyncLoop,下文开讲
我们创建一个service时OnServiceAdd方法会被调用, 这里记录一下之前的状态与当前状态两个东西,然后发个信号给syncRunner让它去处理:
func (proxier *Proxier) OnServiceAdd(service *api.Service) {
	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
	if proxier.serviceChanges.update(&namespacedName, nil, service) && proxier.isInitialized() {
		proxier.syncRunner.Run()
	}
}记录service 信息,可以看到没做什么事,就是把service存在map里, 如果没变直接删掉map信息不做任何处理:
change, exists := scm.items[*namespacedName]
if !exists {
	change = &serviceChange{}
    // 老的service信息
	change.previous = serviceToServiceMap(previous)
	scm.items[*namespacedName] = change
}
// 当前监听到的service信息
change.current = serviceToServiceMap(current)
如果一样,直接删除
if reflect.DeepEqual(change.previous, change.current) {
	delete(scm.items, *namespacedName)
}proxier.syncRunner.Run() 里面就发送了一个信号
select {
case bfr.run <- struct{}{}:
default:
}这里面处理了这个信号
s.Proxier.SyncLoop()
func (proxier *Proxier) SyncLoop() {
	// Update healthz timestamp at beginning in case Sync() never succeeds.
	if proxier.healthzServer != nil {
		proxier.healthzServer.UpdateTimestamp()
	}
	proxier.syncRunner.Loop(wait.NeverStop)
}runner里收到信号执行,没收到信号会定期执行:
func (bfr *BoundedFrequencyRunner) Loop(stop <-chan struct{}) {
	glog.V(3).Infof("%s Loop running", bfr.name)
	bfr.timer.Reset(bfr.maxInterval)
	for {
		select {
		case <-stop:
			bfr.stop()
			glog.V(3).Infof("%s Loop stopping", bfr.name)
			return
		case <-bfr.timer.C():  // 定期执行
			bfr.tryRun()
		case <-bfr.run:
			bfr.tryRun()       // 收到事件信号执行
		}
	}
}这个bfr runner里我们最需要主意的是一个回调函数,tryRun里检查这个回调是否满足被调度的条件:
type BoundedFrequencyRunner struct {
	name        string        // the name of this instance
	minInterval time.Duration // the min time between runs, modulo bursts
	maxInterval time.Duration // the max time between runs
	run chan struct{} // try an async run
	mu      sync.Mutex  // guards runs of fn and all mutations
	fn      func()      // function to run, 这个回调
	lastRun time.Time   // time of last run
	timer   timer       // timer for deferred runs
	limiter rateLimiter // rate limiter for on-demand runs
}
// 传入的proxier.syncProxyRules这个函数
proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)这是个600行左右的搓逼函数,也是处理主要逻辑的地方。
设置一些iptables规则,如mark与comment
确定机器上有网卡,ipvs需要绑定地址到上面
确定有ipset,ipset是iptables的扩展,可以给一批地址设置iptables规则 ...(又臭又长,重复代码多,看不下去了,细节问题自己去看吧)
我们最关注的,如何去处理VirtualServer的
serv := &utilipvs.VirtualServer{
	Address:   net.ParseIP(ingress.IP),
	Port:      uint16(svcInfo.port),
	Protocol:  string(svcInfo.protocol),
	Scheduler: proxier.ipvsScheduler,
}
if err := proxier.syncService(svcNameString, serv, false); err == nil {
	if err := proxier.syncEndpoint(svcName, svcInfo.onlyNodeLocalEndpoints, serv); err != nil {
	}
}看下实现, 如果没有就创建,如果已存在就更新, 给网卡绑定service的cluster ip:
func (proxier *Proxier) syncService(svcName string, vs *utilipvs.VirtualServer, bindAddr bool) error {
	appliedVirtualServer, _ := proxier.ipvs.GetVirtualServer(vs)
	if appliedVirtualServer == nil || !appliedVirtualServer.Equal(vs) {
		if appliedVirtualServer == nil {
			if err := proxier.ipvs.AddVirtualServer(vs); err != nil {
				return err
			}
		} else {
			if err := proxier.ipvs.UpdateVirtualServer(appliedVirtualServer); err != nil {
				return err
			}
		}
	}
	// bind service address to dummy interface even if service not changed,
	// in case that service IP was removed by other processes
	if bindAddr {
		_, err := proxier.netlinkHandle.EnsureAddressBind(vs.Address.String(), DefaultDummyDevice)
		if err != nil {
			return err
		}
	}
	return nil
}现在可以去看ipvs的AddVirtualServer的实现了,主要是利用socket与内核进程通信做到的。 pkg/util/ipvs/ipvs_linux.go 里 runner结构体实现了这些方法, 这里用到了 docker/libnetwork/ipvs库:
// runner implements Interface.
type runner struct {
	exec       utilexec.Interface
	ipvsHandle *ipvs.Handle
}
// New returns a new Interface which will call ipvs APIs.
func New(exec utilexec.Interface) Interface {
	ihandle, err := ipvs.New("") // github.com/docker/libnetwork/ipvs
	if err != nil {
		glog.Errorf("IPVS interface can't be initialized, error: %v", err)
		return nil
	}
	return &runner{
		exec:       exec,
		ipvsHandle: ihandle,
	}
}New的时候创建了一个特殊的socket, 这里与我们普通的socket编程无差别,关键是syscall.AF_NETLINK这个参数,代表与内核进程通信:
sock, err := nl.GetNetlinkSocketAt(n, netns.None(), syscall.NETLINK_GENERIC)
func getNetlinkSocket(protocol int) (*NetlinkSocket, error) {
	fd, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW|syscall.SOCK_CLOEXEC, protocol)
	if err != nil {
		return nil, err
	}
	s := &NetlinkSocket{
		fd: int32(fd),
	}
	s.lsa.Family = syscall.AF_NETLINK
	if err := syscall.Bind(fd, &s.lsa); err != nil {
		syscall.Close(fd)
		return nil, err
	}
	return s, nil
}创建一个service, 转换成docker service格式,直接调用:
// AddVirtualServer is part of Interface.
func (runner *runner) AddVirtualServer(vs *VirtualServer) error {
	eSvc, err := toBackendService(vs)
	if err != nil {
		return err
	}
	return runner.ipvsHandle.NewService(eSvc)
}然后就是把service信息打包,往socket里面写即可:
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
	req := newIPVSRequest(cmd)
	req.Seq = atomic.AddUint32(&i.seq, 1)
	if s == nil {
		req.Flags |= syscall.NLM_F_DUMP                    //Flag to dump all messages
		req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
	} else {
		req.AddData(fillService(s))
	} // 把service塞到请求中
	if d == nil {
		if cmd == ipvsCmdGetDest {
			req.Flags |= syscall.NLM_F_DUMP
		}
	} else {
		req.AddData(fillDestinaton(d))
	}
    // 给内核进程发送service信息
	res, err := execute(i.sock, req, 0)
	if err != nil {
		return [][]byte{}, err
	}
	return res, nil
}构造请求
func newIPVSRequest(cmd uint8) *nl.NetlinkRequest {
	return newGenlRequest(ipvsFamily, cmd)
}在构造请求时传入的是ipvs协议簇
然后构造一个与内核通信的消息头
func NewNetlinkRequest(proto, flags int) *NetlinkRequest {
	return &NetlinkRequest{
		NlMsghdr: syscall.NlMsghdr{
			Len:   uint32(syscall.SizeofNlMsghdr),
			Type:  uint16(proto),
			Flags: syscall.NLM_F_REQUEST | uint16(flags),
			Seq:   atomic.AddUint32(&nextSeqNr, 1),
		},
	}
}给消息加Data,这个Data是个数组,需要实现两个方法:
type NetlinkRequestData interface {
	Len() int  // 长度
	Serialize() []byte // 序列化, 内核通信也需要一定的数据格式,service信息也需要实现
}比如 header是这样序列化的, 一看愣住了,思考好久才看懂: 拆下看: ([unsafe.Sizeof(hdr)]byte) 一个[]byte类型,长度就是结构体大小 (unsafe.Pointer(hdr))把结构体转成byte指针类型 加个取它的值 用[:]转成byte返回
func (hdr *genlMsgHdr) Serialize() []byte {
	return (*(*[unsafe.Sizeof(*hdr)]byte)(unsafe.Pointer(hdr)))[:]
}发送service信息给内核
一个很普通的socket发送接收数据
func execute(s *nl.NetlinkSocket, req *nl.NetlinkRequest, resType uint16) ([][]byte, error) {
	var (
		err error
	)
	if err := s.Send(req); err != nil {
		return nil, err
	}
	pid, err := s.GetPid()
	if err != nil {
		return nil, err
	}
	var res [][]byte
done:
	for {
		msgs, err := s.Receive()
		if err != nil {
			return nil, err
		}
		for _, m := range msgs {
			if m.Header.Seq != req.Seq {
				continue
			}
			if m.Header.Pid != pid {
				return nil, fmt.Errorf("Wrong pid %d, expected %d", m.Header.Pid, pid)
			}
			if m.Header.Type == syscall.NLMSG_DONE {
				break done
			}
			if m.Header.Type == syscall.NLMSG_ERROR {
				error := int32(native.Uint32(m.Data[0:4]))
				if error == 0 {
					break done
				}
				return nil, syscall.Errno(-error)
			}
			if resType != 0 && m.Header.Type != resType {
				continue
			}
			res = append(res, m.Data)
			if m.Header.Flags&syscall.NLM_F_MULTI == 0 {
				break done
			}
		}
	}
	return res, nil
}Service 数据打包 这里比较细,核心思想就是内核只认一定格式的标准数据,我们把service信息按其标准打包发送给内核即可。 至于怎么打包的就不详细讲了。
func fillService(s *Service) nl.NetlinkRequestData {
	cmdAttr := nl.NewRtAttr(ipvsCmdAttrService, nil)
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddressFamily, nl.Uint16Attr(s.AddressFamily))
	if s.FWMark != 0 {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFWMark, nl.Uint32Attr(s.FWMark))
	} else {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrProtocol, nl.Uint16Attr(s.Protocol))
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrAddress, rawIPData(s.Address))
		// Port needs to be in network byte order.
		portBuf := new(bytes.Buffer)
		binary.Write(portBuf, binary.BigEndian, s.Port)
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPort, portBuf.Bytes())
	}
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrSchedName, nl.ZeroTerminated(s.SchedName))
	if s.PEName != "" {
		nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
	}
	f := &ipvsFlags{
		flags: s.Flags,
		mask:  0xFFFFFFFF,
	}
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrFlags, f.Serialize())
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrTimeout, nl.Uint32Attr(s.Timeout))
	nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrNetmask, nl.Uint32Attr(s.Netmask))
	return cmdAttr
}Service总体来讲代码比较简单,但是觉得有些地方实现的有点绕,不够简单直接。 总体来说就是监听apiserver事件,然后比对 处理,定期也会去执行同步策略。
以上就是怎样实现kubeproxy源码分析,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注亿速云行业资讯频道。
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。