15 #include "fletchgen/srec/recordbatch.h"
17 #include <arrow/api.h>
18 #include <fletcher/common.h>
25 #include "fletchgen/srec/srec.h"
27 namespace fletchgen::srec {
29 static inline size_t PaddedLength(
size_t size,
size_t alignment) {
30 return ((size + alignment - 1) / alignment) * alignment;
33 void GenerateReadSREC(
const std::vector<fletcher::RecordBatchDescription> &meta_in,
34 std::vector<fletcher::RecordBatchDescription> *meta_out,
36 int64_t buffer_align) {
40 for (
const auto &desc_in : meta_in) {
41 fletcher::RecordBatchDescription desc_out = desc_in;
43 if (!desc_in.is_virtual) {
44 desc_out.fields.clear();
45 for (
const auto &f : desc_in.fields) {
46 desc_out.fields.emplace_back(f.type_, f.length, f.null_count);
47 FLETCHER_LOG(DEBUG,
"RecordBatch " + desc_in.name +
" buffers: \n" + desc_in.ToString());
48 for (
const auto &buf : f.buffers) {
50 auto srec_buf_address =
reinterpret_cast<uint8_t *
>(offset);
52 desc_out.fields.back().buffers.emplace_back(srec_buf_address, buf.size_, buf.desc_, buf.level_);
55 auto hv = fletcher::HexView(offset);
56 hv.AddData(buf.raw_buffer_, buf.size_);
57 FLETCHER_LOG(DEBUG, fletcher::ToString(buf.desc_) +
"\n" + hv.ToString());
60 auto padded_size = PaddedLength(buf.size_, buffer_align);
61 offset = offset + padded_size;
65 meta_out->push_back(desc_out);
69 auto srec_buffer =
static_cast<uint8_t *
>(calloc(1, offset));
72 for (
size_t r = 0; r < meta_in.size(); r++) {
73 if (!meta_in[r].is_virtual) {
74 for (
size_t f = 0; f < meta_in[r].fields.size(); f++) {
75 for (
size_t b = 0; b < meta_in[r].fields[f].buffers.size(); b++) {
76 auto srec_off =
reinterpret_cast<size_t>(meta_out->at(r).fields[f].buffers[b].raw_buffer_);
77 auto dest = srec_buffer + srec_off;
78 auto src = meta_in[r].fields[f].buffers[b].raw_buffer_;
79 auto size = meta_in[r].fields[f].buffers[b].size_;
82 memcpy(dest, src, size);
90 srec::File sr(0, srec_buffer, offset);
94 FLETCHER_LOG(ERROR,
"Output stream unavailable. SREC was not written.");
100 std::vector<std::shared_ptr<arrow::RecordBatch>>
101 ReadRecordBatchesFromSREC(std::istream *input,
102 const std::vector<std::shared_ptr<arrow::Schema>> &schemas,
103 const std::vector<uint64_t> &num_rows,
104 const std::vector<uint64_t> &buf_offsets) {
105 std::vector<std::shared_ptr<arrow::RecordBatch>> ret;
107 FLETCHER_LOG(ERROR,
"SREC to RecordBatch not yet implemented.");