# File lib/thrift_client/event_machine.rb, line 52 def self.connect(host='localhost', port=9090, timeout=5, &block) EM.connect(host, port, self, host, port) do |conn| conn.pending_connect_timeout = timeout end end
# File lib/thrift_client/event_machine.rb, line 67 def initialize(host, port=9090) @host, @port = host, port @index = 0 @disconnected = 'not connected' @buf = '' end
# File lib/thrift_client/event_machine.rb, line 81 def blocking_read(size) raise IOError, "lost connection to #{@host}:#{@port}: #{@disconnected}" if @disconnected if can_read?(size) yank(size) else raise ArgumentError, "Unexpected state" if @size or @callback fiber = Fiber.current @size = size @callback = proc { |data| fiber.resume(data) } Fiber.yield end end
# File lib/thrift_client/event_machine.rb, line 127 def can_read?(size) @buf.size >= @index + size end
# File lib/thrift_client/event_machine.rb, line 74 def close trap do @disconnected = 'closed' close_connection(true) end end
# File lib/thrift_client/event_machine.rb, line 110 def connected? !@disconnected end
# File lib/thrift_client/event_machine.rb, line 114 def connection_completed @disconnected = nil succeed end
# File lib/thrift_client/event_machine.rb, line 97 def receive_data(data) trap do (@buf) << data if @callback and can_read?(@size) callback = @callback data = yank(@size) @callback = @size = nil callback.call(data) end end end
# File lib/thrift_client/event_machine.rb, line 58 def trap begin yield rescue Exception => ex puts ex.message puts ex.backtrace.join("\n") end end
# File lib/thrift_client/event_machine.rb, line 119 def unbind if !@disconnected @disconnected = 'unbound' else fail end end
# File lib/thrift_client/event_machine.rb, line 133 def yank(len) data = @buf.slice(@index, len) @index += len @index = @buf.size if @index > @buf.size if @index >= GARBAGE_BUFFER_SIZE @buf = @buf.slice(@index..-1) @index = 0 end data end