class GRPC::RpcServer

RpcServer hosts a number of services and makes them available on the network.

Private Class Methods

new(pool_size:DEFAULT_POOL_SIZE, max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS, poll_period:DEFAULT_POLL_PERIOD, completion_queue_override:nil, server_override:nil, connect_md_proc:nil, **kw) click to toggle source

Creates a new RpcServer.

The RPC server is configured using keyword arguments.

There are some specific keyword args used to configure the RpcServer instance, however other arbitrary are allowed and when present are used to configure the listeninng connection set up by the RpcServer.

  • server_override: which if passed must be a [GRPC::Core::Server]. When

present.

  • poll_period: when present, the server polls for new events with this

period

  • pool_size: the size of the thread pool the server uses to run its

threads

  • completion_queue_override: when supplied, this will be used as the

completion_queue that the server uses to receive network events, otherwise its creates a new instance itself

  • creds: [GRPC::Core::ServerCredentials]

the credentials used to secure the server

  • max_waiting_requests: the maximum number of requests that are not

being handled to allow. When this limit is exceeded, the server responds with not available to new requests

  • connect_md_proc:

when non-nil is a proc for determining metadata to to send back the client on receiving an invocation req. The proc signature is: {key: val, ..} func(method_name, {key: val, …})

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 254
def initialize(pool_size:DEFAULT_POOL_SIZE,
               max_waiting_requests:DEFAULT_MAX_WAITING_REQUESTS,
               poll_period:DEFAULT_POLL_PERIOD,
               completion_queue_override:nil,
               server_override:nil,
               connect_md_proc:nil,
               **kw)
  @connect_md_proc = RpcServer.setup_connect_md_proc(connect_md_proc)
  @cq = RpcServer.setup_cq(completion_queue_override)
  @max_waiting_requests = max_waiting_requests
  @poll_period = poll_period
  @pool_size = pool_size
  @pool = Pool.new(@pool_size)
  @run_cond = ConditionVariable.new
  @run_mutex = Mutex.new
  # running_state can take 4 values: :not_started, :running, :stopping, and
  # :stopped. State transitions can only proceed in that order.
  @running_state = :not_started
  @server = RpcServer.setup_srv(server_override, @cq, **kw)
end
setup_connect_md_proc(a_proc) click to toggle source

::setup_connect_md_proc is used by initialize to validate the connect_md_proc.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 216
def self.setup_connect_md_proc(a_proc)
  return nil if a_proc.nil?
  fail(TypeError, '!Proc') unless a_proc.is_a? Proc
  a_proc
end
setup_cq(alt_cq) click to toggle source

::setup_cq is used by initialize to constuct a Core::CompletionQueue from its arguments.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 198
def self.setup_cq(alt_cq)
  return Core::CompletionQueue.new if alt_cq.nil?
  unless alt_cq.is_a? Core::CompletionQueue
    fail(TypeError, '!CompletionQueue')
  end
  alt_cq
end
setup_srv(alt_srv, cq, **kw) click to toggle source

::setup_srv is used by initialize to constuct a Core::Server from its arguments.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 208
def self.setup_srv(alt_srv, cq, **kw)
  return Core::Server.new(cq, kw) if alt_srv.nil?
  fail(TypeError, '!Server') unless alt_srv.is_a? Core::Server
  alt_srv
end

Private Instance Methods

add_rpc_descs_for(service) click to toggle source

This should be called while holding @run_mutex

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 506
def add_rpc_descs_for(service)
  cls = service.is_a?(Class) ? service : service.class
  specs, handlers = (@rpc_descs ||= {}), (@rpc_handlers ||= {})
  cls.rpc_descs.each_pair do |name, spec|
    route = "/#{cls.service_name}/#{name}".to_sym
    fail "already registered: rpc #{route} from #{spec}" if specs.key? route
    specs[route] = spec
    rpc_name = GenericService.underscore(name.to_s).to_sym
    if service.is_a?(Class)
      handlers[route] = cls.new.method(rpc_name)
    else
      handlers[route] = service.method(rpc_name)
    end
    GRPC.logger.info("handling #{route} with #{handlers[route]}")
  end
end
assert_valid_service_class(cls) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 495
def assert_valid_service_class(cls)
  unless cls.include?(GenericService)
    fail "#{cls} must 'include GenericService'"
  end
  if cls.rpc_descs.size.zero?
    fail "#{cls} should specify some rpc descriptions"
  end
  cls.assert_rpc_descs_have_methods
end
available?(an_rpc) click to toggle source

Sends UNAVAILABLE if there are too many unprocessed jobs

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 407
def available?(an_rpc)
  jobs_count, max = @pool.jobs_waiting, @max_waiting_requests
  GRPC.logger.info("waiting: #{jobs_count}, max: #{max}")
  return an_rpc if @pool.jobs_waiting <= @max_waiting_requests
  GRPC.logger.warn("NOT AVAILABLE: too many jobs_waiting: #{an_rpc}")
  noop = proc { |x| x }
  c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
  c.send_status(StatusCodes::UNAVAILABLE, '')
  nil
end
handle(service) click to toggle source

handle registration of classes

service is either a class that includes GRPC::GenericService and whose new function can be called without argument or any instance of such a class.

E.g, after

class Divider

include GRPC::GenericService
rpc :div DivArgs, DivReply    # single request, single response
def initialize(optional_arg='default option') # no args
  ...
end

srv = ::new

# Either of these works

srv.handle(Divider)

# or

srv.handle(Divider.new('replace optional arg'))

It raises RuntimeError:

  • if service is not valid service class or object

  • its handler methods are already registered

  • if the server is already running

@param service [Object|Class] a service class or object as described

above
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 377
def handle(service)
  @run_mutex.synchronize do
    unless @running_state == :not_started
      fail 'cannot add services if the server has been started'
    end
    cls = service.is_a?(Class) ? service : service.class
    assert_valid_service_class(cls)
    add_rpc_descs_for(service)
  end
end
implemented?(an_rpc) click to toggle source

Sends UNIMPLEMENTED if the method is not implemented by this server

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 419
def implemented?(an_rpc)
  mth = an_rpc.method.to_sym
  return an_rpc if rpc_descs.key?(mth)
  GRPC.logger.warn("UNIMPLEMENTED: #{an_rpc}")
  noop = proc { |x| x }
  c = ActiveCall.new(an_rpc.call, @cq, noop, noop, an_rpc.deadline)
  c.send_status(StatusCodes::UNIMPLEMENTED, '')
  nil
end
loop_handle_server_calls() click to toggle source

handles calls to the server

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 430
def loop_handle_server_calls
  fail 'not started' if running_state == :not_started
  loop_tag = Object.new
  while running_state == :running
    begin
      an_rpc = @server.request_call(@cq, loop_tag, INFINITE_FUTURE)
      break if (!an_rpc.nil?) && an_rpc.call.nil?

      active_call = new_active_server_call(an_rpc)
      unless active_call.nil?
        @pool.schedule(active_call) do |ac|
          c, mth = ac
          rpc_descs[mth].run_server_method(c, rpc_handlers[mth])
        end
      end
    rescue Core::CallError, RuntimeError => e
      # these might happen for various reasonse.  The correct behaviour of
      # the server is to log them and continue, if it's not shutting down.
      if running_state == :running
        GRPC.logger.warn("server call failed: #{e}")
      end
      next
    end
  end
  # @running_state should be :stopping here
  @run_mutex.synchronize { transition_running_state(:stopped) }
  GRPC.logger.info("stopped: #{self}")
end
new_active_server_call(an_rpc) click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 459
def new_active_server_call(an_rpc)
  return nil if an_rpc.nil? || an_rpc.call.nil?

  # allow the metadata to be accessed from the call
  handle_call_tag = Object.new
  an_rpc.call.metadata = an_rpc.metadata  # attaches md to call for handlers
  GRPC.logger.debug("call md is #{an_rpc.metadata}")
  connect_md = nil
  unless @connect_md_proc.nil?
    connect_md = @connect_md_proc.call(an_rpc.method, an_rpc.metadata)
  end
  an_rpc.call.run_batch(@cq, handle_call_tag, INFINITE_FUTURE,
                        SEND_INITIAL_METADATA => connect_md)
  return nil unless available?(an_rpc)
  return nil unless implemented?(an_rpc)

  # Create the ActiveCall
  GRPC.logger.info("deadline is #{an_rpc.deadline}; (now=#{Time.now})")
  rpc_desc = rpc_descs[an_rpc.method.to_sym]
  c = ActiveCall.new(an_rpc.call, @cq,
                     rpc_desc.marshal_proc, rpc_desc.unmarshal_proc(:input),
                     an_rpc.deadline)
  mth = an_rpc.method.to_sym
  [c, mth]
end
rpc_descs() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 487
def rpc_descs
  @rpc_descs ||= {}
end
rpc_handlers() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 491
def rpc_handlers
  @rpc_handlers ||= {}
end
run() click to toggle source

runs the server

  • if no #rpc_descs are registered, this exits immediately, otherwise it continues running permanently and does not return until program exit.

  • running? returns true after this is called, until stop cause the the server to stop.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 395
def run
  @run_mutex.synchronize do
    fail 'cannot run without registering services' if rpc_descs.size.zero?
    @pool.start
    @server.start
    transition_running_state(:running)
    @run_cond.broadcast
  end
  loop_handle_server_calls
end
run_till_terminated() click to toggle source

Runs the server in its own thread, then waits for signal INT or TERM on the current thread to terminate it.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 333
def run_till_terminated
  GRPC.trap_signals
  t = Thread.new { run }
  wait_till_running
  loop do
    sleep SIGNAL_CHECK_PERIOD
    break unless GRPC.handle_signals
  end
  stop
  t.join
end
running?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 310
def running?
  running_state == :running
end
running_state() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 290
def running_state
  @run_mutex.synchronize do
    return @running_state
  end
end
stop() click to toggle source

stops a running server

the call has no impact if the server is already stopped, otherwise server's current call loop is it's last.

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 279
def stop
  @run_mutex.synchronize do
    fail 'Cannot stop before starting' if @running_state == :not_started
    return if @running_state != :running
    transition_running_state(:stopping)
  end
  deadline = from_relative_time(@poll_period)
  @server.close(@cq, deadline)
  @pool.stop
end
stopped?() click to toggle source
# File src/ruby/lib/grpc/generic/rpc_server.rb, line 314
def stopped?
  running_state == :stopped
end
transition_running_state(target_state) click to toggle source

Can only be called while holding @run_mutex

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 297
def transition_running_state(target_state)
  state_transitions = {
    not_started: :running,
    running: :stopping,
    stopping: :stopped
  }
  if state_transitions[@running_state] == target_state
    @running_state = target_state
  else
    fail "Bad server state transition: #{@running_state}->#{target_state}"
  end
end
wait_till_running(timeout = nil) click to toggle source

Is called from other threads to wait for run to start up the server.

If run has not been called, this returns immediately.

@param timeout [Numeric] number of seconds to wait @result [true, false] true if the server is running, false otherwise

# File src/ruby/lib/grpc/generic/rpc_server.rb, line 324
def wait_till_running(timeout = nil)
  @run_mutex.synchronize do
    @run_cond.wait(@run_mutex, timeout) if @running_state == :not_started
    return @running_state == :running
  end
end