YUKI Hiroshi
null+****@clear*****
Wed Apr 22 21:07:17 JST 2015
YUKI Hiroshi 2015-04-22 21:07:17 +0900 (Wed, 22 Apr 2015) New Revision: bfb7578378a2ba353216ebef4ced42c451e44185 https://github.com/droonga/droonga-engine/commit/bfb7578378a2ba353216ebef4ced42c451e44185 Message: Apply timeout for internal connections via EngineNode Modified files: lib/droonga/cluster.rb lib/droonga/engine.rb lib/droonga/engine_node.rb Modified: lib/droonga/cluster.rb (+5 -2) =================================================================== --- lib/droonga/cluster.rb 2015-04-22 20:00:48 +0900 (76c0b5f) +++ lib/droonga/cluster.rb 2015-04-22 21:07:17 +0900 (3049e0d) @@ -59,6 +59,7 @@ module Droonga def initialize(loop, params) @loop = loop + @params = params @catalog = params[:catalog] @state = nil @@ -185,9 +186,11 @@ module Droonga def create_engine_nodes all_node_names.collect do |name| node_state = @state[name] || {} - EngineNode.new(name, + EngineNode.new(@loop, + name, node_state, - @loop) + :auto_close_timeout => + @params[:internal_connection_lifetime]) end end Modified: lib/droonga/engine.rb (+3 -1) =================================================================== --- lib/droonga/engine.rb 2015-04-22 20:00:48 +0900 (7440ed3) +++ lib/droonga/engine.rb 2015-04-22 21:07:17 +0900 (2e32c17) @@ -45,7 +45,9 @@ module Droonga :internal_connection_lifetime => options[:internal_connection_lifetime]) @cluster = Cluster.new(loop, - :catalog => @catalog) + :catalog => @catalog, + :internal_connection_lifetime => + options[:internal_connection_lifetime]) @dispatcher = create_dispatcher end Modified: lib/droonga/engine_node.rb (+53 -12) =================================================================== --- lib/droonga/engine_node.rb 2015-04-22 20:00:48 +0900 (38b4297) +++ lib/droonga/engine_node.rb 2015-04-22 21:07:17 +0900 (a5870cd) @@ -14,23 +14,27 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA require "time" +require "coolio" require "droonga/loggable" require "droonga/forward_buffer" require "droonga/fluent_message_sender" +require "droonga/node_name" require "droonga/node_role" module Droonga class EngineNode include Loggable + DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS = 60 + attr_reader :name - def initialize(name, state, loop) + def initialize(loop, name, state, options={}) + @loop = loop @name = name - logger.trace("initialize: start") - @state = state + logger.trace("initialize: start") @buffer = ForwardBuffer.new(name) boundary_timestamp = accept_messages_newer_than_timestamp @@ -39,12 +43,13 @@ module Droonga output(message, destination) end - parsed_name = parse_node_name(@name) - @sender = FluentMessageSender.new(loop, - parsed_name[:host], - parsed_name[:port], - :buffering => true) - @sender.start + @node_name = NodeName.parse(@name) + + @sender = nil + @auto_close_timer = nil + @auto_close_timeout = options[:auto_close_timeout] || + DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS + logger.trace("initialize: done") end @@ -56,7 +61,7 @@ module Droonga def shutdown logger.trace("shutdown: start") - @sender.shutdown + @sender.shutdown if @sender logger.trace("shutdown: done") end @@ -129,7 +134,7 @@ module Droonga def resume logger.trace("resume: start") - @sender.resume + sender.resume unles****@buffe*****? if really_writable? logger.info("Target becomes writable. Start to forwarding.") @@ -232,10 +237,46 @@ module Droonga output_tag = "#{parsed_receiver[:tag]}.message" log_info = "<#{receiver}>:<#{output_tag}>" logger.trace("forward: start: #{log_info}") - @sender.send(output_tag, message) + sender.send(output_tag, message) + set_auto_close_timer logger.trace("forward: end") end + def sender + @sender ||= create_sender + end + + def create_sender + sender = FluentMessageSender.new(@loop, + @node_name.host, + @node_name.port, + :buffering => true) + sender.start + sender + end + + def set_auto_close_timer + previous_timer = @auto_close_timer + previous_timer.detach if previous_timer + + timer = Coolio::TimerWatcher.new(@auto_close_timeout) + on_timeout = lambda do + timer.detach + @auto_close_timer = nil + sender = @sender + if sender + logger.info("sender for #{name} is automatically closed by timeout.") + sender.shutdown + @sender = nil + end + end + timer.on_timer do + on_timeout.call + end + @loop.attach(timer) + @auto_close_timer = timer + end + def log_tag "[#{Process.ppid}] engine-node: #{@name}" end -------------- next part -------------- HTML����������������������������... Télécharger