Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Remove index from collection message #1641

Draft
wants to merge 7 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions examples/hello_world/hello_world.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,22 @@
*/

#include <vt/transport.h>
#include <vt/vrt/collection/index/untyped.h>

struct HelloMsg : vt::Message {
HelloMsg(vt::NodeType in_from) : from(in_from) { }

vt::NodeType from = 0;
struct IndexMessage : vt::Message {
vt::VirtualProxyType proxy = vt::no_vrt_proxy;
vt::vrt::collection::index::UntypedIndex<48> u;
};

static void hello_world(HelloMsg* msg) {
vt::NodeType this_node = vt::theContext()->getNode();
fmt::print("{}: Hello from node {}\n", this_node, msg->from);
void testHandler(IndexMessage* msg) {
vt::vrt::collection::index::registry::getDispatch(msg->u.idx_)(&msg->u);
}

template <typename IndexT>
void sendIndex(IndexT idx) {
auto m = vt::makeMessage<IndexMessage>();
m->u = vt::vrt::collection::index::UntypedIndex<48>{idx};
vt::theMsg()->sendMsg<IndexMessage, testHandler>(1, m);
}

int main(int argc, char** argv) {
Expand All @@ -65,8 +71,8 @@ int main(int argc, char** argv) {
}

if (this_node == 0) {
auto msg = vt::makeMessage<HelloMsg>(this_node);
vt::theMsg()->broadcastMsg<HelloMsg, hello_world>(msg);
sendIndex(vt::Index3D{10, -20, 55});
sendIndex(vt::Index2D{293, 222});
}

vt::finalize();
Expand Down
33 changes: 16 additions & 17 deletions examples/hello_world/hello_world_collection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,19 @@

#include <vt/transport.h>

/// [Hello world collection]
struct Hello : vt::Collection<Hello, vt::Index1D> {
Hello() = default;

virtual ~Hello() {
vtAssert(counter_ == 1, "Must be equal");
}
struct TestMsg : vt::vrt::collection::IndexMessage {

using TestMsg = vt::CollectionMessage<Hello>;
};

/// [Hello world collection]
struct Hello : vt::Collection<Hello, vt::Index3D> {
void doWork(TestMsg* msg) {
fmt::print("Hello from {}\n", this->getIndex());
counter_++;
}
};

struct TestMsg2 : vt::CollectionMessage<Hello> {

private:
int32_t counter_ = 0;
};

int main(int argc, char** argv) {
Expand All @@ -73,13 +69,16 @@ int main(int argc, char** argv) {
num_elms = atoi(argv[1]);
}

fmt::print("IndexMessage={} CollectionMessage={} index size={}\n", sizeof(TestMsg), sizeof(TestMsg2), sizeof(vt::Index3D));

auto range = vt::Index3D(num_elms, 1, 1);
auto proxy = vt::makeCollection<Hello>()
.bounds(range)
.bulkInsert()
.wait();

if (this_node == 0) {
auto range = vt::Index1D(num_elms);
auto proxy = vt::makeCollectionRooted<Hello>()
.bounds(range)
.bulkInsert()
.wait();
proxy.broadcast<Hello::TestMsg,&Hello::doWork>();
proxy[vt::Index3D{num_elms-1, 0, 0}].send<TestMsg, &Hello::doWork>();
}

vt::finalize();
Expand Down
6 changes: 6 additions & 0 deletions src/vt/messaging/collection_chain_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,12 @@ class CollectionChainSet final {
theTerm()->finishedEpoch(epoch);
}

void nextStepCollective(
std::string const& label, EpochType epoch
) {
chain.add(epoch, PendingSend{});
}

/**
* \brief The next collective step to execute for each index that is added
* to the CollectionChainSet on each node.
Expand Down
21 changes: 21 additions & 0 deletions src/vt/runnable/make_runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,27 @@ struct RunnableMaker {
return std::move(*this);
}


/**
* \brief Add a collection; sets up the handler and collection contexts
*
* \param[in] elm the collection element pointer
*/
RunnableMaker&& withCollection(void* elm, std::unique_ptr<ctx::Base> ptr) {
impl_->template addExistingContext(std::move(ptr));
set_handler_ = true;

if (handler_ != uninitialized_handler) {
// Be careful with type casting here..convert to typeless before
// reinterpreting the pointer so the compiler does not produce the wrong
// offset
auto col = reinterpret_cast<vrt::collection::UntypedCollection*>(elm);
impl_->setupHandlerElement(col, handler_);
}

return std::move(*this);
}

/**
* \brief Add LB stats for instrumentation
*
Expand Down
11 changes: 11 additions & 0 deletions src/vt/runnable/runnable.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ struct RunnableNew {
contexts_.emplace_back(std::make_unique<T>(std::forward<Args>(args)...));
}

/**
* \brief Add a new context for this handler
*
* \param[in] args arguments to build the context, forwarded to constructor of
* \c T
*/
template <typename T>
void addExistingContext(std::unique_ptr<T> ptr) {
contexts_.emplace_back(std::move(ptr));
}

/**
* \brief Set up a handler to run on an collection object
*
Expand Down
37 changes: 4 additions & 33 deletions src/vt/topos/location/location.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,10 @@ struct EntityLocationCoord : LocationCoord {
*
* \param[in] id the entity ID
* \param[in] home_node home node for entity
* \param[in] m pointer to the message
*/
template <typename MessageT, ActiveTypedFnType<MessageT> *f>
void routeMsgHandler(
EntityID const& id, NodeType const& home_node, MessageT *m
);

/**
* \brief Route a serialized message with a custom handler
*
* \param[in] id the entity ID
* \param[in] home_node home node for entity
* \param[in] msg pointer to the message
*/
template <typename MessageT, ActiveTypedFnType<MessageT> *f>
void routeMsgSerializeHandler(
void routeMsgHandler(
EntityID const& id, NodeType const& home_node, MsgSharedPtr<MessageT> msg
);

Expand All @@ -254,28 +242,14 @@ struct EntityLocationCoord : LocationCoord {
* \param[in] id the entity ID
* \param[in] home_node home node for the entity
* \param[in] msg pointer to the message
* \param[in] serialize_msg whether it should be serialized (optional)
* \param[in] from_node the sending node (optional)
*/
template <typename MessageT>
void routeMsg(
EntityID const& id, NodeType const& home_node, MsgSharedPtr<MessageT> msg,
bool const serialize_msg = false,
NodeType from_node = uninitialized_destination
);

/**
* \brief Route a message to the default handler
*
* \param[in] id the entity ID
* \param[in] home_node home node for the entity
* \param[in] msg pointer to the message
*/
template <typename MessageT>
void routeMsgSerialize(
EntityID const& id, NodeType const& home_node, MsgSharedPtr<MessageT> msg
);

/**
* \internal \brief Route a message with non-eager protocol
*
Expand Down Expand Up @@ -392,30 +366,27 @@ struct EntityLocationCoord : LocationCoord {
/**
* \internal \brief Route a message to destination with eager protocol
*
* \param[in] is_serialized whether it is serialized
* \param[in] id the entity ID
* \param[in] home_node the home node
* \param[in] msg the message to route
*/
template <typename MessageT>
void routeMsgEager(
bool const is_serialized, EntityID const& id, NodeType const& home_node,
MsgSharedPtr<MessageT> msg
EntityID const& id, NodeType const& home_node, MsgSharedPtr<MessageT> msg
);

/**
* \internal \brief Route a message to destination with rendezvous protocol
*
* \param[in] is_serialized whether it is serialized
* \param[in] id the entity ID
* \param[in] home_node the home node
* \param[in] to_node destination node
* \param[in] msg the message to route
*/
template <typename MessageT>
void routeMsgNode(
bool const is_serialized, EntityID const& id, NodeType const& home_node,
NodeType const& to_node, MsgSharedPtr<MessageT> msg
EntityID const& id, NodeType const& home_node, NodeType const& to_node,
MsgSharedPtr<MessageT> msg
);

/**
Expand Down
Loading