class GRPC::BidiCall
The BiDiCall class orchestrates exection of a BiDi stream on a client or server.
Constants
- END_OF_READS
- END_OF_WRITES
Public Class Methods
Creates a BidiCall.
BidiCall should only be created after a call is accepted. That means different things on a client and a server. On the client, the call is accepted after call.invoke. On the server, this is after call.accept.
initialize cannot determine if the call is accepted or not; so if a call that's not accepted is used here, the error won't be visible until the BidiCall#run is called.
deadline is the absolute deadline for the call.
@param call [Call] the call used by the ActiveCall @param q [CompletionQueue] the completion queue used to accept
the call
@param marshal [Function] f(obj)->string that marshal requests @param unmarshal [Function] f(string)->obj that unmarshals responses @param metadata_tag [Object] tag object used to collect metadata
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 60 def initialize(call, q, marshal, unmarshal, metadata_tag: nil) fail(ArgumentError, 'not a call') unless call.is_a? Core::Call unless q.is_a? Core::CompletionQueue fail(ArgumentError, 'not a CompletionQueue') end @call = call @cq = q @marshal = marshal @op_notifier = nil # signals completion on clients @readq = Queue.new @unmarshal = unmarshal @metadata_tag = metadata_tag end
Public Instance Methods
Begins orchestration of the Bidi stream for a client sending requests.
The method either returns an Enumerator of the responses, or accepts a block that can be invoked with each response.
@param requests the Enumerable of requests to send @op_notifier a Notifier used to signal completion @return an Enumerator of requests to yield
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 82 def run_on_client(requests, op_notifier, &blk) @op_notifier = op_notifier @enq_th = Thread.new { write_loop(requests) } @loop_th = start_read_loop each_queued_msg(&blk) end
Begins orchestration of the Bidi stream for a server generating replies.
N.B. gen_each_reply is a func(Enumerable<Requests>)
It takes an enumerable of requests as an arg, in case there is a relationship between the stream of requests and the stream of replies.
This does not mean that must necessarily be one. E.g, the replies produced by gen_each_reply could ignore the received_msgs
@param gen_each_reply [Proc] generates the BiDi stream replies.
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 100 def run_on_server(gen_each_reply) replys = gen_each_reply.call(each_queued_msg) @loop_th = start_read_loop(is_client: false) write_loop(replys, is_client: false) end
Private Instance Methods
#each_queued_msg yields each message on this instances readq
-
messages are added to the readq by read_loop
-
iteration ends when the instance itself is added
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 134 def each_queued_msg return enum_for(:each_queued_msg) unless block_given? count = 0 loop do GRPC.logger.debug("each_queued_msg: waiting##{count}") count += 1 req = @readq.pop GRPC.logger.debug("each_queued_msg: req = #{req}") fail req if req.is_a? StandardError break if req.equal?(END_OF_READS) yield req end end
signals that bidi operation is complete
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 112 def notify_done return unless @op_notifier GRPC.logger.debug("bidi-notify-done: notifying #{@op_notifier}") @op_notifier.notify(self) end
performs a read using @call.run_batch, ensures metadata is set up
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 119 def read_using_run_batch ops = { RECV_MESSAGE => nil } ops[RECV_INITIAL_METADATA] = nil unless @metadata_tag.nil? batch_result = @call.run_batch(@cq, self, INFINITE_FUTURE, ops) unless @metadata_tag.nil? @call.metadata = batch_result.metadata @metadata_tag = nil end batch_result end
starts the read loop
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 176 def start_read_loop(is_client: true) Thread.new do GRPC.logger.debug('bidi-read-loop: starting') begin read_tag = Object.new count = 0 # queue the initial read before beginning the loop loop do GRPC.logger.debug("bidi-read-loop: #{count}") count += 1 batch_result = read_using_run_batch # handle the next message if batch_result.message.nil? GRPC.logger.debug("bidi-read-loop: null batch #{batch_result}") if is_client batch_result = @call.run_batch(@cq, read_tag, INFINITE_FUTURE, RECV_STATUS_ON_CLIENT => nil) @call.status = batch_result.status batch_result.check_status GRPC.logger.debug("bidi-read-loop: done status #{@call.status}") end @readq.push(END_OF_READS) GRPC.logger.debug('bidi-read-loop: done reading!') break end # push the latest read onto the queue and continue reading res = @unmarshal.call(batch_result.message) @readq.push(res) end rescue StandardError => e GRPC.logger.warn('bidi: read-loop failed') GRPC.logger.warn(e) @readq.push(e) # let each_queued_msg terminate with this error end GRPC.logger.debug('bidi-read-loop: finished') end end
# File src/ruby/lib/grpc/generic/bidi_call.rb, line 148 def write_loop(requests, is_client: true) GRPC.logger.debug('bidi-write-loop: starting') write_tag = Object.new count = 0 requests.each do |req| GRPC.logger.debug("bidi-write-loop: #{count}") count += 1 payload = @marshal.call(req) @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_MESSAGE => payload) end GRPC.logger.debug("bidi-write-loop: #{count} writes done") if is_client GRPC.logger.debug("bidi-write-loop: client sent #{count}, waiting") @call.run_batch(@cq, write_tag, INFINITE_FUTURE, SEND_CLOSE_FROM_CLIENT => nil) GRPC.logger.debug('bidi-write-loop: done') notify_done end GRPC.logger.debug('bidi-write-loop: finished') rescue StandardError => e GRPC.logger.warn('bidi-write-loop: failed') GRPC.logger.warn(e) notify_done raise e end