Please purchase the course to watch this video.

Full Course
The lesson delves into the efficient handling of data movement in Go, particularly through the IO package. It discusses utilizing the IO.pipe
function, which establishes a synchronous in-memory connection between reading and writing operations, thus eliminating redundant memory usage when processing files. The implementation of concurrency becomes crucial, as writing to a pipe blocks until it is read from, leading to potential deadlocks if not managed correctly. The lesson explores two synchronization methods: using a wait group and utilizing channels, with the latter being highlighted as more idiomatic in Go. This approach enables better state management and clearer code structure while ensuring accurate data flow between concurrent tasks. Further simplifications and alternatives within the IO package are promised for future exploration.
No links available for this lesson.
As we saw in the last lesson, the io
package provides a number of other functions we can use when it comes to moving data.
In this case, we're using the io.TeeReader
to create multiple readers from the same source, allowing our three functions to have an individual reader when it comes to calculating their various attributes. However, because we're writing to a bytes.Buffer
in order to achieve this, this is causing our memory to actually triple in size for each file that we read.
Using io.Pipe
Fortunately, the io
package provides another option that we can use to write to — one that won't cause all of the file's contents to be written to memory twice. This is the io.Pipe
function, which creates a synchronous in-memory pipe that can be used to connect code expecting an io.Reader
with code expecting an io.Writer
.
pr, pw := io.Pipe()
This returns two values:
- A pipe reader (
io.Reader
) - A pipe writer (
io.Writer
)
This means we can pass pw
into our io.TeeReader
, allowing us to have a synchronous pipe. When we read from the original reader, the pipe writer (pw
) will be written to, which means the data will be available to the pipe reader (pr
), and we can get rid of our bytes.Buffer
.
Deadlock and the Need for Concurrency
With our pipe in hand, let's go ahead and see how it actually works.
go run main.go words.txt
You'll see that we actually receive a panic:
all goroutines are asleep - deadlock!
This is because io.Pipe
is very similar to how channels work — writing to a pipe will block until it's been read from. Therefore, we need to use concurrency to make this work.
Using Goroutines and WaitGroup
Let's go ahead and make sure that our individual count functions are wrapped inside their own goroutines. Now, however, we have to consider how we can perform synchronisation to ensure that:
- Each goroutine completes before we return a count
- We can pass the return value back to the main goroutine
We can use a sync.WaitGroup
as follows:
var wg sync.WaitGroup
wg.Add(3)
var byteCount, wordsCount, linesCount int
go func() {
defer wg.Done()
byteCount = countBytes(...)
}()
go func() {
defer wg.Done()
wordsCount = countWords(...)
}()
go func() {
defer wg.Done()
linesCount = countLines(...)
}()
wg.Wait()
However, this does not work yet. We need to close the pipe writer once we're done reading from it:
defer pw.Close()
Now, everything should work correctly when we run the program.
Race Conditions with Buffers
However, when we run the code again, we might notice that we're no longer getting any lines. This is because buff2
(our bytes.Buffer
) has no data by the time it's being run in the following goroutine.
To test this, if we remove that goroutine, reduce the wait group to 2
, and move up wg.Wait()
, you’ll see that we do get the lines again. This is just a race condition caused by concurrency.
Solution: Multiple Pipes
To solve this, we can use another pipe:
p1r, p1w := io.Pipe()
p2r, p2w := io.Pipe()
We then wire them up accordingly — TeeReader
can write to p1w
, and p2w
can take input from p1r
, etc. Make sure to close the correct pipes in the correct goroutines.
After making these changes, running the code now gives us the correct output again. To be sure, we can run our tests:
go test
✅ All tests pass!
Switching from WaitGroups to Channels
While using a WaitGroup
works, we're mutating shared state inside a goroutine, which isn't ideal.
Let’s switch to using channels instead.
Creating Channels
chBytes := make(chan int)
chWords := make(chan int)
chLines := make(chan int)
Writing to Channels in Goroutines
go func() {
defer close(chBytes)
chBytes <- countBytes(...)
}()
go func() {
defer close(chWords)
chWords <- countWords(...)
}()
go func() {
defer close(chLines)
chLines <- countLines(...)
}()
Option 1: Using select
and for
for chBytes != nil || chWords != nil || chLines != nil {
select {
case b := <-chBytes:
byteCount = b
chBytes = nil
case w := <-chWords:
wordsCount = w
chWords = nil
case l := <-chLines:
linesCount = l
chLines = nil
}
}
But this gets repetitive and harder to reason about when adding more channels.
Option 2: Synchronous Receives (Recommended)
We can simplify the code by just waiting on each channel synchronously:
byteCount := <-chBytes
wordsCount := <-chWords
linesCount := <-chLines
This works perfectly since we know each channel will send exactly one value. Neat!
Summary
- We switched from
bytes.Buffer
toio.Pipe
to avoid memory duplication. - We used concurrency and goroutines to handle blocking reads/writes with pipes.
- We explored both
sync.WaitGroup
and channels for synchronisation. - Using channels avoids mutating shared state and is generally more idiomatic in Go.
- Whether to use WaitGroups or channels is mostly personal preference.
Personally, I find using multiple
TeeReader
s withio.Pipe
a bit hard to reason about, especially when chaining them. Fortunately, theio
package offers even more tools to make this easier — we’ll explore that in the next lesson.