┏━━━━━━━━━━━━━━━━━━━━━━━━┓

┗━━━━━━━━━━━━━━━━━━━━━━━━┛
(Possibly) Naïve thoughts regarding Go.

The Ultimate Channel Abstraction

PV/UV:/ #Go #Synchronization #Deadlock

Author(s): Changkun Ou

Permalink: https://golang.design/research/ultimate-channel

Recently, I have been rethinking the programming patterns regarding graphics applications, and already wrote a 3D graphics package in Go, called polyred. While I was designing the rendering pipeline APIs, a tricky deadlock struggled with me for a while and led to creating an unbounded channel as a workaround solution eventually.

The problem

At the beginning of my design, I had to deal with OpenGL where a chunk of APIs must be executed on the main thread and issue a draw call is one of those infamous. The common pattern in graphics programming is as follows:

1app := newApp()
2driver := initDriver()
3ctx := driver.Context()
4
5for !app.IsClosed() {
6	ctx.Clear()
7	processingDrawCalls(ctx)
8	processingInputEvents()
9}

The entire GUI application is executed in an infinite loop that contains two parts: draw call processing and event processing.

Typically, all these codes run on the CPU, and the actual rendering computation executes on a GPU. That means, the graphics API provided by a graphic driver (such as OpenGL, Vulkan, Metal, Direct X) is just a communication command send from the CPU to the GPU or even waiting for a response from the GPU. For some special reasons, the polyred is limited to software implementation, a pure-CPU implementation. Hence, the execution should utilize the full power of CPU parallelization. It makes much more sense to execute rendering on a separate goroutine so that it won't block the event processing thread.

*Aside: To guarantee an application's responsiveness, it is ideal not to block the event processing since there might also be system invocation.

Subsequently, I turned the rendering loop into a separate goroutine and sent the rendering result to the event processing loop to be flushed to the hardware display. The entire application works as the following code snippet:

 1// WARNING: This example contains a deadlock.
 2package main
 3
 4import (
 5	"fmt"
 6	"math/rand"
 7	"time"
 8)
 9
10type ResizeEvent struct {
11	width, height int
12}
13
14type RenderProfile struct {
15	id     int
16	width  int
17	height int
18}
19
20// Draw executes a draw call by the given render profile
21func (p *RenderProfile) Draw() interface{} {
22	return fmt.Sprintf("draw-%d-%dx%d", p.id, p.width, p.height)
23}
24
25func main() {
26	// draw is a channel for receiving finished draw calls.
27	draw := make(chan interface{})
28	// change is a channel to receive notification of the change of rendering settings.
29	change := make(chan ResizeEvent)
30
31	// Rendering Thread
32	//
33	// Sending draw calls to the event thread in order to draw pictures.
34	// The thread sends darw calls to the draw channel, using the same
35	// rendering setting id. If there is a change of rendering setting,
36	// the event thread notifies the rendering setting change, and here
37	// increases the rendering setting id.
38	go func() {
39		p := &RenderProfile{id: 0, width: 800, height: 500}
40		for {
41			select {
42			case size := <-change:
43				// Modify rendering profile.
44				p.id++
45				p.width = size.width
46				p.height = size.height
47			default:
48				draw <- p.Draw()
49			}
50		}
51	}()
52
53	// Event Thread
54	//
55	// Process events every 100 ms. Otherwise, process drawcall request
56	// upon-avaliable.
57	event := time.NewTicker(100 * time.Millisecond)
58	for {
59		select {
60		case id := <-draw:
61			println(id)
62		case <-event.C:
63			// Notify the rendering thread there is a change regarding
64			// rendering settings. We simulate a random size at every
65			// event processing loop.
66			change <- ResizeEvent{
67				width:  int(rand.Float64() * 100),
68				height: int(rand.Float64() * 100),
69			}
70		}
71	}
72}

As one can observe from the above example, it simulates a resize event of a GUI window at every event processing loop. Whenever the size of the GUI window is changed, the underlying rendering should adapt to that, for instance, reallocating the rendering buffers. To allow the rendering thread to understand the change, another channel is used to communicate from the event thread to the rendering thread.

It sounds like a perfect design, but a nasty deadlock is hidden in the dark if one executes the program, and the program will freeze until a manual interruption:

draw-0-800x500
...
draw-0-800x500
draw-1-60x94
...
draw-1-60x94
^Csignal: interrupt

If we take a closer look into the program pattern:

  1. Two infinite select loops (say E and R) running on different goroutines (threads).
  2. The E thread receives communication from the R thread
  3. The R thread receives communication from the E thread

Did you find the problem? The problem happens in the two-way communication: If the communication channels are unbuffered channel (wait until the receive is complete), the deadlock happens when E is waiting for R to complete the receive, and R is also waiting for E to complete the receive.

One may argue that the deadlock can be resolved using a buffered channel:

1-draw := make(chan interface{})
2+draw := make(chan interface{}, 100)
3-change := make(chan ResizeEvent)
4+change := make(chan ResizeEvent, 100)

But unfortunately, it remains problematic. Let's do a thought experiment: if E is too busy, and quickly exploits the entire buffer of the communication channel change, then the communication channel falls back to an unbuffered channel. Then E starts to wait to proceed; On the otherwise, R is busy working on the draw call, when it is finished, R tries to send the draw call to E. However, at this moment. the E is already waiting for R to receive the change signal. Hence, we will fall back to the same case – deadlock.

Is the problem a producer-consumer scenario? Indeed, the case is quite similar but not entirely identical. The producer-consumer scenario focuses on producing content for the buffer while the consumer consumes the buffer. If the buffer is full, it is easy to send either producer or consumer to sleep. However, the key difference here is: On the two sides of communication, they both play the role of producer and consumer simultaneously, and they both relying on each other.

What can we do to solve the above deadlock? Let's reveal two approaches in this article.

Solution 1: Send in select's case

The first approach is a simple one. We utilize the power of the select statement: a send operation to any channel won't block, if there is a default statement. Hence, we could simply turn the draw call sends statement into a nested select statement:

 1go func() {
 2	p := &renderProfile{id: 0, width: 800, height: 500}
 3	for {
 4		select {
 5		case size := <-change:
 6			// Modify rendering profile.
 7			p.id++
 8			p.width = size.width
 9			p.height = size.height
10		default:
11-			draw <- p.Draw()
12+			select {
13+			case draw <- p.Draw():
14+			default:
15+			}
16		}
17	}
18}()

In this case, if the draw <- p.Draw() is blocking, the newly introduced select statement will not block on the send and execute the default statement then resolves the deadlock.

However, there are two drawbacks to this approach:

  1. If a draw call is skipped, there will be one frame loss of rendering. Because the next loop will start to calculate a new frame.
  2. The event thread remains blocked until a frame rendering in the rendering thread is complete. Because the new select statement can only be executed after all rendering calculation is complete.

These two drawbacks are there intrinsically, and with this approach, it seems there is no better way to improve it. What else could we do?

Solution 2: Unbounded Channel

We may first come up with this idea: Can we make a channel that contains a buffer with infinite capacity, i.e. unbounded channel? Though the language, it is not possible yet. However, such a pattern can be easily constructed:

 1// MakeChan returns a sender and a receiver of a buffered channel
 2// with infinite capacity.
 3func MakeChan() (chan<- interface{}, <-chan interface{}) {
 4	in, out := make(chan interface{}), make(chan interface{})
 5
 6	go func() {
 7		var q []interface{}
 8		for {
 9			e, ok := <-in
10			if !ok {
11				close(out)
12				return
13			}
14			q = append(q, e)
15			for len(q) > 0 {
16				select {
17				case out <- q[0]:
18					q = q[1:]
19				case e, ok := <-in:
20					if ok {
21						q = append(q, e)
22						break
23					}
24					for _, e := range q {
25						out <- e
26					}
27					close(out)
28					return
29				}
30			}
31		}
32	}()
33	return in, out
34}

In the above implementation, we created two unbuffered channels. To not block the communication, a separate goroutine is created from the call. Whenever there is a send operation, it appends to a buffer q. To send the value to the receiver, a nested select loop that checks whether send is possible or not. If not, it keeps appending the data to the queue q.

When the input channel is closed, an additional loop over the queue q is used to run out all cached elements, then close the output channel.

Hence, another fix of the deadlock using an unbounded channel would be:

 1func main() {
 2-	draw := make(chan interface{})
 3+	drawIn, drawOut := MakeChan()
 4
 5	...
 6
 7	// Rendering Thread
 8	go func() {
 9		...
10		for {
11			select {
12			case size := <-change:
13				...
14			default:
15-				draw <- p.Draw()
16+				drawIn <- p.Draw()
17			}
18		}
19	}()
20
21	// Event Thread
22	event := time.NewTicker(100 * time.Millisecond)
23	for {
24		select {
25-		case id := <-draw:
26+		case id := <-drawOut:
27			println(id)
28		case <-event.C:
29			...
30		}
31	}
32}

This unbounded channel is very similar to the commonly used standard graphics API pattern: CommandBuffer, a buffer that caches a series of draw calls, and does batch execution of a chunk of draw calls.

A Generic Channel Abstraction

We have discussed a form of deadlock in the select statement and two possible ways to address it. In the second approach, we discussed a possible way of implementing an unbounded channel construction. The implementation constructs an interface{} typed channel.

We may ask ourselves, does unbounded make sense to have in the Go language with this particular example? Does the Go team ever consider such usage?

The answer to the second question is: Yes. They do, see golang/go#20352. The discussion thread shows that unbounded channels indeed serve a certain application, but clear drawbacks may hurt the application. The major drawback is that an unbounded channel may run out of memory (OOM). If there is a concurrency bug, the running application will keep eats memory from OS and eventually leads to OOM. Developers argue that an unbounded channel should be added to the language mainly because the MakeChan function is returning an interface{} typed channel which brings a weakly typed flaw into the statically typed Go code. Eventually, Ian Lance Taylor from the Go team clarifies that an unbounded channel may have a sort of usage but is unworthy to be added to the language. As long as we have generics, a type-safe unbounded channel can be easily implemented in a library, answering the first question.

As of Go 1.18, soon we have type parameters, the above difficulty finally can be resolved. Here I provide a generic channel abstraction that is able to construct a type-safe, arbitrary sized channel:

 1// MakeChan is a generic implementation that returns a sender and a
 2// receiver of an arbitrarily sized channel of an arbitrary type.
 3//
 4// If the given size is positive, the returned channel is a regular
 5// fix-sized buffered channel.
 6// If the given size is zero, the returned channel is an unbuffered channel.
 7// If the given size is -1, the returned an unbounded channel contains an
 8// internal buffer with infinite capacity.
 9func MakeChan[T any](size int) (chan<- T, <-chan T) {
10	switch {
11	case size == 0:
12		ch := make(chan T)
13		return ch, ch
14	case size > 0:
15		ch := make(chan T, size)
16		return ch, ch
17	case size != -1:
18		panic("unbounded buffer size should be specified using -1")
19	default:
20		// size == -1
21	}
22
23	in, out := make(chan T), make(chan T)
24
25	go func() {
26		var q []T
27		for {
28			e, ok := <-in
29			if !ok {
30				close(out)
31				return
32			}
33			q = append(q, e)
34			for len(q) > 0 {
35				select {
36				case out <- q[0]:
37					q = q[1:]
38				case e, ok := <-in:
39					if ok {
40						q = append(q, e)
41						break
42					}
43					for _, e := range q {
44						out <- e
45					}
46					close(out)
47					return
48				}
49			}
50		}
51	}()
52	return in, out
53}
1func main() {
2	in, out := MakeChan[int](1)
3	// Or:
4	// in, out := MakeChan[int](0)
5	// in, out := MakeChan[int](-1)
6
7	go func() { in <- 42 }()
8	println(<-out)
9}

*This code is executable on go2go playground: https://go2goplay.golang.org/p/krLWm7ZInnL

Real-world Use Cases

As always, we further made this generic abstraction avaliable as a package to use, and we call it chann, and the API design wraps the above mentioned MakeChan function as follows:

 1package chann // import "golang.design/x/chann"
 2
 3// Chann is a generic channel abstraction that can be either buffered,
 4// unbuffered, or unbounded. To create a new channel, use New to allocate
 5// one, and use Cap to configure the capacity of the channel.
 6type Chann[T any] struct { ... }
 7
 8// New returns a Chann that may represent a buffered, an unbuffered or
 9// an unbounded channel. To configure the type of the channel, one may
10// pass Cap as the argument of this function.
11//
12// By default, or without specification, the function returns an unbounded
13// channel which has unlimited capacity.
14//
15// 	ch := chann.New[float64]()
16// 	// or
17//  ch := chann.New[float64](chann.Cap(-1))
18//
19// If the chann.Cap specified a non-negative integer, the returned channel
20// is either unbuffered (0) or buffered (positive).
21//
22// Note that although the input arguments are  specified as variadic parameter
23// list, however, the function panics if there is more than one option is
24// provided.
25func New[T any](opts ...Opt) *Chann[T] { ... }
26
27
28// In returns the send channel of the given Chann, which can be used to
29// send values to the channel.
30func (ch *Chann[T]) In() chan<- T { ... }
31
32// Out returns the receive channel of the given Chann, which can be used
33// to receive values from the channel.
34func (ch *Chann[T]) Out() <-chan T { ... }
35
36// Close closesa the channel.
37func (ch *Chann[T]) Close() { close(ch.in) }

One may use these APIs to fit the previous discussed example:

 1func main() {
 2-	draw := make(chan interface{})
 3+	draw := chann.New[*image.RGBA]()
 4
 5	...
 6
 7	// Rendering Thread
 8	go func() {
 9		...
10		for {
11			select {
12			case size := <-change:
13				...
14			default:
15-				draw <- p.Draw()
16+				draw.In() <- p.Draw()
17			}
18		}
19	}()
20
21	// Event Thread
22	event := time.NewTicker(100 * time.Millisecond)
23	for {
24		select {
25-		case id := <-draw:
26+		case id := <-draw.Out():
27			println(id)
28		case <-event.C:
29			...
30		}
31	}
32}

Lastly, we also made a contribution to the [fyne-io/fyne] GUI project to improve their draw call batching mechanism, where it previously can only render a fixed number of draw calls can be executed at a frame (more draw calls are ignored), which fixes one of their long-existing code. See fyne-io/io#2406 for more details. Here are two videos to demonstrate the problem intuitively:

Before the fix After the fix

Before the fix, the tiny blocks are only partially rendered; whereas all blocks can be rendered after the fix.

Conclusion

In this article, we talked about a generic implementation of a channel with arbitrary capacity through a real-world deadlock example. We might ask again: Is it perfect?

Well, the answer is non-trivial. As a generalization of channels, the other common operations should also be supported, such as len(), cap(), and close(). If we think carefully about the semantics of closing a channel, it is really just about closing the ability of input to that channel. Hence, implementing the close() functionality, it is simple and straightforward.

However, the len() is not a thread-safe operation for arrays, slices, and maps, but it becomes pretty clear that it has to be thread safe for channels, otherwise, there is no way to fetch channel length atomically. Nonetheless, does it really make sense to get the length of a channel? As we know that channel is typically used for synchronization purposes. If there is a len(ch) that happens concurrently with a send/receive operation, there is no guarantee what is the return of the len(). The length is outdated immediately as len() returns. This scenario is neither discussed in the language specification, or the Go's memory model. After all, Do we really need a len() operation for the ultimate channel abstraction? The answer speaks for itself.

Further Reading Suggestions