BAB 28: Concurrency Pipeline — Pemrosesan Bertahap yang Efisien
Pelajari pola pipeline dalam Go: menghubungkan goroutine lewat channel untuk memproses data secara concurrent, lengkap dengan fan-out dan fan-in untuk distribusi beban kerja.
Kamu sudah menguasai banyak bagian dari mesin concurrency Go: goroutine sebagai unit eksekusi ringan, channel sebagai jalur komunikasi, select untuk multiplexing, dan time.After untuk menghindari penantian tak terbatas. Semua itu adalah komponen individual. Sekarang saatnya menyatukannya ke dalam satu pola arsitektur yang paling sering muncul di sistem Go yang sesungguhnya: pipeline.
Apa Itu Pipeline
Pipeline adalah teknik mengorganisasi pekerjaan concurrent ke dalam serangkaian tahapan (stage) yang terhubung satu sama lain. Setiap stage menerima data dari stage sebelumnya melalui channel, memproses data itu, lalu mengirim hasilnya ke stage berikutnya melalui channel lain.
Anggap saja seperti lini produksi di pabrik: station pertama memotong bahan mentah, station kedua membentuknya, station ketiga melapisinya, station keempat mengemas. Setiap station bekerja secara bersamaan — station kedua tidak perlu menunggu station pertama selesai memproses semua bahan sebelum mulai bekerja. Selama station pertama mengirim hasil kerjanya, station kedua langsung memprosesnya.
Di Go, setiap station itu adalah goroutine, dan conveyor belt antar station adalah channel.
Pola Dasar: Tiga Stage
Untuk memahami pipeline secara konkret, kita bangun sistem pemrosesan log untuk KontenKu. Log server HTTP masuk sebagai teks mentah, perlu diparse menjadi struct, lalu difilter untuk mengambil hanya request yang menghasilkan error.
Stage 1: Generator
Stage pertama bertugas menghasilkan data mentah dan mengirimkannya ke channel. Ia biasanya mengambil data dari sumber eksternal — file, database, atau dalam contoh ini, slice yang disimulasikan sebagai stream.
// pipeline-log.go
package main
import (
"fmt"
"strings"
)
type LogMentah struct {
Baris string
}
type LogTerurai struct {
Metode string
Endpoint string
StatusCode int
}
// stage 1: kirim baris log mentah ke channel
func bacaLog(barisLog []string) <-chan LogMentah {
keluaran := make(chan LogMentah)
go func() {
defer close(keluaran)
for _, baris := range barisLog {
keluaran <- LogMentah{Baris: baris}
}
}()
return keluaran
}
Perhatikan pola yang digunakan: fungsi bacaLog langsung mengembalikan <-chan LogMentah — channel yang hanya bisa dibaca oleh penerima. Goroutine dijalankan di dalam fungsi, dan channel ditutup dengan defer close(keluaran) ketika goroutine selesai. Ini adalah kontrak yang jelas: siapa pun yang menerima channel ini tahu bahwa ketika channel tertutup, tidak ada lagi data yang akan datang.
Stage 2: Parser
Stage kedua menerima channel dari stage pertama dan mengembalikan channel baru berisi data yang sudah diproses.
// pipeline-log.go (lanjutan)
// stage 2: urai setiap baris menjadi struct terstruktur
func uraiLog(masukan <-chan LogMentah) <-chan LogTerurai {
keluaran := make(chan LogTerurai)
go func() {
defer close(keluaran)
for log := range masukan {
// format baris: "GET /artikel/golang 200"
bagian := strings.Fields(log.Baris)
if len(bagian) < 3 {
continue // skip baris yang tidak valid
}
statusCode := 0
fmt.Sscanf(bagian[2], "%d", &statusCode)
keluaran <- LogTerurai{
Metode: bagian[0],
Endpoint: bagian[1],
StatusCode: statusCode,
}
}
}()
return keluaran
}
Stage 3: Filter
Stage ketiga menyaring hasil dari stage sebelumnya — hanya meneruskan log dengan status code 4xx atau 5xx.
// pipeline-log.go (lanjutan)
// stage 3: filter hanya log dengan status error (>= 400)
func filterError(masukan <-chan LogTerurai) <-chan LogTerurai {
keluaran := make(chan LogTerurai)
go func() {
defer close(keluaran)
for log := range masukan {
if log.StatusCode >= 400 {
keluaran <- log
}
}
}()
return keluaran
}
Menyambung Pipeline
// pipeline-log.go (lanjutan)
func main() {
barisLog := []string{
"GET /artikel/golang 200",
"POST /komentar 201",
"GET /halaman/tidak-ada 404",
"GET /artikel/python 200",
"DELETE /admin/pengguna 403",
"GET /api/data 500",
"PUT /profil 200",
"GET /favicon.ico 404",
}
// sambung tiga stage menjadi satu pipeline
logMentah := bacaLog(barisLog)
logTerurai := uraiLog(logMentah)
logError := filterError(logTerurai)
// konsumsi hasil dari stage terakhir
fmt.Println("=== Log Error KontenKu ===")
for log := range logError {
fmt.Printf("[%d] %s %s\n", log.StatusCode, log.Metode, log.Endpoint)
}
}
=== Log Error KontenKu ===
[404] GET /halaman/tidak-ada
[403] DELETE /admin/pengguna
[500] GET /api/data
[404] GET /favicon.ico
Tiga baris penyambungan di main mencerminkan bagaimana pipeline Go terasa seperti komposisi fungsi — keluaran satu fungsi langsung menjadi masukan fungsi berikutnya. Data mengalir secara concurrent: saat stage 2 sedang mengurai baris ketiga, stage 3 sudah memproses hasil baris kedua, dan stage 1 sedang mengirim baris keempat.
Fan-Out: Distribusi Beban ke Banyak Worker
Satu tahap pipeline bisa menjadi bottleneck jika pekerjaannya jauh lebih berat dari stage lain. Solusinya adalah fan-out: menjalankan beberapa goroutine yang semuanya membaca dari channel yang sama.
Bayangkan stage penguraian log di atas perlu melakukan operasi yang lebih berat — misalnya parsing format log yang kompleks atau query ke database untuk memperkaya data. Satu goroutine tidak cukup cepat. Fan-out memungkinkan kita menjalankan beberapa instance stage itu secara bersamaan.
// pipeline-log-fanout.go
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
type LogMentah struct {
ID int
Baris string
}
type LogTerurai struct {
ID int
Metode string
Endpoint string
StatusCode int
}
func bacaLog(barisLog []string) <-chan LogMentah {
keluaran := make(chan LogMentah)
go func() {
defer close(keluaran)
for i, baris := range barisLog {
keluaran <- LogMentah{ID: i + 1, Baris: baris}
}
}()
return keluaran
}
// satu worker pengurai — akan dijalankan beberapa instance
func workerUrai(id int, masukan <-chan LogMentah, keluaran chan<- LogTerurai, wg *sync.WaitGroup) {
defer wg.Done()
for log := range masukan {
// simulasi: penguraian membutuhkan waktu berbeda-beda
time.Sleep(time.Duration(rand.Intn(50)+10) * time.Millisecond)
bagian := strings.Fields(log.Baris)
if len(bagian) < 3 {
continue
}
statusCode := 0
fmt.Sscanf(bagian[2], "%d", &statusCode)
keluaran <- LogTerurai{
ID: log.ID,
Metode: bagian[0],
Endpoint: bagian[1],
StatusCode: statusCode,
}
}
}
// fan-out: jalankan beberapa worker yang semuanya membaca dari channel yang sama
func uraiLogParalel(masukan <-chan LogMentah, jumlahWorker int) <-chan LogTerurai {
keluaran := make(chan LogTerurai)
var wg sync.WaitGroup
for i := range jumlahWorker {
wg.Add(1)
go workerUrai(i+1, masukan, keluaran, &wg)
}
// tutup channel keluaran setelah semua worker selesai
go func() {
wg.Wait()
close(keluaran)
}()
return keluaran
}
func main() {
barisLog := []string{
"GET /artikel/golang 200",
"POST /komentar 201",
"GET /halaman/tidak-ada 404",
"GET /artikel/python 200",
"DELETE /admin/pengguna 403",
"GET /api/data 500",
"PUT /profil 200",
"GET /favicon.ico 404",
"GET /artikel/rust 200",
"POST /langganan 422",
}
mulai := time.Now()
logMentah := bacaLog(barisLog)
// fan-out: 3 worker memproses secara bersamaan
logTerurai := uraiLogParalel(logMentah, 3)
fmt.Println("=== Hasil Penguraian ===")
for log := range logTerurai {
fmt.Printf("log #%d: [%d] %s %s\n", log.ID, log.StatusCode, log.Metode, log.Endpoint)
}
fmt.Printf("\nselesai dalam %v\n", time.Since(mulai))
}
=== Hasil Penguraian ===
log #1: [200] GET /artikel/golang
log #3: [404] GET /halaman/tidak-ada
log #2: [201] POST /komentar
log #5: [403] DELETE /admin/pengguna
log #4: [200] GET /artikel/python
log #7: [200] PUT /profil
log #6: [500] GET /api/data
log #9: [200] GET /artikel/rust
log #8: [404] GET /favicon.ico
log #10: [422] POST /langganan
selesai dalam 213ms
Urutan output tidak berurutan karena tiga worker berlomba memproses dan mengirim hasil ke channel yang sama. Worker mana yang selesai duluan, dialah yang berhasil mengirim lebih awal. Ini adalah perilaku yang diharapkan dari fan-out — kita menukar urutan yang terprediksi dengan kecepatan.
Jika urutan output penting, tambahkan field ID ke struct dan urutkan hasilnya setelah semua data terkumpul. Dalam banyak kasus nyata — seperti menulis log ke file berbeda atau menyimpan ke database — urutan memang tidak penting.
Fan-In: Menggabungkan Banyak Channel
Kebalikan dari fan-out adalah fan-in: menggabungkan output dari beberapa channel menjadi satu channel tunggal. Ini berguna ketika beberapa stage menghasilkan data dari sumber berbeda dan perlu dikonsumsi oleh satu stage selanjutnya.
// fan-in: gabungkan beberapa channel menjadi satu
func gabungChannel(channels ...<-chan LogTerurai) <-chan LogTerurai {
keluaran := make(chan LogTerurai)
var wg sync.WaitGroup
// satu goroutine per channel masukan
teruskan := func(ch <-chan LogTerurai) {
defer wg.Done()
for log := range ch {
keluaran <- log
}
}
wg.Add(len(channels))
for _, ch := range channels {
go teruskan(ch)
}
// tutup channel gabungan setelah semua sumber habis
go func() {
wg.Wait()
close(keluaran)
}()
return keluaran
}
Dengan gabungChannel, kamu bisa mengolah log dari beberapa server berbeda secara bersamaan dan menggabungkan hasilnya ke satu stream yang bisa diproses oleh stage berikutnya.
Pipeline Lengkap dengan Fan-Out dan Fan-In
Berikut implementasi penuh yang menggabungkan semua pola: pipeline tiga stage dengan fan-out di stage tengah dan pengukuran performa.
// pipeline-lengkap.go
package main
import (
"fmt"
"math/rand"
"strings"
"sync"
"time"
)
type EntriLog struct {
ID int
Baris string
}
type HasilAnalisis struct {
ID int
Metode string
Endpoint string
StatusCode int
Durasi time.Duration
}
func hasilkanLog(total int) <-chan EntriLog {
keluaran := make(chan EntriLog)
go func() {
defer close(keluaran)
endpoint := []string{"/artikel", "/komentar", "/profil", "/admin", "/api"}
metode := []string{"GET", "POST", "PUT", "DELETE"}
status := []int{200, 200, 200, 201, 301, 400, 403, 404, 500}
for i := range total {
ep := endpoint[rand.Intn(len(endpoint))]
mt := metode[rand.Intn(len(metode))]
st := status[rand.Intn(len(status))]
keluaran <- EntriLog{
ID: i + 1,
Baris: fmt.Sprintf("%s %s %d", mt, ep, st),
}
}
}()
return keluaran
}
func analisaWorker(masukan <-chan EntriLog, keluaran chan<- HasilAnalisis, wg *sync.WaitGroup) {
defer wg.Done()
for entri := range masukan {
mulai := time.Now()
// simulasi: analisis membutuhkan waktu
time.Sleep(time.Duration(rand.Intn(20)+5) * time.Millisecond)
bagian := strings.Fields(entri.Baris)
statusCode := 0
fmt.Sscanf(bagian[2], "%d", &statusCode)
keluaran <- HasilAnalisis{
ID: entri.ID,
Metode: bagian[0],
Endpoint: bagian[1],
StatusCode: statusCode,
Durasi: time.Since(mulai),
}
}
}
func analisaParalel(masukan <-chan EntriLog, jumlahWorker int) <-chan HasilAnalisis {
keluaran := make(chan HasilAnalisis, jumlahWorker)
var wg sync.WaitGroup
for range jumlahWorker {
wg.Add(1)
go analisaWorker(masukan, keluaran, &wg)
}
go func() {
wg.Wait()
close(keluaran)
}()
return keluaran
}
func filterStatusError(masukan <-chan HasilAnalisis) <-chan HasilAnalisis {
keluaran := make(chan HasilAnalisis)
go func() {
defer close(keluaran)
for hasil := range masukan {
if hasil.StatusCode >= 400 {
keluaran <- hasil
}
}
}()
return keluaran
}
func main() {
const totalLog = 50
const jumlahWorker = 5
mulaiTotal := time.Now()
// stage 1: hasilkan log
logMasuk := hasilkanLog(totalLog)
// stage 2: analisa secara paralel (fan-out dengan 5 worker)
logDianalisa := analisaParalel(logMasuk, jumlahWorker)
// stage 3: filter hanya yang error
logError := filterStatusError(logDianalisa)
// konsumsi hasil akhir
jumlahError := 0
for hasil := range logError {
jumlahError++
fmt.Printf("ERROR #%-3d [%d] %-8s %-12s (analisa: %v)\n",
hasil.ID, hasil.StatusCode, hasil.Metode, hasil.Endpoint, hasil.Durasi.Round(time.Millisecond))
}
fmt.Printf("\n%d dari %d request menghasilkan error\n", jumlahError, totalLog)
fmt.Printf("total waktu: %v (dengan %d worker)\n", time.Since(mulaiTotal).Round(time.Millisecond), jumlahWorker)
}
ERROR #2 [404] GET /admin (analisa: 12ms)
ERROR #4 [500] POST /api (analisa: 18ms)
ERROR #7 [403] DELETE /admin (analisa: 7ms)
...
17 dari 50 request menghasilkan error
total waktu: 127ms (dengan 5 worker)
Tanpa fan-out, 50 log yang masing-masing butuh 5–25ms untuk dianalisa akan membutuhkan sekitar 750ms secara sequential. Dengan 5 worker, waktu total turun ke sekitar 127ms — pengurangan yang signifikan karena waktu tunggu I/O di setiap worker dimanfaatkan oleh worker lain.
Jumlah worker optimal bergantung pada karakteristik pekerjaan. Untuk pekerjaan I/O-bound (query database, HTTP call), lebih banyak worker biasanya lebih baik karena setiap worker sering menunggu. Untuk pekerjaan CPU-bound (komputasi matematika, parsing intensif), jumlah worker yang ideal biasanya mendekati jumlah core CPU yang tersedia — bisa dibaca via runtime.NumCPU().
Latihan
Latihan 1 — Pipeline tiga stage untuk CSV:
Buat pipeline yang membaca baris CSV dari slice (stage 1), mengurai setiap baris menjadi struct Transaksi{ID int, Nominal float64, Kategori string} (stage 2), lalu menjumlahkan total nominal per kategori menggunakan map di stage 3. Tampilkan hasilnya setelah pipeline selesai.
Latihan 2 — Fan-out dengan pengukuran throughput: Jalankan pipeline dari latihan 1 dengan jumlah worker yang berbeda: 1, 2, 4, dan 8 worker di stage 2. Ukur waktu total untuk memproses 1000 baris dan bandingkan throughput-nya. Pada titik berapa penambahan worker tidak lagi memberikan peningkatan signifikan?
Latihan 3 — Fan-in dari dua sumber:
Simulasikan dua “server” yang masing-masing menghasilkan log dengan format berbeda. Buat dua generator terpisah (stage 1a dan 1b), normalize outputnya ke format yang sama di stage 2, gabungkan menggunakan gabungChannel (fan-in), lalu filter dan hitung error di stage 3. Verifikasi bahwa log dari kedua server tercampur secara natural di output akhir.
Pipeline adalah salah satu pola Go yang paling elegan: ia memanfaatkan channel bukan sekadar sebagai mekanisme komunikasi, tapi sebagai kontrak antar komponen yang menentukan siapa yang menghasilkan data, siapa yang memprosesnya, dan siapa yang mengonsumsinya. Dengan fan-out dan fan-in, kamu bisa menyesuaikan kapasitas setiap stage secara independen tanpa mengubah stage lain — persis seperti menambah atau mengurangi jumlah station di lini produksi tanpa mengubah desain lininya.