[Groonga-commit] groonga/grnxx at 5cf759d [master] Add grnxx::map::Pool<grnxx::Bytes>.

Back to archive index

susumu.yata null+****@clear*****
Thu Aug 15 20:42:28 JST 2013


susumu.yata	2013-08-15 20:42:28 +0900 (Thu, 15 Aug 2013)

  New Revision: 5cf759d17a7acdb0cebf6e7e129a2cb5620f8673
  https://github.com/groonga/grnxx/commit/5cf759d17a7acdb0cebf6e7e129a2cb5620f8673

  Message:
    Add grnxx::map::Pool<grnxx::Bytes>.

  Modified files:
    lib/grnxx/map/pool.cpp
    lib/grnxx/map/pool.hpp

  Modified: lib/grnxx/map/pool.cpp (+379 -36)
===================================================================
--- lib/grnxx/map/pool.cpp    2013-08-15 19:23:42 +0900 (1ec5d7d)
+++ lib/grnxx/map/pool.cpp    2013-08-15 20:42:28 +0900 (9dfd533)
@@ -97,10 +97,8 @@ void Pool<T>::unlink(Storage *storage, uint32_t storage_node_id) {
 
 template <typename T>
 void Pool<T>::unset(int64_t key_id) {
-  if (size_ != header_->size) {
-    refresh();
-  }
-  void * const page = get_page(key_id);
+  refresh_if_possible();
+  void * const page = get_page(key_id / PAGE_SIZE);
   const uint64_t local_key_id = key_id % PAGE_SIZE;
   const uint64_t unit_id = local_key_id / UNIT_SIZE;
   const uint64_t validity_bit = 1ULL << (local_key_id % UNIT_SIZE);
@@ -119,10 +117,8 @@ void Pool<T>::unset(int64_t key_id) {
 
 template <typename T>
 void Pool<T>::reset(int64_t key_id, KeyArg dest_key) {
-  if (size_ != header_->size) {
-    refresh();
-  }
-  void * const page = get_page(key_id);
+  refresh_if_possible();
+  void * const page = get_page(key_id / PAGE_SIZE);
   const uint64_t local_key_id = key_id % PAGE_SIZE;
   const Unit * const unit =
       static_cast<const Unit *>(page) - (local_key_id / UNIT_SIZE) - 1;
@@ -135,9 +131,7 @@ void Pool<T>::reset(int64_t key_id, KeyArg dest_key) {
 
 template <typename T>
 int64_t Pool<T>::add(KeyArg key) {
-  if (size_ != header_->size) {
-    refresh();
-  }
+  refresh_if_possible();
   if (header_->latest_available_unit_id == INVALID_UNIT_ID) {
     // Use a new unit.
     const int64_t next_key_id = header_->max_key_id + 1;
@@ -146,7 +140,8 @@ int64_t Pool<T>::add(KeyArg key) {
                     << ", max_key_id = " << MAX_KEY_ID;
       throw LogicError();
     }
-    void * const page = reserve_page(next_key_id);
+    reserve_key_id(next_key_id);
+    void * const page = get_page(next_key_id / PAGE_SIZE);
     const uint64_t local_key_id = next_key_id % PAGE_SIZE;
     Unit * const unit =
         static_cast<Unit *>(page) - (local_key_id / UNIT_SIZE) - 1;
@@ -160,7 +155,7 @@ int64_t Pool<T>::add(KeyArg key) {
   } else {
     // Use an existing unit.
     const uint64_t unit_id = header_->latest_available_unit_id;
-    void * const page = get_page(unit_id * UNIT_SIZE);
+    void * const page = get_page(unit_id * UNIT_SIZE / PAGE_SIZE);
     Unit * const unit =
         static_cast<Unit *>(page) - (unit_id % (PAGE_SIZE / UNIT_SIZE)) - 1;
     const uint8_t validity_bit_id = bit_scan_forward(~unit->validity_bits);
@@ -212,13 +207,12 @@ void Pool<T>::open_pool(Storage *storage, uint32_t storage_node_id) {
 }
 
 template <typename T>
-void *Pool<T>::open_page(int64_t key_id) {
-  if (static_cast<uint64_t>(key_id) >= header_->size) {
-    GRNXX_ERROR() << "invalid argument: key_id = " << key_id
-                  << ", size = " << header_->size;
+void *Pool<T>::open_page(uint64_t page_id) {
+  if (page_id >= (header_->size / PAGE_SIZE)) {
+    GRNXX_ERROR() << "invalid argument: page_id = " << page_id
+                  << ", table_size = " << (header_->size / PAGE_SIZE);
     throw LogicError();
   }
-  const uint64_t page_id = key_id / PAGE_SIZE;
   if (!pages_[page_id]) {
     Lock lock(&header_->mutex);
     if (!pages_[page_id]) {
@@ -237,15 +231,16 @@ void *Pool<T>::open_page(int64_t key_id) {
 }
 
 template <typename T>
-void *Pool<T>::reserve_page(int64_t key_id) {
+void Pool<T>::reserve_key_id(int64_t key_id) {
   if (static_cast<uint64_t>(key_id) >= header_->size) {
     expand();
   }
   const uint64_t page_id = key_id / PAGE_SIZE;
   if (!pages_[page_id]) {
-    Lock lock(&header_->mutex);
-    if (!pages_[page_id]) {
-      void *page;
+    // Note that "pages_[0]" is not nullptr if there is a small-size page
+    // because it is opened in refresh_page().
+    if (table_[page_id] == STORAGE_INVALID_NODE_ID) {
+      Lock lock(&header_->mutex);
       if (table_[page_id] == STORAGE_INVALID_NODE_ID) {
         // Create a full-size page.
         // Note that a small-size page is created in expand_page().
@@ -254,17 +249,9 @@ void *Pool<T>::reserve_page(int64_t key_id) {
         StorageNode page_node =
             storage_->create_node(storage_node_id_, page_node_size);
         table_[page_id] = page_node.id();
-        page = page_node.body();
-      } else {
-        // Open an existing full-size page.
-        // Note that a small-size page is opened in refresh_page().
-        StorageNode page_node = storage_->open_node(table_[page_id]);
-        page = page_node.body();
       }
-      pages_[page_id] = static_cast<Unit *>(page) + (PAGE_SIZE / UNIT_SIZE);
     }
   }
-  return pages_[page_id];
 }
 
 template <typename T>
@@ -272,17 +259,17 @@ void Pool<T>::expand() {
   Lock lock(&header_->mutex);
   if (size_ < PAGE_SIZE) {
     // Create a small-size page or the first full-size page.
-    header_->size = expand_page();
+    expand_page();
     refresh_page();
   } else {
     // Create a table.
-    header_->size = expand_table();
+    expand_table();
     refresh_table();
   }
 }
 
 template <typename T>
-uint64_t Pool<T>::expand_page() {
+void Pool<T>::expand_page() {
   const uint64_t new_size = (size_ == 0) ? MIN_PAGE_SIZE : (size_ * 2);
   const uint64_t page_node_size =
       (sizeof(Unit) * (new_size / UNIT_SIZE)) + (sizeof(T) * new_size);
@@ -301,11 +288,11 @@ uint64_t Pool<T>::expand_page() {
     }
   }
   header_->page_storage_node_id = page_node.id();
-  return new_size;
+  header_->size = new_size;
 }
 
 template <typename T>
-uint64_t Pool<T>::expand_table() {
+void Pool<T>::expand_table() {
   const uint64_t old_table_size =
       (size_ <= PAGE_SIZE) ? 0 : (size_ / PAGE_SIZE);
   const uint64_t new_table_size =
@@ -327,7 +314,7 @@ uint64_t Pool<T>::expand_table() {
     new_table[i] = STORAGE_INVALID_NODE_ID;
   }
   header_->table_storage_node_id = table_node.id();
-  return new_size;
+  header_->size = new_size;
 }
 
 template <typename T>
@@ -410,5 +397,361 @@ template class Pool<uint64_t>;
 template class Pool<double>;
 template class Pool<GeoPoint>;
 
+namespace {
+
+constexpr uint32_t MAX_PAGE_ID     = std::numeric_limits<uint32_t>::max() - 1;
+constexpr uint32_t INVALID_PAGE_ID = MAX_PAGE_ID + 1;
+
+}  // namespace
+
+PoolHeader<Bytes>::PoolHeader()
+    : size(0),
+      next_offset(0),
+      page_storage_node_id(STORAGE_INVALID_NODE_ID),
+      index_pool_storage_node_id(STORAGE_INVALID_NODE_ID),
+      mutex() {}
+
+PoolTableEntry::PoolTableEntry()
+    : page_storage_node_id(STORAGE_INVALID_NODE_ID),
+      size_in_use(0) {}
+
+Pool<Bytes>::Pool()
+    : storage_(nullptr),
+      storage_node_id_(STORAGE_INVALID_NODE_ID),
+      header_(nullptr),
+      index_pool_(),
+      pages_(),
+      table_(nullptr),
+      size_(0),
+      queue_() {}
+
+Pool<Bytes>::~Pool() {}
+
+Pool<Bytes> *Pool<Bytes>::create(Storage *storage, uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    throw LogicError();
+  }
+  std::unique_ptr<Pool> pool(new (std::nothrow) Pool);
+  if (!pool) {
+    GRNXX_ERROR() << "new grnxx::map::Pool failed";
+    throw MemoryError();
+  }
+  pool->create_pool(storage, storage_node_id);
+  return pool.release();
+}
+
+Pool<Bytes> *Pool<Bytes>::open(Storage *storage, uint32_t storage_node_id) {
+  if (!storage) {
+    GRNXX_ERROR() << "invalid argument: storage == nullptr";
+    throw LogicError();
+  }
+  std::unique_ptr<Pool> pool(new (std::nothrow) Pool);
+  if (!pool) {
+    GRNXX_ERROR() << "new grnxx::map::Pool failed";
+    throw MemoryError();
+  }
+  pool->open_pool(storage, storage_node_id);
+  return pool.release();
+}
+
+void Pool<Bytes>::unlink(Storage *storage, uint32_t storage_node_id) {
+  std::unique_ptr<Pool> pool(Pool::open(storage, storage_node_id));
+  storage->unlink_node(storage_node_id);
+}
+
+void Pool<Bytes>::unset(int64_t key_id) {
+  uint64_t bytes_id;
+  if (!index_pool_->get(key_id, &bytes_id)) {
+    GRNXX_ERROR() << "not found: key_id = " << key_id;
+    throw LogicError();
+  }
+  refresh_if_possible();
+  index_pool_->unset(key_id);
+  unset_bytes(bytes_id);
+}
+
+void Pool<Bytes>::reset(int64_t key_id, KeyArg dest_key) {
+  uint64_t src_bytes_id;
+  if (!index_pool_->get(key_id, &src_bytes_id)) {
+    GRNXX_ERROR() << "not found: key_id = " << key_id;
+    throw LogicError();
+  }
+  refresh_if_possible();
+  const uint64_t dest_bytes_id = add_bytes(dest_key);
+  index_pool_->reset(key_id, dest_bytes_id);
+  unset_bytes(src_bytes_id);
+}
+
+int64_t Pool<Bytes>::add(KeyArg key) {
+  refresh_if_possible();
+  const uint64_t bytes_id = add_bytes(key);
+  try {
+    const int64_t key_id = index_pool_->add(bytes_id);
+    return key_id;
+  } catch (...) {
+    unset_bytes(bytes_id);
+    throw;
+  }
+}
+
+void Pool<Bytes>::defrag() {
+  index_pool_->defrag();
+  // TODO
+}
+
+void Pool<Bytes>::create_pool(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  const uint64_t header_node_size = sizeof(Header) + sizeof(TableEntry);
+  StorageNode header_node =
+      storage->create_node(storage_node_id, header_node_size);
+  storage_node_id_ = header_node.id();
+  try {
+    header_ = static_cast<Header *>(header_node.body());
+    *header_ = Header();
+    index_pool_.reset(IndexPool::create(storage_, storage_node_id_));
+    header_->index_pool_storage_node_id = index_pool_->storage_node_id();
+    table_ = reinterpret_cast<TableEntry *>(header_ + 1);
+    *table_ = TableEntry();
+  } catch (...) {
+    storage->unlink_node(storage_node_id_);
+    throw;
+  }
+}
+
+void Pool<Bytes>::open_pool(Storage *storage, uint32_t storage_node_id) {
+  storage_ = storage;
+  StorageNode storage_node = storage->open_node(storage_node_id);
+  storage_node_id_ = storage_node.id();
+  header_ = static_cast<Header *>(storage_node.body());
+  index_pool_.reset(IndexPool::open(storage_,
+                                    header_->index_pool_storage_node_id));
+  table_ = reinterpret_cast<TableEntry *>(header_ + 1);
+}
+
+void Pool<Bytes>::unset_bytes(uint64_t bytes_id) {
+  if (bytes_id == EMPTY_BYTES_ID) {
+    // Nothing to do.
+    return;
+  }
+  const uint64_t bytes_offset = get_offset(bytes_id);
+  const uint32_t bytes_size = get_size(bytes_id);
+  if ((bytes_offset + bytes_size) > header_->next_offset) {
+    GRNXX_ERROR() << "invalid argument: bytes_offset = " << bytes_offset
+                  << ", bytes_size = " << bytes_size
+                  << ", next_offset = " << header_->next_offset;
+    throw LogicError();
+  }
+  const uint32_t page_id = static_cast<uint32_t>(bytes_offset / PAGE_SIZE);
+  TableEntry * const table_entry = &table_[page_id];
+  if (bytes_size > table_entry->size_in_use) {
+    GRNXX_ERROR() << "invalid argument: bytes_size = " << bytes_size
+                  << ", size_in_use = " << table_entry->size_in_use;
+    throw LogicError();
+  }
+  table_entry->size_in_use -= bytes_size;
+  if ((table_entry->size_in_use == 0) &&
+      (page_id != (header_->next_offset / PAGE_SIZE))) {
+    // Unlink a page if this operation makes it empty.
+    storage_->unlink_node(table_entry->page_storage_node_id);
+  }
+}
+
+uint64_t Pool<Bytes>::add_bytes(KeyArg bytes) {
+  const uint32_t bytes_size = static_cast<uint32_t>(bytes.size());
+  if (bytes_size > MAX_KEY_SIZE) {
+    GRNXX_ERROR() << "invalid argument: key_size = " << bytes_size
+                  << ", max_key_size = " << MAX_KEY_SIZE;
+    throw LogicError();
+  }
+  if (bytes_size == 0) {
+    return EMPTY_BYTES_ID;
+  }
+  const uint64_t bytes_offset = reserve_space(bytes_size);
+  const uint32_t page_id = static_cast<uint32_t>(bytes_offset / PAGE_SIZE);
+  uint8_t * const page = get_page(page_id);
+  std::memcpy(page + (bytes_offset % PAGE_SIZE), bytes.data(), bytes_size);
+  TableEntry * const table_entry = &table_[page_id];
+  table_entry->size_in_use += bytes_size;
+  return get_bytes_id(bytes_offset, bytes_size);
+}
+
+uint8_t *Pool<Bytes>::open_page(uint32_t page_id) {
+  if (page_id >= (header_->size / PAGE_SIZE)) {
+    GRNXX_ERROR() << "invalid argument: page_id = " << page_id
+                  << ", table_size = " << (header_->size / PAGE_SIZE);
+    throw LogicError();
+  }
+  if (!pages_[page_id]) {
+    Lock lock(&header_->mutex);
+    if (!pages_[page_id]) {
+      // Open an existing full-size page.
+      // Note that a small-size page is opened in refresh_page().
+      if (table_[page_id].page_storage_node_id == STORAGE_INVALID_NODE_ID) {
+        GRNXX_ERROR() << "not found: page_id = " << page_id;
+        throw LogicError();
+      }
+      StorageNode page_node =
+          storage_->open_node(table_[page_id].page_storage_node_id);
+      pages_[page_id] = static_cast<uint8_t *>(page_node.body());
+    }
+  }
+  return pages_[page_id];
+}
+
+uint64_t Pool<Bytes>::reserve_space(uint32_t size) {
+  uint64_t offset = header_->next_offset;
+  const uint32_t page_size = static_cast<uint32_t>(
+      (header_->size < PAGE_SIZE) ? header_->size : PAGE_SIZE);
+  const uint32_t page_size_left = static_cast<uint32_t>(
+      ((offset % PAGE_SIZE) == 0) ? 0 : (page_size - (offset % PAGE_SIZE)));
+  if (size <= page_size_left) {
+    header_->next_offset += size;
+    return offset;
+  }
+  if ((header_->next_offset + size) > header_->size) {
+    expand(size);
+  }
+  if (page_size == PAGE_SIZE) {
+    offset += page_size_left;
+  }
+  const uint32_t page_id = static_cast<uint32_t>(offset / PAGE_SIZE);
+  if (!pages_[page_id]) {
+    // Note that "pages_[0]" is not nullptr if there is a small-size page
+    // because it is opened in refresh_page().
+    if (table_[page_id].page_storage_node_id == STORAGE_INVALID_NODE_ID) {
+      Lock lock(&header_->mutex);
+      if (table_[page_id].page_storage_node_id == STORAGE_INVALID_NODE_ID) {
+        // Create a full-size page.
+        // Note that a small-size page is created in expand_page().
+        StorageNode page_node =
+            storage_->create_node(storage_node_id_, PAGE_SIZE);
+        table_[page_id].page_storage_node_id = page_node.id();
+      }
+    }
+  }
+  header_->next_offset = offset + size;
+  return offset;
+}
+
+void Pool<Bytes>::expand(uint32_t additional_size) {
+  Lock lock(&header_->mutex);
+  if (size_ < PAGE_SIZE) {
+    // Create a small-size page or the first full-size page.
+    expand_page(additional_size);
+    refresh_page();
+  } else {
+    // Create a table.
+    expand_table(additional_size);
+    refresh_table();
+  }
+}
+
+void Pool<Bytes>::expand_page(uint32_t additional_size) {
+  const uint64_t min_size = size_ + additional_size;
+  uint64_t new_size = (size_ == 0) ? MIN_PAGE_SIZE : (size_ * 2);
+  while (new_size < min_size) {
+    new_size *= 2;
+  }
+  StorageNode page_node = storage_->create_node(storage_node_id_, new_size);
+  if (size_ != 0) {
+    // Copy data from the current page and unlink it.
+    std::memcpy(page_node.body(), pages_[0], size_);
+    try {
+      storage_->unlink_node(header_->page_storage_node_id);
+    } catch (...) {
+      storage_->unlink_node(page_node.id());
+      throw;
+    }
+  }
+  table_[0].page_storage_node_id = page_node.id();
+  header_->page_storage_node_id = page_node.id();
+  header_->size = new_size;
+}
+
+void Pool<Bytes>::expand_table(uint32_t) {
+  const uint64_t old_table_size = size_ / PAGE_SIZE;
+  const uint64_t new_table_size = (old_table_size < MIN_TABLE_SIZE) ?
+       MIN_TABLE_SIZE : (old_table_size * 2);
+  const uint64_t new_size = new_table_size * PAGE_SIZE;
+  StorageNode table_node = storage_->create_node(
+      storage_node_id_, sizeof(TableEntry) * new_table_size);
+  TableEntry * const new_table = static_cast<TableEntry *>(table_node.body());
+  uint64_t i = 0;
+  for ( ; i < old_table_size; ++i) {
+    new_table[i] = table_[i];
+  }
+  for ( ; i < new_table_size; ++i) {
+    new_table[i] = TableEntry();
+  }
+  header_->table_storage_node_id = table_node.id();
+  header_->size = new_size;
+}
+
+void Pool<Bytes>::refresh() {
+  if (size_ != header_->size) {
+    Lock lock(&header_->mutex);
+    if (header_->size <= PAGE_SIZE) {
+      // Reopen a page because it is old.
+      refresh_page();
+    } else {
+      // Reopen a table because it is old.
+      refresh_table();
+    }
+    size_ = header_->size;
+  }
+}
+
+void Pool<Bytes>::refresh_page() {
+  StorageNode page_node =
+      storage_->open_node(header_->page_storage_node_id);
+  if (!pages_) {
+    std::unique_ptr<uint8_t *[]> new_pages(new (std::nothrow) uint8_t *[1]);
+    if (!new_pages) {
+      GRNXX_ERROR() << "new uint8_t *[] failed: size = " << 1;
+      throw MemoryError();
+    }
+    new_pages[0] = static_cast<uint8_t *>(page_node.body());
+    pages_.swap(new_pages);
+  } else {
+    pages_[0] = static_cast<uint8_t *>(page_node.body());
+  }
+}
+
+void Pool<Bytes>::refresh_table() {
+  StorageNode table_node =
+      storage_->open_node(header_->table_storage_node_id);
+  TableEntry * const new_table =
+      static_cast<TableEntry *>(table_node.body());
+  const uint64_t new_table_size = header_->size / PAGE_SIZE;
+  std::unique_ptr<uint8_t *[]> new_pages(
+      new (std::nothrow) uint8_t *[new_table_size]);
+  if (!new_pages) {
+    GRNXX_ERROR() << "new uint8_t *[] failed: size = " << new_table_size;
+    throw MemoryError();
+  }
+  // Initialize a new cache table.
+  const uint64_t old_table_size = size_ / PAGE_SIZE;
+  uint64_t i = 0;
+  for ( ; i < old_table_size; ++i) {
+    new_pages[i] = pages_[i];
+  }
+  for ( ; i < new_table_size; ++i) {
+    new_pages[i] = nullptr;
+  }
+  pages_.swap(new_pages);
+  // Keep an old cache table because another thread may read it.
+  if (new_pages) {
+    try {
+      // TODO: Time must be added.
+      queue_.push(std::move(new_pages));
+    } catch (const std::exception &exception) {
+      GRNXX_ERROR() << "std::queue::push() failed";
+      throw StandardError(exception);
+    }
+  }
+  table_ = new_table;
+}
+
 }  // namespace map
 }  // namespace grnxx

  Modified: lib/grnxx/map/pool.hpp (+165 -22)
===================================================================
--- lib/grnxx/map/pool.hpp    2013-08-15 19:23:42 +0900 (be80571)
+++ lib/grnxx/map/pool.hpp    2013-08-15 20:42:28 +0900 (92e6e77)
@@ -100,10 +100,8 @@ class Pool {
   }
 
   bool get(int64_t key_id, Key *key) {
-    if (size_ != header_->size) {
-      refresh();
-    }
-    const void * const page = get_page(key_id);
+    refresh_if_possible();
+    const void * const page = get_page(key_id / PAGE_SIZE);
     const uint64_t local_key_id = key_id % PAGE_SIZE;
     const Unit * const unit =
         static_cast<const Unit *>(page) - (local_key_id / UNIT_SIZE) - 1;
@@ -116,18 +114,14 @@ class Pool {
   }
 
   Key get_key(int64_t key_id) {
-    if (size_ != header_->size) {
-      refresh();
-    }
-    const void * const page = get_page(key_id);
+    refresh_if_possible();
+    const void * const page = get_page(key_id / PAGE_SIZE);
     return static_cast<const T *>(page)[key_id % PAGE_SIZE];
   }
 
   bool get_bit(int64_t key_id) {
-    if (size_ != header_->size) {
-      refresh();
-    }
-    const void * const page = get_page(key_id);
+    refresh_if_possible();
+    const void * const page = get_page(key_id / PAGE_SIZE);
     const uint64_t local_key_id = key_id % PAGE_SIZE;
     const Unit * const unit =
         static_cast<const Unit *>(page) - (local_key_id / UNIT_SIZE) - 1;
@@ -154,21 +148,170 @@ class Pool {
   void create_pool(Storage *storage, uint32_t storage_node_id);
   void open_pool(Storage *storage, uint32_t storage_node_id);
 
-  void *get_page(int64_t key_id) {
-    void * const page = pages_[key_id / PAGE_SIZE];
-    if (!page) {
-      return open_page(key_id);
-    }
-    return page;
+  void *get_page(uint64_t page_id) {
+    return pages_[page_id] ? pages_[page_id] : open_page(page_id);
   }
-  void *open_page(int64_t key_id);
+  void *open_page(uint64_t page_id);
 
-  void *reserve_page(int64_t key_id);
+  void reserve_key_id(int64_t key_id);
 
   void expand();
-  uint64_t expand_page();
-  uint64_t expand_table();
+  void expand_page();
+  void expand_table();
+
+  void refresh_if_possible() {
+    if (size_ != header_->size) {
+      refresh();
+    }
+  }
+  void refresh();
+  void refresh_page();
+  void refresh_table();
+};
+
+template <>
+struct PoolHeader<Bytes> {
+  uint64_t size;
+  uint64_t next_offset;
+  union {
+    uint32_t page_storage_node_id;
+    uint32_t table_storage_node_id;
+  };
+  uint32_t index_pool_storage_node_id;
+  Mutex mutex;
+
+  PoolHeader();
+};
+
+struct PoolTableEntry {
+  uint32_t page_storage_node_id;
+  uint32_t size_in_use;
+
+  PoolTableEntry();
+};
+
+template <>
+class Pool<Bytes> {
+  using Header     = PoolHeader<Bytes>;
+  using IndexPool  = Pool<uint64_t>;
+  using TableEntry = PoolTableEntry;
+
+  static constexpr int64_t  MIN_KEY_ID     = POOL_MIN_KEY_ID;
+  static constexpr int64_t  MAX_KEY_ID     = POOL_MAX_KEY_ID;
+  static constexpr uint64_t MAX_KEY_SIZE   = 4096;
+
+  static constexpr uint64_t PAGE_SIZE      = 1ULL << 20;
+
+  static constexpr uint64_t MIN_PAGE_SIZE  = 64;
+  static constexpr uint64_t MIN_TABLE_SIZE = 1ULL << 10;
+
+  static constexpr uint64_t BYTES_ID_SIZE_MASK = (1ULL << 13) - 1;
+
+  static constexpr uint64_t EMPTY_BYTES_ID = 0;
 
+ public:
+  using Key = typename Traits<Bytes>::Type;
+  using KeyArg = typename Traits<Bytes>::ArgumentType;
+
+  Pool();
+  ~Pool();
+
+  static Pool *create(Storage *storage, uint32_t storage_node_id);
+  static Pool *open(Storage *storage, uint32_t storage_node_id);
+
+  static void unlink(Storage *storage, uint32_t storage_node_id);
+
+  uint32_t storage_node_id() const {
+    return storage_node_id_;
+  }
+
+  static constexpr int64_t min_key_id() {
+    return IndexPool::min_key_id();
+  }
+  int64_t max_key_id() const {
+    return index_pool_->max_key_id();
+  }
+  uint64_t num_keys() const {
+    return index_pool_->num_keys();
+  }
+
+  bool get(int64_t key_id, Key *key) {
+    uint64_t bytes_id;
+    if (!index_pool_->get(key_id, &bytes_id)) {
+      // Not found.
+      return false;
+    }
+    *key = get_bytes(bytes_id);
+    return true;
+  }
+
+  Key get_key(int64_t key_id) {
+    return get_bytes(index_pool_->get_key(key_id));
+  }
+
+  bool get_bit(int64_t key_id) {
+    return index_pool_->get_bit(key_id);
+  }
+
+  void unset(int64_t key_id);
+  void reset(int64_t key_id, KeyArg dest_key);
+
+  int64_t add(KeyArg key);
+
+  void defrag();
+
+ private:
+  Storage *storage_;
+  uint32_t storage_node_id_;
+  Header *header_;
+  std::unique_ptr<IndexPool> index_pool_;
+  std::unique_ptr<uint8_t *[]> pages_;
+  TableEntry *table_;
+  uint64_t size_;
+  // TODO: Time must be added.
+  std::queue<std::unique_ptr<uint8_t *[]>> queue_;
+
+  void create_pool(Storage *storage, uint32_t storage_node_id);
+  void open_pool(Storage *storage, uint32_t storage_node_id);
+
+  Key get_bytes(uint64_t bytes_id) {
+    if (bytes_id == EMPTY_BYTES_ID) {
+      return Bytes("", 0);
+    }
+    refresh_if_possible();
+    const uint64_t offset = get_offset(bytes_id);
+    const uint32_t page_id = static_cast<uint32_t>(offset / PAGE_SIZE);
+    return Bytes(get_page(page_id) + (offset % PAGE_SIZE), get_size(bytes_id));
+  }
+  void unset_bytes(uint64_t bytes_id);
+  uint64_t add_bytes(KeyArg bytes);
+
+  uint8_t *get_page(uint32_t page_id) {
+    return pages_[page_id] ? pages_[page_id] : open_page(page_id);
+  }
+  uint8_t *open_page(uint32_t page_id);
+
+  uint64_t reserve_space(uint32_t size);
+
+  static uint64_t get_bytes_id(uint64_t offset, uint32_t size) {
+    return (offset * (BYTES_ID_SIZE_MASK + 1)) | size;
+  }
+  static uint64_t get_offset(uint64_t bytes_id) {
+    return bytes_id / (BYTES_ID_SIZE_MASK + 1);
+  }
+  static uint32_t get_size(uint64_t bytes_id) {
+    return static_cast<uint32_t>(bytes_id & BYTES_ID_SIZE_MASK);
+  }
+
+  void expand(uint32_t additional_size);
+  void expand_page(uint32_t additional_size);
+  void expand_table(uint32_t additional_size);
+
+  void refresh_if_possible() {
+    if (size_ != header_->size) {
+      refresh();
+    }
+  }
   void refresh();
   void refresh_page();
   void refresh_table();
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index