BAB 30: Context Cancellation pada Pipeline

Kendalikan siklus hidup pipeline concurrent menggunakan context.Context — hentikan semua worker sekaligus ketika timeout tercapai atau sinyal pembatalan dikirim.

Di Bab 29, Latihan 3 mengisyaratkan sesuatu yang penting: pipeline yang hanya bisa berjalan sampai selesai tidak cukup untuk sistem nyata. Bayangkan validasi komentar KontenKu yang harus selesai dalam 200ms — jika melebihi batas itu, lebih baik pipeline dihentikan dan hasilnya yang sudah terkumpul dilaporkan. Tanpa mekanisme pembatalan, satu-satunya cara menghentikan goroutine yang sedang berjalan adalah menutup program secara paksa.

context.Context adalah jawaban Go untuk masalah ini. Ia membawa sinyal pembatalan yang bisa diteruskan ke setiap goroutine dalam pipeline — ketika context di-cancel, semua goroutine yang memantaunya tahu bahwa waktunya berhenti.

Anatomi context.Context

Package context menyediakan beberapa cara membuat context. Dua yang paling relevan untuk pipeline:

  • context.Background() — context kosong sebagai akar. Biasanya dibuat di main atau di entry point request, lalu diteruskan ke bawah.
  • context.WithTimeout(parent, durasi) — membungkus context parent dengan batas waktu. Ketika durasi habis, context otomatis di-cancel. Mengembalikan context baru dan fungsi cancel yang harus dipanggil untuk membebaskan resource.
  • context.WithCancel(parent) — membungkus context parent dengan kemampuan cancel manual. Berguna ketika pembatalan dipicu oleh kondisi bisnis, bukan waktu.

Setiap context punya method Done() yang mengembalikan channel. Channel itu ditutup ketika context di-cancel — ini yang dipakai goroutine untuk mendeteksi sinyal berhenti.

Masalah: Pipeline Tanpa Batas Waktu

Kembali ke pipeline validasi komentar dari Bab 29. Versi sebelumnya berjalan sampai semua komentar selesai diproses, tidak peduli berapa lama. Ini contoh skenario di mana itu bermasalah:

// validasi-tanpa-batas.go
package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Komentar struct {
    ID  int
    Isi string
}

type HasilValidasi struct {
    ID     int
    Status string
}

func streamKomentar(jumlah int) <-chan Komentar {
    keluaran := make(chan Komentar)
    go func() {
        defer close(keluaran)
        for i := range jumlah {
            keluaran <- Komentar{ID: i + 1, Isi: fmt.Sprintf("komentar-%d", i+1)}
        }
    }()
    return keluaran
}

func validasiLambat(masukan <-chan Komentar, jumlahWorker int) <-chan HasilValidasi {
    keluaran := make(chan HasilValidasi)
    var wg sync.WaitGroup
    for range jumlahWorker {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for k := range masukan {
                // simulasi: beberapa komentar butuh waktu sangat lama
                time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
                keluaran <- HasilValidasi{ID: k.ID, Status: "aman"}
            }
        }()
    }
    go func() { wg.Wait(); close(keluaran) }()
    return keluaran
}

func main() {
    komentar := streamKomentar(50)
    hasil := validasiLambat(komentar, 5)

    for h := range hasil {
        fmt.Printf("komentar #%d: %s\n", h.ID, h.Status)
    }
    // jika ada yang butuh 300ms, total bisa mencapai 3 detik
}

Program ini tidak bisa dihentikan di tengah jalan. Jika ada SLA 1 detik, tidak ada cara untuk memenuhinya kecuali menambahkan context.

Mengintegrasikan Context ke Pipeline

Perubahan utamanya adalah: setiap fungsi pipeline menerima ctx context.Context sebagai parameter pertama, dan loop worker menggunakan select untuk memantau ctx.Done().

Generator yang Bisa Dibatalkan

// validasi-dengan-context.go
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Komentar struct {
    ID  int
    Isi string
}

type HasilValidasi struct {
    ID     int
    Status string
}

func streamKomentar(ctx context.Context, jumlah int) <-chan Komentar {
    keluaran := make(chan Komentar)
    go func() {
        defer close(keluaran)
        for i := range jumlah {
            select {
            case <-ctx.Done():
                // context di-cancel, hentikan pengiriman
                return
            case keluaran <- Komentar{ID: i + 1, Isi: fmt.Sprintf("komentar-%d", i+1)}:
                // berhasil dikirim, lanjut
            }
        }
    }()
    return keluaran
}

Pola select di sini adalah kunci: goroutine mencoba mengirim ke channel, tapi juga memantau ctx.Done(). Mana pun yang siap lebih dulu — itulah yang dieksekusi. Jika context di-cancel sebelum ada penerima, case <-ctx.Done() yang dieksekusi dan goroutine keluar bersih.

Worker yang Bisa Dibatalkan

// validasi-dengan-context.go (lanjutan)

func validasiDenganContext(ctx context.Context, masukan <-chan Komentar, jumlahWorker int) <-chan HasilValidasi {
    keluaran := make(chan HasilValidasi)
    var wg sync.WaitGroup

    for range jumlahWorker {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for {
                select {
                case <-ctx.Done():
                    // sinyal cancel diterima, worker berhenti
                    return
                case k, masih := <-masukan:
                    if !masih {
                        // channel masukan sudah ditutup, selesai
                        return
                    }
                    // proses komentar
                    time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
                    select {
                    case <-ctx.Done():
                        return
                    case keluaran <- HasilValidasi{ID: k.ID, Status: "aman"}:
                    }
                }
            }
        }()
    }

    go func() { wg.Wait(); close(keluaran) }()
    return keluaran
}

Ada dua select di dalam worker: satu sebelum memproses (cek apakah perlu berhenti sebelum mulai kerja), satu setelah memproses (cek apakah perlu berhenti sebelum mengirim hasil). Ini memastikan tidak ada goroutine yang macet menunggu di operasi send ketika context sudah di-cancel.

Program Lengkap dengan Timeout

// validasi-dengan-context.go (lanjutan)

func main() {
    // buat context dengan timeout 500ms
    ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
    defer cancel() // selalu panggil cancel untuk bebaskan resource

    fmt.Println("mulai validasi (timeout: 500ms)...")
    mulai := time.Now()

    komentar := streamKomentar(ctx, 30)
    hasil := validasiDenganContext(ctx, komentar, 5)

    jumlahSelesai := 0
    for h := range hasil {
        jumlahSelesai++
        fmt.Printf("komentar #%d selesai: %s\n", h.ID, h.Status)
    }

    durasi := time.Since(mulai)
    if ctx.Err() != nil {
        fmt.Printf("\ndibatalkan setelah %v: %v\n", durasi.Round(time.Millisecond), ctx.Err())
        fmt.Printf("berhasil memproses %d dari 30 komentar\n", jumlahSelesai)
    } else {
        fmt.Printf("\nselesai dalam %v, semua 30 komentar diproses\n", durasi.Round(time.Millisecond))
    }
}
mulai validasi (timeout: 500ms)...
komentar #3 selesai: aman
komentar #1 selesai: aman
komentar #5 selesai: aman
komentar #2 selesai: aman
komentar #4 selesai: aman
komentar #7 selesai: aman
komentar #6 selesai: aman

dibatalkan setelah 502ms: context deadline exceeded
berhasil memproses 7 dari 30 komentar

ctx.Err() mengembalikan context.DeadlineExceeded jika timeout, atau context.Canceled jika cancel() dipanggil secara manual. Ini yang digunakan untuk membedakan “selesai normal” dari “dihentikan paksa”.

Selalu panggil cancel() yang dikembalikan oleh context.WithTimeout atau context.WithCancel, bahkan jika context sudah expired secara alami. Tanpa ini, resource internal context tidak akan dibebaskan sampai parent context selesai. Gunakan defer cancel() segera setelah membuat context.

Cancel Manual: Hentikan Karena Kondisi Bisnis

Timeout bukan satu-satunya alasan membatalkan pipeline. Kadang ada kondisi bisnis yang memicu pembatalan — misalnya menemukan komentar dengan skor spam yang sangat tinggi dan ingin menghentikan seluruh proses segera.

// cancel-manual.go
package main

import (
    "context"
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type KomentarDenganSkor struct {
    ID       int
    Isi      string
    SkorSpam int
}

func streamDenganSkor(ctx context.Context, jumlah int) <-chan KomentarDenganSkor {
    keluaran := make(chan KomentarDenganSkor)
    go func() {
        defer close(keluaran)
        for i := range jumlah {
            select {
            case <-ctx.Done():
                return
            case keluaran <- KomentarDenganSkor{
                ID:       i + 1,
                Isi:      fmt.Sprintf("komentar-%d", i+1),
                SkorSpam: rand.Intn(100),
            }:
            }
        }
    }()
    return keluaran
}

func filterDenganBatas(ctx context.Context, cancel context.CancelFunc, masukan <-chan KomentarDenganSkor, batasSkor int) <-chan KomentarDenganSkor {
    keluaran := make(chan KomentarDenganSkor)
    go func() {
        defer close(keluaran)
        for {
            select {
            case <-ctx.Done():
                return
            case k, masih := <-masukan:
                if !masih {
                    return
                }
                if k.SkorSpam >= batasSkor {
                    fmt.Printf("PERINGATAN: komentar #%d skor spam %d — pipeline dihentikan\n", k.ID, k.SkorSpam)
                    cancel() // batalkan seluruh pipeline
                    return
                }
                select {
                case <-ctx.Done():
                    return
                case keluaran <- k:
                }
            }
        }
    }()
    return keluaran
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    komentar := streamDenganSkor(ctx, 100)
    // hentikan pipeline jika ada komentar dengan skor spam >= 90
    aman := filterDenganBatas(ctx, cancel, komentar, 90)

    jumlah := 0
    for k := range aman {
        jumlah++
        fmt.Printf("lolos: komentar #%d (skor: %d)\n", k.ID, k.SkorSpam)
    }
    fmt.Printf("\ntotal lolos: %d komentar\n", jumlah)
    if ctx.Err() != nil {
        fmt.Printf("pipeline dihentikan: %v\n", ctx.Err())
    }
}
lolos: komentar #1 (skor: 23)
lolos: komentar #2 (skor: 67)
lolos: komentar #3 (skor: 41)
PERINGATAN: komentar #4 skor spam 94 — pipeline dihentikan
total lolos: 3 komentar
pipeline dihentikan: context canceled

cancel diteruskan sebagai parameter ke fungsi filter, yang memanggilnya ketika menemukan kondisi yang mengharuskan seluruh pipeline berhenti. Ini pola yang bersih: context adalah sinyal bersama, dan siapa pun yang menerimanya berhak mengirim sinyal cancel.

Untuk pipeline yang panjang, pertimbangkan menggunakan context.WithCancelCause (tersedia di Go 1.20+) agar bisa menyertakan alasan spesifik saat membatalkan. Ini memudahkan debugging ketika ada banyak titik yang bisa memicu cancel.

Latihan

Latihan 1 — Timeout bertingkat: Buat pipeline dua stage: stage pertama menghasilkan URL (string), stage kedua “mengambil” konten dari URL tersebut (simulasikan dengan time.Sleep acak 50–200ms). Beri timeout 800ms pada seluruh pipeline. Catat berapa URL yang berhasil diambil sebelum timeout dan cetak ctx.Err() di akhir.

Latihan 2 — Cancel dari goroutine manapun: Modifikasi contoh cancel-manual.go agar ada dua goroutine filter yang berjalan paralel — satu memantau skor spam, satu memantau panjang teks komentar (lebih dari 500 karakter). Keduanya menerima fungsi cancel yang sama. Ketika salah satu memicu cancel, yang lain harus berhenti secara otomatis. Verifikasi bahwa tidak ada goroutine leak setelah program selesai.

Latihan 3 — Laporan hasil parsial: Kembangkan program utama dari bab ini agar selain mencetak jumlah komentar yang berhasil divalidasi sebelum timeout, ia juga mencetak statistik skor spam rata-rata dari komentar yang sudah diproses. Simpan hasil parsial ke dalam slice sebelum pipeline selesai, lalu hitung statistik dari situ.


Context cancellation mengubah pipeline dari sistem yang hanya bisa berjalan sampai selesai menjadi sistem yang responsif terhadap kondisi eksternal. Kombinasi context.WithTimeout untuk batas waktu dan context.WithCancel untuk pembatalan kondisional memberikan kontrol penuh atas siklus hidup goroutine — tanpa harus bergantung pada channel tambahan atau flag boolean yang rawan race condition. Dengan ini, toolkit concurrency Go kamu sudah lengkap: goroutine, channel, select, pipeline, dan context. Sekarang saatnya beralih dari concurrency ke topik yang berbeda — bagaimana Go menjamin kode cleanup selalu berjalan, bahkan ketika ada error atau panic di tengah eksekusi.

Referensi

  1. 1context — Go Standard Library Documentation
  2. 2Go Concurrency Patterns: Context — The Go Blog
  3. 3Go Concurrency Patterns: Pipelines and cancellation — The Go Blog