20210302のGoに関する記事は3件です。

Writing Large Data to CSV File in Go

Preface

One of the projects I'm handling at work requires reading a lot of data from internal API then writing the results to a CSV file. Since this will be used as an AWS Lambda function, I chose to use Go as the go-to language (pun intended, not sorry ?).

The constraints:
1. Overall process must finish below Lambda max execution time, which is currently 15 minutes.
2. Data count under one million records.
3. No need to care about the order of writing to CSV.

How shall we handle this if we want to achieve maximum efficiency?

? PSA: The link to the entire code gist is at the end of this article

Sequentially

First, let's try the most common approach. We process the API calls sequentially then write line by line to CSV file. Most of the time, sequential is good enough if we don't have complex requirements. KISS applies as always.

We can easily do like:

type data struct {
    ID          string
    Name        string
    Description string
}

func testSequential(records int, w *csv.Writer) {
    allData := make([]data, records)
    for i := 0; i < records; i++ {
        // Wait randomized 0~100ms to simulate API call
        time.Sleep(getRandomSleepTime(100))
        // Assign dummy data
        allData[i] = getDummyData(i)
        fmt.Printf("[Write " + allData[i].Name + "] ")
    }
    // Write CSV body
    writeCSVBody(allData, w)
}

This is very straightforward. We call API and write data to file in one go. However, with this operation, the total execution time is linear to the number of API calls. We mock each API call as 10ms, so if we have 100 API calls, we have 1s execution time. For a small number of API calls, this is still OK. But what if we have 100,000 records? Or millions?

Note: Of course, in practice, it does not make sense to have 100,000 API calls in a short burst of time in production. We'd probably chunk the data to something like 5000 records per API call. In this example, for argument's sake, we'll pretend a data record is equal to an API call.

Look at the table below. Even to process a mere list of 10,000 records, we spend a whopping 8 minutes.

No. of API calls Total time
10 516.792ms
100 5.183s
1,000 50.207s
10,000 8m21.169s
100,000 1 hour++
1,000,000 eternity?

Surely there is a better way to do this.

Concurrently

Enter concurrency.

Concurrency is the ability of different parts or units of a program, algorithm, or problem to be executed out-of-order or in a partial order, without affecting the outcome.

Go has one of the best concurrency handling baked into its core, but understanding how it handles concurrency is not as easy as it seems. When using concurrency in writing to a file, we should be careful to avoid the so-called race condition because something weird might happen if we neglect it. We may be lucky and get the right amount of lines, or we may be missing some lines in the file caused by a thread overwriting the previous lines. It is unpredictable.

Note: To understand concurrency, we need to understand first what it is goroutine. I suggest to try visiting the link here, here or here.

To solve this, there are two concurrency options that we can try here.

1. Use WaitGroups

Simply put, a WaitGroup is a mechanism to control multiple goroutines by making the goroutines wait. We could assign a WaitGroup to wait for all goroutines to finish their processes before moving on to the next process.

We can write like below. For each iteration of the loop, we spawn a goroutine to call API concurrently. And then make them wait like a well-behaved toddler before writing to CSV file.

// Previously defined struct goes here
// struct

func testConcurrencyWaitGroup(records int, w *csv.Writer) {
    allData := make([]data, records)
    // Define WaitGroup
    var wg sync.WaitGroup
    wg.Add(records)
    for i := 0; i < records; i++ {
        go func(i int) {
            // Mark gotoutine as as finished when data is assigned
            defer wg.Done()
            // Wait randomized 0~100ms to simulate API call
            time.Sleep(getRandomSleepTime(100))
            // Assign dummy data
            allData[i] = getDummyData(i)
            fmt.Printf("[Write " + allData[i].Name + "] ")
        }(i)
    }
    // Make WaitGroup wait for all goroutines to finish
    wg.Wait()
    // Write CSV body
    writeCSVBody(allData, w)
}

Total time
As you can see, from the numbers alone implementing concurrency gives you a much, much faster total execution time.

No. of API calls Total time
10 100.186ms
100 103.523ms
1,000 167.081ms
10,000 752.808ms
100,000 13.567s
1,000,000 theoretically fast ⚡

Side effect
Using WaitGroup in the above fashion will give you ordered CSV lines as per the original data obtained from API. This is the method you can choose if you care about order. You can see the result from concurrency.csv below:

...
91,Name91,Desc
92,Name92,Desc
93,Name93,Desc
94,Name94,Desc
...

Notes

  • While the above example works, it actually violates one of Go's principles.

Do not communicate by sharing memory. Instead, share memory by communicating.

Each goroutine shares the same memory array []data. As we spawn more and more goroutines, there's no guarantee that the program will behave exactly as we want it to. Tread carefully.

  • In addition, we simply allocate a goroutine to handle an API call, so it's no surprise that we got the error below if we try to make a million API calls. So take memory usage seriously into consideration if we wish to employ concurrency in production. Make sure to optimize the process and test vigorously.
fatal error: runtime: out of memory
fatal error: runtime: cannot allocate memory

If we wish to follow the above Go principle we can...

2. Use Channel

Just as the name implies, Channel is a mechanism in Go to send and receive data between goroutines throughout pipe or channel. We can use Channel to avoid the aforementioned race-condition by ensuring that the process is 'blocked' during data-sending (API calls) and data-sending (writing to CSV). In this way, we are promoting safety in processing our data.

An excellent article called 'The Nature Of Channels In Go' explains this concept visually. Do read that to understand more about Channels.

We can use Channel like below

// Previously defined struct goes here
// struct

func testConcurrencyChannel(records int, w *csv.Writer) {
    // Define buffered channel
    ch := make(chan data, records)
    done := make(chan bool)
    // Close channel only if sending is finished
    defer close(ch)
    for i := 0; i < records; i++ {
        go func(i int) {
            // Wait randomized 0~100ms to simulate API call
            time.Sleep(getRandomSleepTime(100))
            // Send data to channel
            ch <- getDummyData(i)
            fmt.Printf("[Write " + getDummyData(i).Name + "] ")
        }(i)
    }
    // Write CSV body
    go writeCSVBodyWithChannel(ch, done, records, w)
    // Notify main goroutine process is finished
    <-done
}

Note: It's ok to leave the channel open without explicit close as shown here

We need to make sure we send a finished notification to done channel inside the function to write the CSV body. Otherwise, the program will not know when to continue after writing.

func writeCSVBodyWithChannel(ch chan data, done chan bool, records int, w *csv.Writer) {
    // Write data from channel to CSV
    for data := range ch {
        // Write to CSV here
        records--
        // Check if all records are processed, if yes then notify channel
        if records == 0 {
            done <- true
        }
    }
}

Total time
Using Channel will give you similar results to WaitGroup.

No. of API calls Total time
10 91.547ms
100 106.366ms
1,000 169.152ms
10,000 751.443ms
100,000 12.369s
1,000,000 theoretically fast ⚡

Side effect
Using Channel in the above fashion will give you unordered CSV lines. The Channel will receive and process data as soon as data is sent to the queue, so there is no way to guarantee the order. This is the method you can choose if you care NOT about the order, which is exactly what my project needs.

...
35,Name35,Desc
66,Name66,Desc
12,Name12,Desc
13,Name13,Desc
...

Notes

  • Using Channel is a way to share memory by communicating, thus following the Go principle.

  • Similar to WaitGroup, a million API calls will result in an out of memory error, so take precaution.

Closing

This was supposed to be a short reading, but I had fun writing the different approaches and explaining the workings. Writing large data to a file is a recurrent requirement in any software engineering project, so it's always good to know the options/best practices to do it right in the respective programming language of choice. ?

You can find the full code gist below:

main.go

Further Reading

  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

Goでジェネレーターみたく挿入ソートを実装する

package main

import "fmt"

func main() {
    f := insertionSort([]int{5, 2, 4, 6, 1, 3})
    for {
        if arr := f(); len(arr) == 0 {
            break
        } else {
            fmt.Println(arr)
        }
    }
}

func insertionSort(arr []int) func() []int {
    _arr := append([]int{}, arr...)
    return func() []int {
        for i := 0; i < len(_arr); i++ {
            for j := i - 1; j >= 0 && _arr[j] > _arr[i]; j-- {
                _arr[j], _arr[j+1] = _arr[j+1], _arr[j]
                return _arr
            }
        }
        return []int{}
    }
}

output
[2 5 4 6 1 3]
[2 4 5 6 1 3]
[2 4 5 1 6 3]
[2 4 1 5 6 3]
[2 1 4 5 6 3]
[1 2 4 5 6 3]
[1 2 4 5 3 6]
[1 2 4 3 5 6]
[1 2 3 4 5 6]
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む

【Go 初心者向け】Concurrency で知っておく必要のある 4 つのこと

知っておくべき 4 つのこととは?

  1. Goroutine
  2. Channel
  3. Select
  4. Sync パッケージ

です。

詳しく見ていきましょ。

1. Goroutine

関数やメソッドの前に go を付けると、main() とは違う場所(=スレッド)で処理される

go fmt.Println("別の場所で実行")
go SomeFunction()

main() が終了すると、Goroutine も終了するので、これから説明する方法で「待つ」必要がある。

2. Channel

Goroutine でデータをやり取りをやりとりする仕組み

  • 定義: c := make(chan int)
    • 送信専用も作れる (chan から出ていく矢印): var receveOnly <-chan int
    • 受信専用も作れる (chan に入っていく矢印): var sendOnly chan<- int
  • データ送る: c <- 3
  • データ受け取る: val := <-c
    • データを受け取る準備ができていても、送られてこないと、送られてくるまで待つ
    • ちなみに: 矢印(<-)と c の間にスペースがないのが正しいフォーマット。あっても動作するけど。。
func main() {
    ch := make(chan int)

    fmt.Println("Goroutine 実行するよ!")
    go func() {
        ch <- 42
    }()
    fmt.Println("Goroutine 実行したよ!")

    time.Sleep(2 * time.Second)
    recieved := <-ch
    fmt.Println(recieved)
}

すぐに受け取れなくても、channel を通して送られたデータはその後の処理でちゃんと受け取れる

3. Select

  • 先に実行 (例: 送る、受け取る) できるものを処理する。
  • 何も実行できる状態でなければ、default が実行される。
  • default は書かなくてもよく、もし無ければ、何かが実行できるまで待つ
func main() {

    c1 := make(chan int)
    c2 := make(chan int)

    go func() {
        // 何かしらの処理後に c1 にデータを送る
        time.Sleep(1 * time.Second)
        c1 <- 10

        // 何かしらの処理後に c2 にデータを送る
        time.Sleep(1 * time.Second)
        c2 <- 20
    }()

    // Goroutine を使い無限ループで待機
    go func() {
        for {
            time.Sleep(300 * time.Millisecond)
            select {
            case v1 := <-c1:
                fmt.Printf("recieved %v from c1\n", v1)
            case v2 := <-c2:
                fmt.Printf("recieved %v from c2\n", v2)
            default:
                fmt.Printf("Not Ready !!\n")
            }
        }
    }()

    // 無限ループを 3 秒間だけ回すため、main() で sleep する
    time.Sleep(3 * time.Second)
}
  • time.After(duration) でタイマー的なこともできる
func main() {

    c1 := make(chan int)
    c2 := make(chan int)

    go func() {
        // 何かしらの処理後に c1 にデータを送る
        time.Sleep(1 * time.Second)
        c1 <- 10

        // 何かしらの処理後に c2 にデータを送る
        time.Sleep(1 * time.Second)
        c2 <- 20
    }()

    // time.After を使っているので、main() で無限ループしても、指定時間後に終了する
    for {
        time.Sleep(300 * time.Millisecond)
        select {
        case v1 := <-c1:
            fmt.Printf("recieved %v from c1\n", v1)
        case v2 := <-c2:
            fmt.Printf("recieved %v from c2\n", v2)
        case <-time.After(3 * time.Second):
            fmt.Println("timed out !!")
            return
        }
    }
}

4. Sync パッケージ

  • 定義: var wg sync.WaitGroup
  • 追加: wg.Add(1)
  • 終了: wg.Done()
  • wg.ADD した分 wg.Done するまで、待つ: wg.Wait()
func main() {
    // 定義
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        // 追加
        wg.Add(1)

        go func(c int) {
            // 処理が終わったら終了
            defer wg.Done()

            // 長い処理をしてると仮定
            time.Sleep(2 * time.Second)
            fmt.Println(c)
        }(i)
    }

    // 全て Done するまで待つ
    wg.Wait()
    fmt.Println("Closing...")
}
  • このエントリーをはてなブックマークに追加
  • Qiitaで続きを読む