susumu.yata
null+****@clear*****
Mon Jan 21 22:10:20 JST 2013
susumu.yata 2013-01-21 22:10:20 +0900 (Mon, 21 Jan 2013) New Revision: 203ad3b219bbc90c99173e935bd9ff02bc2243cb https://github.com/groonga/grnxx/commit/203ad3b219bbc90c99173e935bd9ff02bc2243cb Log: Implement insertion. (DoubleArray) Modified files: lib/alpha/double_array.cpp lib/alpha/double_array.hpp Modified: lib/alpha/double_array.cpp (+288 -13) =================================================================== --- lib/alpha/double_array.cpp 2013-01-20 15:18:02 +0900 (c68db51) +++ lib/alpha/double_array.cpp 2013-01-21 22:10:20 +0900 (00a37e4) @@ -132,14 +132,13 @@ bool DoubleArrayImpl::insert(const uint8_t *ptr, uint64_t length, uint64_t node_id = root_node_id(); uint64_t query_pos = 0; - // TODO -// search_leaf(ptr, length, node_id, query_pos); -// if (!insert_leaf(ptr, length, node_id, query_pos)) { -// if (key_pos) { -// *key_pos = nodes_[node_id].key_pos(); -// } -// return false; -// } + search_leaf(ptr, length, node_id, query_pos); + if (!insert_leaf(ptr, length, node_id, query_pos)) { + if (key_pos) { + *key_pos = nodes_[node_id].key_pos(); + } + return false; + } const uint64_t new_key_id = header_->next_key_id(); const uint64_t new_key_pos = append_key(ptr, length, new_key_id); @@ -237,13 +236,106 @@ bool DoubleArrayImpl::search_leaf(const uint8_t *ptr, uint64_t length, return true; } - // FIXME: Remove the magic numbers (TERMINAL_LABEL). - const uint64_t next = node.offset() ^ 0x100; - if (nodes_[next].label() != 0x100) { + if (node.child() != DOUBLE_ARRAY_TERMINAL_LABEL) { return false; } - node_id = next; - return nodes_[next].is_leaf(); + node_id = node.offset() ^ DOUBLE_ARRAY_TERMINAL_LABEL; + return true; +} + +bool DoubleArrayImpl::insert_leaf(const uint8_t *ptr, uint64_t length, + uint64_t &node_id, uint64_t query_pos) { + const DoubleArrayNode node = nodes_[node_id]; + if (node.is_terminal()) { + return true; + } else if (node.is_leaf()) { + const DoubleArrayKey &key = get_key(node.key_pos()); + uint64_t i = query_pos; + while ((i < length) && (i < node.key_length())) { + if (ptr[i] != key[i]) { + break; + } + ++i; + } + if ((i == length) && (i == node.key_length())) { + return false; + } + // TODO +// GRN_DAT_THROW_IF(SIZE_ERROR, num_keys() >= max_num_keys()); +// GRN_DAT_DEBUG_THROW_IF(next_key_id() > max_num_keys()); + + for (uint64_t j = query_pos; j < i; ++j) { + node_id = insert_node(node_id, ptr[j]); + } + node_id = separate(ptr, length, node_id, i); + return true; + } else { + // TODO +// GRN_DAT_THROW_IF(SIZE_ERROR, num_keys() >= max_num_keys()); + const uint16_t label = (query_pos < length) ? + static_cast<uint16_t>(ptr[query_pos]) : DOUBLE_ARRAY_TERMINAL_LABEL; + if ((node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) || + nodes_[node.offset() ^ label].is_phantom()) { + resolve(node_id, label); + } + node_id = insert_node(node_id, label); + return true; + } +} + +uint64_t DoubleArrayImpl::insert_node(uint64_t node_id, uint16_t label) { +// GRN_DAT_DEBUG_THROW_IF(node_id >= num_nodes()); +// GRN_DAT_DEBUG_THROW_IF(label > MAX_LABEL); + + const DoubleArrayNode node = nodes_[node_id]; + uint64_t offset; + if (node.is_leaf() || (node.offset() == DOUBLE_ARRAY_INVALID_OFFSET)) { + offset = find_offset(&label, 1); + } else { + offset = node.offset(); + } + + const uint64_t next = offset ^ label; + reserve_node(next); + + nodes_[next].set_label(label); + if (node.is_leaf()) { +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset()); + nodes_[offset].set_is_origin(true); + nodes_[next].set_key(node.key_pos(), node.key_length()); + } else if (node.offset() == DOUBLE_ARRAY_INVALID_OFFSET) { +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset].is_offset()); + nodes_[offset].set_is_origin(true); +// } else { +// GRN_DAT_DEBUG_THROW_IF(!nodes_[offset].is_origin()); + } + nodes_[node_id].set_offset(offset); + + const uint16_t child_label = nodes_[node_id].child(); +// GRN_DAT_DEBUG_THROW_IF(child_label == label); + if (child_label == DOUBLE_ARRAY_INVALID_LABEL) { + nodes_[node_id].set_child(label); + } else if ((label == DOUBLE_ARRAY_TERMINAL_LABEL) || + ((child_label != DOUBLE_ARRAY_TERMINAL_LABEL) && + (label < child_label))) { +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).is_phantom()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset ^ child_label).label() != child_label); + nodes_[next].set_sibling(child_label); + nodes_[node_id].set_child(label); + } else { + uint64_t prev = offset ^ child_label; +// GRN_DAT_DEBUG_THROW_IF(nodes_[prev).label() != child_label); + uint16_t sibling_label = nodes_[prev].sibling(); + while (label > sibling_label) { + prev = offset ^ sibling_label; +// GRN_DAT_DEBUG_THROW_IF(nodes_[prev].label() != sibling_label); + sibling_label = nodes_[prev].sibling(); + } +// GRN_DAT_DEBUG_THROW_IF(label == sibling_label); + nodes_[next].set_sibling(nodes_[prev].sibling()); + nodes_[prev].set_sibling(label); + } + return next; } uint64_t DoubleArrayImpl::append_key(const uint8_t *ptr, uint64_t length, @@ -267,6 +359,189 @@ uint64_t DoubleArrayImpl::append_key(const uint8_t *ptr, uint64_t length, return key_pos; } +uint64_t DoubleArrayImpl::separate(const uint8_t *ptr, uint64_t length, + uint64_t node_id, uint64_t i) { +// GRN_DAT_DEBUG_THROW_IF(node_id >= num_nodes()); +// GRN_DAT_DEBUG_THROW_IF(!nodes_[node_id].is_leaf()); +// GRN_DAT_DEBUG_THROW_IF(i > length); + + const DoubleArrayNode node = nodes_[node_id]; + const uint64_t key_pos = node.key_pos(); + const DoubleArrayKey &key = get_key(key_pos); + + uint16_t labels[2]; + labels[0] = (i < node.key_length()) ? + static_cast<uint16_t>(key[i]) : DOUBLE_ARRAY_TERMINAL_LABEL; + labels[1] = (i < length) ? + static_cast<uint16_t>(ptr[i]) : DOUBLE_ARRAY_TERMINAL_LABEL; +// GRN_DAT_DEBUG_THROW_IF(labels[0] == labels[1]); + + const uint64_t offset = find_offset(labels, 2); + + uint64_t next = offset ^ labels[0]; + reserve_node(next); +// GRN_DAT_DEBUG_THROW_IF(nodes_[offset).is_offset()); + + nodes_[next].set_label(labels[0]); + nodes_[next].set_key(key_pos, length); + + next = offset ^ labels[1]; + reserve_node(next); + + nodes_[next].set_label(labels[1]); + + nodes_[offset].set_is_origin(true); + nodes_[node_id].set_offset(offset); + + if ((labels[0] == DOUBLE_ARRAY_TERMINAL_LABEL) || + ((labels[1] != DOUBLE_ARRAY_TERMINAL_LABEL) && + (labels[0] < labels[1]))) { + nodes_[node_id].set_child(labels[0]); + nodes_[offset ^ labels[0]].set_sibling(labels[1]); + } else { + nodes_[node_id].set_child(labels[1]); + nodes_[offset ^ labels[1]].set_sibling(labels[0]); + } + return next; +} + +void DoubleArrayImpl::resolve(uint64_t node_id, uint16_t label) { +// GRN_DAT_DEBUG_THROW_IF(node_id >= num_nodes()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[node_id].is_leaf()); +// GRN_DAT_DEBUG_THROW_IF(label > MAX_LABEL); + + uint64_t offset = nodes_[node_id].offset(); + if (offset != DOUBLE_ARRAY_INVALID_OFFSET) { + uint16_t labels[DOUBLE_ARRAY_MAX_LABEL + 1]; + uint16_t num_labels = 0; + + uint16_t next_label = nodes_[node_id].child(); + // FIXME: How to check if the node has children or not. +// GRN_DAT_DEBUG_THROW_IF(next_label == INVALID_LABEL); + while (next_label != DOUBLE_ARRAY_INVALID_LABEL) { +// GRN_DAT_DEBUG_THROW_IF(next_label > MAX_LABEL); + labels[num_labels++] = next_label; + next_label = nodes_[offset ^ next_label].sibling(); + } +// GRN_DAT_DEBUG_THROW_IF(num_labels == 0); + + labels[num_labels] = label; + offset = find_offset(labels, num_labels + 1); + migrate_nodes(node_id, offset, labels, num_labels); + } else { + offset = find_offset(&label, 1); + if (offset >= header_->num_nodes()) { +// GRN_DAT_DEBUG_THROW_IF((offset / BLOCK_SIZE) != num_blocks()); + reserve_chunk(header_->num_chunks()); + } + nodes_[offset].set_is_origin(true); + nodes_[node_id].set_offset(offset); + } +} + +void DoubleArrayImpl::migrate_nodes(uint64_t node_id, uint64_t dest_offset, + const uint16_t *labels, + uint16_t num_labels) { +// GRN_DAT_DEBUG_THROW_IF(node_id >= num_nodes()); +// GRN_DAT_DEBUG_THROW_IF(nodes_[node_id].is_leaf()); +// GRN_DAT_DEBUG_THROW_IF(labels == NULL); +// GRN_DAT_DEBUG_THROW_IF(num_labels == 0); +// GRN_DAT_DEBUG_THROW_IF(num_labels > (DOUBLE_ARRAY_MAX_LABEL + 1)); + + const uint64_t src_offset = nodes_[node_id].offset(); +// GRN_DAT_DEBUG_THROW_IF(src_offset == DOUBLE_ARRAY_INVALID_OFFSET); +// GRN_DAT_DEBUG_THROW_IF(!nodes_[src_offset].is_origin()); + + for (uint16_t i = 0; i < num_labels; ++i) { + const uint64_t src_node_id = src_offset ^ labels[i]; + const uint64_t dest_node_id = dest_offset ^ labels[i]; +// GRN_DAT_DEBUG_THROW_IF(ith_node(src_node_id).is_phantom()); +// GRN_DAT_DEBUG_THROW_IF(ith_node(src_node_id).label() != labels[i]); + + reserve_node(dest_node_id); + DoubleArrayNode dest_node = nodes_[src_node_id]; + dest_node.set_is_origin(nodes_[dest_node_id].is_origin()); + nodes_[dest_node_id] = dest_node; + } + header_->set_num_zombies(header_->num_zombies() + num_labels); + +// GRN_DAT_DEBUG_THROW_IF(nodes_[dest_offset].is_origin()); + nodes_[dest_offset].set_is_origin(true); + nodes_[node_id].set_offset(dest_offset); +} + +uint64_t DoubleArrayImpl::find_offset(const uint16_t *labels, + uint16_t num_labels) { +// GRN_DAT_DEBUG_THROW_IF(labels == NULL); +// GRN_DAT_DEBUG_THROW_IF(num_labels == 0); +// GRN_DAT_DEBUG_THROW_IF(num_labels > (MAX_LABEL + 1)); + + // Chunks are tested in descending order of level. Basically, lower level + // chunks contain more phantom nodes. + uint32_t level = 1; + while (num_labels >= (1U << level)) { + ++level; + } + level = (level < DOUBLE_ARRAY_MAX_CHUNK_LEVEL) ? + (DOUBLE_ARRAY_MAX_CHUNK_LEVEL - level) : 0; + + uint64_t chunk_count = 0; + do { + uint64_t leader = header_->ith_leader(level); + if (leader == DOUBLE_ARRAY_INVALID_LEADER) { + // This level group is skipped because it is empty. + continue; + } + + uint64_t chunk_id = leader; + do { + const DoubleArrayChunk &chunk = chunks_[chunk_id]; +// GRN_DAT_DEBUG_THROW_IF(chunk.level() != level); + + const uint64_t first = + (chunk_id * DOUBLE_ARRAY_CHUNK_SIZE) | chunk.first_phantom(); + uint64_t node_id = first; + do { +// GRN_DAT_DEBUG_THROW_IF(!nodes_[node_id=]).is_phantom()); + const uint64_t offset = node_id ^ labels[0]; + if (!nodes_[offset].is_origin()) { + uint16_t i = 1; + for ( ; i < num_labels; ++i) { + if (!nodes_[offset ^ labels[i]].is_phantom()) { + break; + } + } + if (i >= num_labels) { + return offset; + } + } + node_id = (chunk_id * DOUBLE_ARRAY_CHUNK_SIZE) | nodes_[node_id].next(); + } while (node_id != first); + + const uint64_t prev = chunk_id; + const uint64_t next = chunk.next(); + chunk_id = next; + chunks_[prev].set_failure_count(chunks_[prev].failure_count() + 1); + + // The level of a chunk is updated when this function fails many times, + // actually DOUBLE_ARRAY_MAX_FAILURE_COUNT times, in that chunk. + if (chunks_[prev].failure_count() == DOUBLE_ARRAY_MAX_FAILURE_COUNT) { + update_chunk_level(prev, level + 1); + if (next == leader) { + break; + } else { + // Note that the leader might be updated in the level update. + leader = header_->ith_leader(level); + continue; + } + } + } while ((++chunk_count < DOUBLE_ARRAY_MAX_CHUNK_COUNT) && + (chunk_id != leader)); + } while ((chunk_count < DOUBLE_ARRAY_MAX_CHUNK_COUNT) && (level-- != 0)); + + return header_->num_nodes() ^ labels[0]; +} + void DoubleArrayImpl::reserve_node(uint64_t node_id) { if (node_id >= header_->num_nodes()) { reserve_chunk(node_id / DOUBLE_ARRAY_CHUNK_SIZE); Modified: lib/alpha/double_array.hpp (+43 -33) =================================================================== --- lib/alpha/double_array.hpp 2013-01-20 15:18:02 +0900 (aa7dc40) +++ lib/alpha/double_array.hpp 2013-01-21 22:10:20 +0900 (05d48d9) @@ -32,6 +32,10 @@ extern struct DoubleArrayOpen {} DOUBLE_ARRAY_OPEN; constexpr uint64_t DOUBLE_ARRAY_INVALID_ID = 0xFFFFFFFFFFULL; constexpr uint64_t DOUBLE_ARRAY_INVALID_OFFSET = 0; +constexpr uint16_t DOUBLE_ARRAY_TERMINAL_LABEL = 0x100; +constexpr uint16_t DOUBLE_ARRAY_MAX_LABEL = 0x100; +constexpr uint16_t DOUBLE_ARRAY_INVALID_LABEL = 0x1FF; + constexpr uint64_t DOUBLE_ARRAY_CHUNK_SIZE = 0x200; constexpr uint64_t DOUBLE_ARRAY_CHUNK_MASK = 0x1FF; @@ -270,17 +274,18 @@ class DoubleArrayNode { // This node is valid (false) or not (true). bool is_phantom() const { // 1 bit. - return qword_ & IS_PHANTOM_FLAG; + return (qword_ & IS_PHANTOM_FLAG) && (~qword_ & IS_LEAF_FLAG); } // This node is associated with a key (true) or not (false). bool is_leaf() const { // 1 bit. return qword_ & IS_LEAF_FLAG; } - // A child of this node is a leaf node (true) or not (false). + // This node is a leaf node with a valid label (false) or not (true). bool is_terminal() const { // 1 bit. - return qword_ & IS_TERMINAL_FLAG; + return (qword_ & (IS_PHANTOM_FLAG | IS_LEAF_FLAG)) == + (IS_PHANTOM_FLAG | IS_LEAF_FLAG); } void set_is_origin(bool value) { @@ -292,23 +297,23 @@ class DoubleArrayNode { } void set_is_phantom(bool value) { if (value) { - qword_ &= ~IS_PHANTOM_FLAG; + qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); } else { - qword_ |= IS_PHANTOM_FLAG; + qword_ = (qword_ | IS_PHANTOM_FLAG) & ~IS_LEAF_FLAG; } } void set_is_leaf(bool value) { if (value) { - qword_ &= ~IS_LEAF_FLAG; + qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); } else { - qword_ |= IS_LEAF_FLAG; + qword_ = (qword_ | IS_LEAF_FLAG) & ~IS_PHANTOM_FLAG; } } void set_is_terminal(bool value) { if (value) { - qword_ &= ~IS_TERMINAL_FLAG; + qword_ &= ~(IS_PHANTOM_FLAG | IS_LEAF_FLAG); } else { - qword_ |= IS_TERMINAL_FLAG; + qword_ |= IS_PHANTOM_FLAG | IS_LEAF_FLAG; } } @@ -359,15 +364,6 @@ class DoubleArrayNode { (KEY_LENGTH_MASK << KEY_LENGTH_SHIFT))) | (key_pos << KEY_POS_SHIFT) | (key_length << KEY_LENGTH_SHIFT); } - // FIXME: may not be required. -// void set_key_pos(uint64_t value) { -// qword_ = (qword_ & ~(KEY_POS_MASK << KEY_POS_SHIFT)) | -// (value << KEY_POS_SHIFT); -// } -// void set_key_length(uint64_t value) { -// qword_ = (qword_ & ~(KEY_LENGTH_MASK << KEY_LENGTH_SHIFT)) | -// (value << KEY_LENGTH_SHIFT); -// } // A non-phantom and non-leaf node stores the offset to its children, // the label of its next sibling, and the label of its first child. @@ -375,12 +371,12 @@ class DoubleArrayNode { // 36 bits. return (qword_ >> 8) & ((uint64_t(1) << 36) - 1); } - uint8_t child() const { - // 8 bits. + uint16_t child() const { + // 9 bits. return static_cast<uint8_t>(qword_ >> 44); } - uint8_t sibling() const { - // 8 bits. + uint16_t sibling() const { + // 9 bits. return static_cast<uint8_t>(qword_ >> 52); } @@ -400,10 +396,9 @@ class DoubleArrayNode { private: uint64_t qword_; - static constexpr uint64_t IS_ORIGIN_FLAG = uint64_t(1) << 63; - static constexpr uint64_t IS_PHANTOM_FLAG = uint64_t(1) << 62; - static constexpr uint64_t IS_LEAF_FLAG = uint64_t(1) << 61; - static constexpr uint64_t IS_TERMINAL_FLAG = uint64_t(1) << 60; + static constexpr uint64_t IS_ORIGIN_FLAG = uint64_t(1) << 63; + static constexpr uint64_t IS_PHANTOM_FLAG = uint64_t(1) << 62; + static constexpr uint64_t IS_LEAF_FLAG = uint64_t(1) << 61; static constexpr uint64_t NEXT_MASK = 0x1FF; static constexpr uint8_t NEXT_SHIFT = 0; @@ -412,16 +407,16 @@ class DoubleArrayNode { static constexpr uint64_t LABEL_MASK = 0xFF; - static constexpr uint64_t KEY_POS_MASK = (uint64_t(1) << 40) - 1; + static constexpr uint64_t KEY_POS_MASK = (uint64_t(1) << 41) - 1; static constexpr uint8_t KEY_POS_SHIFT = 8; static constexpr uint64_t KEY_LENGTH_MASK = (uint64_t(1) << 12) - 1; - static constexpr uint8_t KEY_LENGTH_SHIFT = 48; + static constexpr uint8_t KEY_LENGTH_SHIFT = 49; - static constexpr uint64_t OFFSET_MASK = (uint64_t(1) << 36) - 1; + static constexpr uint64_t OFFSET_MASK = (uint64_t(1) << 35) - 1; static constexpr uint8_t OFFSET_SHIFT = 8; - static constexpr uint64_t CHILD_MASK = (uint64_t(1) << 8) - 1; - static constexpr uint8_t CHILD_SHIFT = 44; - static constexpr uint64_t SIBLING_MASK = (uint64_t(1) << 8) - 1; + static constexpr uint64_t CHILD_MASK = (uint64_t(1) << 9) - 1; + static constexpr uint8_t CHILD_SHIFT = 43; + static constexpr uint64_t SIBLING_MASK = (uint64_t(1) << 9) - 1; static constexpr uint8_t SIBLING_SHIFT = 52; }; @@ -536,10 +531,13 @@ class DoubleArrayKey { DoubleArrayKey(uint64_t id, const void *address, uint64_t length); explicit operator bool() const { - // FIXME: Magic number. return id() != DOUBLE_ARRAY_INVALID_ID; } + const uint8_t &operator[](uint64_t i) const { + return buf_[i]; + } + uint64_t id() const { return id_low_ | (static_cast<uint64_t>(id_high_) << 32); } @@ -626,8 +624,20 @@ class DoubleArrayImpl { bool search_leaf(const uint8_t *ptr, uint64_t length, uint64_t &node_id, uint64_t &query_pos); + bool insert_leaf(const uint8_t *ptr, uint64_t length, + uint64_t &node_id, uint64_t query_pos); + + uint64_t insert_node(uint64_t node_id, uint16_t label); uint64_t append_key(const uint8_t *ptr, uint64_t length, uint64_t key_id); + uint64_t separate(const uint8_t *ptr, uint64_t length, + uint64_t node_id, uint64_t i); + void resolve(uint64_t node_id, uint16_t label); + void migrate_nodes(uint64_t node_id, uint64_t dest_offset, + const uint16_t *labels, uint16_t num_labels); + + uint64_t find_offset(const uint16_t *labels, uint16_t num_labels); + void reserve_node(uint64_t node_id); void reserve_chunk(uint64_t chunk_id); -------------- next part -------------- HTML����������������������������...Télécharger