# File lib/thrift_client/abstract_thrift_client.rb, line 38 def initialize(client_class, servers, options = {}) @options = DEFAULTS.merge(options) @options[:server_retry_period] ||= 0 @client_class = client_class @server_list = Array(servers).collect do |s| Server.new(s, @options[:cached_connections]) end.sort_by { rand } @current_server = @server_list.first @callbacks = {} @client_methods = [] @client_class.instance_methods.each do |method_name| if method_name != 'send_message' && method_name =~ /^send_(.*)$/ instance_eval("def #{$1}(*args); handled_proxy(:'#{$1}', *args); end", __FILE__, __LINE__) @client_methods << $1 end end @request_count = 0 @options[:wrapped_exception_classes].each do |exception_klass| name = exception_klass.to_s.split('::').last begin @client_class.const_get(name) rescue NameError @client_class.const_set(name, Class.new(exception_klass)) end end end
Adds a callback that will be invoked at a certain time. The valid callback types are:
:post_connect - should accept a single AbstractThriftClient argument, which is the client object to which the callback was added. Called after a connection to the remote thrift server is established. :before_method - should accept a single method name argument. Called before a method is invoked on the thrift server. :on_exception - should accept 2 args: an Exception instance and a method name. Called right before the exception is raised.
# File lib/thrift_client/abstract_thrift_client.rb, line 74 def add_callback(callback_type, &block) case callback_type when :post_connect, :before_method, :on_exception @callbacks[callback_type] ||= [] @callbacks[callback_type].push(block) # Allow chaining return self else return nil end end
Force the client to connect to the server. Not necessary to be called as the connection will be made on the first RPC method call.
# File lib/thrift_client/abstract_thrift_client.rb, line 93 def connect! @current_server = next_live_server @current_server.open(@options[:transport], @options[:transport_wrapper], @options[:connect_timeout], @options[:timeout]) @client = @client_class.new(@options[:protocol].new(@current_server, *@options[:protocol_extra_params])) do_callbacks(:post_connect, self) rescue IOError, Thrift::TransportException disconnect!(true) retry end
# File lib/thrift_client/abstract_thrift_client.rb, line 106 def disconnect!(error = false) if @current_server @current_server.mark_down!(@options[:server_retry_period]) if error @current_server.close end @client = nil @current_server = nil @request_count = 0 end
# File lib/thrift_client/abstract_thrift_client.rb, line 86 def inspect "<#{self.class}(#{client_class}) @current_server=#{@current_server}>" end
Calls all callbacks of the specified type with the given args
# File lib/thrift_client/abstract_thrift_client.rb, line 120 def do_callbacks(callback_type_sym, *args) return unless @callbacks[callback_type_sym] @callbacks[callback_type_sym].each do |callback| callback.call(*args) end end
# File lib/thrift_client/abstract_thrift_client.rb, line 139 def handled_proxy(method_name, *args) disconnect! if @options[:server_max_requests] && @request_count >= @options[:server_max_requests] begin connect! unless @client if has_timeouts? @client.timeout = @options[:timeout_overrides][method_name.to_sym] || @options[:timeout] end @request_count += 1 do_callbacks(:before_method, method_name) @client.send(method_name, *args) rescue *@options[:exception_class_overrides] => e raise_or_default(e, method_name) rescue *@options[:exception_classes] => e disconnect!(true) tries ||= (@options[:retry_overrides][method_name.to_sym] || @options[:retries]) + 1 tries -= 1 if tries > 0 retry else raise_or_default(e, method_name) end rescue Exception => e raise_or_default(e, method_name) end end
# File lib/thrift_client/abstract_thrift_client.rb, line 182 def has_timeouts? @has_timeouts ||= @options[:timeout_overrides].any? && transport_can_timeout? end
# File lib/thrift_client/abstract_thrift_client.rb, line 127 def next_live_server @server_index ||= 0 @server_list.length.times do |i| cur = (1 + @server_index + i) % @server_list.length if @server_list[cur].up? @server_index = cur return @server_list[cur] end end raise ThriftClient::NoServersAvailable, "No live servers in #{@server_list.inspect}." end
# File lib/thrift_client/abstract_thrift_client.rb, line 165 def raise_or_default(e, method_name) if @options[:raise] raise_wrapped_error(e, method_name) else @options[:defaults][method_name.to_sym] end end
# File lib/thrift_client/abstract_thrift_client.rb, line 173 def raise_wrapped_error(e, method_name) do_callbacks(:on_exception, e, method_name) if @options[:wrapped_exception_classes].include?(e.class) raise @client_class.const_get(e.class.to_s.split('::').last), e.message, e.backtrace else raise e end end
# File lib/thrift_client/abstract_thrift_client.rb, line 186 def transport_can_timeout? if (@options[:transport_wrapper] || @options[:transport]).method_defined?(:timeout=) true else warn "ThriftClient: Timeout overrides have no effect with with transport type #{(@options[:transport_wrapper] || @options[:transport])}" false end end