Parent

Class/Module Index [+]

Quicksearch

Twitter::JSONStream

Attributes

af_last_reconnect[RW]
code[RW]
headers[RW]
nf_last_reconnect[RW]
proxy[RW]
reconnect_retries[RW]

Public Class Methods

connect(options = {}) click to toggle source
# File lib/twitter/json_stream.rb, line 49
def self.connect options = {}
  options[:port] = 443 if options[:ssl] && !options.has_key?(:port)
  options = DEFAULT_OPTIONS.merge(options)

  host = options[:host]
  port = options[:port]

  if options[:proxy]
    proxy_uri = URI.parse(options[:proxy])
    host = proxy_uri.host
    port = proxy_uri.port
  end

  connection = EventMachine.connect host, port, self, options
  connection.start_tls if options[:ssl]
  connection
end
new(options = {}) click to toggle source
# File lib/twitter/json_stream.rb, line 67
def initialize options = {}
  @options = DEFAULT_OPTIONS.merge(options) # merge in case initialize called directly
  @gracefully_closed = false
  @nf_last_reconnect = nil
  @af_last_reconnect = nil
  @reconnect_retries = 0
  @immediate_reconnect = false
  @on_inited_callback = options.delete(:on_inited)
  @proxy = URI.parse(options[:proxy]) if options[:proxy]
end

Public Instance Methods

connection_completed() click to toggle source
# File lib/twitter/json_stream.rb, line 124
def connection_completed
  send_request
end
each_item(&block) click to toggle source
# File lib/twitter/json_stream.rb, line 78
def each_item &block
  @each_item_callback = block
end
immediate_reconnect() click to toggle source
# File lib/twitter/json_stream.rb, line 103
def immediate_reconnect
  @immediate_reconnect = true
  @gracefully_closed = false
  close_connection
end
on_close(&block) click to toggle source
# File lib/twitter/json_stream.rb, line 94
def on_close &block
  @close_callback = block
end
on_error(&block) click to toggle source
# File lib/twitter/json_stream.rb, line 82
def on_error &block
  @error_callback = block
end
on_max_reconnects(&block) click to toggle source
# File lib/twitter/json_stream.rb, line 90
def on_max_reconnects &block
  @max_reconnects_callback = block
end
on_reconnect(&block) click to toggle source
# File lib/twitter/json_stream.rb, line 86
def on_reconnect &block
  @reconnect_callback = block
end
post_init() click to toggle source
# File lib/twitter/json_stream.rb, line 128
def post_init
  reset_state
  @on_inited_callback.call if @on_inited_callback
end
receive_data(data) click to toggle source

Receives raw data from the HTTP connection and pushes it into the HTTP parser which then drives subsequent callbacks.

# File lib/twitter/json_stream.rb, line 120
def receive_data(data)
  @parser << data
end
stop() click to toggle source
# File lib/twitter/json_stream.rb, line 98
def stop
  @gracefully_closed = true
  close_connection
end
unbind() click to toggle source
# File lib/twitter/json_stream.rb, line 109
def unbind
  if @state == :stream && !@buffer.empty?
    parse_stream_line(@buffer.flush)
  end
  schedule_reconnect if @options[:auto_reconnect] && !@gracefully_closed
  @close_callback.call if @close_callback

end

Protected Instance Methods

escape(str) click to toggle source
# File lib/twitter/json_stream.rb, line 338
def escape str
  URI.escape(str.to_s, /[^a-zA-Z0-9\-\.\_\~]/)
end
handle_headers_complete(headers) click to toggle source

Called when the status line and all headers have been read from the stream.

# File lib/twitter/json_stream.rb, line 194
def handle_headers_complete(headers)
  @code = @parser.status_code.to_i
  if @code != 200
    receive_error("invalid status code: #{@code}.")
  end
  self.headers = headers
  @state = :stream
end
oauth_header() click to toggle source

:filters => %w(miama lebron jesus) :oauth => {

:consumer_key    => [key],
:consumer_secret => [token],
:access_key      => [access key],
:access_secret   => [access secret]

}

# File lib/twitter/json_stream.rb, line 301
def oauth_header
  uri = uri_base + @options[:path].to_s

  # The hash SimpleOAuth accepts is slightly different from that of
  # ROAuth.  To preserve backward compatability, fix the cache here
  # so that the arguments passed in don't need to change.
  oauth = {
    :consumer_key => @options[:oauth][:consumer_key],
    :consumer_secret => @options[:oauth][:consumer_secret],
    :token => @options[:oauth][:access_key],
    :token_secret => @options[:oauth][:access_secret]
  }

  SimpleOAuth::Header.new(@options[:method], uri, params, oauth)
end
params() click to toggle source

Normalized query hash of escaped string keys and escaped string values nil values are skipped

# File lib/twitter/json_stream.rb, line 324
def params
  flat = {}
  @options[:params].merge( :track => @options[:filters] ).each do |param, val|
    next if val.to_s.empty? || (val.respond_to?(:empty?) && val.empty?)
    val = val.join(",") if val.respond_to?(:join)
    flat[param.to_s] = val.to_s
  end
  flat
end
parse_stream_line(ln) click to toggle source
# File lib/twitter/json_stream.rb, line 271
def parse_stream_line ln
  ln.strip!
  unless ln.empty?
    if ln[0,1] == '{' || ln[ln.length-1,1] == '}'
      @stream << ln
      if @stream[0,1] == '{' && @stream[@stream.length-1,1] == '}'
        @each_item_callback.call(@stream) if @each_item_callback
        @stream = ''
      end
    end
  end
end
query() click to toggle source
# File lib/twitter/json_stream.rb, line 334
def query
  params.map{|param, value| [escape(param), escape(value)].join("=")}.sort.join("&")
end
receive_error(e) click to toggle source
# File lib/twitter/json_stream.rb, line 267
def receive_error e
  @error_callback.call(e) if @error_callback
end
receive_stream_data(data) click to toggle source

Called every time a chunk of data is read from the connection once it has been opened and after the headers have been processed.

# File lib/twitter/json_stream.rb, line 205
def receive_stream_data(data)
  begin
    @buffer.extract(data).each do |line|
      parse_stream_line(line)
    end
    @stream  = ''
  rescue Exception => e
    receive_error("#{e.class}: " + [e.message, e.backtrace].flatten.join("\n\t"))
    close_connection
    return
  end
end
reconnect_after(timeout) click to toggle source
# File lib/twitter/json_stream.rb, line 144
def reconnect_after timeout
  @reconnect_callback.call(timeout, @reconnect_retries) if @reconnect_callback

  if timeout == 0
    reconnect @options[:host], @options[:port]
  else
    EventMachine.add_timer(timeout) do
      reconnect @options[:host], @options[:port]
    end
  end
end
reconnect_timeout() click to toggle source
# File lib/twitter/json_stream.rb, line 156
def reconnect_timeout
  if @immediate_reconnect
    @immediate_reconnect = false
    return 0
  end

  if (@code == 0) # network failure
    if @nf_last_reconnect
      @nf_last_reconnect += NF_RECONNECT_ADD
    else
      @nf_last_reconnect = NF_RECONNECT_START
    end
    [@nf_last_reconnect,NF_RECONNECT_MAX].min
  else
    if @af_last_reconnect
      @af_last_reconnect *= AF_RECONNECT_MUL
    else
      @af_last_reconnect = AF_RECONNECT_START
    end
    @af_last_reconnect
  end
end
reset_state() click to toggle source
# File lib/twitter/json_stream.rb, line 179
def reset_state
  set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0
  @code    = 0
  @headers = {}
  @state   = :init
  @buffer  = BufferedTokenizer.new("\r", MAX_LINE_LENGTH)
  @stream  = ''

  @parser  = Http::Parser.new
  @parser.on_headers_complete = method(:handle_headers_complete)
  @parser.on_body = method(:receive_stream_data)
end
reset_timeouts() click to toggle source
# File lib/twitter/json_stream.rb, line 284
def reset_timeouts
  set_comm_inactivity_timeout @options[:timeout] if @options[:timeout] > 0
  @nf_last_reconnect = @af_last_reconnect = nil
  @reconnect_retries = 0
end
schedule_reconnect() click to toggle source
# File lib/twitter/json_stream.rb, line 134
def schedule_reconnect
  timeout = reconnect_timeout
  @reconnect_retries += 1
  if timeout <= RECONNECT_MAX && @reconnect_retries <= RETRIES_MAX
    reconnect_after(timeout)
  else
    @max_reconnects_callback.call(timeout, @reconnect_retries) if @max_reconnects_callback
  end
end
send_request() click to toggle source
# File lib/twitter/json_stream.rb, line 218
def send_request
  data = []
  request_uri = @options[:path]

  if @proxy
    # proxies need the request to be for the full url
    request_uri = "#{uri_base}:#{@options[:port]}#{request_uri}"
  end

  content = @options[:content]

  unless (q = query).empty?
    if @options[:method].to_s.upcase == 'GET'
      request_uri << "?#{q}"
    else
      content = q
    end
  end

  data << "#{@options[:method]} #{request_uri} HTTP/1.1"
  data << "Host: #{@options[:host]}"
  data << 'Accept: */*'
  data << "User-Agent: #{@options[:user_agent]}" if @options[:user_agent]

  if @options[:auth]
    data << "Authorization: Basic #{[@options[:auth]].pack('m').delete("\r\n")}"
  elsif @options[:oauth]
    data << "Authorization: #{oauth_header}"
  end

  if @proxy && @proxy.user
    data << "Proxy-Authorization: Basic " + ["#{@proxy.user}:#{@proxy.password}"].pack('m').delete("\r\n")
  end
  if @options[:method] == 'POST'
    data << "Content-type: #{@options[:content_type]}"
    data << "Content-length: #{content.length}"
  end

  if @options[:headers]
    @options[:headers].each do |name,value|
        data << "#{name}: #{value}"
    end
  end

  data << "\r\n"

  send_data data.join("\r\n") << content
end
uri_base() click to toggle source

Scheme (https if ssl, http otherwise) and host part of URL

# File lib/twitter/json_stream.rb, line 318
def uri_base
  "http#{'s' if @options[:ssl]}://#{@options[:host]}"
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.