Fletchgen
The Fletcher Design Generator
recordbatch.cc
1 // Copyright 2018-2019 Delft University of Technology
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "fletchgen/srec/recordbatch.h"
16 
17 #include <arrow/api.h>
18 #include <fletcher/common.h>
19 
20 #include <vector>
21 #include <memory>
22 #include <ostream>
23 #include <fstream>
24 
25 #include "fletchgen/srec/srec.h"
26 
27 namespace fletchgen::srec {
28 
29 static inline size_t PaddedLength(size_t size, size_t alignment) {
30  return ((size + alignment - 1) / alignment) * alignment;
31 }
32 
33 void GenerateReadSREC(const std::vector<fletcher::RecordBatchDescription> &meta_in,
34  std::vector<fletcher::RecordBatchDescription> *meta_out,
35  std::ofstream *out,
36  int64_t buffer_align) {
37  // We need to align each buffer into the SREC stream.
38  // We start at offset 0.
39  uint64_t offset = 0;
40  for (const auto &desc_in : meta_in) {
41  fletcher::RecordBatchDescription desc_out = desc_in;
42  // We can only copy data from physically existing recordbatches into the SREC
43  if (!desc_in.is_virtual) {
44  desc_out.fields.clear();
45  for (const auto &f : desc_in.fields) {
46  desc_out.fields.emplace_back(f.type_, f.length, f.null_count);
47  FLETCHER_LOG(DEBUG, "RecordBatch " + desc_in.name + " buffers: \n" + desc_in.ToString());
48  for (const auto &buf : f.buffers) {
49  // May the force be with us
50  auto srec_buf_address = reinterpret_cast<uint8_t *>(offset);
51  // Determine the place of the buffer in the SREC output
52  desc_out.fields.back().buffers.emplace_back(srec_buf_address, buf.size_, buf.desc_, buf.level_);
53 
54  // Print some debug info
55  auto hv = fletcher::HexView(offset);
56  hv.AddData(buf.raw_buffer_, buf.size_);
57  FLETCHER_LOG(DEBUG, fletcher::ToString(buf.desc_) + "\n" + hv.ToString());
58 
59  // Calculate the padded length and calculate the next offset.
60  auto padded_size = PaddedLength(buf.size_, buffer_align);
61  offset = offset + padded_size;
62  }
63  }
64  }
65  meta_out->push_back(desc_out);
66  }
67  // We have now determined the location of every buffer in the SREC file and we know the total size of the resulting
68  // file in bytes. We must now create the actual SREC file. Calloc some space to serialize the Arrow buffers into.
69  auto srec_buffer = static_cast<uint8_t *>(calloc(1, offset));
70 
71  // Copy over every buffer.
72  for (size_t r = 0; r < meta_in.size(); r++) {
73  if (!meta_in[r].is_virtual) {
74  for (size_t f = 0; f < meta_in[r].fields.size(); f++) {
75  for (size_t b = 0; b < meta_in[r].fields[f].buffers.size(); b++) {
76  auto srec_off = reinterpret_cast<size_t>(meta_out->at(r).fields[f].buffers[b].raw_buffer_);
77  auto dest = srec_buffer + srec_off;
78  auto src = meta_in[r].fields[f].buffers[b].raw_buffer_;
79  auto size = meta_in[r].fields[f].buffers[b].size_;
80  // skip empty buffers (typically implicit validity buffers)
81  if (src != nullptr) {
82  memcpy(dest, src, size);
83  }
84  }
85  }
86  }
87  }
88 
89  // Create the SREC file, start at 0
90  srec::File sr(0, srec_buffer, offset);
91  if (out->good()) {
92  sr.write(out);
93  } else {
94  FLETCHER_LOG(ERROR, "Output stream unavailable. SREC was not written.");
95  }
96 
97  free(srec_buffer);
98 }
99 
100 std::vector<std::shared_ptr<arrow::RecordBatch>>
101 ReadRecordBatchesFromSREC(std::istream *input,
102  const std::vector<std::shared_ptr<arrow::Schema>> &schemas,
103  const std::vector<uint64_t> &num_rows,
104  const std::vector<uint64_t> &buf_offsets) {
105  std::vector<std::shared_ptr<arrow::RecordBatch>> ret;
106  // TODO(johanpel): implement this
107  FLETCHER_LOG(ERROR, "SREC to RecordBatch not yet implemented.");
108  return ret;
109 }
110 
111 } // namespace fletchgen::srec