feat(reflect-cpp): Switched from glaze -> reflect cpp
A bug was discovered in glaze which prevented valid toml output. We have switched to toml++ and reflect-cpp. The interface has remained the same so this should not break any code
This commit is contained in:
@@ -0,0 +1,133 @@
|
||||
#ifndef RFL_PARSING_TABULAR_ARROWREADER_HPP_
|
||||
#define RFL_PARSING_TABULAR_ARROWREADER_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <array>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "../../Processors.hpp"
|
||||
#include "../../Result.hpp"
|
||||
#include "../../Tuple.hpp"
|
||||
#include "../../apply.hpp"
|
||||
#include "../../get.hpp"
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "../../to_view.hpp"
|
||||
#include "../../view_t.hpp"
|
||||
#include "../call_destructors_where_necessary.hpp"
|
||||
#include "make_chunked_array_iterators.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class VecType, SerializationType _s, class... Ps>
|
||||
class ArrowReader {
|
||||
static_assert(!Processors<Ps...>::add_tags_to_variants_,
|
||||
"rfl::AddTagsToVariants cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::add_namespaced_tags_to_variants_,
|
||||
"rfl::AddNamespacedTagsToVariants cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::all_required_,
|
||||
"rfl::NoOptionals cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::default_if_missing_,
|
||||
"rfl::DefaultIfMissing cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::no_extra_fields_,
|
||||
"rfl::NoExtraFields cannot be used for tabular data (neither "
|
||||
"can rfl::ExtraFields).");
|
||||
static_assert(!Processors<Ps...>::no_field_names_,
|
||||
"rfl::NoFieldNames cannot be used for tabular data.");
|
||||
|
||||
public:
|
||||
using ValueType = typename std::remove_cvref_t<typename VecType::value_type>;
|
||||
|
||||
static Result<ArrowReader> make(const std::shared_ptr<arrow::Table>& _table) {
|
||||
try {
|
||||
return ArrowReader(_table);
|
||||
} catch (const std::exception& e) {
|
||||
return error(std::string("Could not create ArrowReader: ") + e.what());
|
||||
}
|
||||
}
|
||||
|
||||
~ArrowReader() = default;
|
||||
|
||||
Result<VecType> read() const noexcept {
|
||||
return make_chunked_array_iterators<named_tuple_t<ValueType, Ps...>, _s>(
|
||||
table_)
|
||||
.and_then([&](auto chunked_array_iterators) -> Result<VecType> {
|
||||
VecType result;
|
||||
while (!end(chunked_array_iterators)) {
|
||||
auto value = new_value(&chunked_array_iterators);
|
||||
if (!value) {
|
||||
return error(value.error().what());
|
||||
}
|
||||
result.emplace_back(std::move(*value));
|
||||
}
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
private:
|
||||
ArrowReader(const std::shared_ptr<arrow::Table>& _table)
|
||||
: table_(Ref<arrow::Table>::make(_table).value()) {}
|
||||
|
||||
bool end(const auto& _chunked_array_iterators) const {
|
||||
return apply(
|
||||
[](const auto&... _its) { return (false || ... || _its.end()); },
|
||||
_chunked_array_iterators);
|
||||
}
|
||||
|
||||
Result<ValueType> new_value(auto* _chunked_array_iterators) const noexcept {
|
||||
alignas(ValueType) unsigned char buf[sizeof(ValueType)]{};
|
||||
auto ptr = internal::ptr_cast<ValueType*>(&buf);
|
||||
|
||||
auto view = to_view(*ptr);
|
||||
|
||||
using ViewType = std::remove_cvref_t<decltype(view)>;
|
||||
|
||||
try {
|
||||
const auto set_one = [&]<size_t _i>(std::integral_constant<size_t, _i>) {
|
||||
using FieldType = tuple_element_t<_i, typename ViewType::Fields>;
|
||||
using T = std::remove_cvref_t<
|
||||
std::remove_pointer_t<typename FieldType::Type>>;
|
||||
auto res = *_chunked_array_iterators->template get<_i>();
|
||||
if (!res) {
|
||||
destroy_value<_i>(&view);
|
||||
throw std::runtime_error(
|
||||
std::string("Field '") + typename FieldType::Name().str() +
|
||||
std::string("' could not be set: ") + res.error().what());
|
||||
}
|
||||
::new (view.template get<_i>()) T(std::move(*res));
|
||||
++_chunked_array_iterators->template get<_i>();
|
||||
};
|
||||
|
||||
[&]<size_t... _is>(std::integer_sequence<size_t, _is...>) {
|
||||
(set_one(std::integral_constant<size_t, _is>{}), ...);
|
||||
}(std::make_integer_sequence<size_t, view.size()>());
|
||||
} catch (const std::exception& e) {
|
||||
return error(e.what());
|
||||
}
|
||||
|
||||
return std::move(*ptr);
|
||||
}
|
||||
|
||||
template <size_t _i, class ViewType>
|
||||
void destroy_value(ViewType* _view) const {
|
||||
static_assert(_i < ViewType::size(), "_i out of bounds.");
|
||||
auto set = std::array<bool, ViewType::size()>();
|
||||
for (size_t i = 0; i < _i; ++i) {
|
||||
set[i] = true;
|
||||
}
|
||||
for (size_t i = _i; i < ViewType::size(); ++i) {
|
||||
set[i] = false;
|
||||
}
|
||||
call_destructors_where_necessary(set, _view);
|
||||
}
|
||||
|
||||
private:
|
||||
Ref<arrow::Table> table_;
|
||||
};
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,956 @@
|
||||
#ifndef RFL_PARSING_TABULAR_ARROWTYPES_HPP_
|
||||
#define RFL_PARSING_TABULAR_ARROWTYPES_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
|
||||
#include "../../Box.hpp"
|
||||
#include "../../NamedTuple.hpp"
|
||||
#include "../../Ref.hpp"
|
||||
#include "../../Rename.hpp"
|
||||
#include "../../Timestamp.hpp"
|
||||
#include "../../Tuple.hpp"
|
||||
#include "../../concepts.hpp"
|
||||
#include "../../enums.hpp"
|
||||
#include "../../internal/StringLiteral.hpp"
|
||||
#include "../../internal/has_reflection_type_v.hpp"
|
||||
#include "../../internal/ptr_cast.hpp"
|
||||
#include "../../named_tuple_t.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
enum class SerializationType { csv, parquet };
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes;
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
Result<Ref<typename ArrowTypes<T, _s>::ArrayType>> transform_numerical_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) noexcept;
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<bool, _s> {
|
||||
using ArrayType = arrow::BooleanArray;
|
||||
using BuilderType = arrow::BooleanBuilder;
|
||||
|
||||
static auto data_type() { return arrow::boolean(); }
|
||||
|
||||
static void add_to_builder(const bool _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
if (_arr->type()->Equals(data_type())) {
|
||||
return Ref<ArrayType>::make(std::static_pointer_cast<ArrayType>(_arr));
|
||||
} else {
|
||||
return error("Expected boolean array, got " + _arr->type()->ToString() +
|
||||
".");
|
||||
}
|
||||
}
|
||||
|
||||
static Result<bool> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<uint8_t, _s> {
|
||||
using ArrayType = arrow::UInt8Array;
|
||||
using BuilderType = arrow::UInt8Builder;
|
||||
using T = uint8_t;
|
||||
|
||||
static auto data_type() { return arrow::uint8(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<uint8_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<uint16_t, _s> {
|
||||
using ArrayType = arrow::UInt16Array;
|
||||
using BuilderType = arrow::UInt16Builder;
|
||||
using T = uint16_t;
|
||||
|
||||
static auto data_type() { return arrow::uint16(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<uint16_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<uint32_t, _s> {
|
||||
using ArrayType = arrow::UInt32Array;
|
||||
using BuilderType = arrow::UInt32Builder;
|
||||
using T = uint32_t;
|
||||
|
||||
static auto data_type() { return arrow::uint32(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<uint32_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<uint64_t, _s> {
|
||||
using ArrayType = arrow::UInt64Array;
|
||||
using BuilderType = arrow::UInt64Builder;
|
||||
using T = uint64_t;
|
||||
|
||||
static auto data_type() { return arrow::uint64(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<uint64_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<int8_t, _s> {
|
||||
using ArrayType = arrow::Int8Array;
|
||||
using BuilderType = arrow::Int8Builder;
|
||||
using T = int8_t;
|
||||
|
||||
static auto data_type() { return arrow::int8(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<int8_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<int16_t, _s> {
|
||||
using ArrayType = arrow::Int16Array;
|
||||
using BuilderType = arrow::Int16Builder;
|
||||
using T = int16_t;
|
||||
|
||||
static auto data_type() { return arrow::int16(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<int16_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<int32_t, _s> {
|
||||
using ArrayType = arrow::Int32Array;
|
||||
using BuilderType = arrow::Int32Builder;
|
||||
using T = int32_t;
|
||||
|
||||
static auto data_type() { return arrow::int32(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<int32_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<int64_t, _s> {
|
||||
using ArrayType = arrow::Int64Array;
|
||||
using BuilderType = arrow::Int64Builder;
|
||||
using T = int64_t;
|
||||
|
||||
static auto data_type() { return arrow::int64(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<int64_t> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<float, _s> {
|
||||
using ArrayType = arrow::FloatArray;
|
||||
using BuilderType = arrow::FloatBuilder;
|
||||
using T = float;
|
||||
|
||||
static auto data_type() { return arrow::float32(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<float> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<double, _s> {
|
||||
using ArrayType = arrow::DoubleArray;
|
||||
using BuilderType = arrow::DoubleBuilder;
|
||||
using T = double;
|
||||
|
||||
static auto data_type() { return arrow::float64(); }
|
||||
|
||||
static void add_to_builder(const auto _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return transform_numerical_array<T, _s>(_arr);
|
||||
}
|
||||
|
||||
static Result<double> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return _chunk->Value(_ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <SerializationType _s>
|
||||
struct ArrowTypes<std::string, _s> {
|
||||
using ArrayType = arrow::StringArray;
|
||||
using BuilderType = arrow::StringBuilder;
|
||||
|
||||
static auto data_type() { return arrow::utf8(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
if (_arr->type()->Equals(data_type())) {
|
||||
return Ref<ArrayType>::make(std::static_pointer_cast<ArrayType>(_arr));
|
||||
} else {
|
||||
return error("Expected string array, got " + _arr->type()->ToString() +
|
||||
".");
|
||||
}
|
||||
}
|
||||
|
||||
static Result<std::string> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return std::string(_chunk->Value(_ix));
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
requires enchantum::Enum<T>
|
||||
struct ArrowTypes<T, _s> {
|
||||
using ArrayType = arrow::StringArray;
|
||||
using BuilderType = arrow::StringBuilder;
|
||||
|
||||
static auto data_type() { return arrow::utf8(); }
|
||||
|
||||
static void add_to_builder(const T _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(enum_to_string(_val));
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<std::string, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static Result<T> get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return string_to_enum<T>(std::string(_chunk->Value(_ix)));
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
requires concepts::ContiguousByteContainer<T>
|
||||
struct ArrowTypes<T, _s> {
|
||||
using ArrayType = arrow::BinaryArray;
|
||||
using BuilderType = arrow::BinaryBuilder;
|
||||
|
||||
static auto data_type() { return arrow::binary(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(
|
||||
internal::ptr_cast<const uint8_t*>(_val.data()), _val.size());
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
if (_arr->type()->Equals(data_type())) {
|
||||
return Ref<ArrayType>::make(std::static_pointer_cast<ArrayType>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::utf8())) {
|
||||
return transform_string(
|
||||
std::static_pointer_cast<arrow::StringArray>(_arr));
|
||||
|
||||
} else {
|
||||
return error("Expected binary or string array, got " +
|
||||
_arr->type()->ToString() + ".");
|
||||
}
|
||||
}
|
||||
|
||||
static Result<T> get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
const auto begin = internal::ptr_cast<const typename T::value_type*>(
|
||||
_chunk->Value(_ix).data());
|
||||
return T(begin, begin + _chunk->Value(_ix).size());
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
|
||||
static Result<Ref<arrow::BinaryArray>> transform_string(
|
||||
const std::shared_ptr<arrow::StringArray>& _arr) noexcept {
|
||||
if (!_arr) {
|
||||
return error(
|
||||
"transform_string: std::shared_ptr not set. This is a "
|
||||
"bug, please report.");
|
||||
}
|
||||
|
||||
auto builder = arrow::BinaryBuilder();
|
||||
|
||||
for (int64_t i = 0; i < _arr->length(); ++i) {
|
||||
if (_arr->IsNull(i)) {
|
||||
const auto status = builder.AppendNull();
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else {
|
||||
const std::string_view s = _arr->Value(i);
|
||||
const auto status = builder.Append(
|
||||
internal::ptr_cast<const uint8_t*>(s.data()), s.size());
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::Array> res;
|
||||
const auto status = builder.Finish(&res);
|
||||
return Ref<arrow::BinaryArray>::make(
|
||||
std::static_pointer_cast<arrow::BinaryArray>(res));
|
||||
}
|
||||
};
|
||||
|
||||
template <internal::StringLiteral _format, SerializationType _s>
|
||||
struct ArrowTypes<Timestamp<_format>, _s> {
|
||||
enum class TimeUnit { day, second, milli, micro, nano, string };
|
||||
|
||||
using ArrayType = arrow::TimestampArray;
|
||||
using BuilderType = arrow::TimestampBuilder;
|
||||
|
||||
static auto data_type() { return arrow::timestamp(arrow::TimeUnit::MILLI); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val.to_time_t() * 1000);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
if (_arr->type()->Equals(data_type())) {
|
||||
return Ref<ArrayType>::make(std::static_pointer_cast<ArrayType>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(
|
||||
arrow::timestamp(arrow::TimeUnit::SECOND))) {
|
||||
return transform_time_stamp<TimeUnit::second>(
|
||||
std::static_pointer_cast<arrow::TimestampArray>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::timestamp(arrow::TimeUnit::MICRO))) {
|
||||
return transform_time_stamp<TimeUnit::micro>(
|
||||
std::static_pointer_cast<arrow::TimestampArray>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::timestamp(arrow::TimeUnit::NANO))) {
|
||||
return transform_time_stamp<TimeUnit::nano>(
|
||||
std::static_pointer_cast<arrow::TimestampArray>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::date32())) {
|
||||
return transform_time_stamp<TimeUnit::day>(
|
||||
std::static_pointer_cast<arrow::Date32Array>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::date64())) {
|
||||
return transform_time_stamp<TimeUnit::milli>(
|
||||
std::static_pointer_cast<arrow::Date64Array>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(arrow::utf8())) {
|
||||
return transform_time_stamp<TimeUnit::string>(
|
||||
std::static_pointer_cast<arrow::StringArray>(_arr));
|
||||
|
||||
} else {
|
||||
return error("Expected timestamp, date32, date64 or string array, got " +
|
||||
_arr->type()->ToString() + ".");
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Timestamp<_format>> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return Timestamp<_format>(_chunk->Value(_ix) / 1000);
|
||||
}
|
||||
|
||||
static auto make_builder() {
|
||||
return BuilderType(data_type(), arrow::default_memory_pool());
|
||||
}
|
||||
|
||||
template <TimeUnit _unit, class SourceArrayType>
|
||||
static Result<Ref<arrow::TimestampArray>> transform_time_stamp(
|
||||
const std::shared_ptr<SourceArrayType>& _arr) noexcept {
|
||||
if (!_arr) {
|
||||
return error(
|
||||
"transform_time_stamp: std::shared_ptr not set. This is a "
|
||||
"bug, please report.");
|
||||
}
|
||||
|
||||
auto builder =
|
||||
arrow::TimestampBuilder(data_type(), arrow::default_memory_pool());
|
||||
|
||||
for (int64_t i = 0; i < _arr->length(); ++i) {
|
||||
if (_arr->IsNull(i)) {
|
||||
const auto status = builder.AppendNull();
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else {
|
||||
if constexpr (_unit == TimeUnit::day) {
|
||||
const auto status = builder.Append(
|
||||
static_cast<int64_t>(_arr->Value(i)) * 1000 * 24 * 60 * 60);
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else if constexpr (_unit == TimeUnit::second) {
|
||||
const auto status =
|
||||
builder.Append(static_cast<int64_t>(_arr->Value(i) * 1000));
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else if constexpr (_unit == TimeUnit::milli) {
|
||||
const auto status =
|
||||
builder.Append(static_cast<int64_t>(_arr->Value(i)));
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else if constexpr (_unit == TimeUnit::micro) {
|
||||
const auto status =
|
||||
builder.Append(static_cast<int64_t>(_arr->Value(i) / 1000));
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else if constexpr (_unit == TimeUnit::nano) {
|
||||
const auto status =
|
||||
builder.Append(static_cast<int64_t>(_arr->Value(i) / 1000000));
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else if constexpr (_unit == TimeUnit::string) {
|
||||
const auto ts = Timestamp<_format>::make(std::string(_arr->Value(i)));
|
||||
if (!ts) {
|
||||
return error(ts.error().what());
|
||||
}
|
||||
const auto status = builder.Append(ts->to_time_t() * 1000);
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else {
|
||||
static_assert(rfl::always_false_v<SourceArrayType>,
|
||||
"Unsupported time unit.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<arrow::Array> res;
|
||||
const auto status = builder.Finish(&res);
|
||||
return Ref<arrow::TimestampArray>::make(
|
||||
std::static_pointer_cast<arrow::TimestampArray>(res));
|
||||
}
|
||||
};
|
||||
|
||||
template <internal::StringLiteral _format>
|
||||
struct ArrowTypes<Timestamp<_format>, SerializationType::csv> {
|
||||
using ArrayType = arrow::TimestampArray;
|
||||
using BuilderType = arrow::StringBuilder;
|
||||
|
||||
static auto data_type() { return arrow::timestamp(arrow::TimeUnit::MILLI); }
|
||||
|
||||
static void add_to_builder(const Timestamp<_format>& _val,
|
||||
BuilderType* _builder) {
|
||||
const auto status = _builder->Append(_val.str());
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<Timestamp<_format>,
|
||||
SerializationType::parquet>::get_array(_arr);
|
||||
}
|
||||
|
||||
static Result<Timestamp<_format>> get_value(const Ref<ArrayType>& _chunk,
|
||||
const int64_t _ix) {
|
||||
return ArrowTypes<Timestamp<_format>,
|
||||
SerializationType::parquet>::get_value(_chunk, _ix);
|
||||
}
|
||||
|
||||
static auto make_builder() { return BuilderType(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
requires internal::has_reflection_type_v<T>
|
||||
struct ArrowTypes<T, _s> {
|
||||
using ArrayType =
|
||||
typename ArrowTypes<typename T::ReflectionType, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<typename T::ReflectionType, _s>::BuilderType;
|
||||
|
||||
static auto data_type() {
|
||||
return ArrowTypes<typename T::ReflectionType, _s>::data_type();
|
||||
}
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
ArrowTypes<typename T::ReflectionType, _s>::add_to_builder(
|
||||
_val.reflection(), _builder);
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<typename T::ReflectionType, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static Result<T> get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<typename T::ReflectionType>,
|
||||
_s>::get_value(_chunk, _ix)
|
||||
.and_then([](const auto& _v) -> Result<T> {
|
||||
try {
|
||||
return T(_v);
|
||||
} catch (const std::exception& e) {
|
||||
return error(e.what());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
static auto make_builder() {
|
||||
return ArrowTypes<typename T::ReflectionType, _s>::make_builder();
|
||||
}
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes<std::optional<T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
if (_val) {
|
||||
ArrowTypes<T, _s>::add_to_builder(*_val, _builder);
|
||||
} else {
|
||||
const auto status = _builder->AppendNull();
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return std::make_optional<T>(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes<std::shared_ptr<T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
if (_val) {
|
||||
ArrowTypes<T, _s>::add_to_builder(*_val, _builder);
|
||||
} else {
|
||||
const auto status = _builder->AppendNull();
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return std::make_shared<T>(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes<std::unique_ptr<T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
if (_val) {
|
||||
ArrowTypes<T, _s>::add_to_builder(*_val, _builder);
|
||||
} else {
|
||||
const auto status = _builder->AppendNull();
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return std::make_unique<T>(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes<Box<T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
ArrowTypes<T, _s>::add_to_builder(*_val, _builder);
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return Box<T>::make(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowTypes<Ref<T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
ArrowTypes<T, _s>::add_to_builder(*_val, _builder);
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return Ref<T>::make(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <internal::StringLiteral _name, class T, SerializationType _s>
|
||||
struct ArrowTypes<Rename<_name, T>, _s> {
|
||||
using ArrayType = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
using BuilderType =
|
||||
typename ArrowTypes<std::remove_cvref_t<T>, _s>::BuilderType;
|
||||
|
||||
static auto data_type() { return ArrowTypes<T, _s>::data_type(); }
|
||||
|
||||
static void add_to_builder(const auto& _val, BuilderType* _builder) {
|
||||
ArrowTypes<T, _s>::add_to_builder(_val.value(), _builder);
|
||||
}
|
||||
|
||||
static Result<Ref<ArrayType>> get_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr);
|
||||
}
|
||||
|
||||
static auto get_value(const Ref<ArrayType>& _chunk, const int64_t _ix) {
|
||||
return ArrowTypes<std::remove_cvref_t<T>, _s>::get_value(_chunk, _ix)
|
||||
.transform([](const auto& _v) { return Rename<_name, T>(_v); });
|
||||
}
|
||||
|
||||
static auto make_builder() { return ArrowTypes<T, _s>::make_builder(); }
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s, class SourceArrayType>
|
||||
Result<Ref<typename ArrowTypes<T, _s>::ArrayType>>
|
||||
transform_numerical_array_impl(
|
||||
const std::shared_ptr<SourceArrayType>& _arr) noexcept {
|
||||
if (!_arr) {
|
||||
return error(
|
||||
"transform_numerical_array_impl: std::shared_ptr not set. This is a "
|
||||
"bug, please report.");
|
||||
}
|
||||
|
||||
auto builder = ArrowTypes<T, _s>::make_builder();
|
||||
|
||||
for (int64_t i = 0; i < _arr->length(); ++i) {
|
||||
if (_arr->IsNull(i)) {
|
||||
const auto status = builder.AppendNull();
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
} else {
|
||||
const auto status = builder.Append(static_cast<T>(_arr->Value(i)));
|
||||
if (!status.ok()) {
|
||||
return error(status.message());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
using TargetArrayType = typename ArrowTypes<T, _s>::ArrayType;
|
||||
|
||||
std::shared_ptr<arrow::Array> res;
|
||||
const auto status = builder.Finish(&res);
|
||||
return Ref<TargetArrayType>::make(
|
||||
std::static_pointer_cast<TargetArrayType>(res));
|
||||
}
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
Result<Ref<typename ArrowTypes<T, _s>::ArrayType>> transform_numerical_array(
|
||||
const std::shared_ptr<arrow::Array>& _arr) noexcept {
|
||||
if (!_arr) {
|
||||
return error(
|
||||
"Could not transform the numerical array. std::shared_ptr not set.");
|
||||
}
|
||||
|
||||
using ArrayType = typename ArrowTypes<T, _s>::ArrayType;
|
||||
|
||||
if (_arr->type()->Equals(ArrowTypes<T, _s>::data_type())) {
|
||||
return Ref<ArrayType>::make(std::static_pointer_cast<ArrayType>(_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<uint8_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<uint8_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<uint16_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<uint16_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<uint32_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<uint32_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<uint64_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<uint64_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<int8_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<int8_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<int16_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<int16_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<int32_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<int32_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<int64_t, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<int64_t, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<float, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<float, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else if (_arr->type()->Equals(ArrowTypes<double, _s>::data_type())) {
|
||||
return transform_numerical_array_impl<T, _s>(
|
||||
std::static_pointer_cast<typename ArrowTypes<double, _s>::ArrayType>(
|
||||
_arr));
|
||||
|
||||
} else {
|
||||
return error("Expected numerical array, got " + _arr->type()->ToString() +
|
||||
".");
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,118 @@
|
||||
#ifndef RFL_PARSING_TABULAR_ARROWWRITER_HPP_
|
||||
#define RFL_PARSING_TABULAR_ARROWWRITER_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <stdexcept>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#include "../../Processors.hpp"
|
||||
#include "../../Tuple.hpp"
|
||||
#include "../../get.hpp"
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "../../to_view.hpp"
|
||||
#include "add_to_builder.hpp"
|
||||
#include "make_arrow_builders.hpp"
|
||||
#include "make_arrow_data_types.hpp"
|
||||
#include "make_arrow_schema.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class VecType, SerializationType _s, class... Ps>
|
||||
class ArrowWriter {
|
||||
static_assert(!Processors<Ps...>::add_tags_to_variants_,
|
||||
"rfl::AddTagsToVariants cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::add_namespaced_tags_to_variants_,
|
||||
"rfl::AddNamespacedTagsToVariants cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::all_required_,
|
||||
"rfl::NoOptionals cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::default_if_missing_,
|
||||
"rfl::DefaultIfMissing cannot be used for tabular data.");
|
||||
static_assert(!Processors<Ps...>::no_extra_fields_,
|
||||
"rfl::NoExtraFields cannot be used for tabular data (neither "
|
||||
"can rfl::ExtraFields).");
|
||||
static_assert(!Processors<Ps...>::no_field_names_,
|
||||
"rfl::NoFieldNames cannot be used for tabular data.");
|
||||
|
||||
public:
|
||||
using ValueType = typename std::remove_cvref_t<typename VecType::value_type>;
|
||||
|
||||
ArrowWriter(const size_t _chunksize) : chunksize_(_chunksize) {}
|
||||
|
||||
~ArrowWriter() = default;
|
||||
|
||||
std::shared_ptr<arrow::Table> to_table(const VecType& _data) const {
|
||||
return arrow::Table::Make(
|
||||
make_arrow_schema<named_tuple_t<ValueType, Ps...>, _s>(),
|
||||
to_chunked_arrays(_data));
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<std::shared_ptr<arrow::ChunkedArray>> to_chunked_arrays(
|
||||
const VecType& _data) const;
|
||||
|
||||
private:
|
||||
size_t chunksize_;
|
||||
};
|
||||
|
||||
template <class VecType, SerializationType _s, class... Ps>
|
||||
std::vector<std::shared_ptr<arrow::ChunkedArray>>
|
||||
ArrowWriter<VecType, _s, Ps...>::to_chunked_arrays(const VecType& _data) const {
|
||||
using ValueType = typename VecType::value_type;
|
||||
|
||||
auto builders =
|
||||
make_arrow_builders<named_tuple_t<typename VecType::value_type>, _s>();
|
||||
|
||||
constexpr size_t size = tuple_size_v<decltype(builders)>;
|
||||
|
||||
std::vector<std::vector<std::shared_ptr<arrow::Array>>> array_chunks(size);
|
||||
|
||||
auto it = _data.begin();
|
||||
|
||||
while (it != _data.end()) {
|
||||
size_t i = 0;
|
||||
|
||||
for (; it != _data.end() && (i < chunksize_ || chunksize_ == 0);
|
||||
++i, ++it) {
|
||||
const auto view = to_view(*it);
|
||||
|
||||
[&]<int... _is>(const auto& _v, auto* _b,
|
||||
std::integer_sequence<int, _is...>) {
|
||||
(add_to_builder<_s>(*get<_is>(_v), &(_b->template get<_is>())), ...);
|
||||
}(view, &builders, std::make_integer_sequence<int, size>());
|
||||
}
|
||||
|
||||
if (i != 0) {
|
||||
std::vector<std::shared_ptr<arrow::Array>> chunks(size);
|
||||
|
||||
const auto finish_builder = [](auto* _b, auto* _c) {
|
||||
const auto status = _b->Finish(_c);
|
||||
if (!status.ok()) {
|
||||
throw std::runtime_error(status.message());
|
||||
}
|
||||
};
|
||||
|
||||
[&]<size_t... _is>(auto* _b, auto* _c,
|
||||
std::integer_sequence<size_t, _is...>) {
|
||||
(finish_builder(&_b->template get<_is>(), &_c->at(_is)), ...);
|
||||
}(&builders, &chunks, std::make_integer_sequence<size_t, size>());
|
||||
|
||||
for (size_t j = 0; j < size; ++j) {
|
||||
array_chunks.at(j).emplace_back(std::move(chunks.at(j)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const auto data_types = make_arrow_data_types<ValueType, _s>();
|
||||
|
||||
return [&]<size_t... _is>(std::integer_sequence<size_t, _is...>) {
|
||||
return std::vector<std::shared_ptr<arrow::ChunkedArray>>(
|
||||
{std::make_shared<arrow::ChunkedArray>(array_chunks.at(_is),
|
||||
std::get<_is>(data_types))...});
|
||||
}(std::make_integer_sequence<size_t, size>());
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,88 @@
|
||||
#ifndef RFL_PARSING_TABULAR_CHUNKEDARRAYITERATOR_HPP_
|
||||
#define RFL_PARSING_TABULAR_CHUNKEDARRAYITERATOR_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include "../../Ref.hpp"
|
||||
#include "../../Result.hpp"
|
||||
#include "../../internal/ptr_cast.hpp"
|
||||
#include "../is_required.hpp"
|
||||
#include "array_t.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
class ChunkedArrayIterator {
|
||||
public:
|
||||
using difference_type = std::ptrdiff_t;
|
||||
using value_type = Result<T>;
|
||||
|
||||
using ArrayType = array_t<T, _s>;
|
||||
|
||||
static ChunkedArrayIterator make(const Ref<arrow::ChunkedArray>& _arr) {
|
||||
return ChunkedArrayIterator(_arr);
|
||||
}
|
||||
|
||||
ChunkedArrayIterator(const Ref<arrow::ChunkedArray>& _arr)
|
||||
: arr_(_arr), chunk_ix_(0), current_chunk_(get_chunk(arr_, 0)), ix_(0) {}
|
||||
|
||||
~ChunkedArrayIterator() = default;
|
||||
|
||||
Result<T> operator*() const noexcept {
|
||||
const bool is_null =
|
||||
current_chunk_
|
||||
.transform([&](const auto& _c) { return _c->IsNull(ix_); })
|
||||
.value_or(false);
|
||||
|
||||
if (is_null) {
|
||||
if constexpr (is_required<T, false>()) {
|
||||
return error("Value cannot be null.");
|
||||
} else {
|
||||
return T();
|
||||
}
|
||||
}
|
||||
|
||||
return current_chunk_.and_then(
|
||||
[&](const auto& _c) { return ArrowTypes<T, _s>::get_value(_c, ix_); });
|
||||
}
|
||||
|
||||
bool end() const noexcept { return chunk_ix_ >= arr_->num_chunks(); }
|
||||
|
||||
ChunkedArrayIterator& operator++() noexcept {
|
||||
if (!current_chunk_) {
|
||||
return *this;
|
||||
}
|
||||
++ix_;
|
||||
if (ix_ >= (*current_chunk_)->length()) {
|
||||
++chunk_ix_;
|
||||
current_chunk_ = get_chunk(arr_, chunk_ix_);
|
||||
ix_ = 0;
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
void operator++(int) noexcept { ++*this; }
|
||||
|
||||
private:
|
||||
static Result<Ref<ArrayType>> get_chunk(const Ref<arrow::ChunkedArray>& _arr,
|
||||
const int _chunk_ix) noexcept {
|
||||
if (_chunk_ix < _arr->num_chunks()) {
|
||||
return ArrowTypes<T, _s>::get_array(_arr->chunk(_chunk_ix));
|
||||
} else {
|
||||
return error("chunk_ix out of bounds.");
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
Ref<arrow::ChunkedArray> arr_;
|
||||
|
||||
int chunk_ix_;
|
||||
|
||||
Result<Ref<ArrayType>> current_chunk_;
|
||||
|
||||
int64_t ix_;
|
||||
};
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,19 @@
|
||||
#ifndef RFL_PARSING_TABULAR_ADD_TO_BUILDER_HPP_
|
||||
#define RFL_PARSING_TABULAR_ADD_TO_BUILDER_HPP_
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "ArrowTypes.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <SerializationType _s, class ValueType, class BuilderType>
|
||||
inline void add_to_builder(const ValueType& _val, BuilderType* _builder) {
|
||||
ArrowTypes<std::remove_cvref_t<ValueType>, _s>::add_to_builder(_val,
|
||||
_builder);
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,13 @@
|
||||
#ifndef RFL_PARSING_TABULAR_ARRAYT_HPP_
|
||||
#define RFL_PARSING_TABULAR_ARRAYT_HPP_
|
||||
|
||||
#include "ArrowTypes.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
using array_t = typename ArrowTypes<std::remove_cvref_t<T>, _s>::ArrayType;
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,53 @@
|
||||
#ifndef RFL_PARSING_TABULAR_MAKEARROWBUILDERS_HPP_
|
||||
#define RFL_PARSING_TABULAR_MAKEARROWBUILDERS_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "ArrowTypes.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
using arrow_builder_t =
|
||||
typename ArrowTypes<std::remove_cvref_t<std::remove_pointer_t<T>>,
|
||||
_s>::BuilderType;
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
struct ArrowBuildersType;
|
||||
|
||||
template <SerializationType _s, class... FieldTypes>
|
||||
struct ArrowBuildersType<NamedTuple<FieldTypes...>, _s> {
|
||||
using Type = Tuple<arrow_builder_t<typename FieldTypes::Type, _s>...>;
|
||||
|
||||
static auto data_types() {
|
||||
return [&]<size_t... _is>(std::integer_sequence<size_t, _is...>) {
|
||||
return std::array<std::shared_ptr<arrow::DataType>,
|
||||
sizeof...(FieldTypes)>(
|
||||
{ArrowTypes<typename FieldTypes::Type, _s>::data_type()...});
|
||||
}(std::make_integer_sequence<size_t, sizeof...(FieldTypes)>());
|
||||
}
|
||||
|
||||
static Type make_builders() {
|
||||
return Type(ArrowTypes<typename FieldTypes::Type, _s>::make_builder()...);
|
||||
}
|
||||
|
||||
static auto schema() {
|
||||
const auto fields =
|
||||
std::vector<std::shared_ptr<arrow::Field>>({arrow::field(
|
||||
typename FieldTypes::Name().str(),
|
||||
ArrowTypes<typename FieldTypes::Type, _s>::data_type())...});
|
||||
return arrow::schema(fields);
|
||||
}
|
||||
};
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
auto make_arrow_builders() {
|
||||
return ArrowBuildersType<std::remove_cvref_t<T>, _s>::make_builders();
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,19 @@
|
||||
#ifndef RFL_PARSING_TABULAR_MAKE_ARROW_DATA_TYPES_HPP_
|
||||
#define RFL_PARSING_TABULAR_MAKE_ARROW_DATA_TYPES_HPP_
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "make_arrow_builders.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
inline auto make_arrow_data_types() {
|
||||
return ArrowBuildersType<named_tuple_t<std::remove_cvref_t<T>>,
|
||||
_s>::data_types();
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,18 @@
|
||||
#ifndef RFL_PARSING_TABULAR_MAKE_ARROW_SCHEMA_HPP_
|
||||
#define RFL_PARSING_TABULAR_MAKE_ARROW_SCHEMA_HPP_
|
||||
|
||||
#include <type_traits>
|
||||
|
||||
#include "../../named_tuple_t.hpp"
|
||||
#include "make_arrow_builders.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class T, SerializationType _s>
|
||||
inline auto make_arrow_schema() {
|
||||
return ArrowBuildersType<named_tuple_t<std::remove_cvref_t<T>>, _s>::schema();
|
||||
}
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,52 @@
|
||||
#ifndef RFL_PARSING_TABULAR_MAKECHUNKEDARRAYITERATORS_HPP_
|
||||
#define RFL_PARSING_TABULAR_MAKECHUNKEDARRAYITERATORS_HPP_
|
||||
|
||||
#include <arrow/api.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "../../NamedTuple.hpp"
|
||||
#include "../../Ref.hpp"
|
||||
#include "../../Result.hpp"
|
||||
#include "../../Tuple.hpp"
|
||||
#include "ChunkedArrayIterator.hpp"
|
||||
|
||||
namespace rfl::parsing::tabular {
|
||||
|
||||
template <class NamedTupleType, SerializationType _s>
|
||||
struct MakeChunkedArrayIterators;
|
||||
|
||||
template <SerializationType _s, class... FieldTypes>
|
||||
struct MakeChunkedArrayIterators<NamedTuple<FieldTypes...>, _s> {
|
||||
using TupleType =
|
||||
Tuple<ChunkedArrayIterator<typename FieldTypes::Type, _s>...>;
|
||||
|
||||
Result<TupleType> operator()(const Ref<arrow::Table>& _table) const {
|
||||
const auto get_column =
|
||||
[&](const std::string& _colname) -> Result<Ref<arrow::ChunkedArray>> {
|
||||
const auto col = _table->GetColumnByName(_colname);
|
||||
if (!col) {
|
||||
return error("Column named '" + _colname + "' not found.");
|
||||
}
|
||||
return Ref<arrow::ChunkedArray>::make(col);
|
||||
};
|
||||
|
||||
try {
|
||||
return TupleType(
|
||||
get_column(typename FieldTypes::Name().str())
|
||||
.transform(
|
||||
ChunkedArrayIterator<typename FieldTypes::Type, _s>::make)
|
||||
.value()...);
|
||||
} catch (const std::exception& e) {
|
||||
return error(e.what());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
template <class NamedTupleType, SerializationType _s>
|
||||
const auto make_chunked_array_iterators =
|
||||
MakeChunkedArrayIterators<NamedTupleType, _s>{};
|
||||
|
||||
} // namespace rfl::parsing::tabular
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user