BAB 24: Buffered Channel — Antrian Goroutine
Kuasai buffered channel untuk membangun sistem antrian, worker pool, dan semaphore yang efisien di Go.
Di bab sebelumnya, kamu sudah berkenalan dengan buffered channel lewat contoh tiga laporan yang masuk antrian tanpa perlu goroutine terpisah. Tapi itu baru permukaan — buffered channel punya mekanisme yang lebih kaya dari sekadar “bisa kirim beberapa data sekaligus”. Memahami cara kerjanya secara mendalam akan membuka pola-pola concurrency yang sangat berguna: antrian kerja, pembatas konkurensi, dan worker pool yang efisien.
Anatomi Buffer: Kapasitas dan Isi
Ketika membuat buffered channel, kamu menentukan berapa banyak data yang bisa ditampung sebelum sender terpaksa menunggu. Dua fungsi bawaan Go, len dan cap, bisa digunakan untuk mengintip kondisi buffer saat runtime.
// Ilustrasi: buffered channel kapasitas 4
//
// ┌──────────────────────────────────┐
// │ CHANNEL BUFFER (cap=4) │
// │ ┌──────┬──────┬──────┬──────┐ │
// │ │ [0] │ [1] │ [2] │ [3] │ │
// │ │ A │ B │ │ │ │ ← len=2, cap=4
// │ └──────┴──────┴──────┴──────┘ │
// └──────────────────────────────────┘
// ↑ ↑
// terisi kosong
// (blocking saat terima jika len=0)
// (blocking saat kirim jika len=cap)
// main.go
package main
import "fmt"
func main() {
antrianCh := make(chan string, 4)
antrianCh <- "laporan-keuangan"
antrianCh <- "laporan-sdm"
fmt.Printf("isi buffer : %d\n", len(antrianCh))
fmt.Printf("kapasitas : %d\n", cap(antrianCh))
fmt.Printf("sisa ruang : %d\n", cap(antrianCh)-len(antrianCh))
}
isi buffer : 2
kapasitas : 4
sisa ruang : 2
len mengembalikan jumlah data yang saat ini ada di buffer. cap mengembalikan kapasitas total yang ditetapkan saat make. Selisihnya adalah ruang yang masih tersedia untuk pengiriman berikutnya tanpa blocking.
Semantik Blocking yang Tepat
Ini bagian yang sering membingungkan: buffered channel bukan berarti “tidak pernah blocking”. Ia blocking dalam dua kondisi spesifik.
// Diagram: kapan blocking terjadi
//
// KIRIM (ch <- data)
// ┌─────────────────────────────────┐
// │ buffer kosong/sebagian → aman │ non-blocking
// │ buffer PENUH → tunggu│ blocking
// └─────────────────────────────────┘
//
// TERIMA (<-ch)
// ┌─────────────────────────────────┐
// │ buffer ada isi → aman │ non-blocking
// │ buffer KOSONG → tunggu│ blocking
// └─────────────────────────────────┘
Kode berikut memperlihatkan transisi dari non-blocking ke blocking secara nyata:
// main.go
package main
import (
"fmt"
"time"
)
func main() {
antrianCh := make(chan string, 2)
go func() {
for i := 1; i <= 4; i++ {
laporan := fmt.Sprintf("laporan-%02d", i)
fmt.Printf("mengirim: %s\n", laporan)
antrianCh <- laporan // akan blocking di laporan-03 dan laporan-04
fmt.Printf("terkirim: %s\n", laporan)
}
close(antrianCh)
}()
// receiver sengaja lambat untuk memperlihatkan efek blocking
time.Sleep(100 * time.Millisecond)
for laporan := range antrianCh {
fmt.Printf("memproses: %s\n", laporan)
time.Sleep(50 * time.Millisecond)
}
}
mengirim: laporan-01
terkirim: laporan-01
mengirim: laporan-02
terkirim: laporan-02
mengirim: laporan-03
memproses: laporan-01
terkirim: laporan-03
mengirim: laporan-04
memproses: laporan-02
terkirim: laporan-04
memproses: laporan-03
memproses: laporan-04
Perhatikan polanya: laporan-03 berhasil dikirim (terkirim) hanya setelah receiver memproses laporan-01 dan membebaskan satu slot buffer. Blocking membuat sender berhenti sejenak, bukan crash atau error.
Ini adalah backpressure alami — mekanisme di mana sender otomatis melambat ketika receiver tidak mampu mengikuti kecepatan pengiriman. Sifat ini sangat berguna untuk menjaga program tetap stabil di bawah beban tinggi.
Buffered Channel sebagai Semaphore
Salah satu pola paling elegan di Go adalah menggunakan buffered channel sebagai semaphore — pembatas jumlah goroutine yang boleh berjalan secara bersamaan. Idenya simpel: ukuran buffer = jumlah izin yang tersedia.
// Ilustrasi: semaphore dengan kapasitas 3
//
// ┌─────────────────────────────────────┐
// │ SLOT IZIN (cap=3) │
// │ ┌────┬────┬────┐ │
// │ │ ok │ ok │ ok │ ← 3 slot tersedia│
// │ └────┴────┴────┘ │
// └─────────────────────────────────────┘
// ↓ kirim = ambil izin
// ↑ terima = kembalikan izin
//
// Jika 3 goroutine sudah memegang izin,
// goroutine ke-4 harus menunggu sampai
// salah satu mengembalikan izinnya.
// main.go
package main
import (
"fmt"
"sync"
"time"
)
func prosesLaporan(id int, izin chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
izin <- struct{}{} // ambil satu izin
defer func() { <-izin }() // kembalikan izin saat selesai
fmt.Printf("[worker-%d] memulai pemrosesan laporan\n", id)
time.Sleep(200 * time.Millisecond) // simulasi kerja
fmt.Printf("[worker-%d] laporan selesai\n", id)
}
func main() {
const maxKonkuren = 3
const totalLaporan = 8
izin := make(chan struct{}, maxKonkuren) // hanya 3 yang boleh berjalan bersamaan
var wg sync.WaitGroup
for i := 1; i <= totalLaporan; i++ {
wg.Add(1)
go prosesLaporan(i, izin, &wg)
}
wg.Wait()
fmt.Println("semua laporan selesai diproses")
}
[worker-1] memulai pemrosesan laporan
[worker-2] memulai pemrosesan laporan
[worker-3] memulai pemrosesan laporan
[worker-1] laporan selesai
[worker-4] memulai pemrosesan laporan
[worker-2] laporan selesai
[worker-5] memulai pemrosesan laporan
[worker-3] laporan selesai
[worker-6] memulai pemrosesan laporan
...
semua laporan selesai diproses
Delapan goroutine diluncurkan, tapi paling banyak tiga yang aktif secara bersamaan. Saat satu selesai dan mengembalikan izin via <-izin, goroutine berikutnya otomatis bisa masuk. Pola defer func() { <-izin }() memastikan izin selalu dikembalikan — bahkan jika fungsi panic sekalipun.
Worker Pool — Antrian Tetap dengan Banyak Pekerja
Semaphore membatasi konkurensi, tapi pola worker pool lebih terstruktur: kamu punya sejumlah worker yang berjalan terus-menerus, memproses pekerjaan dari antrian sampai antrian habis.
// Ilustrasi: worker pool
//
// Sender Workers Collector
// ────── ┌──────────────────────┐ ─────────
// tugas-1 ──┐ │ worker-1 (aktif) │ ──┐
// tugas-2 ──┤──→ │ worker-2 (aktif) │ ──┤──→ hasil
// tugas-3 ──┤ │ worker-3 (menunggu) │ ──┘
// tugas-4 ──┘ └──────────────────────┘
// ↑ ↑
// jobsCh hasilCh
// (buffered) (buffered)
// main.go
package main
import (
"fmt"
"sync"
"time"
)
type Tugas struct {
ID int
NamaFile string
}
type Hasil struct {
TugasID int
Status string
Durasi string
}
func worker(id int, tugasCh <-chan Tugas, hasilCh chan<- Hasil, wg *sync.WaitGroup) {
defer wg.Done()
for tugas := range tugasCh {
mulai := time.Now()
// simulasi pemrosesan file laporan
time.Sleep(150 * time.Millisecond)
durasi := time.Since(mulai)
hasilCh <- Hasil{
TugasID: tugas.ID,
Status: fmt.Sprintf("%s berhasil diarsipkan", tugas.NamaFile),
Durasi: durasi.String(),
}
}
}
func main() {
const jumlahWorker = 3
tugasCh := make(chan Tugas, 10)
hasilCh := make(chan Hasil, 10)
// luncurkan worker pool
var wg sync.WaitGroup
for i := 1; i <= jumlahWorker; i++ {
wg.Add(1)
go worker(i, tugasCh, hasilCh, &wg)
}
// kirim pekerjaan ke antrian
fileLaporan := []string{
"q1-keuangan.xlsx", "q1-sdm.xlsx", "q1-operasional.xlsx",
"q1-marketing.xlsx", "q1-produksi.xlsx", "q1-logistik.xlsx",
"q1-it.xlsx",
}
for i, nama := range fileLaporan {
tugasCh <- Tugas{ID: i + 1, NamaFile: nama}
}
close(tugasCh) // sinyal: tidak ada tugas baru
// tutup hasilCh setelah semua worker selesai
go func() {
wg.Wait()
close(hasilCh)
}()
// kumpulkan semua hasil
for hasil := range hasilCh {
fmt.Printf("tugas #%d: %s (%s)\n", hasil.TugasID, hasil.Status, hasil.Durasi)
}
}
tugas #2: q1-sdm.xlsx berhasil diarsipkan (150ms)
tugas #1: q1-keuangan.xlsx berhasil diarsipkan (150ms)
tugas #3: q1-operasional.xlsx berhasil diarsipkan (150ms)
tugas #4: q1-marketing.xlsx berhasil diarsipkan (150ms)
tugas #5: q1-produksi.xlsx berhasil diarsipkan (150ms)
tugas #6: q1-logistik.xlsx berhasil diarsipkan (150ms)
tugas #7: q1-it.xlsx berhasil diarsipkan (150ms)
Tujuh file diproses oleh tiga worker secara bergiliran. Tidak ada goroutine yang menganggur selama masih ada tugas di tugasCh. Dua detail penting di sini:
Pertama, tugasCh <-chan Tugas di signature worker — tanda <- sebelum chan berarti channel ini receive-only untuk fungsi tersebut. Ini mencegah worker secara tidak sengaja mengirim data ke channel yang seharusnya hanya dibaca. Sebaliknya, hasilCh chan<- Hasil adalah send-only.
Kedua, goroutine anonim yang memanggil wg.Wait() lalu close(hasilCh) — ini adalah koordinasi dua tahap: tunggu semua worker selesai, baru tutup channel hasil sehingga for range hasilCh di main bisa berhenti.
Anotasi directional pada parameter channel (<-chan dan chan<-) adalah praktik yang sangat dianjurkan. Compiler akan menangkap bug jika ada kode yang mencoba mengirim ke receive-only channel atau sebaliknya.
Memeriksa Buffer Sebelum Kirim
Kadang kamu ingin tahu apakah buffer masih punya ruang sebelum mencoba mengirim, tanpa mau menunggu kalau buffer penuh. Ini bisa dilakukan dengan select dan default.
// main.go
package main
import "fmt"
func kirimJikaAda(ch chan string, pesan string) bool {
select {
case ch <- pesan:
return true
default:
return false // buffer penuh, lewati
}
}
func main() {
antrianCh := make(chan string, 3)
laporan := []string{
"laporan-audit", "laporan-pajak", "laporan-aset",
"laporan-investasi", // ini tidak akan masuk — buffer penuh
}
for _, lap := range laporan {
berhasil := kirimJikaAda(antrianCh, lap)
if berhasil {
fmt.Printf("masuk antrian : %s\n", lap)
} else {
fmt.Printf("antrian penuh, lewati: %s\n", lap)
}
}
fmt.Printf("\nisi antrian (%d/%d):\n", len(antrianCh), cap(antrianCh))
for len(antrianCh) > 0 {
fmt.Println(" -", <-antrianCh)
}
}
masuk antrian : laporan-audit
masuk antrian : laporan-pajak
masuk antrian : laporan-aset
antrian penuh, lewati: laporan-investasi
isi antrian (3/3):
- laporan-audit
- laporan-pajak
- laporan-aset
select dengan default membuat operasi channel menjadi non-blocking. Jika ch <- pesan tidak bisa langsung dieksekusi (karena buffer penuh), ia langsung jatuh ke default. Ini berguna untuk skenario di mana kehilangan beberapa item lebih baik daripada menghentikan seluruh alur program.
Latihan
Latihan 1 — Monitor kapasitas:
Buat program yang menggunakan buffered channel berkapasitas 5. Kirim data satu per satu dari goroutine, dan setiap kali data masuk, cetak status buffer: "buffer: N/5". Gunakan len dan cap untuk menghitungnya.
Latihan 2 — Rate limiter:
Buat fungsi prosesPermintaan(id int) yang mencetak "permintaan #N diproses". Gunakan buffered channel sebagai semaphore untuk memastikan tidak lebih dari 2 permintaan diproses secara bersamaan, meski 10 goroutine diluncurkan sekaligus. Petunjuk: pola yang sama dengan izin := make(chan struct{}, 2).
Latihan 3 — Pipeline dua tahap:
Kembangkan worker pool di bab ini menjadi dua tahap. Tahap pertama: goroutine membaca nama file dari slice dan mengirimnya ke fileCh. Tahap kedua: tiga worker membaca dari fileCh, “memproses” file (cukup time.Sleep), lalu mengirim hasilnya ke arsipCh. Tahap ketiga: main membaca dari arsipCh dan mencetak hasilnya. Ini adalah pola pipeline tiga tahap yang umum di sistem pemrosesan data.
Buffered channel yang sudah kamu kuasai di bab ini — dari semantik blocking, semaphore, hingga worker pool — adalah fondasi dari sistem concurrency yang sesungguhnya. Tapi ada satu tantangan yang belum tersentuh: bagaimana jika kamu perlu menunggu salah satu dari beberapa channel sekaligus? Situasi ini muncul saat menggabungkan banyak sumber data atau saat ingin menambahkan timeout pada operasi channel. select di Go dirancang tepat untuk itu, dan itu yang akan kita bahas selanjutnya.