class GRPC::RpcServer
RpcServer hosts a number of services and makes them available on the network.
Private Class Methods
Creates a new RpcServer.
The RPC server is configured using keyword arguments.
There are some specific keyword args used to configure the RpcServer instance, however other arbitrary are allowed and when present are used to configure the listeninng connection set up by the RpcServer.
-
server_override: which if passed must be a [GRPC::Core::Server]. When
present.
-
poll_period: when present, the server polls for new events with this
period
-
pool_size: the size of the thread pool the server uses to run its
threads
-
completion_queue_override: when supplied, this will be used as the
completion_queue that the server uses to receive network events, otherwise its creates a new instance itself
-
creds: [GRPC::Core::ServerCredentials]
the credentials used to secure the server
-
max_waiting_requests: the maximum number of requests that are not
being handled to allow. When this limit is exceeded, the server responds with not available to new requests
-
connect_md_proc:
when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: {key: val, ..} func(method_name, {key: val, …})
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 254 def initialize(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, connect_md_proc:nil, **kw) @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc) @cq = RpcServer.setup_cq(completion_queue_override) @max_waiting_requests = max_waiting_requests @poll_period = poll_period @pool_size = pool_size @pool = Pool.new(@pool_size) @run_cond = ConditionVariable.new @run_mutex = Mutex.new # running_state can take 4 values: :not_started, :running, :stopping, and # :stopped. State transitions can only proceed in that order. @running_state = :not_started @server = RpcServer.setup_srv(server_override, @cq, **kw) end
::setup_connect_md_proc is used by initialize to validate the connect_md_proc.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 216 def self.setup_connect_md_proc(a_proc) return nil if a_proc.nil? fail(TypeError, '!Proc') unless a_proc.is_a? Proc a_proc end
::setup_cq is used by initialize to constuct a Core::CompletionQueue from its arguments.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 198 def self.setup_cq(alt_cq) return Core::CompletionQueue.new if alt_cq.nil? unless alt_cq.is_a? Core::CompletionQueue fail(TypeError, '!CompletionQueue') end alt_cq end
::setup_srv is used by initialize to constuct a Core::Server from its arguments.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 208 def self.setup_srv(alt_srv, cq, **kw) return Core::Server.new(cq, kw) if alt_srv.nil? fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server alt_srv end
Private Instance Methods
This should be called while holding @run_mutex
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 506 def add_rpc_descs_for(service) cls = service.is_a?(Class) ? service : service.class specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {}) cls.rpc_descs.each_pair do |name, spec| route = "/#{cls.service_name}/#{name}".to_sym fail "already registered: rpc #{route} from #{spec}" if specs.key? route specs[route] = spec rpc_name = GenericService.underscore(name.to_s).to_sym if service.is_a?(Class) handlers[route] = cls.new.method(rpc_name) else handlers[route] = service.method(rpc_name) end GRPC.logger.info("handling #{route} with #{handlers[route]}") end end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 495 def assert_valid_service_class(cls) unless cls.include?(GenericService) fail "#{cls} must 'include GenericService'" end if cls.rpc_descs.size.zero? fail "#{cls} should specify some rpc descriptions" end cls.assert_rpc_descs_have_methods end
Sends UNAVAILABLE if there are too many unprocessed jobs
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 407 def available?(an_rpc) jobs_count, max = @pool.jobs_waiting, @max_waiting_requests GRPC.logger.info("waiting: #{jobs_count}, max: #{max}") return an_rpc if @pool.jobs_waiting <= @max_waiting_requests GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNAVAILABLE, '') nil end
handle registration of classes
service is either a class that includes GRPC::GenericService and whose new function can be called without argument or any instance of such a class.
E.g, after
class Divider
include GRPC::GenericService rpc :div DivArgs, DivReply # single request, single response def initialize(optional_arg='default option') # no args ... end
srv = ::new
# Either of these works
srv.handle(Divider)
# or
srv.handle(Divider.new('replace optional arg'))
It raises RuntimeError:
-
if service is not valid service class or object
-
its handler methods are already registered
-
if the server is already running
@param service [Object|Class] a service class or object as described
above
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 377 def handle(service) @run_mutex.synchronize do unless @running_state == :not_started fail 'cannot add services if the server has been started' end cls = service.is_a?(Class) ? service : service.class assert_valid_service_class(cls) add_rpc_descs_for(service) end end
Sends UNIMPLEMENTED if the method is not implemented by this server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 419 def implemented?(an_rpc) mth = an_rpc.method.to_sym return an_rpc if rpc_descs.key?(mth) GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}") noop = proc { |x| x } c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline) c.send_status(StatusCodes::UNIMPLEMENTED, '') nil end
handles calls to the server
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 430 def loop_handle_server_calls fail 'not started' if running_state == :not_started loop_tag = Object.new while running_state == :running begin an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE) break if (!an_rpc.nil?) && an_rpc.call.nil? active_call = new_active_server_call(an_rpc) unless active_call.nil? @pool.schedule(active_call) do |ac| c, mth = ac rpc_descs[mth].run_server_method(c, rpc_handlers[mth]) end end rescue Core::CallError, RuntimeError => e # these might happen for various reasonse. The correct behaviour of # the server is to log them and continue, if it's not shutting down. if running_state == :running GRPC.logger.warn("server call failed: #{e}") end next end end # @running_state should be :stopping here @run_mutex.synchronize { transition_running_state(:stopped) } GRPC.logger.info("stopped: #{self}") end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 459 def new_active_server_call(an_rpc) return nil if an_rpc.nil? || an_rpc.call.nil? # allow the metadata to be accessed from the call handle_call_tag = Object.new an_rpc.call.metadata = an_rpc.metadata # attaches md to call for handlers GRPC.logger.debug("call md is #{an_rpc.metadata}") connect_md = nil unless @connect_md_proc.nil? connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata) end an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE, SEND_INITIAL_METADATA => connect_md) return nil unless available?(an_rpc) return nil unless implemented?(an_rpc) # Create the ActiveCall GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})") rpc_desc = rpc_descs[an_rpc.method.to_sym] c = ActiveCall.new(an_rpc.call, @cq, rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input), an_rpc.deadline) mth = an_rpc.method.to_sym [c, mth] end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 487 def rpc_descs @rpc_descs ||= {} end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 491 def rpc_handlers @rpc_handlers ||= {} end
runs the server
-
if no #rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.
-
running? returns true after this is called, until stop cause the the server to stop.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 395 def run @run_mutex.synchronize do fail 'cannot run without registering services' if rpc_descs.size.zero? @pool.start @server.start transition_running_state(:running) @run_cond.broadcast end loop_handle_server_calls end
Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 333 def run_till_terminated GRPC.trap_signals t = Thread.new { run } wait_till_running loop do sleep SIGNAL_CHECK_PERIOD break unless GRPC.handle_signals end stop t.join end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 310 def running? running_state == :running end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 290 def running_state @run_mutex.synchronize do return @running_state end end
stops a running server
the call has no impact if the server is already stopped, otherwise server's current call loop is it's last.
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 279 def stop @run_mutex.synchronize do fail 'Cannot stop before starting' if @running_state == :not_started return if @running_state != :running transition_running_state(:stopping) end deadline = from_relative_time(@poll_period) @server.close(@cq, deadline) @pool.stop end
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 314 def stopped? running_state == :stopped end
Can only be called while holding @run_mutex
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 297 def transition_running_state(target_state) state_transitions = { not_started: :running, running: :stopping, stopping: :stopped } if state_transitions[@running_state] == target_state @running_state = target_state else fail "Bad server state transition: #{@running_state}->#{target_state}" end end
Is called from other threads to wait for run to start up the server.
If run has not been called, this returns immediately.
@param timeout [Numeric] number of seconds to wait @result [true, false] true if the server is running, false otherwise
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 324 def wait_till_running(timeout = nil) @run_mutex.synchronize do @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started return @running_state == :running end end