golang 的线程池实现
原文链接: Handling 1 Million Requests per Minute with Go
注: 这里说的“线程”并不是真正的线程,而是 golang 的协程。
假如要完成的工作是接受一个 POST 请求,并将请求中的 payload 数组部分依次进行处理。
在本文的程序中,只需将 payload 依次取出,将其包装后发送到 一个名为 JobQueue 的 chan 中,线程池会处理提交的 Job。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to Worker for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }
线程池的实现如下。
// 按照 12-Factor,使用环境变量作为配置 var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { // WorkerPool 是个指向全局唯一的 chan 的引用, // 负责传递 Worker 接收 Job 的 chan。 // Worker 空闲时,将自己的 JobChannel 放入 WorkerPool 中。 // Dispatcher 收到新的 Job 时,从 JobChannel 中取出一个 chan, 并将 Job // 放入其中,此时 Worker 将从 Chan 中接收到 Job,并进行处理 WorkerPool chan chan Job // Worker 用于接收 Job 的 chan JobChannel chan Job // 用于给 Worker 发送控制命令的 chan,用于停止 chan quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. // DO THE JOB case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }
有了 Worker 还需要一个用于启动,分发 Job 的角色:
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
初始化时只需执行下面两句即可:
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
此时,就可以像开头第一个程序那样只需向 JobQueue 中发送 Job 即可将任务交由线程池处理。