21/04/21

Concurrent API Patterns in Go

By Mark Salpeter

When I first started writing Golang applications in 2014, I ran headfirst into the quirkier, more interesting parts of the language — concurrency and channels.  After many lines of buggy, hard-to-reason-about code, I learned some patterns that leverage the benefits of concurrency while greatly improving code readability and reducing errors.

This article is designed to save you time and energy by concisely showing you what (I’m embarrassed to admit) took me years to put together on my own. I hope you find this to be an enlightening, pragmatic, real world use case for concurrency in your API’s.

The API Concurrency Pattern

 If you follow the formula outlined below, you can build a highly concurrent API with minimal errors and effort.

Engineer Your Blocking Operations to…

  1. Always run asynchronously using the go keyword, so you can run them concurrently 
  2. make and close their own channels, so you minimize the chance of memory leaks and deadlocks
  3. Accept a context.Context, in order to stop pending requests that are no longer necessary
  4. Use pointers to return results instead of channels, so you can reduce the number of channels we need to manage
  5. Emit errors through a <-chan error, so you can wait for blocking operations to complete before returning responses

That’s it! If you just stick to those 5 rules, you’ll be able to write readable, organized Go code that is highly concurrent and isn’t prone to deadlocks or memory leaks.

Example Code

What follows is an example implementation that puts these rules into practice. Hopefully, this will illustrate how to produce an easy-to-read, test and maintain highly concurrent API codebase in Golang.

API Requests & Blocking Operations

Step 1 of implementing this pattern is to issue all of our API requests and blocking operations asynchronously.

// Piece is a piece of a result
type Piece struct {
	ID uint `json:"id"`
}

// getPiece calls `GET /piece/:id`
func getPiece(ctx context.Context, id uint, piece *Piece) <-chan error {
	out := make(chan error)
	go func() {
		// Correct memory management - always be closing... your channels
		defer close(out)

		// NewRequestWithContext will cancel its request immediately if
		// the caller cancels the context
		req, err := http.NewRequestWithContext(
			ctx, 
			"GET", 
			fmt.Sprintf("api.url.com/piece/%d", id), 
			nil,
		)
		if err != nil {
			out <- err
			return
		}

		// Issue the request
		rsp, err := http.DefaultClient.Do(req)
		if err != nil {
			out <- err
			return
		} else if rsp.StatusCode != http.StatusOK {
			out <- fmt.Errorf("%d: %s", rsp.StatusCode, rsp.Status)
			return
		}

		// Parse the response into piece
		defer rsp.Body.Close()
		if err := json.NewDecoder(rsp.Body).Decode(piece); err != nil {
			out <- err
			return
		}
	}()
	return out
}

API Responses

Step 2 of implementing the pattern is to blend multiple blocking operations and API Requests into a single API response struct.

// Result is the combination of multiple blocking operations 
// that will be retreived concurrently
type Result struct {
	FirstPiece  *Piece `json:"firstPiece,omitempty"`
	SecondPiece *Piece `json:"secondPiece,omitempty"`
	ThirdPiece  *Piece `json:"thirdPiece,omitempty"`
}

// GetResult is an `http.HandleFunc` that GET's `Result`s
func GetResult(w http.ResponseWriter, r *http.Request) {
	// Parse and validate inputs...

	// getResult will stop immediately if the http.Request is canceled
	var result Result
	if err := <-getResult(r.Context(), &result); err != nil {
		w.Write([]byte(err.Error()))
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	// Marshal the response
	bs, err := json.Marshal(&result)
	if err != nil {
		w.Write([]byte(err.Error()))
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	// Success!
	w.Write(bs)
	w.WriteHeader(http.StatusOK)
}

// getResult returns the result of many concurrent API calls
func getResult(ctx context.Context, result *Result) <-chan error {
	out := make(chan error)
	go func() {
		// Correct memory management
		defer close(out)

		// The cancel func will allow us to stop all pending requests if one
		// fails
		ctx, cancel := context.WithCancel(ctx)

		// Merge allows us to recieve the all of errors returned from all of
		// the calls to `getPieces` in a single `<-chan error`.
		// If no errors are returned, Merge will wait until all of the 
		// `<-chan error`s close before proceeding
		for err := range util.Merge(
			getPiece(ctx, 1, result.FirstPiece),
			getPiece(ctx, 2, result.SecondPiece),
			getPiece(ctx, 3, result.ThirdPiece),
		) {
			if err != nil {

				// Cancel all pending requests
				cancel()

				// Surface the error to the caller
				out <- err
				return
			}
		}
	}()
	return out
}

The Merge Function

Step 3 is to implement a single ‘fan-in’ func. Even if you’re very familiar with go, this is probably the most complicated and error prone piece of the puzzle. I recommend copy/pasting this code directly into your own util package or just using the one in Delivery Hero’s pipeline package.

If you’re really curious about how or why this works, this func is based on an excellent article written about pipelines in 2014 on the go blog.

package util

import (
	"sync"
)

// Merge fans multiple error channels in to a single error channel
func Merge(errChans ...<-chan error) <-chan error {
	mergedChan := make(chan error)

	// Create a WaitGroup that waits for all of the errChans to close
	var wg sync.WaitGroup
	wg.Add(len(errChans))
	go func() {
		// When all of the errChans are closed, close the mergedChan
		wg.Wait()
		close(mergedChan)
	}()

	for i := range errChans {
		go func(errChan <-chan error) {
			// Wait for each errChan to close
			for err := range errChan {
				if err != nil {
					// Fan the contents of each errChan into the mergedChan
					mergedChan <- err
				}
			}
			// Tell the WaitGroup that one of the errChans is closed
			wg.Done()
		}(errChans[i])
	}

	return mergedChan
}

In Summary

There are many ways to approach concurrency in Go. In my experience, this has been a clear and efficient way of approaching concurrency when building an API that keeps my code organized and minimizes memory management mistakes. 

I hope you found this to be a useful and pragmatic illustration of concurrency in Golang. I also hope that it saves you some time and headaches piecing this information together for yourself.

Happy Coding!


Thank you Mark! If you are interested in reading more of Mark’s content, take a look at his Medium account.

As always, we are still hiring, so check out some of our latest openings, or join our Talent Community to stay up to date with what’s going on at Delivery Hero and receive customized job alerts!