[Groonga-commit] groonga/groonga at a6f4d52 [master] Support Apache Arrow

Back to archive index

Kouhei Sutou null+****@clear*****
Mon May 15 12:13:34 JST 2017


Kouhei Sutou	2017-05-15 12:13:34 +0900 (Mon, 15 May 2017)

  New Revision: a6f4d521a2e40267277b98ae9eaf646c12a96b34
  https://github.com/groonga/groonga/commit/a6f4d521a2e40267277b98ae9eaf646c12a96b34

  Message:
    Support Apache Arrow
    
    GitHub: fix #691
    
    Support dumping to and loading from Apache Arrow file format.f

  Added files:
    groonga-arrow.pc.in
    include/groonga.hpp
    include/groonga/arrow.h
    include/groonga/arrow.hpp
    lib/arrow.cpp
  Modified files:
    .gitignore
    Makefile.am
    configure.ac
    include/Makefile.am
    include/groonga.h
    include/groonga/Makefile.am
    lib/Makefile.am
    lib/sources.am

  Modified: .gitignore (+1 -0)
===================================================================
--- .gitignore    2017-05-14 22:20:13 +0900 (04ccca2)
+++ .gitignore    2017-05-15 12:13:34 +0900 (7a1817b)
@@ -34,6 +34,7 @@ cmake_install.cmake
 /missing
 /test-driver
 /groonga.pc
+/groonga-arrow.pc
 /groonga-httpd-conf.sh
 /data/groonga-httpd.conf
 /data/logrotate.d/centos/groonga-httpd

  Modified: Makefile.am (+3 -0)
===================================================================
--- Makefile.am    2017-05-14 22:20:13 +0900 (0721be8)
+++ Makefile.am    2017-05-15 12:13:34 +0900 (1fc7028)
@@ -34,6 +34,9 @@ EXTRA_DIST =					\
 
 pkgconfigdir = $(libdir)/pkgconfig
 pkgconfig_DATA = groonga.pc
+if GRN_WITH_ARROW
+pkgconfig_DATA += groonga-arrow.pc
+endif
 
 .PHONY: FORCE
 

  Modified: configure.ac (+25 -0)
===================================================================
--- configure.ac    2017-05-14 22:20:13 +0900 (4362386)
+++ configure.ac    2017-05-15 12:13:34 +0900 (5cafef0)
@@ -960,6 +960,30 @@ if test "x$with_jemalloc" != "xno"; then
                  [AC_MSG_ERROR("No libjemalloc found")])
 fi
 
+# Apache Arrow
+AC_ARG_ENABLE(arrow,
+  [AS_HELP_STRING([--disable-arrow],
+    [enable Apache Arrow support. [default=auto-detect]])],
+  [enable_arrow="$enableval"],
+  [enable_arrow="auto"])
+if test "x$enable_arrow" != "xno"; then
+  m4_ifdef([PKG_CHECK_MODULES], [
+    PKG_CHECK_MODULES([ARROW],
+                      [arrow],
+                      [arrow_available=yes],
+                      [arrow_available=no])
+  ],
+  [arrow_available=no])
+  if test "x$arrow_available" = "xyes"; then
+    AC_DEFINE(GRN_WITH_ARROW, [1], [Enable Apache Arrow support.])
+  else
+    if test "x$enable_arrow" = "xyes"; then
+      AC_MSG_ERROR("No Apache Arrow found")
+    fi
+  fi
+fi
+AM_CONDITIONAL([GRN_WITH_ARROW], [test "$arrow_available" = "yes"])
+
 # MeCab
 # NOTE: MUST be checked last
 
@@ -1704,6 +1728,7 @@ AC_OUTPUT([
   packages/apt/env.sh
   packages/yum/env.sh
   groonga.pc
+  groonga-arrow.pc
   config.sh
   groonga-httpd-conf.sh
   data/groonga-httpd.conf

  Added: groonga-arrow.pc.in (+4 -0) 100644
===================================================================
--- /dev/null
+++ groonga-arrow.pc.in    2017-05-15 12:13:34 +0900 (d0b22f6)
@@ -0,0 +1,4 @@
+Name: Groonga Arrow
+Description: Apache Arrow support for Groonga
+Version: @VERSION@
+Requires: groonga arrow

  Modified: include/Makefile.am (+3 -1)
===================================================================
--- include/Makefile.am    2017-05-14 22:20:13 +0900 (19ced0d)
+++ include/Makefile.am    2017-05-15 12:13:34 +0900 (c7dee71)
@@ -1,6 +1,8 @@
 SUBDIRS = groonga
 
-pkginclude_HEADERS = groonga.h
+pkginclude_HEADERS =				\
+	groonga.h				\
+	groonga.hpp
 
 EXTRA_DIST =					\
 	CMakeLists.txt

  Modified: include/groonga.h (+1 -0)
===================================================================
--- include/groonga.h    2017-05-14 22:20:13 +0900 (9df6247)
+++ include/groonga.h    2017-05-15 12:13:34 +0900 (6e1be29)
@@ -23,6 +23,7 @@
 
 #include "groonga/accessor.h"
 #include "groonga/array.h"
+#include "groonga/arrow.h"
 #include "groonga/cache.h"
 #include "groonga/column.h"
 #include "groonga/config.h"

  Added: include/groonga.hpp (+21 -0) 100644
===================================================================
--- /dev/null
+++ include/groonga.hpp    2017-05-15 12:13:34 +0900 (3d8313b)
@@ -0,0 +1,21 @@
+/*
+  Copyright(C) 2017 Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#pragma once
+
+#include "groonga.h"

  Modified: include/groonga/Makefile.am (+2 -0)
===================================================================
--- include/groonga/Makefile.am    2017-05-14 22:20:13 +0900 (28f8ba2)
+++ include/groonga/Makefile.am    2017-05-15 12:13:34 +0900 (7cc4d56)
@@ -2,6 +2,8 @@ groonga_includedir = $(pkgincludedir)/groonga
 groonga_include_HEADERS =			\
 	accessor.h				\
 	array.h					\
+	arrow.h					\
+	arrow.hpp				\
 	cache.h					\
 	column.h				\
 	command.h				\

  Added: include/groonga/arrow.h (+34 -0) 100644
===================================================================
--- /dev/null
+++ include/groonga/arrow.h    2017-05-15 12:13:34 +0900 (57f3abf)
@@ -0,0 +1,34 @@
+/*
+  Copyright(C) 2017 Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#pragma once
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+GRN_API grn_rc grn_arrow_load(grn_ctx *ctx,
+                              grn_obj *table,
+                              const char *path);
+GRN_API grn_rc grn_arrow_dump(grn_ctx *ctx,
+                              grn_obj *table,
+                              const char *path);
+
+#ifdef __cplusplus
+}
+#endif

  Added: include/groonga/arrow.hpp (+21 -0) 100644
===================================================================
--- /dev/null
+++ include/groonga/arrow.hpp    2017-05-15 12:13:34 +0900 (a35a4ab)
@@ -0,0 +1,21 @@
+/*
+  Copyright(C) 2017 Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License as published by the Free Software Foundation; either
+  version 2.1 of the License, or (at your option) any later version.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#pragma once
+
+#include <groonga.hpp>

  Modified: lib/Makefile.am (+8 -1)
===================================================================
--- lib/Makefile.am    2017-05-14 22:20:13 +0900 (4cf2ac9)
+++ lib/Makefile.am    2017-05-15 12:13:34 +0900 (7a7281e)
@@ -19,6 +19,12 @@ AM_CFLAGS =					\
 	$(LIBLZ4_CFLAGS)			\
 	$(LIBZSTD_CFLAGS)
 
+AM_CXXFLAGS =					\
+	$(NO_STRICT_ALIASING_CFLAGS)		\
+	$(COVERAGE_CFLAGS)			\
+	$(GRN_CXXFLAGS)				\
+	$(ARROW_CFLAGS)
+
 BUNDLED_LIBRARIES_CFLAGS =			\
 	$(MRUBY_CFLAGS)				\
 	$(ONIGMO_CFLAGS)
@@ -56,7 +62,8 @@ libgroonga_la_LIBADD +=				\
 	$(ONIGMO_LIBS)				\
 	$(LIBLZ4_LIBS)				\
 	$(LIBZSTD_LIBS)				\
-	$(ATOMIC_LIBS)
+	$(ATOMIC_LIBS)				\
+	$(ARROW_LIBS)
 
 if WITH_LEMON
 BUILT_SOURCES =					\

  Added: lib/arrow.cpp (+823 -0) 100644
===================================================================
--- /dev/null
+++ lib/arrow.cpp    2017-05-15 12:13:34 +0900 (ec3d35a)
@@ -0,0 +1,823 @@
+/* -*- c-basic-offset: 2 -*- */
+/*
+  Copyright(C) 2017 Brazil
+
+  This library is free software; you can redistribute it and/or
+  modify it under the terms of the GNU Lesser General Public
+  License version 2.1 as published by the Free Software Foundation.
+
+  This library is distributed in the hope that it will be useful,
+  but WITHOUT ANY WARRANTY; without even the implied warranty of
+  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+  Lesser General Public License for more details.
+
+  You should have received a copy of the GNU Lesser General Public
+  License along with this library; if not, write to the Free Software
+  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
+*/
+
+#include "grn.h"
+#include "grn_db.h"
+
+#ifdef GRN_WITH_ARROW
+#include <groonga/arrow.hpp>
+
+#include <arrow/api.h>
+#include <arrow/io/file.h>
+#include <arrow/ipc/api.h>
+
+#include <sstream>
+
+namespace grnarrow {
+  grn_rc status_to_rc(arrow::Status &status) {
+    switch (status.code()) {
+    case arrow::StatusCode::OK:
+      return GRN_SUCCESS;
+    case arrow::StatusCode::OutOfMemory:
+      return GRN_NO_MEMORY_AVAILABLE;
+    case arrow::StatusCode::KeyError:
+      return GRN_INVALID_ARGUMENT; // TODO
+    case arrow::StatusCode::TypeError:
+      return GRN_INVALID_ARGUMENT; // TODO
+    case arrow::StatusCode::Invalid:
+      return GRN_INVALID_ARGUMENT;
+    case arrow::StatusCode::IOError:
+      return GRN_INPUT_OUTPUT_ERROR;
+    case arrow::StatusCode::UnknownError:
+      return GRN_UNKNOWN_ERROR;
+    case arrow::StatusCode::NotImplemented:
+      return GRN_FUNCTION_NOT_IMPLEMENTED;
+    default:
+      return GRN_UNKNOWN_ERROR;
+    }
+  }
+
+  grn_bool check_status(grn_ctx *ctx,
+                        arrow::Status &status,
+                        const char *context) {
+    if (status.ok()) {
+      return GRN_TRUE;
+    } else {
+      auto rc = status_to_rc(status);
+      auto message = status.ToString();
+      ERR(rc, "%s: %s", context, message.c_str());
+      return GRN_FALSE;
+    }
+  }
+
+  grn_bool check_status(grn_ctx *ctx,
+                        arrow::Status &status,
+                        std::ostream &output) {
+    return check_status(ctx,
+                        status,
+                        static_cast<std::stringstream &>(output).str().c_str());
+  }
+
+  class ColumnLoadVisitor : public arrow::ArrayVisitor {
+  public:
+    ColumnLoadVisitor(grn_ctx *ctx,
+                      grn_obj *grn_table,
+                      std::shared_ptr<arrow::Column> &arrow_column,
+                      const grn_id *ids)
+      : ctx_(ctx),
+        grn_table_(grn_table),
+        ids_(ids),
+        time_unit_(arrow::TimeUnit::SECOND) {
+      auto column_name = arrow_column->name();
+      grn_column_ = grn_obj_column(ctx_, grn_table_,
+                                   column_name.data(),
+                                   column_name.size());
+
+      auto arrow_type = arrow_column->type();
+      grn_id type_id;
+      switch (arrow_type->id()) {
+      case arrow::Type::BOOL :
+        type_id = GRN_DB_BOOL;
+        break;
+      case arrow::Type::UINT8 :
+        type_id = GRN_DB_UINT8;
+        break;
+      case arrow::Type::INT8 :
+        type_id = GRN_DB_INT8;
+        break;
+      case arrow::Type::UINT16 :
+        type_id = GRN_DB_UINT16;
+        break;
+      case arrow::Type::INT16 :
+        type_id = GRN_DB_INT16;
+        break;
+      case arrow::Type::UINT32 :
+        type_id = GRN_DB_UINT32;
+        break;
+      case arrow::Type::INT32 :
+        type_id = GRN_DB_INT32;
+        break;
+      case arrow::Type::UINT64 :
+        type_id = GRN_DB_UINT64;
+        break;
+      case arrow::Type::INT64 :
+        type_id = GRN_DB_INT64;
+        break;
+      case arrow::Type::HALF_FLOAT :
+      case arrow::Type::FLOAT :
+      case arrow::Type::DOUBLE :
+        type_id = GRN_DB_FLOAT;
+        break;
+      case arrow::Type::STRING :
+        type_id = GRN_DB_TEXT;
+        break;
+      case arrow::Type::DATE64 :
+        type_id = GRN_DB_TIME;
+        break;
+      case arrow::Type::TIMESTAMP :
+        type_id = GRN_DB_TIME;
+        {
+          auto arrow_timestamp_type =
+            std::static_pointer_cast<arrow::TimestampType>(arrow_type);
+          time_unit_ = arrow_timestamp_type->unit();
+        }
+        break;
+      default :
+        type_id = GRN_DB_VOID;
+        break;
+      }
+
+      if (type_id == GRN_DB_VOID) {
+        // TODO
+        return;
+      }
+
+      if (!grn_column_) {
+        grn_column_ = grn_column_create(ctx_,
+                                        grn_table_,
+                                        column_name.data(),
+                                        column_name.size(),
+                                        NULL,
+                                        GRN_OBJ_COLUMN_SCALAR,
+                                        grn_ctx_at(ctx_, type_id));
+      }
+      if (type_id == GRN_DB_TEXT) {
+        GRN_TEXT_INIT(&buffer_, GRN_OBJ_DO_SHALLOW_COPY);
+      } else {
+        GRN_VALUE_FIX_SIZE_INIT(&buffer_, 0, type_id);
+      }
+    }
+
+    ~ColumnLoadVisitor() {
+      if (grn_obj_is_accessor(ctx_, grn_column_)) {
+        grn_obj_unlink(ctx_, grn_column_);
+      }
+      GRN_OBJ_FIN(ctx_, &buffer_);
+    }
+
+    arrow::Status Visit(const arrow::BooleanArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int8Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt8Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int16Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt16Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int32Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt32Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Int64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::UInt64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::HalfFloatArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::FloatArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::DoubleArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::StringArray &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::Date64Array &array) {
+      return set_values(array);
+    }
+
+    arrow::Status Visit(const arrow::TimestampArray &array) {
+      return set_values(array);
+    }
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    const grn_id *ids_;
+    arrow::TimeUnit::type time_unit_;
+    grn_obj *grn_column_;
+    grn_obj buffer_;
+
+    template <typename T>
+    arrow::Status set_values(const T &array) {
+      int64_t n_rows = array.length();
+      for (int i = 0; i < n_rows; ++i) {
+        auto id = ids_[i];
+        GRN_BULK_REWIND(&buffer_);
+        get_value(array, i);
+        grn_obj_set_value(ctx_, grn_column_, id, &buffer_, GRN_OBJ_SET);
+      }
+      return arrow::Status::OK();
+    }
+
+    void
+    get_value(const arrow::BooleanArray &array, int i) {
+      GRN_BOOL_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt8Array &array, int i) {
+      GRN_UINT8_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int8Array &array, int i) {
+      GRN_INT8_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt16Array &array, int i) {
+      GRN_UINT16_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int16Array &array, int i) {
+      GRN_INT16_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt32Array &array, int i) {
+      GRN_UINT32_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int32Array &array, int i) {
+      GRN_INT32_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::UInt64Array &array, int i) {
+      GRN_UINT64_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::Int64Array &array, int i) {
+      GRN_INT64_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::HalfFloatArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::FloatArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::DoubleArray &array, int i) {
+      GRN_FLOAT_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::StringArray &array, int i) {
+      int32_t size;
+      const auto data = array.GetValue(i, &size);
+      GRN_TEXT_SET(ctx_, &buffer_, data, size);
+    }
+
+    void
+    get_value(const arrow::Date64Array &array, int i) {
+      GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
+    }
+
+    void
+    get_value(const arrow::TimestampArray &array, int i) {
+      switch (time_unit_) {
+      case arrow::TimeUnit::SECOND :
+        GRN_TIME_SET(ctx_, &buffer_, GRN_TIME_PACK(array.Value(i), 0));
+        break;
+      case arrow::TimeUnit::MILLI :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) * 1000);
+        break;
+      case arrow::TimeUnit::MICRO :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i));
+        break;
+      case arrow::TimeUnit::NANO :
+        GRN_TIME_SET(ctx_, &buffer_, array.Value(i) / 1000);
+        break;
+      }
+    }
+  };
+
+  class FileLoader {
+  public:
+    FileLoader(grn_ctx *ctx, grn_obj *grn_table)
+      : ctx_(ctx),
+        grn_table_(grn_table),
+        key_column_name_("") {
+    }
+
+    ~FileLoader() {
+    }
+
+    grn_rc load_table(const std::shared_ptr<arrow::Table> &arrow_table) {
+      int n_columns = arrow_table->num_columns();
+
+      if (key_column_name_.empty()) {
+        grn_obj ids;
+        GRN_RECORD_INIT(&ids, GRN_OBJ_VECTOR, grn_obj_id(ctx_, grn_table_));
+        auto n_records = arrow_table->num_rows();
+        for (int64_t i = 0; i < n_records; ++i) {
+          auto id = grn_table_add(ctx_, grn_table_, NULL, 0, NULL);
+          GRN_RECORD_PUT(ctx_, &ids, id);
+        }
+        for (int i = 0; i < n_columns; ++i) {
+          int64_t offset = 0;
+          auto arrow_column = arrow_table->column(i);
+          auto arrow_chunked_data = arrow_column->data();
+          for (auto arrow_array : arrow_chunked_data->chunks()) {
+            grn_id *sub_ids =
+              reinterpret_cast<grn_id *>(GRN_BULK_HEAD(&ids)) + offset;
+            ColumnLoadVisitor visitor(ctx_,
+                                      grn_table_,
+                                      arrow_column,
+                                      sub_ids);
+            arrow_array->Accept(&visitor);
+            offset += arrow_array->length();
+          }
+        }
+        GRN_OBJ_FIN(ctx_, &ids);
+      } else {
+        auto status = arrow::Status::NotImplemented("_key isn't supported yet");
+        check_status(ctx_, status, "[arrow][load]");
+      }
+      return ctx_->rc;
+    };
+
+    grn_rc load_record_batch(const std::shared_ptr<arrow::RecordBatch> &arrow_record_batch) {
+      std::shared_ptr<arrow::Table> arrow_table;
+      std::vector<std::shared_ptr<arrow::RecordBatch>> arrow_record_batches(1);
+      arrow_record_batches[0] = arrow_record_batch;
+      auto status =
+        arrow::Table::FromRecordBatches(arrow_record_batches, &arrow_table);
+      if (!check_status(ctx_,
+                        status,
+                        "[arrow][load] "
+                        "failed to convert record batch to table")) {
+        return ctx_->rc;
+      }
+      return load_table(arrow_table);
+    };
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    std::string key_column_name_;
+  };
+
+  class FileDumper {
+  public:
+    FileDumper(grn_ctx *ctx, grn_obj *grn_table)
+      : ctx_(ctx),
+        grn_table_(grn_table) {
+      grn_columns_ = grn_hash_create(ctx_,
+                                     NULL,
+                                     sizeof(grn_id),
+                                     0,
+                                     GRN_OBJ_TABLE_HASH_KEY | GRN_HASH_TINY);
+      grn_table_columns(ctx_,
+                        grn_table_,
+                        "", 0,
+                        reinterpret_cast<grn_obj *>(grn_columns_));
+    }
+
+    ~FileDumper() {
+      grn_hash_close(ctx_, grn_columns_);
+    }
+
+    grn_rc dump(arrow::io::OutputStream *output) {
+      std::vector<std::shared_ptr<arrow::Field>> fields;
+      GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) {
+        void *key;
+        grn_hash_cursor_get_key(ctx_, cursor, &key);
+        auto column_id = static_cast<grn_id *>(key);
+        auto column = grn_ctx_at(ctx_, *column_id);
+
+        char column_name[GRN_TABLE_MAX_KEY_SIZE];
+        int column_name_size;
+        column_name_size =
+          grn_column_name(ctx_, column, column_name, GRN_TABLE_MAX_KEY_SIZE);
+        std::string field_name(column_name, column_name_size);
+        std::shared_ptr<arrow::DataType> field_type;
+        switch (grn_obj_get_range(ctx_, column)) {
+        case GRN_DB_BOOL :
+          field_type = arrow::boolean();
+          break;
+        case GRN_DB_UINT8 :
+          field_type = arrow::uint8();
+          break;
+        case GRN_DB_INT8 :
+          field_type = arrow::int8();
+          break;
+        case GRN_DB_UINT16 :
+          field_type = arrow::uint16();
+          break;
+        case GRN_DB_INT16 :
+          field_type = arrow::int16();
+          break;
+        case GRN_DB_UINT32 :
+          field_type = arrow::uint32();
+          break;
+        case GRN_DB_INT32 :
+          field_type = arrow::int32();
+          break;
+        case GRN_DB_UINT64 :
+          field_type = arrow::uint64();
+          break;
+        case GRN_DB_INT64 :
+          field_type = arrow::int64();
+          break;
+        case GRN_DB_FLOAT :
+          field_type = arrow::float64();
+          break;
+        case GRN_DB_TIME :
+          field_type =
+            std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+          break;
+        case GRN_DB_SHORT_TEXT :
+        case GRN_DB_TEXT :
+        case GRN_DB_LONG_TEXT :
+          field_type = arrow::utf8();
+          break;
+        default :
+          break;
+        }
+        if (!field_type) {
+          continue;
+        }
+
+        auto field = std::make_shared<arrow::Field>(field_name,
+                                                    field_type,
+                                                    false);
+        fields.push_back(field);
+      } GRN_HASH_EACH_END(ctx_, cursor);
+
+      auto schema = std::make_shared<arrow::Schema>(fields);
+
+      std::shared_ptr<arrow::ipc::RecordBatchFileWriter> writer;
+      auto status =
+        arrow::ipc::RecordBatchFileWriter::Open(output, schema, &writer);
+      if (!check_status(ctx_,
+                        status,
+                        "[arrow][dump] failed to create file format writer")) {
+        return ctx_->rc;
+      }
+
+      std::vector<grn_id> ids;
+      int n_records_per_batch = 1000;
+      GRN_TABLE_EACH_BEGIN(ctx_, grn_table_, table_cursor, record_id) {
+        ids.push_back(record_id);
+        if (ids.size() == n_records_per_batch) {
+          write_record_batch(ids, schema, writer);
+          ids.clear();
+        }
+      } GRN_TABLE_EACH_END(ctx_, table_cursor);
+      if (!ids.empty()) {
+        write_record_batch(ids, schema, writer);
+      }
+      writer->Close();
+
+      return ctx_->rc;
+    }
+
+  private:
+    grn_ctx *ctx_;
+    grn_obj *grn_table_;
+    grn_hash *grn_columns_;
+
+    void write_record_batch(std::vector<grn_id> &ids,
+                            std::shared_ptr<arrow::Schema> &schema,
+                            std::shared_ptr<arrow::ipc::RecordBatchFileWriter> &writer) {
+      std::vector<std::shared_ptr<arrow::Array>> columns;
+      GRN_HASH_EACH_BEGIN(ctx_, grn_columns_, cursor, id) {
+        void *key;
+        grn_hash_cursor_get_key(ctx_, cursor, &key);
+        auto grn_column_id = static_cast<grn_id *>(key);
+        auto grn_column = grn_ctx_at(ctx_, *grn_column_id);
+
+        arrow::Status status;
+        std::shared_ptr<arrow::Array> column;
+
+        switch (grn_obj_get_range(ctx_, grn_column)) {
+        case GRN_DB_BOOL :
+          status = build_boolean_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT8 :
+          status = build_uint8_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT8 :
+          status = build_int8_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT16 :
+          status = build_uint16_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT16 :
+          status = build_int16_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT32 :
+          status = build_uint32_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT32 :
+          status = build_int32_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_UINT64 :
+          status = build_uint64_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_INT64 :
+          status = build_int64_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_FLOAT :
+          status = build_double_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_TIME :
+          status = build_timestamp_array(ids, grn_column, &column);
+          break;
+        case GRN_DB_SHORT_TEXT :
+        case GRN_DB_TEXT :
+        case GRN_DB_LONG_TEXT :
+          status = build_utf8_array(ids, grn_column, &column);
+          break;
+        default :
+          status =
+            arrow::Status::NotImplemented("[arrow][dumper] not supported type: TODO");
+          break;
+        }
+        if (!status.ok()) {
+          continue;
+        }
+        columns.push_back(column);
+      } GRN_HASH_EACH_END(ctx_, cursor);
+
+      arrow::RecordBatch record_batch(schema, ids.size(), columns);
+      writer->WriteRecordBatch(record_batch);
+    }
+
+    arrow::Status build_boolean_array(std::vector<grn_id> &ids,
+                                      grn_obj *grn_column,
+                                      std::shared_ptr<arrow::Array> *array) {
+      arrow::BooleanBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const grn_bool *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint8_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt8Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint8_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int8_array(std::vector<grn_id> &ids,
+                                   grn_obj *grn_column,
+                                   std::shared_ptr<arrow::Array> *array) {
+      arrow::Int8Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int8_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint16_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt16Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint16_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int16_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int16Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int16_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_uint32_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt32Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint32_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int32_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int32Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int32_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+    arrow::Status build_uint64_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::UInt64Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const uint64_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_int64_array(std::vector<grn_id> &ids,
+                                    grn_obj *grn_column,
+                                    std::shared_ptr<arrow::Array> *array) {
+      arrow::Int64Builder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const int64_t *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_double_array(std::vector<grn_id> &ids,
+                                     grn_obj *grn_column,
+                                     std::shared_ptr<arrow::Array> *array) {
+      arrow::DoubleBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(*(reinterpret_cast<const double *>(data)));
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_timestamp_array(std::vector<grn_id> &ids,
+                                        grn_obj *grn_column,
+                                        std::shared_ptr<arrow::Array> *array) {
+      auto timestamp_ns_data_type =
+        std::make_shared<arrow::TimestampType>(arrow::TimeUnit::MICRO);
+      arrow::TimestampBuilder builder(arrow::default_memory_pool(),
+                                      timestamp_ns_data_type);
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        auto timestamp_ns = *(reinterpret_cast<const int64_t *>(data));
+        builder.Append(timestamp_ns);
+      }
+      return builder.Finish(array);
+    }
+
+    arrow::Status build_utf8_array(std::vector<grn_id> &ids,
+                                   grn_obj *grn_column,
+                                   std::shared_ptr<arrow::Array> *array) {
+      arrow::StringBuilder builder(arrow::default_memory_pool());
+      for (auto id : ids) {
+        uint32_t size;
+        auto data = grn_obj_get_value_(ctx_, grn_column, id, &size);
+        builder.Append(data, size);
+      }
+      return builder.Finish(array);
+    }
+  };
+}
+#endif /* GRN_WITH_ARROW */
+
+extern "C" {
+grn_rc
+grn_arrow_load(grn_ctx *ctx,
+               grn_obj *table,
+               const char *path)
+{
+  GRN_API_ENTER;
+#ifdef GRN_WITH_ARROW
+  std::shared_ptr<arrow::io::MemoryMappedFile> input;
+  auto status =
+    arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::READ, &input);
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              std::ostringstream() <<
+                              "[arrow][load] failed to open path: " <<
+                              "<" << path << ">")) {
+    GRN_API_RETURN(ctx->rc);
+  }
+  std::shared_ptr<arrow::ipc::RecordBatchFileReader> reader;
+  status = arrow::ipc::RecordBatchFileReader::Open(input, &reader);
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              "[arrow][load] "
+                              "failed to create file format reader")) {
+    GRN_API_RETURN(ctx->rc);
+  }
+
+  grnarrow::FileLoader loader(ctx, table);
+  int n_record_batches = reader->num_record_batches();
+  for (int i = 0; i < n_record_batches; ++i) {
+    std::shared_ptr<arrow::RecordBatch> record_batch;
+    status = reader->GetRecordBatch(i, &record_batch);
+    if (!grnarrow::check_status(ctx,
+                                status,
+                                std::ostringstream("") <<
+                                "[arrow][load] failed to get " <<
+                                "the " << i << "-th " << "record")) {
+      break;
+    }
+    loader.load_record_batch(record_batch);
+    if (ctx->rc != GRN_SUCCESS) {
+      break;
+    }
+  }
+#else /* GRN_WITH_ARROW */
+  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
+      "[arrow][load] Apache Arrow support isn't enabled");
+#endif /* GRN_WITH_ARROW */
+  GRN_API_RETURN(ctx->rc);
+}
+
+grn_rc
+grn_arrow_dump(grn_ctx *ctx,
+               grn_obj *table,
+               const char *path)
+{
+  GRN_API_ENTER;
+#ifdef GRN_WITH_ARROW
+  std::shared_ptr<arrow::io::FileOutputStream> output;
+  auto status = arrow::io::FileOutputStream::Open(path, &output);
+  if (!grnarrow::check_status(ctx,
+                              status,
+                              std::stringstream() <<
+                              "[arrow][dump] failed to open path: " <<
+                              "<" << path << ">")) {
+    GRN_API_RETURN(ctx->rc);
+  }
+
+  grnarrow::FileDumper dumper(ctx, table);
+  dumper.dump(output.get());
+#else /* GRN_WITH_ARROW */
+  ERR(GRN_FUNCTION_NOT_IMPLEMENTED,
+      "[arrow][dump] Apache Arrow support isn't enabled");
+#endif /* GRN_WITH_ARROW */
+  GRN_API_RETURN(ctx->rc);
+}
+}

  Modified: lib/sources.am (+1 -0)
===================================================================
--- lib/sources.am    2017-05-14 22:20:13 +0900 (02868a0)
+++ lib/sources.am    2017-05-15 12:13:34 +0900 (355929a)
@@ -1,6 +1,7 @@
 libgroonga_la_SOURCES =				\
 	alloc.c					\
 	grn_alloc.h				\
+	arrow.cpp				\
 	cache.c					\
 	grn_cache.h				\
 	column.c				\
-------------- next part --------------
HTML����������������������������...
Télécharger 



More information about the Groonga-commit mailing list
Back to archive index