Fletchgen
The Fletcher Design Generator
recordbatch.cc
1 // Copyright 2018-2019 Delft University of Technology
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "fletchgen/recordbatch.h"
16 
17 #include <cerata/api.h>
18 #include <memory>
19 #include <vector>
20 #include <utility>
21 
22 #include "fletchgen/array.h"
23 #include "fletchgen/bus.h"
24 #include "fletchgen/schema.h"
25 
26 namespace fletchgen {
27 
28 using cerata::Instance;
29 using cerata::Component;
30 using cerata::Port;
31 using cerata::Literal;
32 using cerata::intl;
33 using cerata::Term;
34 
35 RecordBatch::RecordBatch(const std::string &name,
36  const std::shared_ptr<FletcherSchema> &fletcher_schema,
37  fletcher::RecordBatchDescription batch_desc)
38  : Component(name),
39  fletcher_schema_(fletcher_schema),
40  mode_(fletcher_schema->mode()),
41  batch_desc_(std::move(batch_desc)) {
42  // Get Arrow Schema
43  auto as = fletcher_schema_->arrow_schema();
44 
45  // Add default port nodes
46  Add(port("bcd", cr(), Port::Dir::IN, bus_cd()));
47  Add(port("kcd", cr(), Port::Dir::IN, kernel_cd()));
48 
49  // Add and connect all array readers and resulting ports
50  AddArrays(fletcher_schema);
51 }
52 
53 void RecordBatch::AddArrays(const std::shared_ptr<FletcherSchema> &fletcher_schema) {
54  // Prepare a rebind map
55  cerata::NodeMap rebinding;
56 
57  // Add Array type generics.
58  auto iw = index_width();
59  auto tw = tag_width();
60  Add({iw, tw});
61 
62  // Iterate over all fields and add ArrayReader/Writer data and control ports.
63  for (const auto &field : fletcher_schema->arrow_schema()->fields()) {
64  // Name prefix for all sorts of stuff.
65  auto prefix = fletcher_schema->name() + "_" + field->name();
66 
67  // Check if we must ignore the field
68  if (fletcher::GetBoolMeta(*field, fletcher::meta::IGNORE, false)) {
69  FLETCHER_LOG(DEBUG, "Ignoring field " + field->name());
70  } else {
71  FLETCHER_LOG(DEBUG, "Instantiating Array" << (mode_ == Mode::READ ? "Reader" : "Writer")
72  << " for schema: " << fletcher_schema->name()
73  << " : " << field->name());
74  // Generate a warning for Writers as they are still experimental.
75  if (mode_ == Mode::WRITE) {
76  FLETCHER_LOG(WARNING, "ArrayWriter implementation is highly experimental. Use with caution! Features that are "
77  "not implemented include:\n"
78  " - dvalid bit is ignored (so you cannot supply handshakes on the values stream for "
79  "empty lists or use empty handshakes to close streams)\n"
80  " - lists of primitives (e.g. strings) values stream last signal must signal the last "
81  "value for all lists, not single lists in the Arrow Array).\n"
82  " - clock domain crossings.");
83  }
84 
85  // Generate the schema-defined Arrow data port for the kernel.
86  // This is the un-concatenated version w.r.t. the streams visible on the Array primitive component.
87  auto kernel_arrow_port = arrow_port(fletcher_schema, field, true, kernel_cd());
88  auto kernel_arrow_type = kernel_arrow_port->type();
89  Add(kernel_arrow_port);
90 
91  // Instantiate an ArrayReader/Writer.
92  auto a = Instantiate(array(mode_), field->name() + "_inst");
93  array_instances_.push_back(a);
94 
95  // Generate and set a configuration string for the ArrayReader.
96  Connect(a->Get<Parameter>("CFG"), GenerateConfigString(*field));
97 
98  // Drive the clocks and resets.
99  Connect(a->prt("kcd"), prt("kcd"));
100  Connect(a->prt("bcd"), prt("bcd"));
101 
102  // Connect some global parameters.
103  a->par("CMD_TAG_WIDTH") <<= tw;
104  a->par(index_width()) <<= iw;
105 
106  // Connect the bus ports.
107  ConnectBusPorts(a, prefix, &rebinding);
108 
109  // Drive the RecordBatch Arrow data port with the ArrayReader/Writer data port, or vice versa.
110  if (mode_ == Mode::READ) {
111  auto a_data_port = a->prt("out");
112  // Rebind the type because now we know the field (also see array()).
113  auto a_data_spec = GetArrayDataSpec(*field);
114  auto a_data_type = array_reader_out(a_data_spec.first, a_data_spec.second);
115  a_data_port->SetType(a_data_type);
116  // Create a mapper between the Arrow port and the Array data port.
117  auto mapper = GetStreamTypeMapper(kernel_arrow_type, a_data_type.get());
118  kernel_arrow_type->AddMapper(mapper);
119  // Connect the ports.
120  kernel_arrow_port <<= a_data_port;
121  } else {
122  auto a_data_port = a->prt("in");
123  // Rebind the type because now we know the field (also see array()).
124  auto a_data_spec = GetArrayDataSpec(*field);
125  auto a_data_type = array_writer_in(a_data_spec.first, a_data_spec.second);
126  a_data_port->SetType(a_data_type);
127  // Create a mapper between the Arrow port and the Array data port.
128  auto mapper = GetStreamTypeMapper(kernel_arrow_type, a_data_type.get());
129  kernel_arrow_type->AddMapper(mapper);
130  // Connect the ports.
131  a_data_port <<= kernel_arrow_port;
132  }
133 
134  // Get the command stream and unlock stream ports and set their real type and connect.
135  auto a_cmd = a->Get<Port>("cmd");
136  auto ct = cmd_type(iw,
137  tw,
138  a->par(bus_addr_width())->shared_from_this() * GetCtrlBufferCount(*field));
139  a_cmd->SetType(ct);
140 
141  auto aw = Get<Parameter>(prefix + "_" + bus_addr_width()->name())->shared_from_this();
142  auto cmd = command_port(fletcher_schema, field, iw, tw, aw, kernel_cd());
143  Connect(a_cmd, cmd);
144  Add(cmd);
145 
146  auto a_unl = a->Get<Port>("unl");
147  auto ut = unlock_type(a->par("CMD_TAG_WIDTH")->shared_from_this());
148  a_unl->SetType(ut);
149 
150  auto unl = unlock_port(fletcher_schema, field, tw, kernel_cd());
151  Connect(unl, a_unl);
152  Add(unl);
153  }
154  }
155 }
156 
157 std::vector<std::shared_ptr<FieldPort>>
158 RecordBatch::GetFieldPorts(const std::optional<FieldPort::Function> &function) const {
159  std::vector<std::shared_ptr<FieldPort>> result;
160  for (const auto &n : objects_) {
161  auto ap = std::dynamic_pointer_cast<FieldPort>(n);
162  if (ap != nullptr) {
163  if ((function && (ap->function_ == *function)) || !function) {
164  result.push_back(ap);
165  }
166  }
167  }
168  return result;
169 }
170 
171 void RecordBatch::ConnectBusPorts(Instance *array, const std::string &prefix, cerata::NodeMap *rebinding) {
172  auto a_bus_ports = array->GetAll<BusPort>();
173  for (const auto &a_bus_port : a_bus_ports) {
174  auto rb_port_prefix = prefix + "_bus";
175  auto a_bus_spec = a_bus_port->spec_;
176  // Create new bus parameters to bind to and prefix it with the bus name.
177  auto rb_bus_params = BusDimParams(this, a_bus_spec.dim.plain, prefix);
178  auto rb_bus_spec = BusSpecParams{rb_bus_params, a_bus_spec.func};
179  // Copy over the ArrayReader/Writer's bus port
180  auto rb_bus_port = bus_port(rb_port_prefix, a_bus_port->dir(), rb_bus_spec);
181  // Add them to the RecordBatch
182  Add(rb_bus_port);
183  // Connect them to the ArrayReader/Writer
184  Connect(rb_bus_port, array->prt("bus"));
185  // Connect the bus parameters. Array bus port has no prefix.
186  ConnectBusParam(array, "", rb_bus_params, rebinding);
187  }
188 }
189 
190 std::shared_ptr<RecordBatch> record_batch(const std::string &name,
191  const std::shared_ptr<FletcherSchema> &fletcher_schema,
192  const fletcher::RecordBatchDescription &batch_desc) {
193  auto rb = new RecordBatch(name, fletcher_schema, batch_desc);
194  auto shared_rb = std::shared_ptr<RecordBatch>(rb);
195  cerata::default_component_pool()->Add(shared_rb);
196  return shared_rb;
197 }
198 
199 std::shared_ptr<FieldPort> arrow_port(const std::shared_ptr<FletcherSchema> &fletcher_schema,
200  const std::shared_ptr<arrow::Field> &field,
201  bool reverse,
202  const std::shared_ptr<ClockDomain> &domain) {
203  auto name = fletcher_schema->name() + "_" + field->name();
204  auto type = GetStreamType(*field, fletcher_schema->mode());
205  Port::Dir dir;
206  if (reverse) {
207  dir = Term::Reverse(mode2dir(fletcher_schema->mode()));
208  } else {
209  dir = mode2dir(fletcher_schema->mode());
210  }
211  // Check if the Arrow data stream should be profiled. This is disabled by default but can be conveyed through
212  // the schema.
213  bool profile = fletcher::GetBoolMeta(*field, fletcher::meta::PROFILE, false);
214 
215  return std::make_shared<FieldPort>(name, FieldPort::ARROW, field, fletcher_schema, type, dir, domain, profile);
216 }
217 
218 std::shared_ptr<FieldPort> command_port(const std::shared_ptr<FletcherSchema> &schema,
219  const std::shared_ptr<arrow::Field> &field,
220  const std::shared_ptr<Node> &index_width,
221  const std::shared_ptr<Node> &tag_width,
222  std::optional<std::shared_ptr<Node>> addr_width,
223  const std::shared_ptr<ClockDomain> &domain) {
224  std::shared_ptr<cerata::Type> type;
225  if (addr_width) {
226  type = cmd_type(index_width, tag_width, *addr_width * GetCtrlBufferCount(*field));
227  } else {
228  type = cmd_type(index_width, tag_width);
229  }
230  auto name = schema->name() + "_" + field->name() + "_cmd";
231 
232  return std::make_shared<FieldPort>(name, FieldPort::COMMAND, field, schema, type, Port::Dir::IN, domain, false);
233 }
234 
235 std::shared_ptr<FieldPort> unlock_port(const std::shared_ptr<FletcherSchema> &schema,
236  const std::shared_ptr<arrow::Field> &field,
237  const std::shared_ptr<Node> &tag_width,
238  const std::shared_ptr<ClockDomain> &domain) {
239  auto type = unlock_type(tag_width);
240  auto name = schema->name() + "_" + field->name() + "_unl";
241 
242  return std::make_shared<FieldPort>(name, FieldPort::UNLOCK, field, schema, type, Port::Dir::OUT, domain, false);
243 }
244 
245 std::shared_ptr<cerata::Object> FieldPort::Copy() const {
246  // Take shared ownership of the type.
247  auto typ = type()->shared_from_this();
248  auto result = std::make_shared<FieldPort>(name(), function_, field_, fletcher_schema_, typ, dir(), domain_, profile_);
249  result->meta = meta;
250  return result;
251 }
252 
253 } // namespace fletchgen
Contains all classes and functions related to Fletchgen.
Definition: array.cc:29
std::shared_ptr< Type > cmd_type(const std::shared_ptr< Node > &index_width, const std::shared_ptr< Node > &tag_width, const std::optional< std::shared_ptr< Node >> &ctrl_width)
Return a Fletcher command stream type.
Definition: array.cc:62
std::shared_ptr< ClockDomain > kernel_cd()
Fletcher accelerator clock domain.
Definition: basic_types.cc:62
std::shared_ptr< TypeMapper > GetStreamTypeMapper(Type *stream_type, Type *other)
Get a type mapper for an Arrow::Field-based stream to an ArrayReader/Writer stream.
Definition: array.cc:323
std::string GenerateConfigString(const arrow::Field &field, int level)
Return the configuration string for a ArrayReader/Writer.
Definition: array.cc:256
std::pair< uint32_t, uint32_t > GetArrayDataSpec(const arrow::Field &arrow_field)
Get the ArrayR/W number of streams and data width from an Arrow Field.
Definition: array.cc:478
std::shared_ptr< Type > cr()
Fletcher clock/reset;.
Definition: basic_types.cc:73
std::shared_ptr< Type > unlock_type(const std::shared_ptr< Node > &tag_width)
Fletcher unlock stream.
Definition: array.cc:80
std::shared_ptr< Type > GetStreamType(const arrow::Field &arrow_field, fletcher::Mode mode, int level)
Convert an Arrow::Field into a stream type.
Definition: array.cc:369
void ConnectBusParam(cerata::Graph *dst, const std::string &prefix, const BusDimParams &src, cerata::NodeMap *rebinding)
Find and connect all prefixed bus params on a graph to the supplied source params,...
Definition: bus.cc:191
std::shared_ptr< Type > array_writer_in(uint32_t num_streams, uint32_t full_width)
Fletcher write data.
Definition: array.cc:94
size_t GetCtrlBufferCount(const arrow::Field &field)
Return the number of buffers for the control field.
Definition: array.cc:51
Component * array(Mode mode)
Return a Cerata component model of an Array(Reader/Writer).
Definition: array.cc:139
std::shared_ptr< FieldPort > arrow_port(const std::shared_ptr< FletcherSchema > &fletcher_schema, const std::shared_ptr< arrow::Field > &field, bool reverse, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived port for Arrow data.
Definition: recordbatch.cc:199
std::shared_ptr< FieldPort > unlock_port(const std::shared_ptr< FletcherSchema > &schema, const std::shared_ptr< arrow::Field > &field, const std::shared_ptr< Node > &tag_width, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived unlock port.
Definition: recordbatch.cc:235
std::shared_ptr< RecordBatch > record_batch(const std::string &name, const std::shared_ptr< FletcherSchema > &fletcher_schema, const fletcher::RecordBatchDescription &batch_desc)
Make a new RecordBatch(Reader/Writer) component, based on a Fletcher schema.
Definition: recordbatch.cc:190
std::shared_ptr< ClockDomain > bus_cd()
Fletcher bus clock domain.
Definition: basic_types.cc:67
std::shared_ptr< BusPort > bus_port(const std::string &name, Port::Dir dir, const BusSpecParams &params)
Make a new port and return a shared pointer to it.
Definition: bus.cc:175
std::shared_ptr< FieldPort > command_port(const std::shared_ptr< FletcherSchema > &schema, const std::shared_ptr< arrow::Field > &field, const std::shared_ptr< Node > &index_width, const std::shared_ptr< Node > &tag_width, std::optional< std::shared_ptr< Node >> addr_width, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived command port.
Definition: recordbatch.cc:218
cerata::Port::Dir mode2dir(fletcher::Mode mode)
Return a Cerata port direction from a Fletcher access mode.
Definition: utils.cc:35
std::shared_ptr< Type > array_reader_out(uint32_t num_streams, uint32_t full_width)
Fletcher read data.
Definition: array.cc:85
Holds bus parameters based on bus dimensions, that has actual nodes representing the dimensions.
Definition: bus.h:68
A port derived from bus parameters.
Definition: bus.h:130
BusSpecParams spec_
The bus spec to which the type generics of the bus port are bound.
Definition: bus.h:139
Holds bus parameters and function based on bus dimensions, that has actual nodes representing the dim...
Definition: bus.h:90
BusFunction func
Bus function.
Definition: bus.h:94
std::shared_ptr< arrow::Field > field_
The Arrow field this port was derived from.
Definition: recordbatch.h:63
std::shared_ptr< FletcherSchema > fletcher_schema_
The Fletcher schema this port was derived from.
Definition: recordbatch.h:61
std::shared_ptr< Object > Copy() const override
Create a deep-copy of the FieldPort.
Definition: recordbatch.cc:245
bool profile_
Whether this field port should be profiled.
Definition: recordbatch.h:65
@ UNLOCK
Port that signals the kernel a command was completed.
Definition: recordbatch.h:57
@ ARROW
Port with Arrow data.
Definition: recordbatch.h:55
@ COMMAND
Port to issue commands to the generated interface.
Definition: recordbatch.h:56
enum fletchgen::FieldPort::Function function_
The function of this FieldPort.
A RecordBatch aggregating ArrayReaders/Writers.
Definition: recordbatch.h:150
std::shared_ptr< FletcherSchema > fletcher_schema_
Fletcher schema implemented by this RecordBatch(Reader/Writer)
Definition: recordbatch.h:178
std::vector< std::shared_ptr< FieldPort > > GetFieldPorts(const std::optional< FieldPort::Function > &function={}) const
Obtain all ports derived from an Arrow field with a specific function.
Definition: recordbatch.cc:158
RecordBatch(const std::string &name, const std::shared_ptr< FletcherSchema > &fletcher_schema, fletcher::RecordBatchDescription batch_desc)
RecordBatch constructor.
Definition: recordbatch.cc:35
std::vector< Instance * > array_instances_
A mapping from ArrayReader/Writer instances to their bus ports.
Definition: recordbatch.h:176
void AddArrays(const std::shared_ptr< FletcherSchema > &fletcher_schema)
Adds all ArrayReaders/Writers, un-concatenates ports and connects it to the top-level of this compone...
Definition: recordbatch.cc:53
Mode mode_
Whether to read or write from/to the in-memoRecordBatch.
Definition: recordbatch.h:180