Problem

What are listers and reflectors in client-go?

I am going to talk about the above problem in the context of cluster-autoscaler.

Pre-requisites

This blogpost assumes you understand and have used:

  1. client-go (what should I do if I don’t? Check the official docs)
  2. Kubernetes (what should I do if I don’t? Check the official docs)
  3. cluster-autoscaler (what should I do if I don’t? Check the official docs)

Background

Let’s try to understand them in the context of cluster-autoscaler (CA for short). Think of CA as a loop that runs a function every 10 seconds. This function queries pods, nodes and many other resources (1,2,3) using Listers to reduce the load on the Kubernetes api-server.

What really is a Lister anyway?

Lister allows the you to list and get Kubernetes resources of a particular Kind (e.g., Pods)
Well, why can’t we just use client-go to list and get resources (say pods) like this or like this? What’s wrong with that? Let’s take a closer look at client-go for that:

1
2
3
4
// File: client-go/examples/in-cluster-client-configuration/main.go
51: 		// get pods in all the namespaces by omitting namespace
52: 		// Or specify namespace to get pods in particular namespace
53: 		pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})

source

Here’s the code for List:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// File: client-go/kubernetes/typed/core/v1/pod.go
087: // List takes label and field selectors, and returns the list of Pods that match those selectors.
088: func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
089: 	var timeout time.Duration
090: 	if opts.TimeoutSeconds != nil {
091: 		timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
092: 	}
093: 	result = &v1.PodList{}
094: 	err = c.client.Get().
095: 		Namespace(c.ns).
096: 		Resource("pods").
097: 		VersionedParams(&opts, scheme.ParameterCodec).
098: 		Timeout(timeout).
099: 		Do(ctx). // <- Notice this
100: 		Into(result)
101: 	return
102: }

source

Here’s the code for Do():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// File: client-go/rest/request.go
1014: // Do formats and executes the request. Returns a Result object for easy response
1015: // processing.
1016: //
1017: // Error type:
1018: //   - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
1019: //   - http.Client.Do errors are returned directly.
1020: func (r *Request) Do(ctx context.Context) Result {
1021: 	var result Result
1022: 	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
1023: 		result = r.transformResponse(resp, req)
1024: 	})

source

As you see, Do makes an http request to the api-server. This is important. Everytime you want to list pods, you have to make a request to the api-server. Every list response will have many pods in it which were there in the last list response too. This duplication is just putting extra load on the api-server. Is there a way to reduce that load when listing pods? Yes, listers is one of the ways to do it.

What’s so different about a lister? Let’s look at a lister in CA codebase used to list pods to understand this:

1
2
// File: cluster-autoscaler/core/static_autoscaler.go
286: 	originalScheduledPods, err := scheduledPodLister.List()

source

List is defined here:

1
2
3
4
5
6
// File: cluster-autoscaler/utils/kubernetes/listers.go
239: // List returns all scheduled pods.
240: func (lister *ScheduledPodLister) List() ([]*apiv1.Pod, error) {
241: 	return lister.podLister.List(labels.Everything())
242: }

source

Notice the name ScheduledPodLister and how it is a wrapper around another internal lister called podLister.

podLister.List() is defined here:

1
2
3
4
5
6
7
8
9
// File: cluster-autoscaler/listers/core/v1/pod.go
49: // List lists all Pods in the indexer.
50: func (s *podLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
51: 	err = cache.ListAll(s.indexer, selector, func(m interface{}) {
52: 		ret = append(ret, m.(*v1.Pod))
53: 	})
54: 	return ret, err
55: }

source
Notice how List function above is not making an http request to the api-server. It’s requesting the data from a Cache.

s.indexer is of the type cache.Indexer. Think of cache.Indexer as Cache to store pods.

Note: I am going to use “Cache” with a capital C to denote the concept while cache or cache to denote the go package with the same name.

From all of this, it can be inferred that lister allows you to list pods without actually hitting the api-server, thereby reducing the load on the api-server

But, how is the Cache above filled in the first place?
This is where Reflector comes into the picture. Think of Reflector as a loop that runs forever in the background to populate our Cache. How does this help?

Well, let’s look at where ScheduledPodLister and podLister are initialized to understand this better:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// File: cluster-autoscaler/utils/kubernetes/listers.go
206: // NewScheduledPodLister builds ScheduledPodLister
207: func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodLister {
...
211: 	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
212: 	store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
213: 	podLister := v1lister.NewPodLister(store)
214: 	go reflector.Run(stopchannel)
215: 
216: 	return &ScheduledPodLister{
217: 		podLister: podLister,
218: 	}
219: }

source

I have skipped some lines in the code which seemed irrelevant. You can check the source link above to see the skipped lines).

Let’s dissect it line-by-line

1
207: func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodLister {

Let’s look at the arguments:

  1. kubeClient: Interface of type client.Interface which can be used to query in-built Kubernetes resources.
  2. stopChannel: This is used to stop the reflector from running. Sending a message on the channel would stop the reflector from running (we will get to where this happens in a bit).

Return type PodLister is an interface with only one function in it which looks like List() ([]*apiv1.Pod, error) (remember ScheduledPodLister implements this function with List() and calls podLister internally). The interface is defined here.

1
211: 	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)

This is an interesting piece of code. We are asking NewListWatchFromClient (a function cache package provides us for convenience) to give us a cache.ListWatch struct if the correct RESTClient (in this case, the RESTClient is returned by kubeClient.CoreV1().RESTClient()), resource kind (in this case, pods), namespace (in this case, "" which denotes all pods) and selector (read label selector) is passed to it.

cache.ListWatch is a struct which contains a list and a watch function. This is what helps us fill our cache. We run the reflector which calls the list function to list pods at the beginning and then the watch function forever to watch for new updates from the API Server (note that watch function is a streaming connection i.e., the API Server sends updates to the watch function; I will get to what this means in a moment). e.g., (let’s assume we start our reflector at 10:00AM)

  1. 10:00AM: list func gets a lists of pods from the API Server. It finds pods A,B,C. It adds the pods to Cache | Cache: [A,B,C]
  2. 10:00AM: watch func starts | Cache: [A,B,C]
  3. 10:03AM: user creates a new pod D | Cache: [A,B,C]
  4. 10:03AM: API Server informs watch func about D | Cache: [A,B,C]
  5. 10:03AM: watch func updates the Cache with new pod D | Cache: [A,B,C,D]
  6. 10:12AM: user creates a new pod E | Cache: [A,B,C,D]
  7. 10:12AM: API Server informs watch func about E | Cache: [A,B,C,D]
  8. 10:12AM: watch func updates the Cache with new pod E | Cache: [A,B,C,D,E]
  9. 10:20AM: some problem happens with watch (we get an error)
  10. 10:20AM: start from 1 again

I have over-simplified things here. It is assumed that

  • watch func receives updates from the API Server immediately
  • Cache looks like an array of pods (which is not actually true; it’s more like a hash map)

The benefit of doing 1-10 instead of directly querying the API server using client-go:

  1. We don’t have to list pods every couple of seconds like we would do in polling
  2. We let API Server tell us about new pods

This saves load on the API Server. What happens if the process/controller doing list and watch dies? when a new process/controller comes up again, list func is called first which helps the Cache to catch up and the watch function runs again.

When you send a watch request, the API server responds with a stream of changes. These changes itemize the outcome of operations (such as create, delete, and update) that occurred after the resourceVersion you specified as a parameter to the watch request. The overall watch mechanism allows a client to fetch the current state and then subscribe to subsequent changes, without missing any events.

If a client watch is disconnected then that client can start a new watch from the last returned resourceVersion; the client could also perform a fresh get / list request and begin again. See Resource Version Semantics for more detail.

https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes

Let’s take a closer look at cache.ListWatch. It looks like this in the code:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// File: client-go/tools/cache/listwatch.go
54: // ListWatch knows how to list and watch a set of apiserver resources.  It satisfies the ListerWatcher interface.
55: // It is a convenience function for users of NewReflector, etc.
56: // ListFunc and WatchFunc must not be nil
57: type ListWatch struct {
58: 	ListFunc  ListFunc
59: 	WatchFunc WatchFunc
60: 	// DisableChunking requests no chunking for this list watcher.
61: 	DisableChunking bool
62: }

source
Let’s ignore DisableChunking for now.

If we look at what ListFunc and WatchFunc refer to:

1
2
3
4
5
6
7
// File: client-go/tools/cache/listwatch.go
48: // ListFunc knows how to list resources
49: type ListFunc func(options metav1.ListOptions) (runtime.Object, error)
50: 
51: // WatchFunc knows how to watch resources
52: type WatchFunc func(options metav1.ListOptions) (watch.Interface, error)

source

ListFunc returns a runtime.Object which accepts an optional metav1.ListOptions (I say optional because you can pass metav1.ListOptions{} and the function will work). Note that runtime.Object here is an interface implemented by both Pod (ref1, ref2, ref3) and PodList(ref1, ref2, ref3) object. In this particular case runtime.Object refers to PodList object.

WatchFunc also accepts an optional metav1.ListOptions and returns a watch.Interface.

Let’s look at an actual implementation of ListFunc and WatchFunc to understand it better.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// File: client-go/tools/cache/listwatch.go
81: 	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
82: 		optionsModifier(&options)
83: 		return c.Get().
84: 			Namespace(namespace).
85: 			Resource(resource).
86: 			VersionedParams(&options, metav1.ParameterCodec).
87: 			Do(context.TODO()). // <- Notice this
88: 			Get()
89: 	}
90: 	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
91: 		options.Watch = true
92: 		optionsModifier(&options)
93: 		return c.Get().
94: 			Namespace(namespace).
95: 			Resource(resource).
96: 			VersionedParams(&options, metav1.ParameterCodec).
97: 			Watch(context.TODO()) // <- Notice this
98: 	}

source

listFunc makes a normal HTTP call to get a list of pods:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
// File: client-go/rest/request.go
1014: // Do formats and executes the request. Returns a Result object for easy response
1015: // processing.
1016: //
1017: // Error type:
1018: //   - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
1019: //   - http.Client.Do errors are returned directly.
1020: func (r *Request) Do(ctx context.Context) Result {
1021: 	var result Result
1022: 	err := r.request(ctx, func(req *http.Request, resp *http.Response) {
1023: 		result = r.transformResponse(resp, req)
1024: 	})
1025: 	if err != nil {
1026: 		return Result{err: err}
1027: 	}
1028: 	if result.err == nil || len(result.body) > 0 {
1029: 		metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
1030: 	}
1031: 	return result
1032: }

source

watchFunc creates a streaming HTTP connection to receive updates from the API Server. Here’s a simplified version of code for Watch (check source below for the actual code):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// File: vendor/k8s.io/client-go/rest/request.go
694: // Watch attempts to begin watching the requested location.
695: // Returns a watch.Interface, or an error.
696: func (r *Request) Watch(ctx context.Context) (watch.Interface, error) {
697: 	...
716: 	retry := r.retryFn(r.maxRetries)
717: 	url := r.URL().String()
718: 	for {
719: 		...
722: 
723: 		req, err := r.newHTTPRequest(ctx)
724: 		if err != nil {
725: 			return nil, err
726: 		}
727: 
728: 		resp, err := client.Do(req)
            ...
730: 		retry.After(ctx, r, resp, err)
731: 		if err == nil && resp.StatusCode == http.StatusOK {
732: 			return r.newStreamWatcher(resp)
733: 		}
734: 
735: 		...
762: 	}
763: }

source

Note how newHTTPRequest is called on line :723. Now, you might have the following questions:

  1. What is the for loop for?
    Notice the r.maxRetries. The for loop runs continuously but retry.After adds a sleep in between and exponential backoff if client.Do on line :728 errors. It does this until maxRetries are breached. If client.Do is successful, it means we are successful in creating a streaming connection. Return r.newStreamWatcher which would be used to reading contents sent by the api-server.
  2. Isn’t this an HTTP request? What’s so special about it? To answer this question, let’s look at the following code from above again:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// File: client-go/tools/cache/listwatch.go
81: 	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
82: 		optionsModifier(&options)
83: 		return c.Get().
84: 			Namespace(namespace).
85: 			Resource(resource).
86: 			VersionedParams(&options, metav1.ParameterCodec).
87: 			Do(context.TODO()).
88: 			Get()
89: 	}
90: 	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
91: 		options.Watch = true // <- Notice this
92: 		optionsModifier(&options)
93: 		return c.Get().
94: 			Namespace(namespace).
95: 			Resource(resource).
96: 			VersionedParams(&options, metav1.ParameterCodec).
97: 			Watch(context.TODO())
98: 	}

source

Notice options.Watch = true on line :91 (we don’t see any such thing for listFunc). This is important. Why? Because this instructs the client to initiate a streaming watch connection.

https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes

Note that Watch = true is actually converted to ?watch=true in the query parameters. The screenshot above says watch=1 which is not correct. watch is defined as a boolean in the Kubernetes open-api spec here. Check the API reference for watch here

Going back to the code for definition a lister:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// File: utils/kubernetes/listers.go
206: // NewScheduledPodLister builds ScheduledPodLister
207: func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struct{}) PodLister {
...
211: 	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", apiv1.NamespaceAll, selector)
212: 	store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)
213: 	podLister := v1lister.NewPodLister(store)
214: 	go reflector.Run(stopchannel)
215: 
216: 	return &ScheduledPodLister{
217: 		podLister: podLister,
218: 	}
219: }

source

I hope line :211 makes more sense now.

Looking at line :212:

1
212: 	store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(podListWatch, &apiv1.Pod{}, time.Hour)

cache.NewNamespaceKeyedIndexerAndReflector is a convenience function from the cache package which given the listwatch struct, object and resync duration (you can ignore resync duration for now), it gives us a cache.Store (cache.Indexer embeds cache.Store) and a reflector.

Note: re-list and re-sync are two different things. re-list only happens once (unless there is an error or the reflector’s Run() function exits for some reason: ref1, ref2). re-sync makes a lot more sense in context of informers (I am ignoring this interesting construct for now because it is not relevant in this blogpost). You might find this google group thread interesting.

cache.Store is used to create our lister (which we use to query pods instead of directly hitting the API Server)

1
213: 	podLister := v1lister.NewPodLister(store)

while the reflector is the thing which runs continuously to populate the Cache with new pod objects.

1
214: 	go reflector.Run(stopchannel)

Since we have already looked at listers, let’s take a closer look at reflector.Run above.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// File: client-go/tools/cache/reflector.go
276: // Run repeatedly uses the reflector's ListAndWatch to fetch all the
277: // objects and subsequent deltas.
278: // Run will exit when stopCh is closed.
279: func (r *Reflector) Run(stopCh <-chan struct{}) {
280: 	klog.V(3).Infof("Starting reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
281: 	wait.BackoffUntil(func() {
282: 		if err := r.ListAndWatch(stopCh); err != nil {
283: 			r.watchErrorHandler(r, err)
284: 		}
285: 	}, r.backoffManager, true, stopCh)
286: 	klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.typeDescription, r.resyncPeriod, r.name)
287: }

source

BackoffUntil is called if ListAndWatch exits for some reason. Let’s look at ListAndWatch (I have removed anything but the bare bones of the function to easily see what’s happening)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// File: client-go/tools/cache/reflector.go
312: // ListAndWatch first lists all items and get the resource version at the moment of call,
313: // and then use the resource version to watch.
314: // It returns error if ListAndWatch didn't even try to initialize watch.
315: func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
... 
318: 	err := r.list(stopCh)
319: 	if err != nil {
320: 		return err
321: 	}
322: 
...
352: 	for {
353: 		// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
354: 		select {
355: 		case <-stopCh:
356: 			return nil
357: 		default:
358: 		}

...
374: 		w, err := r.listerWatcher.Watch(options)
375: 		if err != nil {
...
385: 			return err
386: 		}
387: 
388: 		err = watchHandler(start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, r.clock, resyncerrc, stopCh)

...
411:	}
412:}

source
As you can see, list (list here is a wrapper function which internally calls r.listerWatcher.List()) is called at the beginning and watch takes over. Notice how stopCh can be used to stop the reflector because we keep watching for it from line :354 to line :358. watchHandler is what reads the content from watch that api-server sends and updates the cache.

The last line creates and returns a ScheduledPodLister.

1
2
3
4
216: 	return &ScheduledPodLister{
217: 		podLister: podLister,
218: 	}

To recap,

  1. Querying API Server directly using client-go in a loop puts load on the API Server
  2. To get around this we use listers. Listers use a Cache internally to query resources (like pods) instead of hitting the API Server directly
  3. The Cache is updated by reflector using list and watch function.

Conclusion

I hope that gives you an understanding of listers, reflectors, list/watch functions and how they work together. I haven’t covered Informers or delved too deep into things like cache.Store, cache.Indexer. I think they are interesting topics in their own right and deserve a separate blogpost.

Feedback

I appreciate any sort (good and bad) of constructive feedback. Feel free to mail me at surajrbanakar@gmail.com.