- 投稿日:2021-03-02T11:26:55+09:00
Writing Large Data to CSV File in Go
Preface
One of the projects I'm handling at work requires reading a lot of data from internal API then writing the results to a CSV file. Since this will be used as an AWS Lambda function, I chose to use Go as the go-to language (pun intended, not sorry ?).
The constraints:
1. Overall process must finish below Lambda max execution time, which is currently 15 minutes.
2. Data count under one million records.
3. No need to care about the order of writing to CSV.How shall we handle this if we want to achieve maximum efficiency?
? PSA: The link to the entire code gist is at the end of this article
Sequentially
First, let's try the most common approach. We process the API calls sequentially then write line by line to CSV file. Most of the time, sequential is good enough if we don't have complex requirements. KISS applies as always.
We can easily do like:
type data struct { ID string Name string Description string } func testSequential(records int, w *csv.Writer) { allData := make([]data, records) for i := 0; i < records; i++ { // Wait randomized 0~100ms to simulate API call time.Sleep(getRandomSleepTime(100)) // Assign dummy data allData[i] = getDummyData(i) fmt.Printf("[Write " + allData[i].Name + "] ") } // Write CSV body writeCSVBody(allData, w) }This is very straightforward. We call API and write data to file in one go. However, with this operation, the total execution time is linear to the number of API calls. We mock each API call as 10ms, so if we have 100 API calls, we have 1s execution time. For a small number of API calls, this is still OK. But what if we have 100,000 records? Or millions?
Note: Of course, in practice, it does not make sense to have 100,000 API calls in a short burst of time in production. We'd probably chunk the data to something like 5000 records per API call. In this example, for argument's sake, we'll pretend a data record is equal to an API call.
Look at the table below. Even to process a mere list of 10,000 records, we spend a whopping 8 minutes.
No. of API calls Total time 10 516.792ms 100 5.183s 1,000 50.207s 10,000 8m21.169s 100,000 1 hour++ 1,000,000 eternity? Surely there is a better way to do this.
Concurrently
Enter concurrency.
Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in a partial order, without affecting the outcome.
Go has one of the best concurrency handling baked into its core, but understanding how it handles concurrency is not as easy as it seems. When using concurrency in writing to a file, we should be careful to avoid the so-called race condition because something weird might happen if we neglect it. We may be lucky and get the right amount of lines, or we may be missing some lines in the file caused by a thread overwriting the previous lines. It is unpredictable.
Note: To understand concurrency, we need to understand first what it is goroutine. I suggest to try visiting the link here, here or here.
To solve this, there are two concurrency options that we can try here.
1. Use WaitGroups
Simply put, a WaitGroup is a mechanism to control multiple goroutines by making the goroutines wait. We could assign a WaitGroup to wait for all goroutines to finish their processes before moving on to the next process.
We can write like below. For each iteration of the loop, we spawn a goroutine to call API concurrently. And then make them wait like a well-behaved toddler before writing to CSV file.
// Previously defined struct goes here // struct func testConcurrencyWaitGroup(records int, w *csv.Writer) { allData := make([]data, records) // Define WaitGroup var wg sync.WaitGroup wg.Add(records) for i := 0; i < records; i++ { go func(i int) { // Mark gotoutine as as finished when data is assigned defer wg.Done() // Wait randomized 0~100ms to simulate API call time.Sleep(getRandomSleepTime(100)) // Assign dummy data allData[i] = getDummyData(i) fmt.Printf("[Write " + allData[i].Name + "] ") }(i) } // Make WaitGroup wait for all goroutines to finish wg.Wait() // Write CSV body writeCSVBody(allData, w) }Total time
As you can see, from the numbers alone implementing concurrency gives you a much, much faster total execution time.
No. of API calls Total time 10 100.186ms 100 103.523ms 1,000 167.081ms 10,000 752.808ms 100,000 13.567s 1,000,000 theoretically fast ⚡ Side effect
Using WaitGroup in the above fashion will give you ordered CSV lines as per the original data obtained from API. This is the method you can choose if you care about order. You can see the result fromconcurrency.csv
below:... 91,Name91,Desc 92,Name92,Desc 93,Name93,Desc 94,Name94,Desc ...Notes
- While the above example works, it actually violates one of Go's principles.
Do not communicate by sharing memory. Instead, share memory by communicating.
Each goroutine shares the same memory array
[]data
. As we spawn more and more goroutines, there's no guarantee that the program will behave exactly as we want it to. Tread carefully.
- In addition, we simply allocate a goroutine to handle an API call, so it's no surprise that we got the error below if we try to make a million API calls. So take memory usage seriously into consideration if we wish to employ concurrency in production. Make sure to optimize the process and test vigorously.
fatal error: runtime: out of memory fatal error: runtime: cannot allocate memoryIf we wish to follow the above Go principle we can...
2. Use Channel
Just as the name implies, Channel is a mechanism in Go to send and receive data between goroutines throughout pipe or channel. We can use Channel to avoid the aforementioned race-condition by ensuring that the process is 'blocked' during data-sending (API calls) and data-sending (writing to CSV). In this way, we are promoting safety in processing our data.
An excellent article called 'The Nature Of Channels In Go' explains this concept visually. Do read that to understand more about Channels.
We can use Channel like below
// Previously defined struct goes here // struct func testConcurrencyChannel(records int, w *csv.Writer) { // Define buffered channel ch := make(chan data, records) done := make(chan bool) // Close channel only if sending is finished defer close(ch) for i := 0; i < records; i++ { go func(i int) { // Wait randomized 0~100ms to simulate API call time.Sleep(getRandomSleepTime(100)) // Send data to channel ch <- getDummyData(i) fmt.Printf("[Write " + getDummyData(i).Name + "] ") }(i) } // Write CSV body go writeCSVBodyWithChannel(ch, done, records, w) // Notify main goroutine process is finished <-done }Note: It's ok to leave the channel open without explicit close as shown here
We need to make sure we send a
finished
notification todone
channel inside the function to write the CSV body. Otherwise, the program will not know when to continue after writing.func writeCSVBodyWithChannel(ch chan data, done chan bool, records int, w *csv.Writer) { // Write data from channel to CSV for data := range ch { // Write to CSV here records-- // Check if all records are processed, if yes then notify channel if records == 0 { done <- true } } }Total time
Using Channel will give you similar results to WaitGroup.
No. of API calls Total time 10 91.547ms 100 106.366ms 1,000 169.152ms 10,000 751.443ms 100,000 12.369s 1,000,000 theoretically fast ⚡ Side effect
Using Channel in the above fashion will give you unordered CSV lines. The Channel will receive and process data as soon as data is sent to the queue, so there is no way to guarantee the order. This is the method you can choose if you care NOT about the order, which is exactly what my project needs.... 35,Name35,Desc 66,Name66,Desc 12,Name12,Desc 13,Name13,Desc ...Notes
Using Channel is a way to share memory by communicating, thus following the Go principle.
Similar to WaitGroup, a million API calls will result in an
out of memory
error, so take precaution.Closing
This was supposed to be a short reading, but I had fun writing the different approaches and explaining the workings. Writing large data to a file is a recurrent requirement in any software engineering project, so it's always good to know the options/best practices to do it right in the respective programming language of choice. ?
You can find the full code gist below:
Further Reading
- 投稿日:2021-03-02T10:01:43+09:00
Goでジェネレーターみたく挿入ソートを実装する
package main import "fmt" func main() { f := insertionSort([]int{5, 2, 4, 6, 1, 3}) for { if arr := f(); len(arr) == 0 { break } else { fmt.Println(arr) } } } func insertionSort(arr []int) func() []int { _arr := append([]int{}, arr...) return func() []int { for i := 0; i < len(_arr); i++ { for j := i - 1; j >= 0 && _arr[j] > _arr[i]; j-- { _arr[j], _arr[j+1] = _arr[j+1], _arr[j] return _arr } } return []int{} } }output[2 5 4 6 1 3] [2 4 5 6 1 3] [2 4 5 1 6 3] [2 4 1 5 6 3] [2 1 4 5 6 3] [1 2 4 5 6 3] [1 2 4 5 3 6] [1 2 4 3 5 6] [1 2 3 4 5 6]
- 投稿日:2021-03-02T04:13:04+09:00
【Go 初心者向け】Concurrency で知っておく必要のある 4 つのこと
知っておくべき 4 つのこととは?
- Goroutine
- Channel
- Select
- Sync パッケージ
です。
詳しく見ていきましょ。
1. Goroutine
関数やメソッドの前に
go
を付けると、main()
とは違う場所(=スレッド)で処理されるgo fmt.Println("別の場所で実行") go SomeFunction()
main()
が終了すると、Goroutine も終了するので、これから説明する方法で「待つ」必要がある。2. Channel
Goroutine でデータをやり取りをやりとりする仕組み
- 定義:
c := make(chan int)
- 送信専用も作れる (chan から出ていく矢印):
var receveOnly <-chan int
- 受信専用も作れる (chan に入っていく矢印):
var sendOnly chan<- int
- データ送る:
c <- 3
- データ受け取る:
val := <-c
- データを受け取る準備ができていても、送られてこないと、送られてくるまで待つ
- ちなみに: 矢印(
<-
)とc
の間にスペースがないのが正しいフォーマット。あっても動作するけど。。func main() { ch := make(chan int) fmt.Println("Goroutine 実行するよ!") go func() { ch <- 42 }() fmt.Println("Goroutine 実行したよ!") time.Sleep(2 * time.Second) recieved := <-ch fmt.Println(recieved) }すぐに受け取れなくても、channel を通して送られたデータはその後の処理でちゃんと受け取れる
3. Select
- 先に実行 (例: 送る、受け取る) できるものを処理する。
- 何も実行できる状態でなければ、
default
が実行される。default
は書かなくてもよく、もし無ければ、何かが実行できるまで待つfunc main() { c1 := make(chan int) c2 := make(chan int) go func() { // 何かしらの処理後に c1 にデータを送る time.Sleep(1 * time.Second) c1 <- 10 // 何かしらの処理後に c2 にデータを送る time.Sleep(1 * time.Second) c2 <- 20 }() // Goroutine を使い無限ループで待機 go func() { for { time.Sleep(300 * time.Millisecond) select { case v1 := <-c1: fmt.Printf("recieved %v from c1\n", v1) case v2 := <-c2: fmt.Printf("recieved %v from c2\n", v2) default: fmt.Printf("Not Ready !!\n") } } }() // 無限ループを 3 秒間だけ回すため、main() で sleep する time.Sleep(3 * time.Second) }
time.After(duration)
でタイマー的なこともできるfunc main() { c1 := make(chan int) c2 := make(chan int) go func() { // 何かしらの処理後に c1 にデータを送る time.Sleep(1 * time.Second) c1 <- 10 // 何かしらの処理後に c2 にデータを送る time.Sleep(1 * time.Second) c2 <- 20 }() // time.After を使っているので、main() で無限ループしても、指定時間後に終了する for { time.Sleep(300 * time.Millisecond) select { case v1 := <-c1: fmt.Printf("recieved %v from c1\n", v1) case v2 := <-c2: fmt.Printf("recieved %v from c2\n", v2) case <-time.After(3 * time.Second): fmt.Println("timed out !!") return } } }4. Sync パッケージ
- 定義:
var wg sync.WaitGroup
- 追加:
wg.Add(1)
- 終了:
wg.Done()
wg.ADD
した分wg.Done
するまで、待つ:wg.Wait()
func main() { // 定義 var wg sync.WaitGroup for i := 0; i < 5; i++ { // 追加 wg.Add(1) go func(c int) { // 処理が終わったら終了 defer wg.Done() // 長い処理をしてると仮定 time.Sleep(2 * time.Second) fmt.Println(c) }(i) } // 全て Done するまで待つ wg.Wait() fmt.Println("Closing...") }