[Groonga-commit] droonga/droonga-engine at bfb7578 [master] Apply timeout for internal connections via EngineNode

Back to archive index

YUKI Hiroshi null+****@clear*****
Wed Apr 22 21:07:17 JST 2015


YUKI Hiroshi	2015-04-22 21:07:17 +0900 (Wed, 22 Apr 2015)

  New Revision: bfb7578378a2ba353216ebef4ced42c451e44185
  https://github.com/droonga/droonga-engine/commit/bfb7578378a2ba353216ebef4ced42c451e44185

  Message:
    Apply timeout for internal connections via EngineNode

  Modified files:
    lib/droonga/cluster.rb
    lib/droonga/engine.rb
    lib/droonga/engine_node.rb

  Modified: lib/droonga/cluster.rb (+5 -2)
===================================================================
--- lib/droonga/cluster.rb    2015-04-22 20:00:48 +0900 (76c0b5f)
+++ lib/droonga/cluster.rb    2015-04-22 21:07:17 +0900 (3049e0d)
@@ -59,6 +59,7 @@ module Droonga
     def initialize(loop, params)
       @loop = loop
 
+      @params = params
       @catalog = params[:catalog]
       @state = nil
 
@@ -185,9 +186,11 @@ module Droonga
     def create_engine_nodes
       all_node_names.collect do |name|
         node_state = @state[name] || {}
-        EngineNode.new(name,
+        EngineNode.new(@loop,
+                       name,
                        node_state,
-                       @loop)
+                       :auto_close_timeout =>
+                         @params[:internal_connection_lifetime])
       end
     end
 

  Modified: lib/droonga/engine.rb (+3 -1)
===================================================================
--- lib/droonga/engine.rb    2015-04-22 20:00:48 +0900 (7440ed3)
+++ lib/droonga/engine.rb    2015-04-22 21:07:17 +0900 (2e32c17)
@@ -45,7 +45,9 @@ module Droonga
                                :internal_connection_lifetime =>
                                  options[:internal_connection_lifetime])
       @cluster = Cluster.new(loop,
-                             :catalog  => @catalog)
+                             :catalog  => @catalog,
+                             :internal_connection_lifetime =>
+                               options[:internal_connection_lifetime])
 
       @dispatcher = create_dispatcher
     end

  Modified: lib/droonga/engine_node.rb (+53 -12)
===================================================================
--- lib/droonga/engine_node.rb    2015-04-22 20:00:48 +0900 (38b4297)
+++ lib/droonga/engine_node.rb    2015-04-22 21:07:17 +0900 (a5870cd)
@@ -14,23 +14,27 @@
 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
 
 require "time"
+require "coolio"
 
 require "droonga/loggable"
 require "droonga/forward_buffer"
 require "droonga/fluent_message_sender"
+require "droonga/node_name"
 require "droonga/node_role"
 
 module Droonga
   class EngineNode
     include Loggable
 
+    DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS = 60
+
     attr_reader :name
 
-    def initialize(name, state, loop)
+    def initialize(loop, name, state, options={})
+      @loop = loop
       @name  = name
-      logger.trace("initialize: start")
-
       @state = state
+      logger.trace("initialize: start")
 
       @buffer = ForwardBuffer.new(name)
       boundary_timestamp = accept_messages_newer_than_timestamp
@@ -39,12 +43,13 @@ module Droonga
         output(message, destination)
       end
 
-      parsed_name = parse_node_name(@name)
-      @sender = FluentMessageSender.new(loop,
-                                        parsed_name[:host],
-                                        parsed_name[:port],
-                                        :buffering => true)
-      @sender.start
+      @node_name = NodeName.parse(@name)
+
+      @sender = nil
+      @auto_close_timer = nil
+      @auto_close_timeout = options[:auto_close_timeout] ||
+                              DEFAULT_AUTO_CLOSE_TIMEOUT_SECONDS
+
       logger.trace("initialize: done")
     end
 
@@ -56,7 +61,7 @@ module Droonga
 
     def shutdown
       logger.trace("shutdown: start")
-      @sender.shutdown
+      @sender.shutdown if @sender
       logger.trace("shutdown: done")
     end
 
@@ -129,7 +134,7 @@ module Droonga
 
     def resume
       logger.trace("resume: start")
-      @sender.resume
+      sender.resume
       unles****@buffe*****?
         if really_writable?
           logger.info("Target becomes writable. Start to forwarding.")
@@ -232,10 +237,46 @@ module Droonga
       output_tag = "#{parsed_receiver[:tag]}.message"
       log_info = "<#{receiver}>:<#{output_tag}>"
       logger.trace("forward: start: #{log_info}")
-      @sender.send(output_tag, message)
+      sender.send(output_tag, message)
+      set_auto_close_timer
       logger.trace("forward: end")
     end
 
+    def sender
+      @sender ||= create_sender
+    end
+
+    def create_sender
+      sender = FluentMessageSender.new(@loop,
+                                       @node_name.host,
+                                       @node_name.port,
+                                       :buffering => true)
+      sender.start
+      sender
+    end
+
+    def set_auto_close_timer
+      previous_timer = @auto_close_timer
+      previous_timer.detach if previous_timer
+
+      timer = Coolio::TimerWatcher.new(@auto_close_timeout)
+      on_timeout = lambda do
+        timer.detach
+        @auto_close_timer = nil
+        sender = @sender
+        if sender
+          logger.info("sender for #{name} is automatically closed by timeout.")
+          sender.shutdown
+          @sender = nil
+        end
+      end
+      timer.on_timer do
+        on_timeout.call
+      end
+      @loop.attach(timer)
+      @auto_close_timer = timer
+    end
+
     def log_tag
       "[#{Process.ppid}] engine-node: #{@name}"
     end
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index