[Groonga-commit] droonga/droonga-engine at ac06373 [master] Split implementation for remote commands to separate classes

Back to archive index

YUKI Hiroshi null+****@clear*****
Thu Aug 28 18:35:39 JST 2014


YUKI Hiroshi	2014-08-28 18:35:39 +0900 (Thu, 28 Aug 2014)

  New Revision: ac06373f6254fbd7666f121048278bee92c00a5c
  https://github.com/droonga/droonga-engine/commit/ac06373f6254fbd7666f121048278bee92c00a5c

  Message:
    Split implementation for remote commands to separate classes

  Modified files:
    bin/droonga-engine-join
    lib/droonga/command/serf_event_handler.rb

  Modified: bin/droonga-engine-join (+0 -2)
===================================================================
--- bin/droonga-engine-join    2014-08-28 17:16:24 +0900 (38cccb4)
+++ bin/droonga-engine-join    2014-08-28 18:35:39 +0900 (6a8162e)
@@ -70,9 +70,7 @@ run_remote_command(joining_node, "join",
                    "node"    => joining_node,
                    "type"    => "replica",
                    "source"  => source_node,
-                   "port"    => options[:port],
                    "dataset" => options[:dataset],
-                   "tag"     => options[:tag],
                    "copy"    => !options["no-copy"])
 sleep(5) #TODO: wait for restarting of the joining node. this should be done more safely.
 

  Modified: lib/droonga/command/serf_event_handler.rb (+282 -197)
===================================================================
--- lib/droonga/command/serf_event_handler.rb    2014-08-28 17:16:24 +0900 (abe023b)
+++ lib/droonga/command/serf_event_handler.rb    2014-08-28 18:35:39 +0900 (22b49cb)
@@ -26,156 +26,162 @@ require "droonga/safe_file_writer"
 
 module Droonga
   module Command
-    class SerfEventHandler
-      class << self
-        def run
-          new.run
+    module Remote
+      class Base
+        attr_reader :response
+
+        def initialize
+          @serf_name = ENV["SERF_SELF_NAME"]
+          @response = {
+            "log" => []
+          }
+          @payload = JSON.parse($stdin.gets)
         end
-      end
 
-      def initialize
-        @serf = ENV["SERF"] || Serf.path
-        @serf_rpc_address = ENV["SERF_RPC_ADDRESS"] || "127.0.0.1:7373"
-        @serf_name = ENV["SERF_SELF_NAME"]
-        @response = {
-          "log" => []
-        }
-      end
+        def process
+          # override me!
+        end
 
-      def run
-        parse_event
-        unless should_process?
-          log(" => ignoring event not for me")
-          output_response
-          return true
+        def should_process?
+          for_me? or****@paylo*****? or not****@paylo*****?("node")
         end
 
-        process_event
-        output_live_nodes
-        output_response
-        true
-      end
+        private
+        def node
+          @serf_name
+        end
 
-      private
-      def parse_event
-        @event_name = ENV["SERF_EVENT"]
-        @payload = nil
-        case @event_name
-        when "user"
-          @event_sub_name = ENV["SERF_USER_EVENT"]
-          @payload = JSON.parse($stdin.gets)
-          log("event sub name = #{@event_sub_name}")
-        when "query"
-          @event_sub_name = ENV["SERF_QUERY_NAME"]
-          @payload = JSON.parse($stdin.gets)
-          log("event sub name = #{@event_sub_name}")
-        when "member-join", "member-leave", "member-update", "member-reap"
-          output_live_nodes
+        def host
+          node.split(":").first
         end
-      end
 
-      def target_node
-        @payload && @payload["node"]
-      end
+        def target_node
+          @payload && @payload["node"]
+        end
 
-      def for_me?
-        target_node == @serf_name
-      end
+        def for_me?
+          target_node == @serf_name
+        end
 
-      def should_process?
-        for_me? or****@paylo*****? or not****@paylo*****?("node")
+        def log(message)
+          @response["log"] << message
+        end
       end
 
-      def process_event
-        case @event_sub_name
-        when "change_role"
+      class ChangeRole < Base
+        def process
           NodeStatus.set(:role, @payload["role"])
-        when "report_status"
-          report_status
-        when "join"
-          join
-        when "set_replicas"
-          set_replicas
-        when "add_replicas"
-          add_replicas
-        when "remove_replicas"
-          remove_replicas
-        when "absorb_data"
-          absorb_data
         end
       end
 
-      def output_response
-        puts JSON.generate(@response)
+      class ReportStatus < Base
+        def process
+          @response["value"] = NodeStatus.get(@payload["key"])
+        end
       end
 
-      def host
-        @serf_name.split(":").first
-      end
+      class Join < Base
+        def process
+          log("type = #{type}")
+          case type
+          when "replica"
+            join_as_replica
+          end
+        end
 
-      def given_hosts
-        hosts = @payload["hosts"]
-        return nil unless hosts
-        hosts = [hosts] if hosts.is_a?(String)
-        hosts
-      end
+        private
+        def type
+          @payload["type"]
+        end
 
-      def report_status
-        @response["value"] = NodeStatus.get(@payload["key"])
-      end
+        def source_node
+          @payload["source"]
+        end
 
-      def join
-        type = @payload["type"]
-        log("type = #{type}")
-        case type
-        when "replica"
-          join_as_replica
+        def joining_node
+          @payload["node"]
         end
-      end
 
-      def join_as_replica
-        source_node         = @payload["source"]
-        source_node_port    = @payload["port"]
-        joining_node        = @payload["node"]
-        tag                 = @payload["tag"]
-        dataset_name        = @payload["dataset"]
-        required_params = [
-          source_node,
-          source_node_port,
-          joining_node,
-          dataset_name,
-        ]
-        return unless required_params.all?
-
-        log("source_node  = #{source_node}")
-
-        source_host  = source_node.split(":").first
-        joining_host = joining_node.split(":").first
-
-        fetcher = CatalogFetcher.new(:host          => source_host,
-                                     :port          => source_node_port,
-                                     :tag           => tag,
-                                     :receiver_host => joining_host)
-        catalog = fetcher.fetch(:dataset => dataset_name)
-
-        generator = CatalogGenerator.new
-        generator.load(catalog)
-        dataset = generator.dataset_for_host(source_host) ||
-                    generator.dataset_for_host(host)
-        return unless dataset
-
-        # restart self with the fetched catalog.
-        SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog))
-
-        tag          = dataset.replicas.tag
-        port         = dataset.replicas.port
-        other_hosts  = dataset.replicas.hosts
-
-        log("dataset = #{dataset_name}")
-        log("port    = #{port}")
-        log("tag     = #{tag}")
-
-        if @payload["copy"]
+        def dataset_name
+          @payload["dataset"]
+        end
+
+        def valid_params?
+          have_required_params? and
+            valid_node?(source_node) and
+            valid_node?(joining_node)
+        end
+
+        def have_required_params?
+          required_params = [
+            source_node,
+            joining_node,
+            dataset_name,
+          ]
+          required_params.all? do |param|
+            not param.nil?
+          end
+        end
+
+        NODE_PATTERN = /\A([^:]+):(\d+)\/(.+)\z/
+
+        def valid_node?(node)
+          node =~ NODE_PATTERN
+        end
+
+        def source_host
+          @source_host ||= (source_node =~ NODE_PATTERN && $1)
+        end
+
+        def joining_host
+          @source_host ||= (joining_node =~ NODE_PATTERN && $1)
+        end
+
+        def port
+          @port ||= (source_node =~ NODE_PATTERN && $2 && $2.to_i)
+        end
+
+        def tag
+          @tag ||= (source_node =~ NODE_PATTERN && $3)
+        end
+
+        def should_absorb_data?
+          @payload["copy"]
+        end
+
+        def join_as_replica
+          return unless valid_params?
+
+          log("source_node  = #{source_node}")
+
+          fetcher = CatalogFetcher.new(:host          => source_host,
+                                       :port          => port,
+                                       :tag           => tag,
+                                       :receiver_host => joining_host)
+          catalog = fetcher.fetch(:dataset => dataset_name)
+
+          generator = CatalogGenerator.new
+          generator.load(catalog)
+          dataset = generator.dataset_for_host(source_host) ||
+                      generator.dataset_for_host(host)
+          return unless dataset
+
+          # restart self with the fetched catalog.
+          SafeFileWriter.write(Path.catalog, JSON.pretty_generate(catalog))
+
+          other_hosts  = dataset.replicas.hosts
+
+          absorb_data if should_absorb_data?
+
+          log("joining to the cluster: update myself")
+
+          CatalogModifier.modify do |modifier|
+            modifier.datasets[dataset_name].replicas.hosts += other_hosts
+            modifier.datasets[dataset_name].replicas.hosts.uniq!
+          end
+        end
+
+        def absorb_data
           log("starting to copy data from #{source_host}")
 
           CatalogModifier.modify do |modifier|
@@ -187,118 +193,197 @@ module Droonga
           status.set(:absorbing, true)
           DataAbsorber.absorb(:dataset          => dataset_name,
                               :source_host      => source_host,
-                              :destination_host => host,
+                              :destination_host => joining_host,
                               :port             => port,
                               :tag              => tag)
           status.delete(:absorbing)
           sleep(1)
         end
+      end
+
+      class AbsorbData < Base
+        attr_writer :dataset_name, :port, :tag
 
-        log("joining to the cluster: update myself")
+        def process
+          return unless source
 
-        CatalogModifier.modify do |modifier|
-          modifier.datasets[dataset_name].replicas.hosts += other_hosts
-          modifier.datasets[dataset_name].replicas.hosts.uniq!
+          log("start to absorb data from #{source}")
+
+          if dataset_name.nil? or port.nil? or tag.nil?
+            current_catalog = JSON.parse(Path.catalog.read)
+            generator = CatalogGenerator.new
+            generator.load(current_catalog)
+
+            dataset = generator.dataset_for_host(source)
+            return unless dataset
+
+            self.dataset_name = dataset.name
+            self.port         = dataset.replicas.port
+            self.tag          = dataset.replicas.tag
+          end
+
+          log("dataset = #{dataset_name}")
+          log("port    = #{port}")
+          log("tag     = #{tag}")
+
+          status = NodeStatus.new
+          status.set(:absorbing, true)
+          DataAbsorber.absorb(:dataset          => dataset_name,
+                              :source_host      => source,
+                              :destination_host => host,
+                              :port             => port,
+                              :tag              => tag,
+                              :client           => "droonga-send")
+          status.delete(:absorbing)
         end
-      end
 
-      def set_replicas
-        dataset = @payload["dataset"]
-        return unless dataset
+        private
+        def source
+          @payload["source"]
+        end
 
-        hosts = given_hosts
-        return unless hosts
+        def dataset_name
+          @dataset_name ||= @payload["dataset"]
+        end
 
-        log("new replicas: #{hosts.join(",")}")
+        def port
+          @port ||= @payload["port"]
+        end
 
-        CatalogModifier.modify do |modifier|
-          modifier.datasets[dataset].replicas.hosts = hosts
+        def tag
+          @tag ||= @payload["tag"]
         end
       end
 
-      def add_replicas
-        dataset = @payload["dataset"]
-        return unless dataset
+      class ModifyReplicasBase < Base
+        private
+        def dataset
+          @payload["dataset"]
+        end
 
-        hosts = given_hosts
-        return unless hosts
+        def hosts
+          @hosts ||= prepare_hosts
+        end
+
+        def prepare_hosts
+          hosts = @payload["hosts"]
+          return nil unless hosts
+          hosts = [hosts] if hosts.is_a?(String)
+          hosts
+        end
+      end
 
-        hosts -= [host]
-        return if hosts.empty?
+      class SetReplicas < ModifyReplicasBase
+        def process
+          return unless dataset
+          return unless hosts
 
-        log("adding replicas: #{hosts.join(",")}")
+          log("new replicas: #{hosts.join(",")}")
 
-        CatalogModifier.modify do |modifier|
-          modifier.datasets[dataset].replicas.hosts += hosts
-          modifier.datasets[dataset].replicas.hosts.uniq!
+          CatalogModifier.modify do |modifier|
+            modifier.datasets[dataset].replicas.hosts = hosts
+          end
         end
       end
 
-      def remove_replicas
-        dataset = @payload["dataset"]
-        return unless dataset
+      class AddReplicas < ModifyReplicasBase
+        def process
+          return unless dataset
+          return unless hosts
 
-        hosts = given_hosts
-        return unless hosts
+          hosts -= [host]
+          return if hosts.empty?
 
-        log("removing replicas: #{hosts.join(",")}")
+          log("adding replicas: #{hosts.join(",")}")
 
-        CatalogModifier.modify do |modifier|
-          modifier.datasets[dataset].replicas.hosts -= hosts
+          CatalogModifier.modify do |modifier|
+            modifier.datasets[dataset].replicas.hosts += hosts
+            modifier.datasets[dataset].replicas.hosts.uniq!
+          end
         end
       end
 
-      def absorb_data
-        source = @payload["source"]
-        return unless source
+      class RemoveReplicas < ModifyReplicasBase
+        def process
+          return unless dataset
+          return unless hosts
 
-        log("start to absorb data from #{source}")
+          log("removing replicas: #{hosts.join(",")}")
 
-        dataset_name = @payload["dataset"]
-        port         = @payload["port"]
-        tag          = @payload["tag"]
+          CatalogModifier.modify do |modifier|
+            modifier.datasets[dataset].replicas.hosts -= hosts
+          end
+        end
+      end
 
-        if dataset_name.nil? or port.nil? or tag.nil?
-          current_catalog = JSON.parse(Path.catalog.read)
-          generator = CatalogGenerator.new
-          generator.load(current_catalog)
+      class UpdateLiveNodes < Base
+        def process
+          def live_nodes
+            Serf.live_nodes(@serf_name)
+          end
 
-          dataset = generator.dataset_for_host(source)
-          return unless dataset
+          def output_live_nodes
+            path = Path.live_nodes
+            nodes = live_nodes
+            file_contents = JSON.pretty_generate(nodes)
+            SafeFileWriter.write(path, file_contents)
+          end
+        end
+      end
+    end
 
-          dataset_name = dataset.name
-          port = dataset.replicas.port
-          tag  = dataset.replicas.tag
+    class SerfEventHandler
+      class << self
+        def run
+          new.run
         end
+      end
 
-        log("dataset = #{dataset_name}")
-        log("port    = #{port}")
-        log("tag     = #{tag}")
+      def run
+        command_class = detect_command_class
+        return true if command_class.nil?
 
-        status = NodeStatus.new
-        status.set(:absorbing, true)
-        DataAbsorber.absorb(:dataset          => dataset_name,
-                            :source_host      => source,
-                            :destination_host => host,
-                            :port             => port,
-                            :tag              => tag,
-                            :client           => "droonga-send")
-        status.delete(:absorbing)
+        command = command_class.new
+        command.process if command.should_process?
+        output_response(command.response)
+        true
       end
 
-      def live_nodes
-        Serf.live_nodes(@serf_name)
+      private
+      def detect_command_class
+        case ENV["SERF_EVENT"]
+        when "user"
+          detect_command_class_from_custom_event(ENV["SERF_USER_EVENT"])
+        when "query"
+          detect_command_class_from_custom_event(ENV["SERF_QUERY_NAME"])
+        when "member-join", "member-leave", "member-update", "member-reap"
+          Remote::UpdateLiveNodes
+        end
       end
 
-      def output_live_nodes
-        path = Path.live_nodes
-        nodes = live_nodes
-        file_contents = JSON.pretty_generate(nodes)
-        SafeFileWriter.write(path, file_contents)
+      def detect_command_class_from_custom_event(event_name)
+        case event_name
+        when "change_role"
+          Remote::ChangeRole
+        when "report_status"
+          Remote::ReportStatus
+        when "join"
+          Remote::Join
+        when "set_replicas"
+          Remote::SetReplicas
+        when "add_replicas"
+          Remote::AddReplicas
+        when "remove_replicas"
+          Remote::RemoveReplicas
+        when "absorb_data"
+          Remote::AbsorbData
+        else
+          nil
+        end
       end
 
-      def log(message)
-        @response["log"] << message
+      def output_response(response)
+        puts JSON.generate(response)
       end
     end
   end
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index