The Ultimate Channel Abstraction
| PV/UV:/ | PDF | #Go #Synchronization #DeadlockAuthor(s): Changkun Ou
Permalink: https://golang.design/research/ultimate-channel
Recently, I have been rethinking the programming patterns regarding graphics applications, and already wrote a 3D graphics package in Go, called polyred. While I was designing the rendering pipeline APIs, a tricky deadlock struggled with me for a while and led to creating an unbounded channel as a workaround solution eventually.
The problem
At the beginning of my design, I had to deal with OpenGL where a chunk of APIs must be executed on the main thread and issue a draw call is one of those infamous. The common pattern in graphics programming is as follows:
1app := newApp()
2driver := initDriver()
3ctx := driver.Context()
4
5for !app.IsClosed() {
6 ctx.Clear()
7 processingDrawCalls(ctx)
8 processingInputEvents()
9}
The entire GUI application is executed in an infinite loop that contains two parts: draw call processing and event processing.
Typically, all these codes run on the CPU, and the actual rendering computation executes on a GPU. That means, the graphics API provided by a graphic driver (such as OpenGL, Vulkan, Metal, Direct X) is just a communication command send from the CPU to the GPU or even waiting for a response from the GPU. For some special reasons, the polyred is limited to software implementation, a pure-CPU implementation. Hence, the execution should utilize the full power of CPU parallelization. It makes much more sense to execute rendering on a separate goroutine so that it won't block the event processing thread.
*Aside: To guarantee an application's responsiveness, it is ideal not to block the event processing since there might also be system invocation.
Subsequently, I turned the rendering loop into a separate goroutine and sent the rendering result to the event processing loop to be flushed to the hardware display. The entire application works as the following code snippet:
1// WARNING: This example contains a deadlock.
2package main
3
4import (
5 "fmt"
6 "math/rand"
7 "time"
8)
9
10type ResizeEvent struct {
11 width, height int
12}
13
14type RenderProfile struct {
15 id int
16 width int
17 height int
18}
19
20// Draw executes a draw call by the given render profile
21func (p *RenderProfile) Draw() interface{} {
22 return fmt.Sprintf("draw-%d-%dx%d", p.id, p.width, p.height)
23}
24
25func main() {
26 // draw is a channel for receiving finished draw calls.
27 draw := make(chan interface{})
28 // change is a channel to receive notification of the change
29 // of rendering settings.
30 change := make(chan ResizeEvent)
31
32 // Rendering Thread
33 //
34 // Sending draw calls to the event thread in order to draw
35 // pictures. The thread sends darw calls to the draw channel,
36 // using the same rendering setting id. If there is a change
37 // of rendering setting, the event thread notifies the rendering
38 // setting change, and here increases the rendering setting id.
39 go func() {
40 p := &RenderProfile{id: 0, width: 800, height: 500}
41 for {
42 select {
43 case size := <-change:
44 // Modify rendering profile.
45 p.id++
46 p.width = size.width
47 p.height = size.height
48 default:
49 draw <- p.Draw()
50 }
51 }
52 }()
53
54 // Event Thread
55 //
56 // Process events every 100 ms. Otherwise, process drawcall
57 // request upon-avaliable.
58 event := time.NewTicker(100 * time.Millisecond)
59 for {
60 select {
61 case id := <-draw:
62 println(id)
63 case <-event.C:
64 // Notify the rendering thread there is a change
65 // regarding rendering settings. We simulate a
66 // random size at every event processing loop.
67 change <- ResizeEvent{
68 width: int(rand.Float64() * 100),
69 height: int(rand.Float64() * 100),
70 }
71 }
72 }
73}
As one can observe from the above example, it simulates a resize event of a GUI window at every event processing loop. Whenever the size of the GUI window is changed, the underlying rendering should adapt to that, for instance, reallocating the rendering buffers. To allow the rendering thread to understand the change, another channel is used to communicate from the event thread to the rendering thread.
It sounds like a perfect design, but a nasty deadlock is hidden in the dark if one executes the program, and the program will freeze until a manual interruption:
draw-0-800x500
...
draw-0-800x500
draw-1-60x94
...
draw-1-60x94
^Csignal: interrupt
If we take a closer look into the program pattern:
- Two infinite
select
loops (sayE
andR
) running on different goroutines (threads). - The
E
thread receives communication from theR
thread - The
R
thread receives communication from theE
thread
Did you find the problem? The problem happens in the two-way communication:
If the communication channels are unbuffered channel (wait until the
receive is complete), the deadlock happens when E
is waiting for R
to
complete the receive, and R
is also waiting for E
to complete the receive.
One may argue that the deadlock can be resolved using a buffered channel:
1-draw := make(chan interface{})
2+draw := make(chan interface{}, 100)
3-change := make(chan ResizeEvent)
4+change := make(chan ResizeEvent, 100)
But unfortunately, it remains problematic. Let's do a thought experiment:
if E
is too busy, and quickly exploits the entire buffer of the
communication channel change
, then the communication channel falls
back to an unbuffered channel. Then E
starts to wait to proceed;
On the otherwise, R
is busy working on the draw call, when it is
finished, R
tries to send the draw call to E
.
However, at this moment. the E
is already waiting for R
to receive
the change
signal. Hence, we will fall back to the same case – deadlock.
Is the problem a producer-consumer scenario? Indeed, the case is quite similar but not entirely identical. The producer-consumer scenario focuses on producing content for the buffer while the consumer consumes the buffer. If the buffer is full, it is easy to send either producer or consumer to sleep. However, the key difference here is: On the two sides of communication, they both play the role of producer and consumer simultaneously, and they both relying on each other.
What can we do to solve the above deadlock? Let's reveal two approaches in this article.
Solution 1: Send in select's case
The first approach is a simple one. We utilize the power of the select statement: a send operation to any channel won't block, if there is a default statement. Hence, we could simply turn the draw call sends statement into a nested select statement:
1go func() {
2 p := &renderProfile{id: 0, width: 800, height: 500}
3 for {
4 select {
5 case size := <-change:
6 // Modify rendering profile.
7 p.id++
8 p.width = size.width
9 p.height = size.height
10 default:
11- draw <- p.Draw()
12+ select {
13+ case draw <- p.Draw():
14+ default:
15+ }
16 }
17 }
18}()
In this case, if the draw <- p.Draw()
is blocking, the newly introduced
select
statement will not block on the send and execute the default
statement then resolves the deadlock.
However, there are two drawbacks to this approach:
- If a draw call is skipped, there will be one frame loss of rendering. Because the next loop will start to calculate a new frame.
- The event thread remains blocked until a frame rendering in the rendering thread is complete. Because the new select statement can only be executed after all rendering calculation is complete.
These two drawbacks are there intrinsically, and with this approach, it seems there is no better way to improve it. What else could we do?
Solution 2: Unbounded Channel
We may first come up with this idea: Can we make a channel that contains a buffer with infinite capacity, i.e. unbounded channel? Though the language, it is not possible yet. However, such a pattern can be easily constructed:
1// MakeChan returns a sender and a receiver of a buffered channel
2// with infinite capacity.
3//
4// Warning: this implementation can be easily misuse,
5// see discussion below
6func MakeChan() (chan<- interface{}, <-chan interface{}) {
7 in, out := make(chan interface{}), make(chan interface{})
8
9 go func() {
10 var q []interface{}
11 for {
12 e, ok := <-in
13 if !ok {
14 close(out)
15 return
16 }
17 q = append(q, e)
18 for len(q) > 0 {
19 select {
20 case out <- q[0]:
21 q = q[1:]
22 case e, ok := <-in:
23 if ok {
24 q = append(q, e)
25 break
26 }
27 for _, e := range q {
28 out <- e
29 }
30 close(out)
31 return
32 }
33 }
34 }
35 }()
36 return in, out
37}
In the above implementation, we created two unbuffered channels. To not
block the communication, a separate goroutine is created from the call.
Whenever there is a send operation, it appends to a buffer q
. To send
the value to the receiver, a nested select loop that checks whether send
is possible or not. If not, it keeps appending the data to the queue q
.
When the input channel is closed, an additional loop over the queue q
is used to run out all cached elements, then close the output channel.
Hence, another fix of the deadlock using an unbounded channel would be:
1func main() {
2- draw := make(chan interface{})
3+ drawIn, drawOut := MakeChan()
4
5 ...
6
7 // Rendering Thread
8 go func() {
9 ...
10 for {
11 select {
12 case size := <-change:
13 ...
14 default:
15- draw <- p.Draw()
16+ drawIn <- p.Draw()
17 }
18 }
19 }()
20
21 // Event Thread
22 event := time.NewTicker(100 * time.Millisecond)
23 for {
24 select {
25- case id := <-draw:
26+ case id := <-drawOut:
27 println(id)
28 case <-event.C:
29 ...
30 }
31 }
32}
This unbounded channel is very similar to the commonly used standard
graphics API pattern: CommandBuffer
, a buffer that caches a series of
draw calls, and does batch execution of a chunk of draw calls.
A Generic Channel Abstraction
We have discussed a form of deadlock in the select statement
and two possible ways to address it. In the second approach, we discussed
a possible way of implementing an unbounded channel construction. The
implementation constructs an interface{}
typed channel.
We may ask ourselves, does unbounded make sense to have in the Go language with this particular example? Does the Go team ever consider such usage?
The answer to the second question is: Yes. They do, see golang/go#20352 2.
The discussion thread shows that unbounded channels indeed serve a
certain application, but clear drawbacks may hurt the application.
The major drawback is that an unbounded channel may run out of memory (OOM).
If there is a concurrency bug, the running application will keep eats memory
from OS and eventually leads to OOM. Developers argue that an unbounded channel
should be added to the language mainly because the MakeChan
function is
returning an interface{}
typed channel which brings a weakly typed flaw into
the statically typed Go code. Eventually, Ian Lance Taylor from the Go team
clarifies that
an unbounded channel may have a sort of usage but is unworthy to be added
to the language. As long as we have generics, a type-safe unbounded channel
can be easily implemented in a library, answering the first question.
As of Go 1.18, soon we have type parameters1, the above difficulty finally
can be resolved.
Here I provide a generic channel abstraction that is able to construct a type-safe, arbitrary sized channel:
1// MakeChan is a generic implementation that returns a sender and a
2// receiver of an arbitrarily sized channel of an arbitrary type.
3//
4// If the given size is positive, the returned channel is a regular
5// fix-sized buffered channel.
6// If the given size is zero, the returned channel is an unbuffered
7// channel.
8// If the given size is -1, the returned an unbounded channel
9// contains an internal buffer with infinite capacity.
10//
11// Warning: this implementation can be easily misuse,
12// see discussion below
13func MakeChan[T any](size int) (chan<- T, <-chan T) {
14 switch {
15 case size == 0:
16 ch := make(chan T)
17 return ch, ch
18 case size > 0:
19 ch := make(chan T, size)
20 return ch, ch
21 case size != -1:
22 panic("unbounded buffer size should be specified using -1")
23 default:
24 // size == -1
25 }
26
27 in, out := make(chan T), make(chan T)
28
29 go func() {
30 var q []T
31 for {
32 e, ok := <-in
33 if !ok {
34 close(out)
35 return
36 }
37 q = append(q, e)
38 for len(q) > 0 {
39 select {
40 case out <- q[0]:
41 q = q[1:]
42 case e, ok := <-in:
43 if ok {
44 q = append(q, e)
45 break
46 }
47 for _, e := range q {
48 out <- e
49 }
50 close(out)
51 return
52 }
53 }
54 }
55 }()
56 return in, out
57}
1func main() {
2 in, out := MakeChan[int](1)
3 // Or:
4 // in, out := MakeChan[int](0)
5 // in, out := MakeChan[int](-1)
6
7 go func() { in <- 42 }()
8 println(<-out)
9}
*This code is executable on go2go playground: https://go.dev/play/p/krLWm7ZInnL
Design Concerns and Real-world Use Cases
Lastly, we have to address several potential misuses in the current implementation. The previously demonstrated MakeChan
indeed can return
two channels, one as input and the other as output. However, from the
caller side, it is not super clear about whether to write:
1in, out := MakeChan[int](-1)
or:
1out, in := MakeChan[int](-1)
Moreover, the internal buffer and goroutine may be leaked. Because this can happen if one closes the input channel, but forget to drain out the output buffer. This means, there are several concerns we have to address:
- When the unbounded channel is closed, the internal goroutine for caching events must return, so that the internal output channel won't block on send operation forever so that a goroutine may leak;
- When the unbounded channel is closed, all elements can still be safely received from the output channel;
- To avoid misuse of
close()
, a runtime panic should be triggered when accidentally closing the input channel.
As always, we addressed all these issues and further made a generic
abstraction avaliable as a package to use, and we call it
chann
.
The API design wraps the above mentioned MakeChan
function and the
implementation also addresses the mentioned concerns to avoid potential
misuses:
1// Package chann provides a unified representation of buffered,
2// unbuffered, and unbounded channels in Go.
3//
4// The package is compatible with existing buffered and unbuffered
5// channels. For example, in Go, to create a buffered or unbuffered
6// channel, one uses built-in function `make` to create a channel:
7//
8// ch := make(chan int) // unbuffered channel
9// ch := make(chan int, 42) // or buffered channel
10//
11// However, all these channels have a finite capacity for caching, and
12// it is impossible to create a channel with unlimited capacity, namely,
13// an unbounded channel.
14//
15// This package provides the ability to create all possible types of
16// channels. To create an unbuffered or a buffered channel:
17//
18// ch := chann.New[int](chann.Cap(0)) // unbuffered channel
19// ch := chann.New[int](chann.Cap(42)) // or buffered channel
20//
21// More importantly, when the capacity of the channel is unspecified,
22// or provided as negative values, the created channel is an unbounded
23// channel:
24//
25// ch := chann.New[int]() // unbounded channel
26// ch := chann.New[int](chann.Cap(-42)) // or unbounded channel
27//
28// Furthermore, all channels provides methods to send (In()),
29// receive (Out()), and close (Close()).
30//
31// Note that to close a channel, must use Close() method instead of the
32// language built-in method
33// Two additional methods: ApproxLen and Cap returns the current status
34// of the channel: an approximation of the current length of the channel,
35// as well as the current capacity of the channel.
36//
37// See https://golang.design/research/ultimate-channel to understand
38// the motivation of providing this package and the possible use cases
39// with this package.
40package chann // import "golang.design/x/chann"
41
42// Opt represents an option to configure the created channel.
43// The current possible option is Cap.
44type Opt func(*config)
45
46// Cap is the option to configure the capacity of a creating buffer.
47// if the provided number is 0, Cap configures the creating buffer to a
48// unbuffered channel; if the provided number is a positive integer, then
49// Cap configures the creating buffer to a buffered channel with the given
50// number of capacity for caching. If n is a negative integer, then it
51// configures the creating channel to become an unbounded channel.
52func Cap(n int) Opt { ... }
53
54// Chann is a generic channel abstraction that can be either buffered,
55// unbuffered, or unbounded. To create a new channel, use New to allocate
56// one, and use Cap to configure the capacity of the channel.
57type Chann[T any] struct { ... }
58
59// New returns a Chann that may represent a buffered, an unbuffered or
60// an unbounded channel. To configure the type of the channel, one may
61// pass Cap as the argument of this function.
62//
63// By default, or without specification, the function returns an unbounded
64// channel which has unlimited capacity.
65//
66// ch := chann.New[float64]()
67// // or
68// ch := chann.New[float64](chann.Cap(-1))
69//
70// If the chann.Cap specified a non-negative integer, the returned channel
71// is either unbuffered (0) or buffered (positive).
72//
73// Note that although the input arguments are specified as variadic parameter
74// list, however, the function panics if there is more than one option is
75// provided.
76func New[T any](opts ...Opt) *Chann[T] { ... }
77
78
79// In returns the send channel of the given Chann, which can be used to
80// send values to the channel. If one closes the channel using close(),
81// it will result in a runtime panic. Instead, use Close() method.
82func (ch *Chann[T]) In() chan<- T { ... }
83
84// Out returns the receive channel of the given Chann, which can be used
85// to receive values from the channel.
86func (ch *Chann[T]) Out() <-chan T { ... }
87
88// Close closes the channel gracefully.
89func (ch *Chann[T]) Close() { ... }
90
91// ApproxLen returns an approximation of the length of the channel.
92//
93// Note that in a concurrent scenario, the returned length of a channel
94// may never be accurate. Hence the function is named with an Approx prefix.
95func (ch *Chann[T]) ApproxLen() int
96
97// Cap returns the capacity of the channel.
98func (ch *Chann[T]) Cap() int
One may use these APIs to fit the previous discussed example:
1func main() {
2- draw := make(chan interface{})
3+ draw := chann.New[*image.RGBA]()
4
5 ...
6
7 // Rendering Thread
8 go func() {
9 ...
10 for {
11 select {
12 case size := <-change:
13 ...
14 default:
15- draw <- p.Draw()
16+ draw.In() <- p.Draw()
17 }
18 }
19 }()
20
21 // Event Thread
22 event := time.NewTicker(100 * time.Millisecond)
23 for {
24 select {
25- case id := <-draw:
26+ case id := <-draw.Out():
27 println(id)
28 case <-event.C:
29 ...
30 }
31 }
32}
Lastly, we also made a few contribution to the [fyne-io/fyne] GUI project to improve their draw call batching mechanism, where it previously can only render a fixed number of draw calls can be executed at a frame (more draw calls are ignored), which fixes one of their long-existing code. See fyne-io/fyne#24065, and fyne-io/fyne#24736 for more details. Here are two videos to demonstrate the problem intuitively:
Before the fix | After the fix |
---|---|
Before the fix, the tiny blocks are only partially rendered; whereas all blocks can be rendered after the fix.
Conclusion
In this article, we talked about a generic implementation of a channel with arbitrary capacity through a real-world deadlock example. A public package chann7 is provided as a generic channel package.
1import "golang.design/x/chann"
We may still ask: Is the implementation perfect? Why there is no len()
but only a ApproxLen()
?
Well, the answer is non-trivial. The len()
is not a thread-safe operation
for arrays, slices, and maps, but it becomes pretty clear that it has to be
thread safe for channels, otherwise, there is no way to fetch channel length
atomically. Nonetheless, does it really make sense to get the length of a channel?
As we know that channel is typically used for synchronization purposes.
If there is a len(ch)
that happens concurrently with a send/receive
operation, there is no guarantee what is the return of the len()
.
The length is outdated immediately as len()
returns.
This scenario is neither discussed in the language specification3, or the Go's memory model4. After all, Do we really need a len()
operation for the ultimate channel abstraction? The answer speaks for itself.
References
-
Ian Lance Taylor. Type Parameters. March 19, 2021. https://golang.org/design/43651-type-parameters↩
-
rgooch. proposal: spec: add support for unlimited capacity channels. 13 May 2017. https://golang.org/issue/20352↩
-
The Go Authors. The Go Programming Language Specification. Feb 10, 2021. https://golang.org/ref/spec↩
-
The Go Authors. The Go Memory Model. May 31, 2014. https://golang.org/ref/mem↩
-
Changkun Ou. internal/dirver: use unbounded channel for event processing Issue 2406. Aug 27, 2021. https://github.com/fyne-io/fyne/pull/2406↩
-
Changkun Ou. internal/driver: fix rendering freeze in mobile Issue 2473. Sep 15, 2021. https://github.com/fyne-io/fyne/pull/2473↩
-
Changkun Ou. Package chann. Sep 10, 2021. https://golang.design/s/chann↩