BAB 30: Context Cancellation pada Pipeline
Kendalikan siklus hidup pipeline concurrent menggunakan context.Context — hentikan semua worker sekaligus ketika timeout tercapai atau sinyal pembatalan dikirim.
Di Bab 29, Latihan 3 mengisyaratkan sesuatu yang penting: pipeline yang hanya bisa berjalan sampai selesai tidak cukup untuk sistem nyata. Bayangkan validasi komentar KontenKu yang harus selesai dalam 200ms — jika melebihi batas itu, lebih baik pipeline dihentikan dan hasilnya yang sudah terkumpul dilaporkan. Tanpa mekanisme pembatalan, satu-satunya cara menghentikan goroutine yang sedang berjalan adalah menutup program secara paksa.
context.Context adalah jawaban Go untuk masalah ini. Ia membawa sinyal pembatalan yang bisa diteruskan ke setiap goroutine dalam pipeline — ketika context di-cancel, semua goroutine yang memantaunya tahu bahwa waktunya berhenti.
Anatomi context.Context
Package context menyediakan beberapa cara membuat context. Dua yang paling relevan untuk pipeline:
context.Background()— context kosong sebagai akar. Biasanya dibuat dimainatau di entry point request, lalu diteruskan ke bawah.context.WithTimeout(parent, durasi)— membungkus context parent dengan batas waktu. Ketika durasi habis, context otomatis di-cancel. Mengembalikan context baru dan fungsicancelyang harus dipanggil untuk membebaskan resource.context.WithCancel(parent)— membungkus context parent dengan kemampuan cancel manual. Berguna ketika pembatalan dipicu oleh kondisi bisnis, bukan waktu.
Setiap context punya method Done() yang mengembalikan channel. Channel itu ditutup ketika context di-cancel — ini yang dipakai goroutine untuk mendeteksi sinyal berhenti.
Masalah: Pipeline Tanpa Batas Waktu
Kembali ke pipeline validasi komentar dari Bab 29. Versi sebelumnya berjalan sampai semua komentar selesai diproses, tidak peduli berapa lama. Ini contoh skenario di mana itu bermasalah:
// validasi-tanpa-batas.go
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Komentar struct {
ID int
Isi string
}
type HasilValidasi struct {
ID int
Status string
}
func streamKomentar(jumlah int) <-chan Komentar {
keluaran := make(chan Komentar)
go func() {
defer close(keluaran)
for i := range jumlah {
keluaran <- Komentar{ID: i + 1, Isi: fmt.Sprintf("komentar-%d", i+1)}
}
}()
return keluaran
}
func validasiLambat(masukan <-chan Komentar, jumlahWorker int) <-chan HasilValidasi {
keluaran := make(chan HasilValidasi)
var wg sync.WaitGroup
for range jumlahWorker {
wg.Add(1)
go func() {
defer wg.Done()
for k := range masukan {
// simulasi: beberapa komentar butuh waktu sangat lama
time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
keluaran <- HasilValidasi{ID: k.ID, Status: "aman"}
}
}()
}
go func() { wg.Wait(); close(keluaran) }()
return keluaran
}
func main() {
komentar := streamKomentar(50)
hasil := validasiLambat(komentar, 5)
for h := range hasil {
fmt.Printf("komentar #%d: %s\n", h.ID, h.Status)
}
// jika ada yang butuh 300ms, total bisa mencapai 3 detik
}
Program ini tidak bisa dihentikan di tengah jalan. Jika ada SLA 1 detik, tidak ada cara untuk memenuhinya kecuali menambahkan context.
Mengintegrasikan Context ke Pipeline
Perubahan utamanya adalah: setiap fungsi pipeline menerima ctx context.Context sebagai parameter pertama, dan loop worker menggunakan select untuk memantau ctx.Done().
Generator yang Bisa Dibatalkan
// validasi-dengan-context.go
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type Komentar struct {
ID int
Isi string
}
type HasilValidasi struct {
ID int
Status string
}
func streamKomentar(ctx context.Context, jumlah int) <-chan Komentar {
keluaran := make(chan Komentar)
go func() {
defer close(keluaran)
for i := range jumlah {
select {
case <-ctx.Done():
// context di-cancel, hentikan pengiriman
return
case keluaran <- Komentar{ID: i + 1, Isi: fmt.Sprintf("komentar-%d", i+1)}:
// berhasil dikirim, lanjut
}
}
}()
return keluaran
}
Pola select di sini adalah kunci: goroutine mencoba mengirim ke channel, tapi juga memantau ctx.Done(). Mana pun yang siap lebih dulu — itulah yang dieksekusi. Jika context di-cancel sebelum ada penerima, case <-ctx.Done() yang dieksekusi dan goroutine keluar bersih.
Worker yang Bisa Dibatalkan
// validasi-dengan-context.go (lanjutan)
func validasiDenganContext(ctx context.Context, masukan <-chan Komentar, jumlahWorker int) <-chan HasilValidasi {
keluaran := make(chan HasilValidasi)
var wg sync.WaitGroup
for range jumlahWorker {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
// sinyal cancel diterima, worker berhenti
return
case k, masih := <-masukan:
if !masih {
// channel masukan sudah ditutup, selesai
return
}
// proses komentar
time.Sleep(time.Duration(rand.Intn(300)+50) * time.Millisecond)
select {
case <-ctx.Done():
return
case keluaran <- HasilValidasi{ID: k.ID, Status: "aman"}:
}
}
}
}()
}
go func() { wg.Wait(); close(keluaran) }()
return keluaran
}
Ada dua select di dalam worker: satu sebelum memproses (cek apakah perlu berhenti sebelum mulai kerja), satu setelah memproses (cek apakah perlu berhenti sebelum mengirim hasil). Ini memastikan tidak ada goroutine yang macet menunggu di operasi send ketika context sudah di-cancel.
Program Lengkap dengan Timeout
// validasi-dengan-context.go (lanjutan)
func main() {
// buat context dengan timeout 500ms
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel() // selalu panggil cancel untuk bebaskan resource
fmt.Println("mulai validasi (timeout: 500ms)...")
mulai := time.Now()
komentar := streamKomentar(ctx, 30)
hasil := validasiDenganContext(ctx, komentar, 5)
jumlahSelesai := 0
for h := range hasil {
jumlahSelesai++
fmt.Printf("komentar #%d selesai: %s\n", h.ID, h.Status)
}
durasi := time.Since(mulai)
if ctx.Err() != nil {
fmt.Printf("\ndibatalkan setelah %v: %v\n", durasi.Round(time.Millisecond), ctx.Err())
fmt.Printf("berhasil memproses %d dari 30 komentar\n", jumlahSelesai)
} else {
fmt.Printf("\nselesai dalam %v, semua 30 komentar diproses\n", durasi.Round(time.Millisecond))
}
}
mulai validasi (timeout: 500ms)...
komentar #3 selesai: aman
komentar #1 selesai: aman
komentar #5 selesai: aman
komentar #2 selesai: aman
komentar #4 selesai: aman
komentar #7 selesai: aman
komentar #6 selesai: aman
dibatalkan setelah 502ms: context deadline exceeded
berhasil memproses 7 dari 30 komentar
ctx.Err() mengembalikan context.DeadlineExceeded jika timeout, atau context.Canceled jika cancel() dipanggil secara manual. Ini yang digunakan untuk membedakan “selesai normal” dari “dihentikan paksa”.
Selalu panggil cancel() yang dikembalikan oleh context.WithTimeout atau context.WithCancel, bahkan jika context sudah expired secara alami. Tanpa ini, resource internal context tidak akan dibebaskan sampai parent context selesai. Gunakan defer cancel() segera setelah membuat context.
Cancel Manual: Hentikan Karena Kondisi Bisnis
Timeout bukan satu-satunya alasan membatalkan pipeline. Kadang ada kondisi bisnis yang memicu pembatalan — misalnya menemukan komentar dengan skor spam yang sangat tinggi dan ingin menghentikan seluruh proses segera.
// cancel-manual.go
package main
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
)
type KomentarDenganSkor struct {
ID int
Isi string
SkorSpam int
}
func streamDenganSkor(ctx context.Context, jumlah int) <-chan KomentarDenganSkor {
keluaran := make(chan KomentarDenganSkor)
go func() {
defer close(keluaran)
for i := range jumlah {
select {
case <-ctx.Done():
return
case keluaran <- KomentarDenganSkor{
ID: i + 1,
Isi: fmt.Sprintf("komentar-%d", i+1),
SkorSpam: rand.Intn(100),
}:
}
}
}()
return keluaran
}
func filterDenganBatas(ctx context.Context, cancel context.CancelFunc, masukan <-chan KomentarDenganSkor, batasSkor int) <-chan KomentarDenganSkor {
keluaran := make(chan KomentarDenganSkor)
go func() {
defer close(keluaran)
for {
select {
case <-ctx.Done():
return
case k, masih := <-masukan:
if !masih {
return
}
if k.SkorSpam >= batasSkor {
fmt.Printf("PERINGATAN: komentar #%d skor spam %d — pipeline dihentikan\n", k.ID, k.SkorSpam)
cancel() // batalkan seluruh pipeline
return
}
select {
case <-ctx.Done():
return
case keluaran <- k:
}
}
}
}()
return keluaran
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
komentar := streamDenganSkor(ctx, 100)
// hentikan pipeline jika ada komentar dengan skor spam >= 90
aman := filterDenganBatas(ctx, cancel, komentar, 90)
jumlah := 0
for k := range aman {
jumlah++
fmt.Printf("lolos: komentar #%d (skor: %d)\n", k.ID, k.SkorSpam)
}
fmt.Printf("\ntotal lolos: %d komentar\n", jumlah)
if ctx.Err() != nil {
fmt.Printf("pipeline dihentikan: %v\n", ctx.Err())
}
}
lolos: komentar #1 (skor: 23)
lolos: komentar #2 (skor: 67)
lolos: komentar #3 (skor: 41)
PERINGATAN: komentar #4 skor spam 94 — pipeline dihentikan
total lolos: 3 komentar
pipeline dihentikan: context canceled
cancel diteruskan sebagai parameter ke fungsi filter, yang memanggilnya ketika menemukan kondisi yang mengharuskan seluruh pipeline berhenti. Ini pola yang bersih: context adalah sinyal bersama, dan siapa pun yang menerimanya berhak mengirim sinyal cancel.
Untuk pipeline yang panjang, pertimbangkan menggunakan context.WithCancelCause (tersedia di Go 1.20+) agar bisa menyertakan alasan spesifik saat membatalkan. Ini memudahkan debugging ketika ada banyak titik yang bisa memicu cancel.
Latihan
Latihan 1 — Timeout bertingkat:
Buat pipeline dua stage: stage pertama menghasilkan URL (string), stage kedua “mengambil” konten dari URL tersebut (simulasikan dengan time.Sleep acak 50–200ms). Beri timeout 800ms pada seluruh pipeline. Catat berapa URL yang berhasil diambil sebelum timeout dan cetak ctx.Err() di akhir.
Latihan 2 — Cancel dari goroutine manapun:
Modifikasi contoh cancel-manual.go agar ada dua goroutine filter yang berjalan paralel — satu memantau skor spam, satu memantau panjang teks komentar (lebih dari 500 karakter). Keduanya menerima fungsi cancel yang sama. Ketika salah satu memicu cancel, yang lain harus berhenti secara otomatis. Verifikasi bahwa tidak ada goroutine leak setelah program selesai.
Latihan 3 — Laporan hasil parsial: Kembangkan program utama dari bab ini agar selain mencetak jumlah komentar yang berhasil divalidasi sebelum timeout, ia juga mencetak statistik skor spam rata-rata dari komentar yang sudah diproses. Simpan hasil parsial ke dalam slice sebelum pipeline selesai, lalu hitung statistik dari situ.
Context cancellation mengubah pipeline dari sistem yang hanya bisa berjalan sampai selesai menjadi sistem yang responsif terhadap kondisi eksternal. Kombinasi context.WithTimeout untuk batas waktu dan context.WithCancel untuk pembatalan kondisional memberikan kontrol penuh atas siklus hidup goroutine — tanpa harus bergantung pada channel tambahan atau flag boolean yang rawan race condition. Dengan ini, toolkit concurrency Go kamu sudah lengkap: goroutine, channel, select, pipeline, dan context. Sekarang saatnya beralih dari concurrency ke topik yang berbeda — bagaimana Go menjamin kode cleanup selalu berjalan, bahkan ketika ada error atau panic di tengah eksekusi.