Limiting Goroutines with a Semaphore

Boston Cartwright
Boston Cartwright | April 17, 2021 | golang
6 min read | ––– views
Photo by iam_os on Unsplash
Photo by iam_os on Unsplash

Only run a number of goroutines at once

Want to check this out later?

Preface

Concurrency is one of Go's strong points and I love working with the paradigm that the Go team has built.

It is a big topic with lots to talk about. I recommend reading through the Effective Go documentation about concurrency in Go to learn about goroutines, channels, and how they all work together.

Introduction

A few weeks ago I was working on a micro service that needed to do processing work on large documents (sometimes hundreds or thousands of pages). This included splitting the document into individual pages and doing some work on each page.

In order to speed up the work, I would spawn a goroutine for each page and do it's work concurrently with the others.

This worked great while I was testing with small documents (1-10 pages). However, when I started testing with large documents (500 pages and more), my service started to hang.

Having lots of debug logging to describe which part of the service was being worked, I noticed that no work was being done suddenly! After waiting a few minutes, turns out that the service didn't stop, just slowed down to the pace of a snail 🐌.

DMV clip from the movie Zootopia

When I checked my activity monitor, my CPU usage was normal, but my memory usage was bleeding over onto my disk! After just a few minutes of work, my service was attempting to use 45 gigabytes of memory, and it kept increasing.

Clearly processing all the pages at once was not a great solution for this. When processing them sequentially, it took about three minutes. Three minutes is better than not completing, but it isn't great. My next thought was how I could limit the number of goroutines running at a time, so that we didn't hit a memory issue but still benefited from the concurrent work.

Semaphore

This brought me to implementing a semaphore *

. Fortunately, Go has a semaphore implementation included in their x packages.

The x stands for external.

These packages are a part of the Go project but are outside the main Go tree.

They provide many useful utilities and have some separate requirements than the standard library.

Read more about them here.

Creating a semaphore is straightforward with this package, let's take a look.

First, we need a context. For our example, we will use a TODO context:

ctx := context.TODO()

Then, we create our semaphore:

var (
	maxWorkers = 5 // max goroutines running at one time
	sem = semaphore.NewWeighted(int64(maxWorkers)) // semaphore
)

Next, when we start our goroutines, we first wait to see if we can acquire a worker:

// loop through slice of tasks to be completed, by index
for i := range tasks {
	// Acquire will block if there are already maxWorkers goroutines running,
	// until one is released.
	if err := sem.Acquire(ctx, 1); err != nil {
		log.Printf("Failed to acquire semaphore: %v", err)
		break
	}

	go func(i int) {
		// be sure to release the worker when the work is done!
		defer sem.Release(1)
		doLongTask(i)
	}(i)
}

Don't forget to wait for all of the goroutines to finish!

// wait for all the goroutines to finish by acquiring our worker limit
// notice this is similar to attempting to acquire a semaphore in the loop
// the difference being we are attempting to acquire all of them
// thus, it blocks until there are no more running
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
	log.Printf("Failed to acquire semaphore: %v", err)
}

That's it! We set up some concurrent work but had it limit at 5 goroutines at a time.

Take a look at the full example implementation below.

Results

Implementing a semaphore in my document processing service reduced the time from running a large document sequentially at three minutes, to less than one minute!

I think there is still some work to be done to determine the best limit of goroutines, but we still saw great results, even up to 3 times faster!

What to do think? Let me know @bstncartwright

Full Implementation

Here is the full implementation for the example above:

package main

import (
	"context"
	"fmt"
	"time"

	"golang.org/x/sync/semaphore"
)

func main() {
	fmt.Println("Semaphore example starting!")

	var (
		ctx        = context.TODO()
		maxWorkers = 5                                        // max goroutines running at one time
		sem        = semaphore.NewWeighted(int64(maxWorkers)) // semaphore
		tasks      = make([]int, 100)
	)

	// loop through slice of tasks to be completed, by index
	for i := range tasks {
		// Acquire will block if there are already maxWorkers goroutines running,
		// until one is released.
		if err := sem.Acquire(ctx, 1); err != nil {
			fmt.Printf("Failed to acquire semaphore: %v", err)
			break
		}

		go func(i int) {
			// be sure to release the worker when the work is done!
			defer sem.Release(1)
			doLongTask(i)
		}(i)
	}

	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
		fmt.Printf("Failed to acquire semaphore: %v", err)
	}

	fmt.Println("Completed all tasks!")
}

func doLongTask(i int) {
	fmt.Println("doing long work")
  // wait some time, simulate IO block or something
	time.Sleep(3 * time.Second)
	fmt.Printf("completed long task %d\n", i)
}