BAB 29: Simplified Fan-in Fan-out Pipeline

Sederhanakan arsitektur pipeline dengan pola yang menggabungkan distribusi dan pengumpulan hasil dalam satu fungsi terpadu — lebih ringkas, tetap efisien.

Pipeline yang dibangun di Bab 28 bekerja dengan baik, tapi strukturnya cukup verbose: setiap stage membutuhkan fungsi tersendiri, fan-out memerlukan WaitGroup eksplisit, dan fan-in memerlukan fungsi gabungChannel yang terpisah. Untuk pipeline sederhana dengan satu jenis pekerjaan, semua boilerplate itu terasa berat. Ada pendekatan yang lebih ringkas — satu fungsi yang menggabungkan distribusi dan pengumpulan hasil sekaligus.

Ide di Balik Simplified Pipeline

Pada pipeline tradisional, fan-out dan fan-in dipisah: fungsi satu mendistribusikan pekerjaan, fungsi lain mengumpulkan hasilnya. Pendekatan simplified menggabungkan keduanya dalam satu fungsi. Fungsi itu menerima channel masukan, meluncurkan sejumlah worker di dalamnya, lalu mengembalikan satu channel keluaran yang sudah berisi hasil dari semua worker.

Dari sudut pandang pemanggil, tidak ada bedanya — ia tetap menerima <-chan Hasil dan mengiterasinya. Yang berubah adalah bagaimana fungsi itu diorganisasi di dalamnya. Analogi lini produksi dari Bab 28 masih berlaku: bedanya sekarang seluruh station distribusi dan pengumpulan dikemas dalam satu modul yang bisa dipakai ulang tanpa membangun ulang infrastrukturnya.

Perbandingan: Tradisional vs Simplified

Sebelum masuk ke kode, penting untuk memahami kapan masing-masing pendekatan lebih cocok.

AspekPipeline Tradisional (Bab 28)Simplified Pipeline
Jumlah fungsiBanyak (per stage)Sedikit (terpadu)
Kontrol per stageTinggiSedang
Keterbacaan untuk pipeline kompleksLebih jelasBisa membingungkan
Keterbacaan untuk satu jenis pekerjaanTerlalu verboseLebih bersih
Cocok untukMulti-stage dengan logika berbedaSatu stage dengan banyak worker

Simplified pipeline paling efektif ketika kamu punya satu jenis pekerjaan yang perlu diparalelkan — bukan rangkaian transformasi yang berbeda-beda.

Implementasi: Pemroses Komentar KontenKu

Bayangkan KontenKu menerima ribuan komentar baru yang perlu divalidasi: cek apakah mengandung kata terlarang, hitung skor spam, dan beri label status. Pekerjaan ini seragam — setiap komentar diproses dengan cara yang sama — tapi bisa memakan waktu jika dilakukan satu per satu.

Struktur Data

// validasi-komentar.go
package main

import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
)

type Komentar struct {
    ID      int
    Penulis string
    Isi     string
}

type HasilValidasi struct {
    ID        int
    Penulis   string
    SkorSpam  int
    Status    string // "aman", "ditahan", "diblokir"
}

Generator Komentar

// validasi-komentar.go (lanjutan)

func streamKomentar(jumlah int) <-chan Komentar {
    keluaran := make(chan Komentar)
    go func() {
        defer close(keluaran)
        contohIsi := []string{
            "artikel yang bagus, terima kasih",
            "kunjungi website gratis bonus",
            "penjelasannya sangat membantu",
            "beli produk murah disini klik link",
            "saya setuju dengan pendapat penulis",
            "cepat kaya tanpa modal hubungi saya",
        }
        for i := range jumlah {
            keluaran <- Komentar{
                ID:      i + 1,
                Penulis: fmt.Sprintf("user_%d", rand.Intn(1000)),
                Isi:     contohIsi[rand.Intn(len(contohIsi))],
            }
        }
    }()
    return keluaran
}

Fungsi Validasi (Logika Satu Worker)

// validasi-komentar.go (lanjutan)

var kataSpam = []string{"gratis bonus", "klik link", "tanpa modal", "hubungi saya", "murah disini"}

func validasiSatu(k Komentar) HasilValidasi {
    // simulasi: validasi membutuhkan waktu (cek database, API, dsb)
    time.Sleep(time.Duration(rand.Intn(15)+5) * time.Millisecond)

    skor := 0
    isiLower := strings.ToLower(k.Isi)
    for _, kata := range kataSpam {
        if strings.Contains(isiLower, kata) {
            skor += 40
        }
    }

    status := "aman"
    if skor >= 80 {
        status = "diblokir"
    } else if skor >= 40 {
        status = "ditahan"
    }

    return HasilValidasi{
        ID:       k.ID,
        Penulis:  k.Penulis,
        SkorSpam: skor,
        Status:   status,
    }
}

Simplified Fan-in Fan-out

Inilah inti polanya — satu fungsi yang menangani distribusi ke worker sekaligus pengumpulan hasilnya:

// validasi-komentar.go (lanjutan)

func validasiParalel(masukan <-chan Komentar, jumlahWorker int) <-chan HasilValidasi {
    keluaran := make(chan HasilValidasi)
    var wg sync.WaitGroup

    // fan-out: jalankan sejumlah worker
    for range jumlahWorker {
        wg.Add(1)
        go func() {
            defer wg.Done()
            // setiap worker membaca dari channel masukan yang sama (fan-out)
            // dan menulis ke channel keluaran yang sama (fan-in)
            for komentar := range masukan {
                keluaran <- validasiSatu(komentar)
            }
        }()
    }

    // goroutine penjaga: tutup keluaran setelah semua worker selesai
    go func() {
        wg.Wait()
        close(keluaran)
    }()

    return keluaran
}

Perhatikan betapa ringkasnya: fan-out terjadi karena jumlahWorker goroutine semuanya membaca dari masukan yang sama. Fan-in terjadi karena semua goroutine menulis ke keluaran yang sama. Tidak ada fungsi gabungChannel terpisah, tidak ada slice of channels — semuanya terjadi secara alami karena channel di Go aman diakses dari banyak goroutine secara bersamaan.

Ini bekerja karena Go menjamin bahwa operasi send dan receive pada channel adalah atomic — tidak ada race condition meskipun banyak goroutine mengakses channel yang sama secara bersamaan.

Program Lengkap

// validasi-komentar.go (lanjutan)

func main() {
    const jumlahKomentar = 200
    const jumlahWorker = 10

    fmt.Printf("memvalidasi %d komentar dengan %d worker...\n\n", jumlahKomentar, jumlahWorker)
    mulai := time.Now()

    komentar := streamKomentar(jumlahKomentar)
    hasil := validasiParalel(komentar, jumlahWorker)

    // statistik
    hitungan := map[string]int{"aman": 0, "ditahan": 0, "diblokir": 0}
    for h := range hasil {
        hitungan[h.Status]++
    }

    durasi := time.Since(mulai)
    fmt.Printf("selesai dalam %v\n\n", durasi.Round(time.Millisecond))
    fmt.Printf("aman    : %d komentar\n", hitungan["aman"])
    fmt.Printf("ditahan : %d komentar\n", hitungan["ditahan"])
    fmt.Printf("diblokir: %d komentar\n", hitungan["diblokir"])
}
memvalidasi 200 komentar dengan 10 worker...

selesai dalam 203ms

aman    : 134 komentar
ditahan : 43 komentar
diblokir: 23 komentar

Tanpa concurrency, 200 komentar dengan jeda 5–20ms per komentar membutuhkan sekitar 1.5 detik. Dengan 10 worker, selesai dalam ~200ms.

Mengukur Dampak Jumlah Worker

Salah satu keunggulan pola ini adalah mudahnya mengubah jumlah worker tanpa menyentuh logika bisnis. Tambahkan fungsi benchmark sederhana untuk menemukan titik optimal:

// benchmark-worker.go
package main

import (
    "fmt"
    "math/rand"
    "strings"
    "sync"
    "time"
)

// --- salin struct dan fungsi dari validasi-komentar.go ---

func ukurDurasi(jumlahKomentar, jumlahWorker int) time.Duration {
    mulai := time.Now()
    komentar := streamKomentar(jumlahKomentar)
    hasil := validasiParalel(komentar, jumlahWorker)
    for range hasil {
        // drain channel
    }
    return time.Since(mulai)
}

func main() {
    const jumlahKomentar = 500
    variasiWorker := []int{1, 2, 5, 10, 20, 50}

    fmt.Printf("benchmark: %d komentar\n\n", jumlahKomentar)
    fmt.Printf("%-10s %s\n", "worker", "durasi")
    fmt.Println(strings.Repeat("-", 25))

    for _, w := range variasiWorker {
        durasi := ukurDurasi(jumlahKomentar, w)
        fmt.Printf("%-10d %v\n", w, durasi.Round(time.Millisecond))
    }
}
benchmark: 500 komentar

worker     durasi
-------------------------
1          5.312s
2          2.698s
5          1.091s
10         548ms
20         285ms
50         203ms

Dari angka ini terlihat bahwa penambahan worker dari 20 ke 50 hanya memberikan penghematan ~82ms — jauh lebih kecil dari lompatan 1 ke 10 worker. Titik diminishing return seperti ini yang perlu dicari untuk setiap workload spesifik.

Untuk pekerjaan I/O-bound seperti validasi komentar yang menyimulasikan network call, jumlah worker bisa jauh melebihi jumlah CPU. Coba mulai dari jumlahWorker = runtime.NumCPU() * 4 sebagai baseline, lalu sesuaikan berdasarkan hasil benchmark.

Latihan

Latihan 1 — Simplified pipeline untuk resize gambar: Buat fungsi prosesGambar(masukan <-chan string, jumlahWorker int) <-chan string di mana setiap worker menerima path file gambar (string) dari channel masukan, “memproses” gambar itu (simulasikan dengan time.Sleep acak 10–50ms), lalu mengirim string "processed: <path>" ke channel keluaran. Jalankan dengan 100 path gambar dan 8 worker, ukur total durasinya.

Latihan 2 — Error handling per item: Modifikasi validasiParalel agar mengembalikan <-chan HasilValidasi di mana HasilValidasi memiliki field tambahan Err error. Jika validasi gagal (misalnya isi komentar kosong), isi Err dengan pesan error dan tetap kirim ke channel keluaran — jangan lewatkan item yang gagal. Pastikan pemanggil bisa membedakan item berhasil dan gagal.

Latihan 3 — Worker pool yang bisa dimatikan: Kembangkan validasiParalel agar menerima parameter ketiga ctx context.Context. Ketika context di-cancel (misalnya setelah 100ms), semua worker harus berhenti memproses item baru dan fungsi harus mengembalikan segera. Gunakan select dengan case <-ctx.Done() di dalam loop worker.


Simplified pipeline membuktikan bahwa concurrency di Go tidak selalu membutuhkan banyak fungsi dan banyak channel. Dengan satu fungsi yang tepat, kamu bisa mendistribusikan ribuan pekerjaan ke puluhan worker dan mengumpulkan hasilnya — semua dengan kode yang muat dalam satu layar. Tapi ada satu hal yang belum ditangani: bagaimana menghentikan seluruh pipeline ketika kondisi tertentu terpenuhi — misalnya timeout tercapai atau ada error fatal di tengah jalan. Latihan 3 di atas sudah memberi petunjuknya: ctx context.Context. Di bab berikutnya, kita akan mengintegrasikan context cancellation secara penuh ke dalam pipeline.

Referensi

  1. 1Go Concurrency Patterns: Pipelines and cancellation — The Go Blog
  2. 2sync.WaitGroup — Go Standard Library Documentation
  3. 3runtime.NumCPU — Go Standard Library Documentation