[Groonga-commit] droonga/droonga-engine at 52872d1 [master] Define serf tags in a place

Back to archive index

YUKI Hiroshi null+****@clear*****
Tue Apr 21 17:19:31 JST 2015


YUKI Hiroshi	2015-04-21 17:19:31 +0900 (Tue, 21 Apr 2015)

  New Revision: 52872d1048b5a85bd3a31c4ed7399029a5b56f6a
  https://github.com/droonga/droonga-engine/commit/52872d1048b5a85bd3a31c4ed7399029a5b56f6a

  Message:
    Define serf tags in a place

  Added files:
    lib/droonga/serf/tag.rb
  Modified files:
    lib/droonga/serf.rb

  Modified: lib/droonga/serf.rb (+21 -26)
===================================================================
--- lib/droonga/serf.rb    2015-04-21 17:07:45 +0900 (7369213)
+++ lib/droonga/serf.rb    2015-04-21 17:19:31 +0900 (8a0c5c2)
@@ -20,6 +20,7 @@ require "droonga/loggable"
 require "droonga/catalog/loader"
 require "droonga/node_name"
 require "droonga/node_role"
+require "droonga/serf/tag"
 require "droonga/serf/downloader"
 require "droonga/serf/agent"
 require "droonga/serf/command"
@@ -61,9 +62,9 @@ module Droonga
     end
 
     def initialize_tags
-      set_tag("type", "engine")
-      set_tag("cluster_id", cluster_id)
-      set_tag("role", role)
+      set_tag(Tag.node_type, "engine")
+      set_tag(Tag.node_role, role)
+      set_tag(Tag.cluster_id, cluster_id)
     end
 
     def leave
@@ -122,20 +123,20 @@ module Droonga
       nodes = {}
       unprocessed_messages_existence = {}
       current_members.each do |member|
-        foreign = member["tags"]["cluster_id"] != current_cluster_id
+        foreign = member["tags"][Tag.cluster_id] != current_cluster_id
         next if foreign
 
         member["tags"].each do |key, value|
-          next unless key.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX)
-          node_name = key.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "")
+          next unless Tag.have_unprocessed_messages_tag?(key)
+          node_name = Tag.extract_node_name_from_have_unprocessed_messages_tag(key)
           next if unprocessed_messages_existence[node_name]
           unprocessed_messages_existence[node_name] = value == "true"
         end
 
         nodes[member["name"]] = {
-          "type" => member["tags"]["type"],
-          "role" => member["tags"]["role"],
-          "accept_messages_newer_than" => member["tags"]["accept-newer-than"],
+          "type" => member["tags"][Tag.node_type],
+          "role" => member["tags"][Tag.node_role],
+          "accept_messages_newer_than" => member["tags"][Tag.accept_messages_newer_than],
           "live" => member["status"] == "alive",
         }
       end
@@ -171,44 +172,44 @@ module Droonga
     end
 
     def update_cluster_id
-      set_tag("cluster_id", cluster_id)
+      set_tag(Tag.cluster_id, cluster_id)
     end
 
     def set_have_unprocessed_messages_for(node_name)
-      tag = have_unprocessed_messages_tag_for(node_name)
+      tag = Tag.have_unprocessed_messages_tag_for(node_name)
       set_tag(tag, true) unless @tags_cache.key?(tag)
     end
 
     def reset_have_unprocessed_messages_for(node_name)
-      delete_tag(have_unprocessed_messages_tag_for(node_name))
+      delete_tag(Tag.have_unprocessed_messages_tag_for(node_name))
     end
 
     def role
-      NodeRole.normalize(get_tag("role"))
+      NodeRole.normalize(get_tag(Tag.node_role))
     end
 
     def role=(new_role)
       role = NodeRole.normalize(new_role)
-      set_tag("role", role)
+      set_tag(Tag.node_role, role)
       # after that you must run update_cluster_state to update the cluster information cache
       role
     end
 
     def last_processed_message_timestamp
-      get_tag("last-timestamp")
+      get_tag(Tag.last_processed_message_timestamp)
     end
 
     def last_processed_message_timestamp=(timestamp)
-      set_tag("last-timestamp", timestamp.to_s)
+      set_tag(Tag.last_processed_message_timestamp, timestamp.to_s)
       # after that you must run update_cluster_state to update the cluster information cache
     end
 
     def accept_messages_newer_than_timestamp
-      get_tag("accept-newer-than")
+      get_tag(Tag.accept_messages_newer_than)
     end
 
     def accept_messages_newer_than(timestamp)
-      set_tag("accept-newer-than", timestamp.to_s)
+      set_tag(Tag.accept_messages_newer_than, timestamp.to_s)
       # after that you must run update_cluster_state to update the cluster information cache
     end
 
@@ -223,13 +224,13 @@ module Droonga
 
     def ensure_restarted(&block)
       start_time = Time.now
-      previous_internal_name = get_tag("internal-name")
+      previous_internal_name = get_tag(Tag.internal_node_name)
       restarted = false
 
       yield # the given operation must restart the service.
 
       while Time.now - start_time < CHECK_RESTARTED_TIMEOUT
-        restarted = get_tag("internal-name") == previous_internal_name
+        restarted = get_tag(Tag.internal_node_name) == previous_internal_name
         break if restarted
         sleep(CHECK_RESTARTED_INTERVAL)
       end
@@ -302,12 +303,6 @@ module Droonga
       end
     end
 
-    HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-"
-
-    def have_unprocessed_messages_tag_for(node_name)
-      "#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}"
-    end
-
     def log_tag
       "serf"
     end

  Added: lib/droonga/serf/tag.rb (+60 -0) 100644
===================================================================
--- /dev/null
+++ lib/droonga/serf/tag.rb    2015-04-21 17:19:31 +0900 (564ea55)
@@ -0,0 +1,60 @@
+# Copyright (C) 2015 Droonga Project
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License version 2.1 as published by the Free Software Foundation.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+
+module Droonga
+  class Serf
+    class Tag
+      class << self
+        def node_type
+          "type"
+        end
+
+        def node_role
+          "role"
+        end
+
+        def internal_node_name
+          "internal-name"
+        end
+
+        def cluster_id
+          "cluster_id"
+        end
+
+        def accept_messages_newer_than
+          "accept-newer-than"
+        end
+
+        def last_processed_message_timestamp
+          "last-timestamp"
+        end
+
+        HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX = "buffered-for-"
+
+        def have_unprocessed_messages_tag_for(node_name)
+         "#{HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX}#{node_name}"
+        end
+
+        def have_unprocessed_messages_tag?(tag)
+          tag.start_with?(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX)
+        end
+
+        def extract_node_name_from_have_unprocessed_messages_tag(tag)
+          tag.sub(HAVE_UNPROCESSED_MESSAGES_TAG_PREFIX, "")
+        end
+      end
+    end
+  end
+end
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index