When I first started writing Golang applications in 2014, I ran headfirst into the quirkier, more interesting parts of the language — concurrency and channels. After many lines of buggy, hard-to-reason-about code, I learned some patterns that leverage the benefits of concurrency while greatly improving code readability and reducing errors.
This article is designed to save you time and energy by concisely showing you what (I’m embarrassed to admit) took me years to put together on my own. I hope you find this to be an enlightening, pragmatic, real world use case for concurrency in your API’s.
The API Concurrency Pattern
If you follow the formula outlined below, you can build a highly concurrent API with minimal errors and effort.
Engineer Your Blocking Operations to…
- Always run asynchronously using the
go
keyword, so you can run them concurrently make
andclose
their own channels, so you minimize the chance of memory leaks and deadlocks- Accept a
context.Context
, in order to stop pending requests that are no longer necessary - Use pointers to return results instead of channels, so you can reduce the number of channels we need to manage
- Emit errors through a
<-chan error
, so you can wait for blocking operations to complete before returning responses
That’s it! If you just stick to those 5 rules, you’ll be able to write readable, organized Go code that is highly concurrent and isn’t prone to deadlocks or memory leaks.
Example Code
What follows is an example implementation that puts these rules into practice. Hopefully, this will illustrate how to produce an easy-to-read, test and maintain highly concurrent API codebase in Golang.
API Requests & Blocking Operations
Step 1 of implementing this pattern is to issue all of our API requests and blocking operations asynchronously.
// Piece is a piece of a result
type Piece struct {
ID uint `json:"id"`
}
// getPiece calls `GET /piece/:id`
func getPiece(ctx context.Context, id uint, piece *Piece) <-chan error {
out := make(chan error)
go func() {
// Correct memory management - always be closing... your channels
defer close(out)
// NewRequestWithContext will cancel its request immediately if
// the caller cancels the context
req, err := http.NewRequestWithContext(
ctx,
"GET",
fmt.Sprintf("api.url.com/piece/%d", id),
nil,
)
if err != nil {
out <- err
return
}
// Issue the request
rsp, err := http.DefaultClient.Do(req)
if err != nil {
out <- err
return
} else if rsp.StatusCode != http.StatusOK {
out <- fmt.Errorf("%d: %s", rsp.StatusCode, rsp.Status)
return
}
// Parse the response into piece
defer rsp.Body.Close()
if err := json.NewDecoder(rsp.Body).Decode(piece); err != nil {
out <- err
return
}
}()
return out
}
API Responses
Step 2 of implementing the pattern is to blend multiple blocking operations and API Requests into a single API response struct
.
// Result is the combination of multiple blocking operations
// that will be retreived concurrently
type Result struct {
FirstPiece *Piece `json:"firstPiece,omitempty"`
SecondPiece *Piece `json:"secondPiece,omitempty"`
ThirdPiece *Piece `json:"thirdPiece,omitempty"`
}
// GetResult is an `http.HandleFunc` that GET's `Result`s
func GetResult(w http.ResponseWriter, r *http.Request) {
// Parse and validate inputs...
// getResult will stop immediately if the http.Request is canceled
var result Result
if err := <-getResult(r.Context(), &result); err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
// Marshal the response
bs, err := json.Marshal(&result)
if err != nil {
w.Write([]byte(err.Error()))
w.WriteHeader(http.StatusInternalServerError)
return
}
// Success!
w.Write(bs)
w.WriteHeader(http.StatusOK)
}
// getResult returns the result of many concurrent API calls
func getResult(ctx context.Context, result *Result) <-chan error {
out := make(chan error)
go func() {
// Correct memory management
defer close(out)
// The cancel func will allow us to stop all pending requests if one
// fails
ctx, cancel := context.WithCancel(ctx)
// Merge allows us to recieve the all of errors returned from all of
// the calls to `getPieces` in a single `<-chan error`.
// If no errors are returned, Merge will wait until all of the
// `<-chan error`s close before proceeding
for err := range util.Merge(
getPiece(ctx, 1, result.FirstPiece),
getPiece(ctx, 2, result.SecondPiece),
getPiece(ctx, 3, result.ThirdPiece),
) {
if err != nil {
// Cancel all pending requests
cancel()
// Surface the error to the caller
out <- err
return
}
}
}()
return out
}
The Merge Function
Step 3 is to implement a single ‘fan-in’ func
. Even if you’re very familiar with go, this is probably the most complicated and error prone piece of the puzzle. I recommend copy/pasting this code directly into your own util
package or just using the one in Delivery Hero’s pipeline package.
If you’re really curious about how or why this works, this func
is based on an excellent article written about pipelines in 2014 on the go blog.
package util
import (
"sync"
)
// Merge fans multiple error channels in to a single error channel
func Merge(errChans ...<-chan error) <-chan error {
mergedChan := make(chan error)
// Create a WaitGroup that waits for all of the errChans to close
var wg sync.WaitGroup
wg.Add(len(errChans))
go func() {
// When all of the errChans are closed, close the mergedChan
wg.Wait()
close(mergedChan)
}()
for i := range errChans {
go func(errChan <-chan error) {
// Wait for each errChan to close
for err := range errChan {
if err != nil {
// Fan the contents of each errChan into the mergedChan
mergedChan <- err
}
}
// Tell the WaitGroup that one of the errChans is closed
wg.Done()
}(errChans[i])
}
return mergedChan
}
In Summary
There are many ways to approach concurrency in Go. In my experience, this has been a clear and efficient way of approaching concurrency when building an API that keeps my code organized and minimizes memory management mistakes.
I hope you found this to be a useful and pragmatic illustration of concurrency in Golang. I also hope that it saves you some time and headaches piecing this information together for yourself.
Happy Coding!
Thank you Mark! If you are interested in reading more of Mark’s content, take a look at his Medium account.
As always, we are still hiring, so check out some of our latest openings, or join our Talent Community to stay up to date with what’s going on at Delivery Hero and receive customized job alerts!