client-go watch接口隔一段时间自动退出怎么办?

Overview

在使用client-go的watch接口时候碰到异常退出问题,查了一下google没有多少信息,于是扒了一下代码,把自己踩的坑记录下来方便以后自查自纠。

使用client-go watch接口

1. 如何watch

由于kubernetes整合了etcd的watch功能,我们可以通过watch操作去建立一个长连接,不断的接收数据;这种方式要优于普通的反复轮询请求,降低server端的压力;

使用client-go调用对应对象的Watch()方法之后,会返回一个watch.Event对象,可以对其使用ResultChan()接受watch到的对象。

1pod, err := mycluster.Clusterclientset.CoreV1().Pods(appNamespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: label})
2if err != nil {
3    log.Error(err)
4}
5...
6event, ok := <-pod.ResultChan()
7if !ok {
8    log.Error(err)
9}

异常:watch接口自动断开

1. 现象

在使用过程中,watch操作持续一段时间就会自动断开

2. 排查

我们进入watch包里面找到streamwatcher.go,其中节选了一些重要片段:

 1type StreamWatcher struct {
 2	sync.Mutex
 3	source   Decoder
 4	reporter Reporter
 5	result   chan Event
 6	stopped  bool
 7}
 8...
 9func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
10	sw := &StreamWatcher{
11		source:   d,
12		reporter: r,
13		// It's easy for a consumer to add buffering via an extra
14		// goroutine/channel, but impossible for them to remove it,
15		// so nonbuffered is better.
16		result: make(chan Event),
17	}
18	go sw.receive()
19	return sw
20}
21...
22func (sw *StreamWatcher) receive() {
23	defer close(sw.result)
24	defer sw.Stop()
25	defer utilruntime.HandleCrash()
26	for {
27		action, obj, err := sw.source.Decode()
28		if err != nil {
29			// Ignore expected error.
30			if sw.stopping() {
31				return
32			}
33			switch err {
34			case io.EOF:
35				// watch closed normally
36			case io.ErrUnexpectedEOF:
37				klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
38			default:
39				if net.IsProbableEOF(err) || net.IsTimeout(err) {
40					klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
41				} else {
42					sw.result <- Event{
43						Type:   Error,
44						Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
45					}
46				}
47			}
48			return
49		}
50		sw.result <- Event{
51			Type:   action,
52			Object: obj,
53		}
54	}
55}

3. 原因

结合代码看一下,StreamWatcher实现了Watch()方法,我们上述调用ResultChan()的时候,实际上返回的是这里的sw.result;

再往下看新建StreamWatcher的时候,有一个”go sw.receive()”, 也就是几乎在新建对象的同步就开始接受处理数据了,最后看到sw的receive()方法可以看到,在处理数据的时候(sw.source.Decode()), 如果err不为nil, 会switch集中error情况,最后会直接return,然后defer sw.Stop();

也就是说如果接受数据解码的时候(sw.source.Decode()), 如果解码失败,那么StreamWatcher就被关闭了,那自然数据通道也就关闭了,造成”watch一段时间之后自动关闭的现象”。

解决办法(1):forinfor

那么既然是这种情况会导致watch断开,那么我们首先想到的就是暴力恢复这个StreamWatcher,代码实现如下:

 1for {
 2	pod, err := mycluster.Clusterclientset.CoreV1().Pods(appNamespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: label})
 3	if err != nil {
 4		log.Error(err)
 5	}
 6loopier:
 7	for {
 8		event, ok := <-pod.ResultChan()
 9		if !ok {
10			time.Sleep(time.Second * 5)
11			log.Info("Restarting watcher...")
12			break loopier
13		}
14		// your process logic
15	}
16}

我们定义一层for嵌套,因为在上述退出的时候会先defer close(sw.result),所以我们接受数据的通道也就是上面代码里的pod.ResultChan()就会关闭,然后我们加一个错误处理,等待5s之后,break掉这个loopier循环,让外层的for循环继续新建StreamWatcher继续监听数据。以此达到持续监听的效果,好处是实现简单,坏处是缺少错误判断,不能针对错误类型分别处理,对于一直出错的场景也只是无脑重启。

解决办法(2):retrywatcher

在官方代码下client-go/tools/watch/retrywatcher.go中其实官方给了一个标准解法,用于解决watch异常退出的问题,下面我们看下这种实现方式:

 1type RetryWatcher struct {
 2	lastResourceVersion string
 3	watcherClient       cache.Watcher
 4	resultChan          chan watch.Event
 5	stopChan            chan struct{}
 6	doneChan            chan struct{}
 7	minRestartDelay     time.Duration
 8}
 9...
10func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
11	switch initialResourceVersion {
12	case "", "0":
13		// TODO: revisit this if we ever get WATCH v2 where it means start "now"
14		//       without doing the synthetic list of objects at the beginning (see #74022)
15		return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
16	default:
17		break
18	}
19
20	rw := &RetryWatcher{
21		lastResourceVersion: initialResourceVersion,
22		watcherClient:       watcherClient,
23		stopChan:            make(chan struct{}),
24		doneChan:            make(chan struct{}),
25		resultChan:          make(chan watch.Event, 0),
26		minRestartDelay:     minRestartDelay,
27	}
28
29	go rw.receive()
30	return rw, nil
31}

和普通的StreamWatcher很类似,这里面RetryWatcher多了一些结构体字段;lastResourceVersion、minRestartDelay用于出错之后重启Watcher的RV保存,以及重试时间;传入initialResourceVersion和watcherClient(cache.Watcher)即可创建一个RetryWatcher;

同理,RetryWatcher也是在创建对象的同时就开始go rw.receive()接受数据。

 1func (rw *RetryWatcher) receive() {
 2	defer close(rw.doneChan)
 3	defer close(rw.resultChan)
 4
 5	klog.V(4).Info("Starting RetryWatcher.")
 6	defer klog.V(4).Info("Stopping RetryWatcher.")
 7
 8	ctx, cancel := context.WithCancel(context.Background())
 9	defer cancel()
10	go func() {
11		select {
12		case <-rw.stopChan:
13			cancel()
14			return
15		case <-ctx.Done():
16			return
17		}
18	}()
19
20	// We use non sliding until so we don't introduce delays on happy path when WATCH call
21	// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
22	wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
23		done, retryAfter := rw.doReceive()
24		if done {
25			cancel()
26			return
27		}
28
29		time.Sleep(retryAfter)
30
31		klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
32	}, rw.minRestartDelay)
33}
34...
35func (rw *RetryWatcher) Stop() {
36	close(rw.stopChan)
37}

上面代码的receive()函数中,wait包起到核心重试逻辑作用,他会循环执行里面的函数,直到收到context Done 的信号才会往下走;而上面代码的ctx只有两种情况才会被关闭:

  1. 有人调用了RetryWatcher的Stop();
  2. 另外就是rw.doReceive()中返回了done, 也会直接调用cancel()结束wait部分。

而如果接受的done为false,则会正常等待time.Sleep(retryAfter)之后,进行重试,实现RetryWatcher!

接下来就看下这个rw.doReceive(),也就是RetryWatcher的接收处理数据部分, 同时会根据err类型判断是否应该重试:

 1func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
 2	watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
 3		ResourceVersion:     rw.lastResourceVersion,
 4		AllowWatchBookmarks: true,
 5	})
 6	// We are very unlikely to hit EOF here since we are just establishing the call,
 7	// but it may happen that the apiserver is just shutting down (e.g. being restarted)
 8	// This is consistent with how it is handled for informers
 9	switch err {
10...
11// 省略watch的一些错误处理,都会返回false,也就是继续wait重试
12...
13	}
14
15	if watcher == nil {
16		klog.Error("Watch returned nil watcher")
17		// Retry
18		return false, 0
19	}
20
21	ch := watcher.ResultChan()
22	defer watcher.Stop()
23
24	for {
25		select {
26		case <-rw.stopChan:
27			klog.V(4).Info("Stopping RetryWatcher.")
28			return true, 0
29		case event, ok := <-ch:
30			if !ok {
31				klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
32				return false, 0
33			}
34
35			// We need to inspect the event and get ResourceVersion out of it
36			switch event.Type {
37			case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
38				metaObject, ok := event.Object.(resourceVersionGetter)
39				...
40				resourceVersion := metaObject.GetResourceVersion()
41				...
42        // All is fine; send the non-bookmark events and update resource version.
43				if event.Type != watch.Bookmark {
44					ok = rw.send(event)
45					if !ok {
46						return true, 0
47					}
48				}
49				rw.lastResourceVersion = resourceVersion
50
51				continue
52
53			case watch.Error:
54			...
55			}
56		}
57	}
58}
59...
60func (rw *RetryWatcher) send(event watch.Event) bool {
61	// Writing to an unbuffered channel is blocking operation
62	// and we need to check if stop wasn't requested while doing so.
63	select {
64	case rw.resultChan <- event:
65		return true
66	case <-rw.stopChan:
67		return false
68	}
69}

上面代码我已经非常清晰的展示出了doReceive()的处理逻辑,第一步会按照rw中定义的watcher开始真实的监听对应资源对象,这里返回错误的话也会进行rw的重试逻辑;然后会获取真实的watcher.ResultChan()也就是可以获取到真实对象的通道,套用for select模式,循环接受数据,如果数据一直是正常的,那么会通过rw.send(event)发送给rw.ResultChan,然后记录保存rw.lastResourceVersion然后继续接收,实现watch的功能

这里多说一句,由于rw.lastResourceVersion是保存在rw的,也就是及时重启(对应上面的任意一个case返回的是true),会从rw.lastResourceVersion也就是最新的RV开始监听,这样实现根据特定原因重启故障的watcher,比较合理,也很巧妙,是官方的标准答案。

个人版RetryWatcher代码实现:

说了那么多,那么具体要怎么使用这个RetryWatcher呢?我个人做了一个妥协方案可以参考:

还记得RetryWatcher中定义的watcherClient类型吗,需要是cache.Watcher,然后我们看client-go/tools/cache里面定义的cache.Watcher接口,定义如下:

1type Watcher interface {
2	// Watch should begin a watch at the specified version.
3	Watch(options metav1.ListOptions) (watch.Interface, error)
4}

也就是实现了Watch(xxxx)这个方法的就是符合的cache.Watcher; 而最开始我们代码正好是通过MyCluster.Clusterclientset.CoreV1().Pods()返回的接口v1.PodInterface中调用的Watch(xxxxx), 那么直接用“MyCluster.Clusterclientset.CoreV1().Pods()”来生成RetryWatcher是不是就行了!

我们来看一下这个接口类型:

1type PodInterface interface {
2	...
3	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
4	...
5}

答案是不行… 下面这个多了一个ctx参数 🥶,但是我们就是想从Clientset这里用怎么办,自己实现一个符合的结构吧:

1type PodListWatch struct {
2	MyCluster    *initk8s.MyCluster
3	AppNamespace string
4}
5// watch指定命名空间下的Pod例子 
6func (p PodListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
7	return p.MyCluster.Clusterclientset.CoreV1().Pods(p.AppNamespace).Watch(context.TODO(), options)
8}

这个PodListWatch的Watch(xxxx)方法正好满足cache.Watcher, 单后watch的方式还是按照我们原先Clientset的,然后生成RetryWatcher的方式如下:

 1func ResourceWatcher(mycluster *initk8s.MyCluster, appNamespace string) {
 2	podWatcher := PodListWatch{MyCluster: mycluster, AppNamespace: appNamespace}
 3	// generate new retrywatcher, use podRetryWatcher.ResultChan() to keep a chronic watch operation
 4	podRetryWatcher, err := retrywatcher.NewRetryWatcher(label, podWatcher)
 5	if err != nil {
 6		log.Error(err)
 7	}
 8FORWATCHER:
 9	for {
10		select {
11		case <-podRetryWatcher.Done():
12			log.Info("Retrywatcher is exiting, please check...")
13			break FORWATCHER
14		case event, ok := <-podRetryWatcher.ResultChan():
15			if !ok {
16				log.Warn("Retrywatcher is not open, please check...")
17				continue
18			}
19			...

上述只是一种思路提供,希望能够帮到使用client-go进行watch的同学;官方的实现RetryWatcher确实更加合理,还可以进行改造加减不同情况是否重试的策略。另外watch方法毕竟是直接和kubernetes中的apiserver沟通,如果想要减轻apiserver的压力kubernetes提供了更加常用的informer机制(sharedinformer也是众多controller使用的,同时也是我认为kubernetes最核心的功能),至于使用informer就又有很多要说的了,以后有时间可能会更新~