イスタンブール行きたい

たまに書きたくなります

【Go】並行処理の「拘束」で安全に並行処理を扱う

オライリー「Go言語による並行処理」を参考に、 実務で使ってみてなるほどと思った内容です。 今日初めて並行処理を書いたのでまだまだ浅い理解ですがメモとしてまとめてみます。

拘束とは

拘束は、チャネルの読み書きを制限することにより、安全にチャネルを扱えるようにするという考え方です。 チャネルの扱いには注意が必要で、例えばチャネルに対する不適切な書き込みや、 チャネルを閉じる作業の漏れや重複により、デッドロックやpanicが起きてしまう可能性があります。 そうしたことを防ぐために、並行処理を関数にまとめてチャネルに対する権限を制限し、 関数の呼び出し側が安全に並行処理を扱えるようにするという考え方です。

コード

「拘束」を使わない場合

package main

import (
    "fmt"
    "time"
)

type processAResult struct {
    Message string
    Error   error
}

type processBResult struct {
    Message string
    Error   error
}

func main() {
    start := time.Now()
 
    channelA := make(chan processAResult, 1) // 関数の外でchannelを定義
    go func() {
        time.Sleep(time.Second * 5)
        channelA <- processAResult{
            Message: "AAAAA",
            Error:   nil,
        }
    }()

    channelB := make(chan processBResult, 1)
    go func() {
        time.Sleep(time.Second * 3)
        channelB <- processBResult{
            Message: "BBBBB",
            Error:   nil,
        }
    }()

    // 関数の外でchannelの受信
    resultA := <-channelA
    resultB := <-channelB
  
    // channelを閉じる
    close(channelA)
    close(channelB)

    if resultA.Error != nil {
        fmt.Println(resultA.Error.Error())
    }
    fmt.Println(resultA.Message)
 
    if resultA.Error != nil {
        fmt.Println(resultB.Error.Error())
    }
    fmt.Println(resultB.Message)
 
    fmt.Println(time.Since(start))
}

ちょっと誇張した書き方かもしれませんが、関数の外側でチャネルを定義し、 関数の呼び出し側が自分でchannelを受け取って、自分でchannelを受け取らなければなりません。 デッドロックやpanicなど思わぬ結果になりやすい構造と言えます。 次に、「拘束」を使ってみます。

「拘束」を使った場合

package main

import (
    "fmt"
    "time"
)

type processAResult struct {
    Message string
    Error   error
}

type processBResult struct {
    Message string
    Error   error
}

func main() {
    start := time.Now()
 
    processA := func() <-chan processAResult { // 読み込み専用のチャネルを返す
        channelA := make(chan processAResult, 1)  // チャネルの初期化
        go func() {
            defer close(channelA)  // チャネルを閉じる
            time.Sleep(time.Second * 5)
            channelA <- processAResult{
                Message: "AAAAA",
                Error:   nil,
            }
        }()
        return channelA    
    }
 
    processB := func() <-chan processBResult {
        channelB := make(chan processBResult, 1)
        go func() {
            defer close(channelB)
            time.Sleep(time.Second * 5)
            channelB <- processBResult{
                Message: "BBBBB",
                Error:   nil,
            }
        }()
        return channelB
    }
 
    // 読み込み専用のチャネルを引数に取る
    getProcessAFinalResult := func(resultA <-chan processAResult) processAResult {
        var result processAResult
        for v := range resultA {
            result = v
        }
        return result
    }
 
    getProcessBFinalResult := func(resultB <-chan processBResult) processBResult {
        var result processBResult
        for v := range resultB {
            result = v
        }
        return result
    }
 
    processAResult := processA()
    processBResult := processB()
 
    finalResultA := getProcessAFinalResult(processAResult)
    finalResultB := getProcessBFinalResult(processBResult)

    if finalResultA.Error != nil {
        fmt.Println(finalResultA.Error.Error())
    }
    fmt.Println(finalResultA.Message)
 
    if finalResultB.Error != nil {
        fmt.Println(finalResultB.Error.Error())
    }
    fmt.Println(finalResultB.Message)
 
    fmt.Println(time.Since(start))
}

先ほどの「拘束」を使わない例とほぼ同じ処理を、「拘束」を使って書き直してみました。

以下のチャネルに対する4つの操作が関数に閉じ込められ、関数の呼び出し側はチャネルに対する操作をしていません。 (1) チャネルの生成 (2) チャネルの書き込み (3) チャネルのクローズ (4) チャネルの読み込み

(1) チャネルの生成、(2) チャネルの書き込み、(3) チャネルのクローズ はprocessA, processBという関数に閉じ込められました。

processA := func() <-chan processAResult { // 読み込み専用のチャネルを返す
        channelA := make(chan processAResult, 1)  // (1) チャネルの生成
        go func() {
            defer close(channelA)  // (3) チャネルのクローズ
            time.Sleep(time.Second * 5)
            channelA <- processAResult{ // (2) チャネルの書き込み
                Message: "AAAAA",
                Error:   nil,
            }
        }()
        return channelA 
}

(4) チャネルの読み込みはgetProcessAFinalResult, getProcessBFinalResultという関数に閉じ込められました。

getProcessAFinalResult := func(resultA <-chan processAResult) processAResult {
        var result processAResult
        for v := range resultA { // (4) チャネルの読み込み
            result = v
        }
        return result
}

チャネルに対する操作が関数に閉じ込められたことは、「拘束」、すなわちチャネルに対する権限の制限です。 これにより、関数の呼び出し側は一切チャネルを操作する必要がなくなります。

具体的には以下のような権限の制限となります。 - 関数processA, processBの中でチャネルの初期化を行うことで、チャネルへの書き込み権限を制限、 他のgo routineが意図せずチャネルへの書き込みをしてしまうことを防いでいる - 関数processA, processBは読み込み専用のチャネルを返すことで、呼び出し側はチャネルの呼び出ししかできない - 関数getProcessAFinalResult, getProcessBFinalResultは読み込み専用のチャネルを引数に取るので、 チャネルに対しては読み込みしかしないことが明示的になる

このように、「拘束」を使うことで一定程度安全に並行処理を扱うことができます。