15 #include "fletchgen/recordbatch.h"
17 #include <cerata/api.h>
22 #include "fletchgen/array.h"
23 #include "fletchgen/bus.h"
24 #include "fletchgen/schema.h"
28 using cerata::Instance;
29 using cerata::Component;
31 using cerata::Literal;
36 const std::shared_ptr<FletcherSchema> &fletcher_schema,
37 fletcher::RecordBatchDescription batch_desc)
39 fletcher_schema_(fletcher_schema),
40 mode_(fletcher_schema->mode()),
41 batch_desc_(std::move(batch_desc)) {
46 Add(port(
"bcd",
cr(), Port::Dir::IN,
bus_cd()));
55 cerata::NodeMap rebinding;
58 auto iw = index_width();
59 auto tw = tag_width();
63 for (
const auto &field : fletcher_schema->arrow_schema()->fields()) {
65 auto prefix = fletcher_schema->name() +
"_" + field->name();
68 if (fletcher::GetBoolMeta(*field, fletcher::meta::IGNORE,
false)) {
69 FLETCHER_LOG(DEBUG,
"Ignoring field " + field->name());
71 FLETCHER_LOG(DEBUG,
"Instantiating Array" << (
mode_ == Mode::READ ?
"Reader" :
"Writer")
72 <<
" for schema: " << fletcher_schema->name()
73 <<
" : " << field->name());
75 if (
mode_ == Mode::WRITE) {
76 FLETCHER_LOG(WARNING,
"ArrayWriter implementation is highly experimental. Use with caution! Features that are "
77 "not implemented include:\n"
78 " - dvalid bit is ignored (so you cannot supply handshakes on the values stream for "
79 "empty lists or use empty handshakes to close streams)\n"
80 " - lists of primitives (e.g. strings) values stream last signal must signal the last "
81 "value for all lists, not single lists in the Arrow Array).\n"
82 " - clock domain crossings.");
88 auto kernel_arrow_type = kernel_arrow_port->type();
89 Add(kernel_arrow_port);
92 auto a = Instantiate(
array(
mode_), field->name() +
"_inst");
99 Connect(a->prt(
"kcd"), prt(
"kcd"));
100 Connect(a->prt(
"bcd"), prt(
"bcd"));
103 a->par(
"CMD_TAG_WIDTH") <<= tw;
104 a->par(index_width()) <<= iw;
107 ConnectBusPorts(a, prefix, &rebinding);
110 if (
mode_ == Mode::READ) {
111 auto a_data_port = a->prt(
"out");
115 a_data_port->SetType(a_data_type);
118 kernel_arrow_type->AddMapper(mapper);
120 kernel_arrow_port <<= a_data_port;
122 auto a_data_port = a->prt(
"in");
125 auto a_data_type =
array_writer_in(a_data_spec.first, a_data_spec.second);
126 a_data_port->SetType(a_data_type);
129 kernel_arrow_type->AddMapper(mapper);
131 a_data_port <<= kernel_arrow_port;
135 auto a_cmd = a->Get<Port>(
"cmd");
141 auto aw = Get<Parameter>(prefix +
"_" + bus_addr_width()->name())->shared_from_this();
146 auto a_unl = a->Get<Port>(
"unl");
147 auto ut =
unlock_type(a->par(
"CMD_TAG_WIDTH")->shared_from_this());
157 std::vector<std::shared_ptr<FieldPort>>
159 std::vector<std::shared_ptr<FieldPort>> result;
160 for (
const auto &n : objects_) {
161 auto ap = std::dynamic_pointer_cast<FieldPort>(n);
163 if ((
function && (ap->function_ == *
function)) || !
function) {
164 result.push_back(ap);
171 void RecordBatch::ConnectBusPorts(Instance *
array,
const std::string &prefix, cerata::NodeMap *rebinding) {
173 for (
const auto &a_bus_port : a_bus_ports) {
174 auto rb_port_prefix = prefix +
"_bus";
175 auto a_bus_spec = a_bus_port->
spec_;
177 auto rb_bus_params =
BusDimParams(
this, a_bus_spec.dim.plain, prefix);
180 auto rb_bus_port =
bus_port(rb_port_prefix, a_bus_port->dir(), rb_bus_spec);
184 Connect(rb_bus_port,
array->prt(
"bus"));
191 const std::shared_ptr<FletcherSchema> &fletcher_schema,
192 const fletcher::RecordBatchDescription &batch_desc) {
193 auto rb =
new RecordBatch(name, fletcher_schema, batch_desc);
194 auto shared_rb = std::shared_ptr<RecordBatch>(rb);
195 cerata::default_component_pool()->Add(shared_rb);
199 std::shared_ptr<FieldPort>
arrow_port(
const std::shared_ptr<FletcherSchema> &fletcher_schema,
200 const std::shared_ptr<arrow::Field> &field,
202 const std::shared_ptr<ClockDomain> &domain) {
203 auto name = fletcher_schema->name() +
"_" + field->name();
207 dir = Term::Reverse(
mode2dir(fletcher_schema->mode()));
209 dir =
mode2dir(fletcher_schema->mode());
213 bool profile = fletcher::GetBoolMeta(*field, fletcher::meta::PROFILE,
false);
215 return std::make_shared<FieldPort>(name,
FieldPort::ARROW, field, fletcher_schema, type, dir, domain, profile);
218 std::shared_ptr<FieldPort>
command_port(
const std::shared_ptr<FletcherSchema> &schema,
219 const std::shared_ptr<arrow::Field> &field,
220 const std::shared_ptr<Node> &index_width,
221 const std::shared_ptr<Node> &tag_width,
222 std::optional<std::shared_ptr<Node>> addr_width,
223 const std::shared_ptr<ClockDomain> &domain) {
224 std::shared_ptr<cerata::Type> type;
228 type =
cmd_type(index_width, tag_width);
230 auto name = schema->name() +
"_" + field->name() +
"_cmd";
232 return std::make_shared<FieldPort>(name,
FieldPort::COMMAND, field, schema, type, Port::Dir::IN, domain,
false);
235 std::shared_ptr<FieldPort>
unlock_port(
const std::shared_ptr<FletcherSchema> &schema,
236 const std::shared_ptr<arrow::Field> &field,
237 const std::shared_ptr<Node> &tag_width,
238 const std::shared_ptr<ClockDomain> &domain) {
240 auto name = schema->name() +
"_" + field->name() +
"_unl";
242 return std::make_shared<FieldPort>(name,
FieldPort::UNLOCK, field, schema, type, Port::Dir::OUT, domain,
false);
247 auto typ = type()->shared_from_this();
Contains all classes and functions related to Fletchgen.
std::shared_ptr< Type > cmd_type(const std::shared_ptr< Node > &index_width, const std::shared_ptr< Node > &tag_width, const std::optional< std::shared_ptr< Node >> &ctrl_width)
Return a Fletcher command stream type.
std::shared_ptr< ClockDomain > kernel_cd()
Fletcher accelerator clock domain.
std::shared_ptr< TypeMapper > GetStreamTypeMapper(Type *stream_type, Type *other)
Get a type mapper for an Arrow::Field-based stream to an ArrayReader/Writer stream.
std::string GenerateConfigString(const arrow::Field &field, int level)
Return the configuration string for a ArrayReader/Writer.
std::pair< uint32_t, uint32_t > GetArrayDataSpec(const arrow::Field &arrow_field)
Get the ArrayR/W number of streams and data width from an Arrow Field.
std::shared_ptr< Type > cr()
Fletcher clock/reset;.
std::shared_ptr< Type > unlock_type(const std::shared_ptr< Node > &tag_width)
Fletcher unlock stream.
std::shared_ptr< Type > GetStreamType(const arrow::Field &arrow_field, fletcher::Mode mode, int level)
Convert an Arrow::Field into a stream type.
void ConnectBusParam(cerata::Graph *dst, const std::string &prefix, const BusDimParams &src, cerata::NodeMap *rebinding)
Find and connect all prefixed bus params on a graph to the supplied source params,...
std::shared_ptr< Type > array_writer_in(uint32_t num_streams, uint32_t full_width)
Fletcher write data.
size_t GetCtrlBufferCount(const arrow::Field &field)
Return the number of buffers for the control field.
Component * array(Mode mode)
Return a Cerata component model of an Array(Reader/Writer).
std::shared_ptr< FieldPort > arrow_port(const std::shared_ptr< FletcherSchema > &fletcher_schema, const std::shared_ptr< arrow::Field > &field, bool reverse, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived port for Arrow data.
std::shared_ptr< FieldPort > unlock_port(const std::shared_ptr< FletcherSchema > &schema, const std::shared_ptr< arrow::Field > &field, const std::shared_ptr< Node > &tag_width, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived unlock port.
std::shared_ptr< RecordBatch > record_batch(const std::string &name, const std::shared_ptr< FletcherSchema > &fletcher_schema, const fletcher::RecordBatchDescription &batch_desc)
Make a new RecordBatch(Reader/Writer) component, based on a Fletcher schema.
std::shared_ptr< ClockDomain > bus_cd()
Fletcher bus clock domain.
std::shared_ptr< BusPort > bus_port(const std::string &name, Port::Dir dir, const BusSpecParams ¶ms)
Make a new port and return a shared pointer to it.
std::shared_ptr< FieldPort > command_port(const std::shared_ptr< FletcherSchema > &schema, const std::shared_ptr< arrow::Field > &field, const std::shared_ptr< Node > &index_width, const std::shared_ptr< Node > &tag_width, std::optional< std::shared_ptr< Node >> addr_width, const std::shared_ptr< ClockDomain > &domain)
Construct a field-derived command port.
cerata::Port::Dir mode2dir(fletcher::Mode mode)
Return a Cerata port direction from a Fletcher access mode.
std::shared_ptr< Type > array_reader_out(uint32_t num_streams, uint32_t full_width)
Fletcher read data.
Holds bus parameters based on bus dimensions, that has actual nodes representing the dimensions.
A port derived from bus parameters.
BusSpecParams spec_
The bus spec to which the type generics of the bus port are bound.
Holds bus parameters and function based on bus dimensions, that has actual nodes representing the dim...
BusFunction func
Bus function.
std::shared_ptr< arrow::Field > field_
The Arrow field this port was derived from.
std::shared_ptr< FletcherSchema > fletcher_schema_
The Fletcher schema this port was derived from.
std::shared_ptr< Object > Copy() const override
Create a deep-copy of the FieldPort.
bool profile_
Whether this field port should be profiled.
@ UNLOCK
Port that signals the kernel a command was completed.
@ ARROW
Port with Arrow data.
@ COMMAND
Port to issue commands to the generated interface.
enum fletchgen::FieldPort::Function function_
The function of this FieldPort.
A RecordBatch aggregating ArrayReaders/Writers.
std::shared_ptr< FletcherSchema > fletcher_schema_
Fletcher schema implemented by this RecordBatch(Reader/Writer)
std::vector< std::shared_ptr< FieldPort > > GetFieldPorts(const std::optional< FieldPort::Function > &function={}) const
Obtain all ports derived from an Arrow field with a specific function.
RecordBatch(const std::string &name, const std::shared_ptr< FletcherSchema > &fletcher_schema, fletcher::RecordBatchDescription batch_desc)
RecordBatch constructor.
std::vector< Instance * > array_instances_
A mapping from ArrayReader/Writer instances to their bus ports.
void AddArrays(const std::shared_ptr< FletcherSchema > &fletcher_schema)
Adds all ArrayReaders/Writers, un-concatenates ports and connects it to the top-level of this compone...
Mode mode_
Whether to read or write from/to the in-memoRecordBatch.