Go Channel app pattern

Mondo Technology Updated on 2024-02-12

Channel is a type of Go that, along with GoRoutine, provides concurrency technology for Go, and it is widely used in development. Go encourages people to pass references to data between goroutines via a channel (like passing the owner of data from one goroutine to another), and Effective Go sums it up this sentence:

do not communicate by sharing memory; instead, share memory by communicating.

The Go memory model points out a feature of channel as concurrency control
a send on a channel happens before the corresponding receive from that channel completes. (golang spec)

In addition to the normal and secure transfer of shared data between goroutines, channels can also play a lot of tricks (patterns), and this article lists some of the application patterns of channels.

The main factors that contributed to the birth of this article include:

Eapache's Channels Library, Concurrency in Go, Francesc Campoy's JustForfun Series, About the Merge Channel Implementation, My Inspiration for Scala Collections in the Scala Collections Handbook, Let's take a look at this pattern as an example.

We know, GO's standard librarysyncYesmutex, can be used as a lock, howevermutexBut it didn't materializetrylockMethod.

We're fortrylockThe definition is that the current goroutine is trying to acquire a lock, and if successful, the lock is obtained, returning true, otherwise returning false. We can use this method to avoid the current goroutine being blocked when the lock is acquired.

Originally, this is a commonly used feature that is implemented in some other programming languages, so why isn't it implemented in Go? As discussed in detail in issue 6123, it seems to me that the members of the go core group themselves are not enthusiastic about this feature and think that the same way can be achieved through channels. Actually, for the standard librarysync.mutexIt's easy to add this feature, and the way to do it is to go throughhackway:mutexRealizedtrylockfeatures.

const mutexlocked = 1 "There is an additional one in the ** aboveislockedmethod, however, is not commonly used because the query and lock methods are not atomic operations, and this method may be useful for debugging and logging. 

Since the standard library is not ready inmutexinstead of using a channel, let's see how to use a channel.

type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock() bool return false}func (m *mutex) islocked() bool

This is mainly achieved by taking advantage of the blocking feature in the case of channel boundaries.

You can also change the size of the cache from 1 to n to handle n locks (resources).

Sometimes, when we get a lock, due to competition, when the lock is owned by another goroutine, the current goroutine has no way to get the lock immediately, but can only block and wait. The standard library doesn't provide the ability to wait for timeouts, and we try to implement it.

type mutex struct }func newmutex() mutex , 1)} mu.ch <-struct{}func (m *mutex) lock() func (m *mutex) unlock() default: panic("unlock of unlocked mutex") }func (m *mutex) trylock(timeout time.duration) bool return false}func (m *mutex) islocked() bool
You can also use itcontextto transform, not to use the timeout, but to usecontextto cancel the timeout to get a lock, this job is left to the reader to implement.

When you wait for multiple signals, if any one signal is received, the business logic is executed, ignoring the other signals that have not yet been received.

For example, if we send a request to n nodes that provide the same service, as long as any one of the service nodes returns a result, we can execute the following business logic, and the requests of the other n-1 nodes can be canceled or ignored. When n=2, that's itback requestMode. This allows resources to be traded for increased latency.

It should be noted that when any one of the signals is received,All other signals are ignored。If you use a channel, you can get all channels off as long as you receive a single data from any channel (depending on your implementation, but the output channel will definitely be closed).

There are three ways to do this: goroutine, reflect, and recursion.

func or(chans ..chan interface{})chan interface{} go func() case <-out: }c) }return out}
orThe function can handle n channels, it starts a goroutine for each channel, and as soon as any goroutine reads data from the channel, the output channel is closed.

To avoid the problem of concurrently closing the output channel, the shutdown operation is performed only once.

Go's reflection library has dedicated data (for select statementsreflect.selectcase) and functions (reflect.select) processing.

So we can use reflection to "randomly" receive data from a set of optional channels and turn off the output channel.

This way it looks more concise.

func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() reflect.select(cases) }return ordone}
The recursive approach has always been a brain-opening implementation, and the following approach is a divide-and-conquer approach, gradually merging channels, and eventually returning a channel.

func or(channels ..chan interface{})chan interface{} ordone := make(chan interface{})go func() default: m := len(channels) / 2 select }return ordone}
In the later fan-in (merge) mode, we will still use the same recursive pattern to merge multiple input channels, which is more efficient than goroutines and reflect according to JustForfun's test results.

This is a pattern that we often use, using a signal channel(done) to control (cancel) the processing of the input channel.

As soon as a signal is read from the done channel, or the done channel is closed, the processing of the input channel is canceled.

This pattern provides an easy way to merge the done channel and the input channel into an output channel.

func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}
Fanin mode is the merging of multiple input channels of the same type into one output channel of the same type, that is, the merging of channels.

There is a goroutine for each channel.

func fanin(chans ..chan interface{})chan interface{} go func() wg.done() c) }wg.wait() close(out) }return out}
Utilize the reflection library's processing of select statements to merge input channels.

The following implementation is actually still a bit problematic, and it is more effective when the input channel is read more evenly, otherwise the performance is lower.

func faninreflect(chans ..chan interface{})chan interface{} go func() for len(cases) >0 out <-v.interface() return out}
Although this method is not intuitive to understand, the performance is still good (the recursion level will not be high and will not become a bottleneck when the input channel is not very large).

func faninrec(chans ..chan interface{})chan interface{} close(c) return c case 1: return chans[0] case 2: return mergetwo(chans[0], chans[1]) default: m := len(chans) / 2 return mergetwo( faninrec(chans[:m]..faninrec(chans[m:].func mergetwo(a, b <-chan interface{})chan interface{} go func() c <-v case v, ok := <-b: if !ok c <-v } return c}
Fanout mode is to fan out an input channel into multiple channels.

Fan-out behavior can be divided into at least two types:

Read a piece of data from an input channel and send it to each input channel, this pattern is called a tee pattern, read a piece of data from the input channel, and select a channel from the output channel to send This section only describes the first case, and the next section describes the second case.

The read value is sent to each output channel, and the asynchronous mode can result in a lot of goroutines.

func fanout(ch <-chan interface{},out chan interface{},async bool) for v := range ch ()else }
In this mode, once an output channel is blocked, it can cause subsequent processing delays.

func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch for _ = range cases }
The distribution mode sends the values read from the input channel to one of the output channels.

roundrobin's way to choose the output channel.

func fanout(ch <-chan interface{},out chan interface{})// roundrobin var i = 0 var n = len(out) for v := range ch }
Take advantage of the random selection of launches.

func fanoutreflect(ch <-chan interface{},out chan interface{})cases := make(reflect.selectcase, len(out)) for i := range cases for v := range ch _= reflect.select(cases) }
eapache channels provide some ways to apply patterns to channels, such as the fan-in and fan-out patterns above. Because the channel of go itself can no longer be extendedeapache/channelsThe library defines its own channel interface and provides convenient conversions to the channel.

eapache/channelsFour methods are provided:

distribute: Reads a value from an input channel and sends it to one of the output channels. When the input channel is turned off, the output channel is turned offtee: the value is read from the input channel and sent to all output channels. When the input channel is turned off, the output channel is turned off in multiplex: the input channel is merged into one output channel, and the output is closed when all the inputs are turned offpipe: the string of two channels is also provided for the above four functionsweakxxx, the input is turned off and the output is not closed.

Let's take a look at an example of the corresponding function.

func testdist() channels.distribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 6; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testtee() channels.tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) outputs[0], outputs[1], outputs[2], outputs[3]) go func() a.close() for i := 0; i < 20; i++ var j int select fmt.printf("channel#%d: %d", j, v) }
func testmulti() channels.multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) inputs[0], inputs[1], inputs[2], inputs[3]) go func() for i := range inputs }for v := range a.out()
func testpipe() a.close() for v := range b.out()
From the perspective of the channel's behavior, it looks a lot like a data stream, so we can implement something like a scala collection.

Scala's collection classes provide a wide range of operations (methods), but other programming languages or frameworks also provide similar methods, such as Apache Spark, J**a Stream, Reactivex, etc.

Some of the implementations of some methods are listed below, and I believe that after some people dig into them, the related methods can be turned into a good class library, but for now let's look at some examples.

The skip function is to skip some data from a channel before it starts reading.

skipnskipn skips the first n data.

func skipn(done <-chan struct{},valuestream <-chan interface{},num int)func skipfn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func skipwhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) takentaken reads the first n data. 

func taken(done <-chan struct{},valuestream <-chan interface{},num int)func takefn(done <-chan struct{},valuestream <-chan interface{},fn func(interface{})bool)func takewhile(done <-chan struct{}, valuestream <-chan interface{},fn func(interface{})bool) If the input is a channel, and the data in the channel is still the same type of channel, then flat will return an output channel, and the data in the output channel is the data in the input channels.

It is different from fan-in, where the input channel is fixed at the time of call and is provided as an array, while the input of flat is a channel that can be added to the channel at runtime.

func ordone(done <-chan struct{},c <-chan interface{})chan interface{} go func() select }return valstream}func flat(done <-chan struct{},chanstream <-chan <-chan interface{})chan interface{} go func() select stream = maybestream case <-done: return } for val := range ordone(done, stream) return valstream}

Map and Reduce are a common set of operations.

A map maps one channel to another, and the type of channel can be different.

func mapchan(in <-chan interface{},fn func(interface{})interface{})chan interface{} if in == nil go func() return out}
Becausemapis the keyword for go, so we can't name the function type asmap, used heremapchanIn lieu of.

For example, you can process a channel of employee wages in a company and output a channel of employee wages after tax deductions.

func reduce(in <-chan interface{},fn func(r, v interface{})interface{})interface{} out := <-in for v := range in return out}
You can:reduceImplementationsummaxminand other aggregation operations.

This article lists some in-depth application patterns of channels, and I believe that by reading this article, you can have a deeper understanding of the channel types of go and flexibly apply channels in development. You are also welcome to suggest more application patterns for channels in your comments.

All of them can be found on GitHub: Smallnest Channels.

Related Pages