Kouhei Sutou
null+****@clear*****
Mon May 15 12:13:34 JST 2017
Kouhei Sutou 2017-05-15 12:13:34 +0900 (Mon, 15 May 2017) New Revision: a6f4d521a2e40267277b98ae9eaf646c12a96b34 https://github.com/groonga/groonga/commit/a6f4d521a2e40267277b98ae9eaf646c12a96b34 Message: Support Apache Arrow GitHub: fix #691 Support dumping to and loading from Apache Arrow file format.f Added files: groonga-arrow.pc.in include/groonga.hpp include/groonga/arrow.h include/groonga/arrow.hpp lib/arrow.cpp Modified files: .gitignore Makefile.am configure.ac include/Makefile.am include/groonga.h include/groonga/Makefile.am lib/Makefile.am lib/sources.am Modified: .gitignore (+1 -0) =================================================================== --- .gitignore 2017-05-14 22:20:13 +0900 (04ccca2) +++ .gitignore 2017-05-15 12:13:34 +0900 (7a1817b) @@ -34,6 +34,7 @@ cmake_install.cmake /missing /test-driver /groonga.pc +/groonga-arrow.pc /groonga-httpd-conf.sh /data/groonga-httpd.conf /data/logrotate.d/centos/groonga-httpd Modified: Makefile.am (+3 -0) =================================================================== --- Makefile.am 2017-05-14 22:20:13 +0900 (0721be8) +++ Makefile.am 2017-05-15 12:13:34 +0900 (1fc7028) @@ -34,6 +34,9 @@ EXTRA_DIST = \ pkgconfigdir = $(libdir)/pkgconfig pkgconfig_DATA = groonga.pc +if GRN_WITH_ARROW +pkgconfig_DATA += groonga-arrow.pc +endif .PHONY: FORCE Modified: configure.ac (+25 -0) =================================================================== --- configure.ac 2017-05-14 22:20:13 +0900 (4362386) +++ configure.ac 2017-05-15 12:13:34 +0900 (5cafef0) @@ -960,6 +960,30 @@ if test "x$with_jemalloc" != "xno"; then [AC_MSG_ERROR("No libjemalloc found")]) fi +# Apache Arrow +AC_ARG_ENABLE(arrow, + [AS_HELP_STRING([--disable-arrow], + [enable Apache Arrow support. [default=auto-detect]])], + [enable_arrow="$enableval"], + [enable_arrow="auto"]) +if test "x$enable_arrow" != "xno"; then + m4_ifdef([PKG_CHECK_MODULES], [ + PKG_CHECK_MODULES([ARROW], + [arrow], + [arrow_available=yes], + [arrow_available=no]) + ], + [arrow_available=no]) + if test "x$arrow_available" = "xyes"; then + AC_DEFINE(GRN_WITH_ARROW, [1], [Enable Apache Arrow support.]) + else + if test "x$enable_arrow" = "xyes"; then + AC_MSG_ERROR("No Apache Arrow found") + fi + fi +fi +AM_CONDITIONAL([GRN_WITH_ARROW], [test "$arrow_available" = "yes"]) + # MeCab # NOTE: MUST be checked last @@ -1704,6 +1728,7 @@ AC_OUTPUT([ packages/apt/env.sh packages/yum/env.sh groonga.pc + groonga-arrow.pc config.sh groonga-httpd-conf.sh data/groonga-httpd.conf Added: groonga-arrow.pc.in (+4 -0) 100644 =================================================================== --- /dev/null +++ groonga-arrow.pc.in 2017-05-15 12:13:34 +0900 (d0b22f6) @@ -0,0 +1,4 @@ +Name: Groonga Arrow +Description: Apache Arrow support for Groonga +Version: @VERSION@ +Requires: groonga arrow Modified: include/Makefile.am (+3 -1) =================================================================== --- include/Makefile.am 2017-05-14 22:20:13 +0900 (19ced0d) +++ include/Makefile.am 2017-05-15 12:13:34 +0900 (c7dee71) @@ -1,6 +1,8 @@ SUBDIRS = groonga -pkginclude_HEADERS = groonga.h +pkginclude_HEADERS = \ + groonga.h \ + groonga.hpp EXTRA_DIST = \ CMakeLists.txt Modified: include/groonga.h (+1 -0) =================================================================== --- include/groonga.h 2017-05-14 22:20:13 +0900 (9df6247) +++ include/groonga.h 2017-05-15 12:13:34 +0900 (6e1be29) @@ -23,6 +23,7 @@ #include "groonga/accessor.h" #include "groonga/array.h" +#include "groonga/arrow.h" #include "groonga/cache.h" #include "groonga/column.h" #include "groonga/config.h" Added: include/groonga.hpp (+21 -0) 100644 =================================================================== --- /dev/null +++ include/groonga.hpp 2017-05-15 12:13:34 +0900 (3d8313b) @@ -0,0 +1,21 @@ +/* + Copyright(C) 2017 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#pragma once + +#include "groonga.h" Modified: include/groonga/Makefile.am (+2 -0) =================================================================== --- include/groonga/Makefile.am 2017-05-14 22:20:13 +0900 (28f8ba2) +++ include/groonga/Makefile.am 2017-05-15 12:13:34 +0900 (7cc4d56) @@ -2,6 +2,8 @@ groonga_includedir = $(pkgincludedir)/groonga groonga_include_HEADERS = \ accessor.h \ array.h \ + arrow.h \ + arrow.hpp \ cache.h \ column.h \ command.h \ Added: include/groonga/arrow.h (+34 -0) 100644 =================================================================== --- /dev/null +++ include/groonga/arrow.h 2017-05-15 12:13:34 +0900 (57f3abf) @@ -0,0 +1,34 @@ +/* + Copyright(C) 2017 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#pragma once + +#ifdef __cplusplus +extern "C" { +#endif + +GRN_API grn_rc grn_arrow_load(grn_ctx *ctx, + grn_obj *table, + const char *path); +GRN_API grn_rc grn_arrow_dump(grn_ctx *ctx, + grn_obj *table, + const char *path); + +#ifdef __cplusplus +} +#endif Added: include/groonga/arrow.hpp (+21 -0) 100644 =================================================================== --- /dev/null +++ include/groonga/arrow.hpp 2017-05-15 12:13:34 +0900 (a35a4ab) @@ -0,0 +1,21 @@ +/* + Copyright(C) 2017 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#pragma once + +#include <groonga.hpp> Modified: lib/Makefile.am (+8 -1) =================================================================== --- lib/Makefile.am 2017-05-14 22:20:13 +0900 (4cf2ac9) +++ lib/Makefile.am 2017-05-15 12:13:34 +0900 (7a7281e) @@ -19,6 +19,12 @@ AM_CFLAGS = \ $(LIBLZ4_CFLAGS) \ $(LIBZSTD_CFLAGS) +AM_CXXFLAGS = \ + $(NO_STRICT_ALIASING_CFLAGS) \ + $(COVERAGE_CFLAGS) \ + $(GRN_CXXFLAGS) \ + $(ARROW_CFLAGS) + BUNDLED_LIBRARIES_CFLAGS = \ $(MRUBY_CFLAGS) \ $(ONIGMO_CFLAGS) @@ -56,7 +62,8 @@ libgroonga_la_LIBADD += \ $(ONIGMO_LIBS) \ $(LIBLZ4_LIBS) \ $(LIBZSTD_LIBS) \ - $(ATOMIC_LIBS) + $(ATOMIC_LIBS) \ + $(ARROW_LIBS) if WITH_LEMON BUILT_SOURCES = \ Added: lib/arrow.cpp (+823 -0) 100644 =================================================================== --- /dev/null +++ lib/arrow.cpp 2017-05-15 12:13:34 +0900 (ec3d35a) @@ -0,0 +1,823 @@ +/* -*- c-basic-offset: 2 -*- */ +/* + Copyright(C) 2017 Brazil + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License version 2.1 as published by the Free Software Foundation. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, write to the Free Software + Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +*/ + +#include "grn.h" +#include "grn_db.h" + +#ifdef GRN_WITH_ARROW +#include <groonga/arrow.hpp> + +#include <arrow/api.h> +#include <arrow/io/file.h> +#include <arrow/ipc/api.h> + +#include <sstream> + +namespace grnarrow { + grn_rc status_to_rc(arrow::Status &status) { + switch (status.code()) { + case arrow::StatusCode::OK: + return GRN_SUCCESS; + case arrow::StatusCode::OutOfMemory: + return GRN_NO_MEMORY_AVAILABLE; + case arrow::StatusCode::KeyError: + return GRN_INVALID_ARGUMENT; // TODO + case arrow::StatusCode::TypeError: + return GRN_INVALID_ARGUMENT; // TODO + case arrow::StatusCode::Invalid: + return GRN_INVALID_ARGUMENT; + case arrow::StatusCode::IOError: + return GRN_INPUT_OUTPUT_ERROR; + case arrow::StatusCode::UnknownError: + return GRN_UNKNOWN_ERROR; + case arrow::StatusCode::NotImplemented: + return GRN_FUNCTION_NOT_IMPLEMENTED; + default: + return GRN_UNKNOWN_ERROR; + } + } + + grn_bool check_status(grn_ctx *ctx, + arrow::Status &status, + const char *context) { + if (status.ok()) { + return GRN_TRUE; + } else { + auto rc = status_to_rc(status); + auto message = status.ToString(); + ERR(rc, "%s: %s", context, message.c_str()); + return GRN_FALSE; + } + } + + grn_bool check_status(grn_ctx *ctx, + arrow::Status &status, + std::ostream &output) { + return check_status(ctx, + status, + static_cast<std::stringstream &>(output).str().c_str()); + } + + class ColumnLoadVisitor : public arrow::ArrayVisitor { + public: + ColumnLoadVisitor(grn_ctx *ctx, + grn_obj *grn_table, + std::shared_ptr<arrow::Column> &arrow_column, + const grn_id *ids) + : ctx_(ctx), + grn_table_(grn_table), + ids_(ids), + time_unit_(arrow::TimeUnit::SECOND) { + auto column_name = arrow_column->name(); + grn_column_ = grn_obj_column(ctx_, grn_table_, + column_name.data(), + column_name.size()); + + auto arrow_type = arrow_column->type(); + grn_id type_id; + switch (arrow_type->id()) { + case arrow::Type::BOOL : + type_id = GRN_DB_BOOL; + break; + case arrow::Type::UINT8 : + type_id = GRN_DB_UINT8; + break; + case arrow::Type::INT8 : + type_id = GRN_DB_INT8; + break; + case arrow::Type::UINT16 : + type_id = GRN_DB_UINT16; + break; + case arrow::Type::INT16 : + type_id = GRN_DB_INT16; + break; + case arrow::Type::UINT32 : + type_id = GRN_DB_UINT32; + break; + case arrow::Type::INT32 : + type_id = GRN_DB_INT32; + break; + case arrow::Type::UINT64 : + type_id = GRN_DB_UINT64; + break; + case arrow::Type::INT64 : + type_id = GRN_DB_INT64; + break; + case arrow::Type::HALF_FLOAT : + case arrow::Type::FLOAT : + case arrow::Type::DOUBLE : + type_id = GRN_DB_FLOAT; + break; + case arrow::Type::STRING : + type_id = GRN_DB_TEXT; + break; + case arrow::Type::DATE64 : + type_id = GRN_DB_TIME; + break; + case arrow::Type::TIMESTAMP : + type_id = GRN_DB_TIME; + { + auto arrow_timestamp_type = + std::static_pointer_cast<arrow::TimestampType>(arrow_type); + time_unit_ = arrow_timestamp_type->unit(); + } + break; + default : + type_id = GRN_DB_VOID; + break; + } + + if (type_id == GRN_DB_VOID) { + // TODO + return; + } + + if (!grn_column_) { + grn_column_ = grn_column_create(ctx_, + grn_table_, + column_name.data(), + column_name.size(), + NULL, + GRN_OBJ_COLUMN_SCALAR, + grn_ctx_at(ctx_, type_id)); + } + if (type_id == GRN_DB_TEXT) { + GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY); + } else { + GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id); + } + } + + ~ColumnLoadVisitor() { + if (grn_obj_is_accessor(ctx_, grn_column_)) { + grn_obj_unlink(ctx_, grn_column_); + } + GRN_OBJ_FIN(ctx_, &buffer_); + } + + arrow::Status Visit(const arrow::BooleanArray &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::Int8Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::UInt8Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::Int16Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::UInt16Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::Int32Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::UInt32Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::Int64Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::UInt64Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::HalfFloatArray &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::FloatArray &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::DoubleArray &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::StringArray &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::Date64Array &array) { + return set_values(array); + } + + arrow::Status Visit(const arrow::TimestampArray &array) { + return set_values(array); + } + + private: + grn_ctx *ctx_; + grn_obj *grn_table_; + const grn_id *ids_; + arrow::TimeUnit::type time_unit_; + grn_obj *grn_column_; + grn_obj buffer_; + + template <typename T> + arrow::Status set_values(const T &array) { + int64_t n_rows = array.length(); + for (int i = 0; i < n_rows; ++i) { + auto id = ids_[i]; + GRN_BULK_REWIND(&buffer_); + get_value(array, i); + grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET); + } + return arrow::Status::OK(); + } + + void + get_value(const arrow::BooleanArray &array, int i) { + GRN_BOOL_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::UInt8Array &array, int i) { + GRN_UINT8_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::Int8Array &array, int i) { + GRN_INT8_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::UInt16Array &array, int i) { + GRN_UINT16_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::Int16Array &array, int i) { + GRN_INT16_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::UInt32Array &array, int i) { + GRN_UINT32_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::Int32Array &array, int i) { + GRN_INT32_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::UInt64Array &array, int i) { + GRN_UINT64_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::Int64Array &array, int i) { + GRN_INT64_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::HalfFloatArray &array, int i) { + GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::FloatArray &array, int i) { + GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::DoubleArray &array, int i) { + GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::StringArray &array, int i) { + int32_t size; + const auto data = array.GetValue(i, &size); + GRN_TEXT_SET(ctx_, &buffer_, data, size); + } + + void + get_value(const arrow::Date64Array &array, int i) { + GRN_TIME_SET(ctx_, &buffer_, array.Value(i)); + } + + void + get_value(const arrow::TimestampArray &array, int i) { + switch (time_unit_) { + case arrow::TimeUnit::SECOND : + GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0)); + break; + case arrow::TimeUnit::MILLI : + GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000); + break; + case arrow::TimeUnit::MICRO : + GRN_TIME_SET(ctx_, &buffer_, array.Value(i)); + break; + case arrow::TimeUnit::NANO : + GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000); + break; + } + } + }; + + class FileLoader { + public: + FileLoader(grn_ctx *ctx, grn_obj *grn_table) + : ctx_(ctx), + grn_table_(grn_table), + key_column_name_("") { + } + + ~FileLoader() { + } + + grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) { + int n_columns = arrow_table->num_columns(); + + if (key_column_name_.empty()) { + grn_obj ids; + GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_)); + auto n_records = arrow_table->num_rows(); + for (int64_t i = 0; i < n_records; ++i) { + auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL); + GRN_RECORD_PUT(ctx_, &ids, id); + } + for (int i = 0; i < n_columns; ++i) { + int64_t offset = 0; + auto arrow_column = arrow_table->column(i); + auto arrow_chunked_data = arrow_column->data(); + for (auto arrow_array : arrow_chunked_data->chunks()) { + grn_id *sub_ids = + reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset; + ColumnLoadVisitor visitor(ctx_, + grn_table_, + arrow_column, + sub_ids); + arrow_array->Accept(&visitor); + offset += arrow_array->length(); + } + } + GRN_OBJ_FIN(ctx_, &ids); + } else { + auto status = arrow::Status::NotImplemented("_key isn't supported yet"); + check_status(ctx_, status, "[arrow][load]"); + } + return ctx_->rc; + }; + + grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) { + std::shared_ptr<arrow::Table> arrow_table; + std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1); + arrow_record_batches[0] = arrow_record_batch; + auto status = + arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table); + if (!check_status(ctx_, + status, + "[arrow][load] " + "failed to convert record batch to table")) { + return ctx_->rc; + } + return load_table(arrow_table); + }; + + private: + grn_ctx *ctx_; + grn_obj *grn_table_; + std::string key_column_name_; + }; + + class FileDumper { + public: + FileDumper(grn_ctx *ctx, grn_obj *grn_table) + : ctx_(ctx), + grn_table_(grn_table) { + grn_columns_ = grn_hash_create(ctx_, + NULL, + sizeof(grn_id), + 0, + GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY); + grn_table_columns(ctx_, + grn_table_, + "", 0, + reinterpret_cast<grn_obj *>(grn_columns_)); + } + + ~FileDumper() { + grn_hash_close(ctx_, grn_columns_); + } + + grn_rc dump(arrow::io::OutputStream *output) { + std::vector<std::shared_ptr<arrow::Field>> fields; + GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) { + void *key; + grn_hash_cursor_get_key(ctx_, cursor, &key); + auto column_id = static_cast<grn_id *>(key); + auto column = grn_ctx_at(ctx_, *column_id); + + char column_name[GRN_TABLE_MAX_KEY_SIZE]; + int column_name_size; + column_name_size = + grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE); + std::string field_name(column_name, column_name_size); + std::shared_ptr<arrow::DataType> field_type; + switch (grn_obj_get_range(ctx_, column)) { + case GRN_DB_BOOL : + field_type = arrow::boolean(); + break; + case GRN_DB_UINT8 : + field_type = arrow::uint8(); + break; + case GRN_DB_INT8 : + field_type = arrow::int8(); + break; + case GRN_DB_UINT16 : + field_type = arrow::uint16(); + break; + case GRN_DB_INT16 : + field_type = arrow::int16(); + break; + case GRN_DB_UINT32 : + field_type = arrow::uint32(); + break; + case GRN_DB_INT32 : + field_type = arrow::int32(); + break; + case GRN_DB_UINT64 : + field_type = arrow::uint64(); + break; + case GRN_DB_INT64 : + field_type = arrow::int64(); + break; + case GRN_DB_FLOAT : + field_type = arrow::float64(); + break; + case GRN_DB_TIME : + field_type = + std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO); + break; + case GRN_DB_SHORT_TEXT : + case GRN_DB_TEXT : + case GRN_DB_LONG_TEXT : + field_type = arrow::utf8(); + break; + default : + break; + } + if (!field_type) { + continue; + } + + auto field = std::make_shared<arrow::Field>(field_name, + field_type, + false); + fields.push_back(field); + } GRN_HASH_EACH_END(ctx_, cursor); + + auto schema = std::make_shared<arrow::Schema>(fields); + + std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer; + auto status = + arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer); + if (!check_status(ctx_, + status, + "[arrow][dump] failed to create file format writer")) { + return ctx_->rc; + } + + std::vector<grn_id> ids; + int n_records_per_batch = 1000; + GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) { + ids.push_back(record_id); + if (ids.size() == n_records_per_batch) { + write_record_batch(ids, schema, writer); + ids.clear(); + } + } GRN_TABLE_EACH_END(ctx_, table_cursor); + if (!ids.empty()) { + write_record_batch(ids, schema, writer); + } + writer->Close(); + + return ctx_->rc; + } + + private: + grn_ctx *ctx_; + grn_obj *grn_table_; + grn_hash *grn_columns_; + + void write_record_batch(std::vector<grn_id> &ids, + std::shared_ptr<arrow::Schema> &schema, + std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) { + std::vector<std::shared_ptr<arrow::Array>> columns; + GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) { + void *key; + grn_hash_cursor_get_key(ctx_, cursor, &key); + auto grn_column_id = static_cast<grn_id *>(key); + auto grn_column = grn_ctx_at(ctx_, *grn_column_id); + + arrow::Status status; + std::shared_ptr<arrow::Array> column; + + switch (grn_obj_get_range(ctx_, grn_column)) { + case GRN_DB_BOOL : + status = build_boolean_array(ids, grn_column, &column); + break; + case GRN_DB_UINT8 : + status = build_uint8_array(ids, grn_column, &column); + break; + case GRN_DB_INT8 : + status = build_int8_array(ids, grn_column, &column); + break; + case GRN_DB_UINT16 : + status = build_uint16_array(ids, grn_column, &column); + break; + case GRN_DB_INT16 : + status = build_int16_array(ids, grn_column, &column); + break; + case GRN_DB_UINT32 : + status = build_uint32_array(ids, grn_column, &column); + break; + case GRN_DB_INT32 : + status = build_int32_array(ids, grn_column, &column); + break; + case GRN_DB_UINT64 : + status = build_uint64_array(ids, grn_column, &column); + break; + case GRN_DB_INT64 : + status = build_int64_array(ids, grn_column, &column); + break; + case GRN_DB_FLOAT : + status = build_double_array(ids, grn_column, &column); + break; + case GRN_DB_TIME : + status = build_timestamp_array(ids, grn_column, &column); + break; + case GRN_DB_SHORT_TEXT : + case GRN_DB_TEXT : + case GRN_DB_LONG_TEXT : + status = build_utf8_array(ids, grn_column, &column); + break; + default : + status = + arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO"); + break; + } + if (!status.ok()) { + continue; + } + columns.push_back(column); + } GRN_HASH_EACH_END(ctx_, cursor); + + arrow::RecordBatch record_batch(schema, ids.size(), columns); + writer->WriteRecordBatch(record_batch); + } + + arrow::Status build_boolean_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::BooleanBuilder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const grn_bool *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_uint8_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::UInt8Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const uint8_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_int8_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::Int8Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const int8_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_uint16_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::UInt16Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const uint16_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_int16_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::Int16Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const int16_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_uint32_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::UInt32Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const uint32_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_int32_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::Int32Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const int32_t *>(data))); + } + return builder.Finish(array); + } + arrow::Status build_uint64_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::UInt64Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const uint64_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_int64_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::Int64Builder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const int64_t *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_double_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::DoubleBuilder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(*(reinterpret_cast<const double *>(data))); + } + return builder.Finish(array); + } + + arrow::Status build_timestamp_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + auto timestamp_ns_data_type = + std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO); + arrow::TimestampBuilder builder(arrow::default_memory_pool(), + timestamp_ns_data_type); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data)); + builder.Append(timestamp_ns); + } + return builder.Finish(array); + } + + arrow::Status build_utf8_array(std::vector<grn_id> &ids, + grn_obj *grn_column, + std::shared_ptr<arrow::Array> *array) { + arrow::StringBuilder builder(arrow::default_memory_pool()); + for (auto id : ids) { + uint32_t size; + auto data = grn_obj_get_value_(ctx_, grn_column, id, &size); + builder.Append(data, size); + } + return builder.Finish(array); + } + }; +} +#endif /* GRN_WITH_ARROW */ + +extern "C" { +grn_rc +grn_arrow_load(grn_ctx *ctx, + grn_obj *table, + const char *path) +{ + GRN_API_ENTER; +#ifdef GRN_WITH_ARROW + std::shared_ptr<arrow::io::MemoryMappedFile> input; + auto status = + arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input); + if (!grnarrow::check_status(ctx, + status, + std::ostringstream() << + "[arrow][load] failed to open path: " << + "<" << path << ">")) { + GRN_API_RETURN(ctx->rc); + } + std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader; + status = arrow::ipc::RecordBatchFileReader::Open(input, &reader); + if (!grnarrow::check_status(ctx, + status, + "[arrow][load] " + "failed to create file format reader")) { + GRN_API_RETURN(ctx->rc); + } + + grnarrow::FileLoader loader(ctx, table); + int n_record_batches = reader->num_record_batches(); + for (int i = 0; i < n_record_batches; ++i) { + std::shared_ptr<arrow::RecordBatch> record_batch; + status = reader->GetRecordBatch(i, &record_batch); + if (!grnarrow::check_status(ctx, + status, + std::ostringstream("") << + "[arrow][load] failed to get " << + "the " << i << "-th " << "record")) { + break; + } + loader.load_record_batch(record_batch); + if (ctx->rc != GRN_SUCCESS) { + break; + } + } +#else /* GRN_WITH_ARROW */ + ERR(GRN_FUNCTION_NOT_IMPLEMENTED, + "[arrow][load] Apache Arrow support isn't enabled"); +#endif /* GRN_WITH_ARROW */ + GRN_API_RETURN(ctx->rc); +} + +grn_rc +grn_arrow_dump(grn_ctx *ctx, + grn_obj *table, + const char *path) +{ + GRN_API_ENTER; +#ifdef GRN_WITH_ARROW + std::shared_ptr<arrow::io::FileOutputStream> output; + auto status = arrow::io::FileOutputStream::Open(path, &output); + if (!grnarrow::check_status(ctx, + status, + std::stringstream() << + "[arrow][dump] failed to open path: " << + "<" << path << ">")) { + GRN_API_RETURN(ctx->rc); + } + + grnarrow::FileDumper dumper(ctx, table); + dumper.dump(output.get()); +#else /* GRN_WITH_ARROW */ + ERR(GRN_FUNCTION_NOT_IMPLEMENTED, + "[arrow][dump] Apache Arrow support isn't enabled"); +#endif /* GRN_WITH_ARROW */ + GRN_API_RETURN(ctx->rc); +} +} Modified: lib/sources.am (+1 -0) =================================================================== --- lib/sources.am 2017-05-14 22:20:13 +0900 (02868a0) +++ lib/sources.am 2017-05-15 12:13:34 +0900 (355929a) @@ -1,6 +1,7 @@ libgroonga_la_SOURCES = \ alloc.c \ grn_alloc.h \ + arrow.cpp \ cache.c \ grn_cache.h \ column.c \ -------------- next part -------------- HTML����������������������������...Télécharger