EventMachine::Connection
app failure reconnections
network failure reconnections
# File lib/twitter/json_stream.rb, line 49 def self.connect options = {} options[:port] = 443 if options[:ssl] && !options.has_key?(:port) options = DEFAULT_OPTIONS.merge(options) host = options[:host] port = options[:port] if options[:proxy] proxy_uri = URI.parse(options[:proxy]) host = proxy_uri.host port = proxy_uri.port end connection = EventMachine.connect host, port, self, options connection.start_tls if options[:ssl] connection end
# File lib/twitter/json_stream.rb, line 67 def initialize options = {} @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly @gracefully_closed = false @nf_last_reconnect = nil @af_last_reconnect = nil @reconnect_retries = 0 @immediate_reconnect = false @on_inited_callback = options.delete(:on_inited) @proxy = URI.parse(options[:proxy]) if options[:proxy] end
# File lib/twitter/json_stream.rb, line 124 def connection_completed send_request end
# File lib/twitter/json_stream.rb, line 78 def each_item &block @each_item_callback = block end
# File lib/twitter/json_stream.rb, line 103 def immediate_reconnect @immediate_reconnect = true @gracefully_closed = false close_connection end
# File lib/twitter/json_stream.rb, line 94 def on_close &block @close_callback = block end
# File lib/twitter/json_stream.rb, line 82 def on_error &block @error_callback = block end
# File lib/twitter/json_stream.rb, line 90 def on_max_reconnects &block @max_reconnects_callback = block end
# File lib/twitter/json_stream.rb, line 86 def on_reconnect &block @reconnect_callback = block end
# File lib/twitter/json_stream.rb, line 128 def post_init reset_state @on_inited_callback.call if @on_inited_callback end
Receives raw data from the HTTP connection and pushes it into the HTTP parser which then drives subsequent callbacks.
# File lib/twitter/json_stream.rb, line 120 def receive_data(data) @parser << data end
# File lib/twitter/json_stream.rb, line 338 def escape str URI.escape(str.to_s, /[^a-zA-Z0-9\-\.\_\~]/) end
Called when the status line and all headers have been read from the stream.
# File lib/twitter/json_stream.rb, line 194 def handle_headers_complete(headers) @code = @parser.status_code.to_i if @code != 200 receive_error("invalid status code: #{@code}.") end self.headers = headers @state = :stream end
:filters => %w(miama lebron jesus) :oauth => {
:consumer_key => [key], :consumer_secret => [token], :access_key => [access key], :access_secret => [access secret]
}
# File lib/twitter/json_stream.rb, line 301 def oauth_header uri = uri_base + @options[:path].to_s # The hash SimpleOAuth accepts is slightly different from that of # ROAuth. To preserve backward compatability, fix the cache here # so that the arguments passed in don't need to change. oauth = { :consumer_key => @options[:oauth][:consumer_key], :consumer_secret => @options[:oauth][:consumer_secret], :token => @options[:oauth][:access_key], :token_secret => @options[:oauth][:access_secret] } SimpleOAuth::Header.new(@options[:method], uri, params, oauth) end
Normalized query hash of escaped string keys and escaped string values nil values are skipped
# File lib/twitter/json_stream.rb, line 324 def params flat = {} @options[:params].merge( :track => @options[:filters] ).each do |param, val| next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?) val = val.join(",") if val.respond_to?(:join) flat[param.to_s] = val.to_s end flat end
# File lib/twitter/json_stream.rb, line 271 def parse_stream_line ln ln.strip! unless ln.empty? if ln[0,1] == '{' || ln[ln.length-1,1] == '}' @stream << ln if @stream[0,1] == '{' && @stream[@stream.length-1,1] == '}' @each_item_callback.call(@stream) if @each_item_callback @stream = '' end end end end
# File lib/twitter/json_stream.rb, line 334 def query params.map{|param, value| [escape(param), escape(value)].join("=")}.sort.join("&") end
# File lib/twitter/json_stream.rb, line 267 def receive_error e @error_callback.call(e) if @error_callback end
Called every time a chunk of data is read from the connection once it has been opened and after the headers have been processed.
# File lib/twitter/json_stream.rb, line 205 def receive_stream_data(data) begin @buffer.extract(data).each do |line| parse_stream_line(line) end @stream = '' rescue Exception => e receive_error("#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t")) close_connection return end end
# File lib/twitter/json_stream.rb, line 144 def reconnect_after timeout @reconnect_callback.call(timeout, @reconnect_retries) if @reconnect_callback if timeout == 0 reconnect @options[:host], @options[:port] else EventMachine.add_timer(timeout) do reconnect @options[:host], @options[:port] end end end
# File lib/twitter/json_stream.rb, line 156 def reconnect_timeout if @immediate_reconnect @immediate_reconnect = false return 0 end if (@code == 0) # network failure if @nf_last_reconnect @nf_last_reconnect += NF_RECONNECT_ADD else @nf_last_reconnect = NF_RECONNECT_START end [@nf_last_reconnect,NF_RECONNECT_MAX].min else if @af_last_reconnect @af_last_reconnect *= AF_RECONNECT_MUL else @af_last_reconnect = AF_RECONNECT_START end @af_last_reconnect end end
# File lib/twitter/json_stream.rb, line 179 def reset_state set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0 @code = 0 @headers = {} @state = :init @buffer = BufferedTokenizer.new("\r", MAX_LINE_LENGTH) @stream = '' @parser = Http::Parser.new @parser.on_headers_complete = method(:handle_headers_complete) @parser.on_body = method(:receive_stream_data) end
# File lib/twitter/json_stream.rb, line 284 def reset_timeouts set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0 @nf_last_reconnect = @af_last_reconnect = nil @reconnect_retries = 0 end
# File lib/twitter/json_stream.rb, line 134 def schedule_reconnect timeout = reconnect_timeout @reconnect_retries += 1 if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX reconnect_after(timeout) else @max_reconnects_callback.call(timeout, @reconnect_retries) if @max_reconnects_callback end end
# File lib/twitter/json_stream.rb, line 218 def send_request data = [] request_uri = @options[:path] if @proxy # proxies need the request to be for the full url request_uri = "#{uri_base}:#{@options[:port]}#{request_uri}" end content = @options[:content] unless (q = query).empty? if @options[:method].to_s.upcase == 'GET' request_uri << "?#{q}" else content = q end end data << "#{@options[:method]} #{request_uri} HTTP/1.1" data << "Host: #{@options[:host]}" data << 'Accept: */*' data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent] if @options[:auth] data << "Authorization: Basic #{[@options[:auth]].pack('m').delete("\r\n")}" elsif @options[:oauth] data << "Authorization: #{oauth_header}" end if @proxy && @proxy.user data << "Proxy-Authorization: Basic " + ["#{@proxy.user}:#{@proxy.password}"].pack('m').delete("\r\n") end if @options[:method] == 'POST' data << "Content-type: #{@options[:content_type]}" data << "Content-length: #{content.length}" end if @options[:headers] @options[:headers].each do |name,value| data << "#{name}: #{value}" end end data << "\r\n" send_data data.join("\r\n") << content end
Generated with the Darkfish Rdoc Generator 2.