UnixWorker is used only on platforms where fork is available. The way this code works is, it forks a preconfigured number of workers and then It starts preconfigured number of threads that write to the connected pipe.
# File lib/bundler/parallel_workers/unix_worker.rb, line 17 def initialize(size, job) # Close the persistent connections for the main thread before forking Net::HTTP::Persistent.new('bundler', :ENV).shutdown super end
Start the threads whose job is basically to wait for incoming messages on request queue and write that message to the connected pipe. Also retrieve messages from child worker via connected pipe and write the message to response queue
@param size [Integer] Number of threads to be started
# File lib/bundler/parallel_workers/unix_worker.rb, line 67 def prepare_threads(size) @threads = size.times.map do |i| Thread.start do worker = @workers[i] loop do obj = @request_queue.deq break if obj.equal? POISON @response_queue.enq worker.work(obj) end end end end
Start forked workers for downloading gems. This version of worker is only used on platforms where fork is available.
@param size [Integer] Size of worker pool @param func [Proc] Job that should be executed in the worker
# File lib/bundler/parallel_workers/unix_worker.rb, line 30 def prepare_workers(size, func) @workers = size.times.map do |num| child_read, parent_write = IO.pipe parent_read, child_write = IO.pipe pid = Process.fork do begin parent_read.close parent_write.close while !child_read.eof? obj = Marshal.load child_read Marshal.dump func.call(obj, num), child_write end rescue Exception => e begin Marshal.dump WrappedException.new(e), child_write rescue Errno::EPIPE nil end ensure child_read.close child_write.close end end child_read.close child_write.close JobHandler.new pid, parent_read, parent_write end end
Kill the forked workers by sending SIGINT to them
# File lib/bundler/parallel_workers/unix_worker.rb, line 81 def stop_workers @workers.each do |worker| worker.io_r.close unless worker.io_r.closed? worker.io_w.close unless worker.io_w.closed? begin Process.kill :INT, worker.pid rescue Errno::ESRCH nil end end @workers.each do |worker| begin Process.waitpid worker.pid rescue Errno::ECHILD nil end end end