client-go watch接口隔一段时间自动退出怎么办?
Overview
在使用client-go的watch接口时候碰到异常退出问题,查了一下google没有多少信息,于是扒了一下代码,把自己踩的坑记录下来方便以后自查自纠。
使用client-go watch接口
1. 如何watch
由于kubernetes整合了etcd的watch功能,我们可以通过watch操作去建立一个长连接,不断的接收数据;这种方式要优于普通的反复轮询请求,降低server端的压力;
使用client-go调用对应对象的Watch()方法之后,会返回一个watch.Event对象,可以对其使用ResultChan()接受watch到的对象。
1pod, err := mycluster.Clusterclientset.CoreV1().Pods(appNamespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: label})
2if err != nil {
3 log.Error(err)
4}
5...
6event, ok := <-pod.ResultChan()
7if !ok {
8 log.Error(err)
9}
异常:watch接口自动断开
1. 现象
在使用过程中,watch操作持续一段时间就会自动断开
2. 排查
我们进入watch包里面找到streamwatcher.go,其中节选了一些重要片段:
1type StreamWatcher struct {
2 sync.Mutex
3 source Decoder
4 reporter Reporter
5 result chan Event
6 stopped bool
7}
8...
9func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
10 sw := &StreamWatcher{
11 source: d,
12 reporter: r,
13 // It's easy for a consumer to add buffering via an extra
14 // goroutine/channel, but impossible for them to remove it,
15 // so nonbuffered is better.
16 result: make(chan Event),
17 }
18 go sw.receive()
19 return sw
20}
21...
22func (sw *StreamWatcher) receive() {
23 defer close(sw.result)
24 defer sw.Stop()
25 defer utilruntime.HandleCrash()
26 for {
27 action, obj, err := sw.source.Decode()
28 if err != nil {
29 // Ignore expected error.
30 if sw.stopping() {
31 return
32 }
33 switch err {
34 case io.EOF:
35 // watch closed normally
36 case io.ErrUnexpectedEOF:
37 klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
38 default:
39 if net.IsProbableEOF(err) || net.IsTimeout(err) {
40 klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
41 } else {
42 sw.result <- Event{
43 Type: Error,
44 Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
45 }
46 }
47 }
48 return
49 }
50 sw.result <- Event{
51 Type: action,
52 Object: obj,
53 }
54 }
55}
3. 原因
结合代码看一下,StreamWatcher实现了Watch()方法,我们上述调用ResultChan()的时候,实际上返回的是这里的sw.result;
再往下看新建StreamWatcher的时候,有一个”go sw.receive()”, 也就是几乎在新建对象的同步就开始接受处理数据了,最后看到sw的receive()方法可以看到,在处理数据的时候(sw.source.Decode()), 如果err不为nil, 会switch集中error情况,最后会直接return,然后defer sw.Stop();
也就是说如果接受数据解码的时候(sw.source.Decode()), 如果解码失败,那么StreamWatcher就被关闭了,那自然数据通道也就关闭了,造成”watch一段时间之后自动关闭的现象”。
解决办法(1):forinfor
那么既然是这种情况会导致watch断开,那么我们首先想到的就是暴力恢复这个StreamWatcher,代码实现如下:
1for {
2 pod, err := mycluster.Clusterclientset.CoreV1().Pods(appNamespace).Watch(context.TODO(), metav1.ListOptions{LabelSelector: label})
3 if err != nil {
4 log.Error(err)
5 }
6loopier:
7 for {
8 event, ok := <-pod.ResultChan()
9 if !ok {
10 time.Sleep(time.Second * 5)
11 log.Info("Restarting watcher...")
12 break loopier
13 }
14 // your process logic
15 }
16}
我们定义一层for嵌套,因为在上述退出的时候会先defer close(sw.result),所以我们接受数据的通道也就是上面代码里的pod.ResultChan()就会关闭,然后我们加一个错误处理,等待5s之后,break掉这个loopier循环,让外层的for循环继续新建StreamWatcher继续监听数据。以此达到持续监听的效果,好处是实现简单,坏处是缺少错误判断,不能针对错误类型分别处理,对于一直出错的场景也只是无脑重启。
解决办法(2):retrywatcher
在官方代码下client-go/tools/watch/retrywatcher.go中其实官方给了一个标准解法,用于解决watch异常退出的问题,下面我们看下这种实现方式:
1type RetryWatcher struct {
2 lastResourceVersion string
3 watcherClient cache.Watcher
4 resultChan chan watch.Event
5 stopChan chan struct{}
6 doneChan chan struct{}
7 minRestartDelay time.Duration
8}
9...
10func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
11 switch initialResourceVersion {
12 case "", "0":
13 // TODO: revisit this if we ever get WATCH v2 where it means start "now"
14 // without doing the synthetic list of objects at the beginning (see #74022)
15 return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
16 default:
17 break
18 }
19
20 rw := &RetryWatcher{
21 lastResourceVersion: initialResourceVersion,
22 watcherClient: watcherClient,
23 stopChan: make(chan struct{}),
24 doneChan: make(chan struct{}),
25 resultChan: make(chan watch.Event, 0),
26 minRestartDelay: minRestartDelay,
27 }
28
29 go rw.receive()
30 return rw, nil
31}
和普通的StreamWatcher很类似,这里面RetryWatcher多了一些结构体字段;lastResourceVersion、minRestartDelay用于出错之后重启Watcher的RV保存,以及重试时间;传入initialResourceVersion和watcherClient(cache.Watcher)即可创建一个RetryWatcher;
同理,RetryWatcher也是在创建对象的同时就开始go rw.receive()接受数据。
1func (rw *RetryWatcher) receive() {
2 defer close(rw.doneChan)
3 defer close(rw.resultChan)
4
5 klog.V(4).Info("Starting RetryWatcher.")
6 defer klog.V(4).Info("Stopping RetryWatcher.")
7
8 ctx, cancel := context.WithCancel(context.Background())
9 defer cancel()
10 go func() {
11 select {
12 case <-rw.stopChan:
13 cancel()
14 return
15 case <-ctx.Done():
16 return
17 }
18 }()
19
20 // We use non sliding until so we don't introduce delays on happy path when WATCH call
21 // timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
22 wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
23 done, retryAfter := rw.doReceive()
24 if done {
25 cancel()
26 return
27 }
28
29 time.Sleep(retryAfter)
30
31 klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
32 }, rw.minRestartDelay)
33}
34...
35func (rw *RetryWatcher) Stop() {
36 close(rw.stopChan)
37}
上面代码的receive()函数中,wait包起到核心重试逻辑作用,他会循环执行里面的函数,直到收到context Done 的信号才会往下走;而上面代码的ctx只有两种情况才会被关闭:
- 有人调用了RetryWatcher的Stop();
- 另外就是rw.doReceive()中返回了done, 也会直接调用cancel()结束wait部分。
而如果接受的done为false,则会正常等待time.Sleep(retryAfter)之后,进行重试,实现RetryWatcher!
接下来就看下这个rw.doReceive(),也就是RetryWatcher的接收处理数据部分, 同时会根据err类型判断是否应该重试:
1func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
2 watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
3 ResourceVersion: rw.lastResourceVersion,
4 AllowWatchBookmarks: true,
5 })
6 // We are very unlikely to hit EOF here since we are just establishing the call,
7 // but it may happen that the apiserver is just shutting down (e.g. being restarted)
8 // This is consistent with how it is handled for informers
9 switch err {
10...
11// 省略watch的一些错误处理,都会返回false,也就是继续wait重试
12...
13 }
14
15 if watcher == nil {
16 klog.Error("Watch returned nil watcher")
17 // Retry
18 return false, 0
19 }
20
21 ch := watcher.ResultChan()
22 defer watcher.Stop()
23
24 for {
25 select {
26 case <-rw.stopChan:
27 klog.V(4).Info("Stopping RetryWatcher.")
28 return true, 0
29 case event, ok := <-ch:
30 if !ok {
31 klog.V(4).Infof("Failed to get event! Re-creating the watcher. Last RV: %s", rw.lastResourceVersion)
32 return false, 0
33 }
34
35 // We need to inspect the event and get ResourceVersion out of it
36 switch event.Type {
37 case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
38 metaObject, ok := event.Object.(resourceVersionGetter)
39 ...
40 resourceVersion := metaObject.GetResourceVersion()
41 ...
42 // All is fine; send the non-bookmark events and update resource version.
43 if event.Type != watch.Bookmark {
44 ok = rw.send(event)
45 if !ok {
46 return true, 0
47 }
48 }
49 rw.lastResourceVersion = resourceVersion
50
51 continue
52
53 case watch.Error:
54 ...
55 }
56 }
57 }
58}
59...
60func (rw *RetryWatcher) send(event watch.Event) bool {
61 // Writing to an unbuffered channel is blocking operation
62 // and we need to check if stop wasn't requested while doing so.
63 select {
64 case rw.resultChan <- event:
65 return true
66 case <-rw.stopChan:
67 return false
68 }
69}
上面代码我已经非常清晰的展示出了doReceive()的处理逻辑,第一步会按照rw中定义的watcher开始真实的监听对应资源对象,这里返回错误的话也会进行rw的重试逻辑;然后会获取真实的watcher.ResultChan()也就是可以获取到真实对象的通道,套用for select模式,循环接受数据,如果数据一直是正常的,那么会通过rw.send(event)发送给rw.ResultChan,然后记录保存rw.lastResourceVersion然后继续接收,实现watch的功能;
这里多说一句,由于rw.lastResourceVersion是保存在rw的,也就是及时重启(对应上面的任意一个case返回的是true),会从rw.lastResourceVersion也就是最新的RV开始监听,这样实现根据特定原因重启故障的watcher,比较合理,也很巧妙,是官方的标准答案。
个人版RetryWatcher代码实现:
说了那么多,那么具体要怎么使用这个RetryWatcher呢?我个人做了一个妥协方案可以参考:
还记得RetryWatcher中定义的watcherClient类型吗,需要是cache.Watcher,然后我们看client-go/tools/cache里面定义的cache.Watcher接口,定义如下:
1type Watcher interface {
2 // Watch should begin a watch at the specified version.
3 Watch(options metav1.ListOptions) (watch.Interface, error)
4}
也就是实现了Watch(xxxx)这个方法的就是符合的cache.Watcher; 而最开始我们代码正好是通过MyCluster.Clusterclientset.CoreV1().Pods()返回的接口v1.PodInterface中调用的Watch(xxxxx), 那么直接用“MyCluster.Clusterclientset.CoreV1().Pods()”来生成RetryWatcher是不是就行了!
我们来看一下这个接口类型:
1type PodInterface interface {
2 ...
3 Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
4 ...
5}
答案是不行… 下面这个多了一个ctx参数 🥶,但是我们就是想从Clientset这里用怎么办,自己实现一个符合的结构吧:
1type PodListWatch struct {
2 MyCluster *initk8s.MyCluster
3 AppNamespace string
4}
5// watch指定命名空间下的Pod例子
6func (p PodListWatch) Watch(options metav1.ListOptions) (watch.Interface, error) {
7 return p.MyCluster.Clusterclientset.CoreV1().Pods(p.AppNamespace).Watch(context.TODO(), options)
8}
这个PodListWatch的Watch(xxxx)方法正好满足cache.Watcher, 单后watch的方式还是按照我们原先Clientset的,然后生成RetryWatcher的方式如下:
1func ResourceWatcher(mycluster *initk8s.MyCluster, appNamespace string) {
2 podWatcher := PodListWatch{MyCluster: mycluster, AppNamespace: appNamespace}
3 // generate new retrywatcher, use podRetryWatcher.ResultChan() to keep a chronic watch operation
4 podRetryWatcher, err := retrywatcher.NewRetryWatcher(label, podWatcher)
5 if err != nil {
6 log.Error(err)
7 }
8FORWATCHER:
9 for {
10 select {
11 case <-podRetryWatcher.Done():
12 log.Info("Retrywatcher is exiting, please check...")
13 break FORWATCHER
14 case event, ok := <-podRetryWatcher.ResultChan():
15 if !ok {
16 log.Warn("Retrywatcher is not open, please check...")
17 continue
18 }
19 ...
上述只是一种思路提供,希望能够帮到使用client-go进行watch的同学;官方的实现RetryWatcher确实更加合理,还可以进行改造加减不同情况是否重试的策略。另外watch方法毕竟是直接和kubernetes中的apiserver沟通,如果想要减轻apiserver的压力kubernetes提供了更加常用的informer机制(sharedinformer也是众多controller使用的,同时也是我认为kubernetes最核心的功能),至于使用informer就又有很多要说的了,以后有时间可能会更新~