BAB 24: Buffered Channel — Antrian Goroutine

Kuasai buffered channel untuk membangun sistem antrian, worker pool, dan semaphore yang efisien di Go.

Di bab sebelumnya, kamu sudah berkenalan dengan buffered channel lewat contoh tiga laporan yang masuk antrian tanpa perlu goroutine terpisah. Tapi itu baru permukaan — buffered channel punya mekanisme yang lebih kaya dari sekadar “bisa kirim beberapa data sekaligus”. Memahami cara kerjanya secara mendalam akan membuka pola-pola concurrency yang sangat berguna: antrian kerja, pembatas konkurensi, dan worker pool yang efisien.

Anatomi Buffer: Kapasitas dan Isi

Ketika membuat buffered channel, kamu menentukan berapa banyak data yang bisa ditampung sebelum sender terpaksa menunggu. Dua fungsi bawaan Go, len dan cap, bisa digunakan untuk mengintip kondisi buffer saat runtime.

// Ilustrasi: buffered channel kapasitas 4
//
//  ┌──────────────────────────────────┐
//  │  CHANNEL BUFFER (cap=4)          │
//  │  ┌──────┬──────┬──────┬──────┐  │
//  │  │ [0]  │ [1]  │ [2]  │ [3]  │  │
//  │  │  A   │  B   │      │      │  │  ← len=2, cap=4
//  │  └──────┴──────┴──────┴──────┘  │
//  └──────────────────────────────────┘
//        ↑                  ↑
//      terisi            kosong
//    (blocking saat terima jika len=0)
//    (blocking saat kirim jika len=cap)
// main.go
package main

import "fmt"

func main() {
    antrianCh := make(chan string, 4)

    antrianCh <- "laporan-keuangan"
    antrianCh <- "laporan-sdm"

    fmt.Printf("isi buffer  : %d\n", len(antrianCh))
    fmt.Printf("kapasitas   : %d\n", cap(antrianCh))
    fmt.Printf("sisa ruang  : %d\n", cap(antrianCh)-len(antrianCh))
}
isi buffer  : 2
kapasitas   : 4
sisa ruang  : 2

len mengembalikan jumlah data yang saat ini ada di buffer. cap mengembalikan kapasitas total yang ditetapkan saat make. Selisihnya adalah ruang yang masih tersedia untuk pengiriman berikutnya tanpa blocking.

Semantik Blocking yang Tepat

Ini bagian yang sering membingungkan: buffered channel bukan berarti “tidak pernah blocking”. Ia blocking dalam dua kondisi spesifik.

// Diagram: kapan blocking terjadi
//
//  KIRIM (ch <- data)
//  ┌─────────────────────────────────┐
//  │ buffer kosong/sebagian  → aman  │  non-blocking
//  │ buffer PENUH            → tunggu│  blocking
//  └─────────────────────────────────┘
//
//  TERIMA (<-ch)
//  ┌─────────────────────────────────┐
//  │ buffer ada isi          → aman  │  non-blocking
//  │ buffer KOSONG           → tunggu│  blocking
//  └─────────────────────────────────┘

Kode berikut memperlihatkan transisi dari non-blocking ke blocking secara nyata:

// main.go
package main

import (
    "fmt"
    "time"
)

func main() {
    antrianCh := make(chan string, 2)

    go func() {
        for i := 1; i <= 4; i++ {
            laporan := fmt.Sprintf("laporan-%02d", i)
            fmt.Printf("mengirim: %s\n", laporan)
            antrianCh <- laporan // akan blocking di laporan-03 dan laporan-04
            fmt.Printf("terkirim: %s\n", laporan)
        }
        close(antrianCh)
    }()

    // receiver sengaja lambat untuk memperlihatkan efek blocking
    time.Sleep(100 * time.Millisecond)
    for laporan := range antrianCh {
        fmt.Printf("memproses: %s\n", laporan)
        time.Sleep(50 * time.Millisecond)
    }
}
mengirim: laporan-01
terkirim: laporan-01
mengirim: laporan-02
terkirim: laporan-02
mengirim: laporan-03
memproses: laporan-01
terkirim: laporan-03
mengirim: laporan-04
memproses: laporan-02
terkirim: laporan-04
memproses: laporan-03
memproses: laporan-04

Perhatikan polanya: laporan-03 berhasil dikirim (terkirim) hanya setelah receiver memproses laporan-01 dan membebaskan satu slot buffer. Blocking membuat sender berhenti sejenak, bukan crash atau error.

Ini adalah backpressure alami — mekanisme di mana sender otomatis melambat ketika receiver tidak mampu mengikuti kecepatan pengiriman. Sifat ini sangat berguna untuk menjaga program tetap stabil di bawah beban tinggi.

Buffered Channel sebagai Semaphore

Salah satu pola paling elegan di Go adalah menggunakan buffered channel sebagai semaphore — pembatas jumlah goroutine yang boleh berjalan secara bersamaan. Idenya simpel: ukuran buffer = jumlah izin yang tersedia.

// Ilustrasi: semaphore dengan kapasitas 3
//
//  ┌─────────────────────────────────────┐
//  │  SLOT IZIN (cap=3)                  │
//  │  ┌────┬────┬────┐                   │
//  │  │ ok │ ok │ ok │  ← 3 slot tersedia│
//  │  └────┴────┴────┘                   │
//  └─────────────────────────────────────┘
//       ↓ kirim = ambil izin
//       ↑ terima = kembalikan izin
//
//  Jika 3 goroutine sudah memegang izin,
//  goroutine ke-4 harus menunggu sampai
//  salah satu mengembalikan izinnya.
// main.go
package main

import (
    "fmt"
    "sync"
    "time"
)

func prosesLaporan(id int, izin chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()

    izin <- struct{}{} // ambil satu izin
    defer func() { <-izin }() // kembalikan izin saat selesai

    fmt.Printf("[worker-%d] memulai pemrosesan laporan\n", id)
    time.Sleep(200 * time.Millisecond) // simulasi kerja
    fmt.Printf("[worker-%d] laporan selesai\n", id)
}

func main() {
    const maxKonkuren = 3
    const totalLaporan = 8

    izin := make(chan struct{}, maxKonkuren) // hanya 3 yang boleh berjalan bersamaan
    var wg sync.WaitGroup

    for i := 1; i <= totalLaporan; i++ {
        wg.Add(1)
        go prosesLaporan(i, izin, &wg)
    }

    wg.Wait()
    fmt.Println("semua laporan selesai diproses")
}
[worker-1] memulai pemrosesan laporan
[worker-2] memulai pemrosesan laporan
[worker-3] memulai pemrosesan laporan
[worker-1] laporan selesai
[worker-4] memulai pemrosesan laporan
[worker-2] laporan selesai
[worker-5] memulai pemrosesan laporan
[worker-3] laporan selesai
[worker-6] memulai pemrosesan laporan
...
semua laporan selesai diproses

Delapan goroutine diluncurkan, tapi paling banyak tiga yang aktif secara bersamaan. Saat satu selesai dan mengembalikan izin via <-izin, goroutine berikutnya otomatis bisa masuk. Pola defer func() { <-izin }() memastikan izin selalu dikembalikan — bahkan jika fungsi panic sekalipun.

Worker Pool — Antrian Tetap dengan Banyak Pekerja

Semaphore membatasi konkurensi, tapi pola worker pool lebih terstruktur: kamu punya sejumlah worker yang berjalan terus-menerus, memproses pekerjaan dari antrian sampai antrian habis.

// Ilustrasi: worker pool
//
//  Sender                  Workers                Collector
//  ──────          ┌──────────────────────┐       ─────────
//  tugas-1 ──┐     │  worker-1 (aktif)    │ ──┐
//  tugas-2 ──┤──→  │  worker-2 (aktif)    │ ──┤──→ hasil
//  tugas-3 ──┤     │  worker-3 (menunggu) │ ──┘
//  tugas-4 ──┘     └──────────────────────┘
//             ↑                                ↑
//          jobsCh                          hasilCh
//        (buffered)                       (buffered)
// main.go
package main

import (
    "fmt"
    "sync"
    "time"
)

type Tugas struct {
    ID       int
    NamaFile string
}

type Hasil struct {
    TugasID int
    Status  string
    Durasi  string
}

func worker(id int, tugasCh <-chan Tugas, hasilCh chan<- Hasil, wg *sync.WaitGroup) {
    defer wg.Done()

    for tugas := range tugasCh {
        mulai := time.Now()
        // simulasi pemrosesan file laporan
        time.Sleep(150 * time.Millisecond)
        durasi := time.Since(mulai)

        hasilCh <- Hasil{
            TugasID: tugas.ID,
            Status:  fmt.Sprintf("%s berhasil diarsipkan", tugas.NamaFile),
            Durasi:  durasi.String(),
        }
    }
}

func main() {
    const jumlahWorker = 3

    tugasCh := make(chan Tugas, 10)
    hasilCh := make(chan Hasil, 10)

    // luncurkan worker pool
    var wg sync.WaitGroup
    for i := 1; i <= jumlahWorker; i++ {
        wg.Add(1)
        go worker(i, tugasCh, hasilCh, &wg)
    }

    // kirim pekerjaan ke antrian
    fileLaporan := []string{
        "q1-keuangan.xlsx", "q1-sdm.xlsx", "q1-operasional.xlsx",
        "q1-marketing.xlsx", "q1-produksi.xlsx", "q1-logistik.xlsx",
        "q1-it.xlsx",
    }

    for i, nama := range fileLaporan {
        tugasCh <- Tugas{ID: i + 1, NamaFile: nama}
    }
    close(tugasCh) // sinyal: tidak ada tugas baru

    // tutup hasilCh setelah semua worker selesai
    go func() {
        wg.Wait()
        close(hasilCh)
    }()

    // kumpulkan semua hasil
    for hasil := range hasilCh {
        fmt.Printf("tugas #%d: %s (%s)\n", hasil.TugasID, hasil.Status, hasil.Durasi)
    }
}
tugas #2: q1-sdm.xlsx berhasil diarsipkan (150ms)
tugas #1: q1-keuangan.xlsx berhasil diarsipkan (150ms)
tugas #3: q1-operasional.xlsx berhasil diarsipkan (150ms)
tugas #4: q1-marketing.xlsx berhasil diarsipkan (150ms)
tugas #5: q1-produksi.xlsx berhasil diarsipkan (150ms)
tugas #6: q1-logistik.xlsx berhasil diarsipkan (150ms)
tugas #7: q1-it.xlsx berhasil diarsipkan (150ms)

Tujuh file diproses oleh tiga worker secara bergiliran. Tidak ada goroutine yang menganggur selama masih ada tugas di tugasCh. Dua detail penting di sini:

Pertama, tugasCh <-chan Tugas di signature worker — tanda <- sebelum chan berarti channel ini receive-only untuk fungsi tersebut. Ini mencegah worker secara tidak sengaja mengirim data ke channel yang seharusnya hanya dibaca. Sebaliknya, hasilCh chan<- Hasil adalah send-only.

Kedua, goroutine anonim yang memanggil wg.Wait() lalu close(hasilCh) — ini adalah koordinasi dua tahap: tunggu semua worker selesai, baru tutup channel hasil sehingga for range hasilCh di main bisa berhenti.

Anotasi directional pada parameter channel (<-chan dan chan<-) adalah praktik yang sangat dianjurkan. Compiler akan menangkap bug jika ada kode yang mencoba mengirim ke receive-only channel atau sebaliknya.

Memeriksa Buffer Sebelum Kirim

Kadang kamu ingin tahu apakah buffer masih punya ruang sebelum mencoba mengirim, tanpa mau menunggu kalau buffer penuh. Ini bisa dilakukan dengan select dan default.

// main.go
package main

import "fmt"

func kirimJikaAda(ch chan string, pesan string) bool {
    select {
    case ch <- pesan:
        return true
    default:
        return false // buffer penuh, lewati
    }
}

func main() {
    antrianCh := make(chan string, 3)

    laporan := []string{
        "laporan-audit", "laporan-pajak", "laporan-aset",
        "laporan-investasi", // ini tidak akan masuk — buffer penuh
    }

    for _, lap := range laporan {
        berhasil := kirimJikaAda(antrianCh, lap)
        if berhasil {
            fmt.Printf("masuk antrian : %s\n", lap)
        } else {
            fmt.Printf("antrian penuh, lewati: %s\n", lap)
        }
    }

    fmt.Printf("\nisi antrian (%d/%d):\n", len(antrianCh), cap(antrianCh))
    for len(antrianCh) > 0 {
        fmt.Println(" -", <-antrianCh)
    }
}
masuk antrian : laporan-audit
masuk antrian : laporan-pajak
masuk antrian : laporan-aset
antrian penuh, lewati: laporan-investasi

isi antrian (3/3):
 - laporan-audit
 - laporan-pajak
 - laporan-aset

select dengan default membuat operasi channel menjadi non-blocking. Jika ch <- pesan tidak bisa langsung dieksekusi (karena buffer penuh), ia langsung jatuh ke default. Ini berguna untuk skenario di mana kehilangan beberapa item lebih baik daripada menghentikan seluruh alur program.

Latihan

Latihan 1 — Monitor kapasitas: Buat program yang menggunakan buffered channel berkapasitas 5. Kirim data satu per satu dari goroutine, dan setiap kali data masuk, cetak status buffer: "buffer: N/5". Gunakan len dan cap untuk menghitungnya.

Latihan 2 — Rate limiter: Buat fungsi prosesPermintaan(id int) yang mencetak "permintaan #N diproses". Gunakan buffered channel sebagai semaphore untuk memastikan tidak lebih dari 2 permintaan diproses secara bersamaan, meski 10 goroutine diluncurkan sekaligus. Petunjuk: pola yang sama dengan izin := make(chan struct{}, 2).

Latihan 3 — Pipeline dua tahap: Kembangkan worker pool di bab ini menjadi dua tahap. Tahap pertama: goroutine membaca nama file dari slice dan mengirimnya ke fileCh. Tahap kedua: tiga worker membaca dari fileCh, “memproses” file (cukup time.Sleep), lalu mengirim hasilnya ke arsipCh. Tahap ketiga: main membaca dari arsipCh dan mencetak hasilnya. Ini adalah pola pipeline tiga tahap yang umum di sistem pemrosesan data.

Buffered channel yang sudah kamu kuasai di bab ini — dari semantik blocking, semaphore, hingga worker pool — adalah fondasi dari sistem concurrency yang sesungguhnya. Tapi ada satu tantangan yang belum tersentuh: bagaimana jika kamu perlu menunggu salah satu dari beberapa channel sekaligus? Situasi ini muncul saat menggabungkan banyak sumber data atau saat ingin menambahkan timeout pada operasi channel. select di Go dirancang tepat untuk itu, dan itu yang akan kita bahas selanjutnya.

Referensi

  1. 1Buffered Channels — A Tour of Go
  2. 2Buffered Channels and Worker Pools — golangbot.com
  3. 3Channel types — The Go Programming Language Specification