Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: give control on which CORBA connections are served by which dispatchers #15

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 124 additions & 6 deletions rtt/transports/corba/CorbaDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,130 @@


#include "CorbaDispatcher.hpp"
#include <boost/lexical_cast.hpp>

namespace RTT {
using namespace corba;
CorbaDispatcher::DispatchMap CorbaDispatcher::DispatchI;
os::Mutex CorbaDispatcher::mlock;
using namespace RTT;
using namespace RTT::corba;

int CorbaDispatcher::defaultScheduler = ORO_SCHED_RT;
int CorbaDispatcher::defaultPriority = os::LowestPriority;
CorbaDispatcher::DispatchMap CorbaDispatcher::DispatchI;
os::Mutex CorbaDispatcher::mlock;

int CorbaDispatcher::defaultScheduler = ORO_SCHED_RT;
int CorbaDispatcher::defaultPriority = os::LowestPriority;

CorbaDispatcher::DispatchEntry& CorbaDispatcher::Get(std::string const& name, int scheduler, int priority)
{
DispatchMap::iterator result = DispatchI.find(name);
if ( result != DispatchI.end() )
return result->second;

CorbaDispatcher* dispatcher = new CorbaDispatcher( name, scheduler, priority );
dispatcher->start();
return (DispatchI[name] = DispatchEntry(dispatcher));
}

CorbaDispatcher* CorbaDispatcher::Instance(DataFlowInterface* iface, int scheduler, int priority) {
return Instance(defaultDispatcherName(iface), scheduler, priority);
}

std::string CorbaDispatcher::defaultDispatcherName(DataFlowInterface* iface)
{
std::string name;
if ( iface == 0 || iface->getOwner() == 0)
name = "Global";
else
name = iface->getOwner()->getName();
return name + ".CorbaDispatch." + boost::lexical_cast<std::string>(reinterpret_cast<uint64_t>(iface));
}

/**
* Create a new dispatcher and registers it under a certain name
*
* @param name the dispatcher registration name
* @return
*/
CorbaDispatcher* CorbaDispatcher::Instance(std::string const& name, int scheduler, int priority) {
os::MutexLock lock(mlock);

return Get(name, scheduler, priority).dispatcher;
}


CorbaDispatcher* CorbaDispatcher::Acquire(RTT::DataFlowInterface* interface, int scheduler, int priority) {
return Acquire(defaultDispatcherName(interface), scheduler, priority);
}

CorbaDispatcher* CorbaDispatcher::Acquire(std::string const& name, int scheduler, int priority) {
os::MutexLock lock(mlock);

DispatchEntry& entry = Get(name, scheduler, priority);
entry.refcount.inc();
return entry.dispatcher;
}

void CorbaDispatcher::Release(CorbaDispatcher* dispatcher) {
return Release(dispatcher->getName());
}

void CorbaDispatcher::Release(std::string const& name) {
CorbaDispatcher* dispatcher = nullptr;

{
os::MutexLock lock(mlock);

DispatchMap::iterator result = DispatchI.find(name);
if (result == DispatchI.end()) {
return;
}

if (!result->second.refcount.dec_and_test()) {
return;
}

dispatcher = result->second.dispatcher;
DispatchI.erase(result);
}

delete dispatcher;
}

static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result)
{
result = result || (c0 == c1);
}

void CorbaDispatcher::dispatchChannel( base::ChannelElementBase::shared_ptr chan ) {
bool has_element = false;
RClist.apply(boost::bind(&hasElement, _1, chan, boost::ref(has_element)));
if (!has_element) {
while (!RClist.append( chan )) {
RClist.grow(20);
}
}
this->trigger();
}

void CorbaDispatcher::cancelChannel( base::ChannelElementBase::shared_ptr chan ) {
RClist.erase( chan );
}

bool CorbaDispatcher::initialize() {
log(Info) <<"Started " << this->getName() << "." <<endlog();
do_exit = false;
return true;
}

void CorbaDispatcher::loop() {
while ( !RClist.empty() && !do_exit) {
base::ChannelElementBase::shared_ptr chan = RClist.front();
CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get());
if (rbase)
rbase->transferSamples();
RClist.erase( chan );
}
}

bool CorbaDispatcher::breakLoop() {
do_exit = true;
return true;
}
119 changes: 30 additions & 89 deletions rtt/transports/corba/CorbaDispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,16 @@ namespace RTT {
*/
class CorbaDispatcher : public Activity
{
typedef std::map<DataFlowInterface*,CorbaDispatcher*> DispatchMap;
struct DispatchEntry {
os::AtomicInt refcount;
CorbaDispatcher* dispatcher;

DispatchEntry()
: dispatcher(0) {}
explicit DispatchEntry(CorbaDispatcher* dispatcher)
: dispatcher(dispatcher) {}
};
typedef std::map<std::string, DispatchEntry> DispatchMap;
RTT_CORBA_API static DispatchMap DispatchI;

typedef internal::List<base::ChannelElementBase::shared_ptr> RCList;
Expand All @@ -67,9 +76,6 @@ namespace RTT {
/* Protects DispatchI */
RTT_CORBA_API static os::Mutex mlock;

RTT_CORBA_API static int defaultScheduler;
RTT_CORBA_API static int defaultPriority;

CorbaDispatcher( const std::string& name)
: Activity(defaultScheduler, defaultPriority, 0.0, 0, name),
RClist(20,2),
Expand All @@ -86,99 +92,34 @@ namespace RTT {
this->stop();
}

public:
/**
* Create a new dispatcher for a given data flow interface.
* This method will only lock and allocate when a new dispatcher must be created,
* otherwise, the access is lock-free and real-time.
* One dispatcher per \a iface is created.
* @param iface The interface to dispatch data flow messages for.
* @return
*/
static CorbaDispatcher* Instance(DataFlowInterface* iface, int scheduler = defaultScheduler, int priority = defaultPriority) {
os::MutexLock lock(mlock);
DispatchMap::iterator result = DispatchI.find(iface);
if ( result == DispatchI.end() ) {
std::string name;
if ( iface == 0 || iface->getOwner() == 0)
name = "Global";
else
name = iface->getOwner()->getName();
name += "Corba";
DispatchI[iface] = new CorbaDispatcher( name, scheduler, priority );
DispatchI[iface]->start();
return DispatchI[iface];
}
return result->second;
}

/**
* Releases and cleans up a specific interface from dispatching.
* @param iface
/** Internal access and auto-creation of dispatch entries
*
* It is a helper method, and does NOT acquire the locking mutex.
* Callers MUST acquire it before calling
*/
static void Release(DataFlowInterface* iface) {
os::MutexLock lock(mlock);
DispatchMap::iterator result = DispatchI.find(iface);
if ( result != DispatchI.end() ) {
delete result->second;
DispatchI.erase(result);
}
}
static DispatchEntry& Get(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);

/**
* May be called during program termination to clean up all resources.
*/
static void ReleaseAll() {
os::MutexLock lock(mlock);

DispatchMap::iterator result = DispatchI.begin();
while ( result != DispatchI.end() ) {
delete result->second;
DispatchI.erase(result);
result = DispatchI.begin();
}
}
public:
RTT_CORBA_API static int defaultScheduler;
RTT_CORBA_API static int defaultPriority;

static void hasElement(base::ChannelElementBase::shared_ptr c0, base::ChannelElementBase::shared_ptr c1, bool& result)
{
result = result || (c0 == c1);
}
static std::string defaultDispatcherName(DataFlowInterface* iface);

void dispatchChannel( base::ChannelElementBase::shared_ptr chan ) {
bool has_element = false;
RClist.apply(boost::bind(&CorbaDispatcher::hasElement, _1, chan, boost::ref(has_element)));
if (!has_element) {
while (!RClist.append( chan )) {
RClist.grow(20);
}
}
this->trigger();
}
static CorbaDispatcher* Instance(DataFlowInterface* iface, int scheduler = defaultScheduler, int priority = defaultPriority);
static CorbaDispatcher* Instance(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);
static CorbaDispatcher* Acquire(DataFlowInterface* interface, int scheduler = defaultScheduler, int priority = defaultPriority);
static CorbaDispatcher* Acquire(std::string const& name, int scheduler = defaultScheduler, int priority = defaultPriority);

void cancelChannel( base::ChannelElementBase::shared_ptr chan ) {
RClist.erase( chan );
}
static void Release(std::string const& name);
static void Release(CorbaDispatcher* dispatcher);

bool initialize() {
log(Info) <<"Started " << this->getName() << "." <<endlog();
do_exit = false;
return true;
}
void dispatchChannel( base::ChannelElementBase::shared_ptr chan );
void cancelChannel( base::ChannelElementBase::shared_ptr chan );

void loop() {
while ( !RClist.empty() && !do_exit) {
base::ChannelElementBase::shared_ptr chan = RClist.front();
CRemoteChannelElement_i* rbase = dynamic_cast<CRemoteChannelElement_i*>(chan.get());
if (rbase)
rbase->transferSamples();
RClist.erase( chan );
}
}
bool initialize();

bool breakLoop() {
do_exit = true;
return true;
}
void loop();
bool breakLoop();
};
}
}
Expand Down
8 changes: 7 additions & 1 deletion rtt/transports/corba/CorbaLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,13 @@ namespace RTT {

virtual base::ChannelElementBase* buildDataStorage(ConnPolicy const& policy) const { return 0; }

virtual CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface*, ::PortableServer::POA* poa, bool, bool) const {
virtual CRemoteChannelElement_i* createOutputChannelElement_i(std::string const&, ::PortableServer::POA* poa, bool, bool) const {
Logger::In in("CorbaFallBackProtocol");
log(Error) << "Could create Channel : data type not known to CORBA Transport." <<Logger::endl;
return 0;

}
virtual CRemoteChannelElement_i* createInputChannelElement_i(::PortableServer::POA* poa, bool) const {
Logger::In in("CorbaFallBackProtocol");
log(Error) << "Could create Channel : data type not known to CORBA Transport." <<Logger::endl;
return 0;
Expand Down
7 changes: 5 additions & 2 deletions rtt/transports/corba/CorbaTemplateProtocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ namespace RTT
*/
typedef typename Property<T>::DataSourceType PropertyType;

CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface* sender,PortableServer::POA_ptr poa, bool is_pull, bool is_signalling) const
{ return new RemoteChannelElement<T>(*this, sender, poa, is_pull, is_signalling); }
CRemoteChannelElement_i* createOutputChannelElement_i(std::string const& dispatcherName,PortableServer::POA_ptr poa, bool is_pull, bool is_signalling) const
{ return new RemoteChannelElement<T>(dispatcherName, *this, poa, is_pull, is_signalling); }

CRemoteChannelElement_i* createInputChannelElement_i(PortableServer::POA_ptr poa, bool is_pull) const
{ return new RemoteChannelElement<T>(*this, poa, is_pull); }

/**
* Create an transportable object for a \a protocol which contains the value of \a source.
Expand Down
10 changes: 9 additions & 1 deletion rtt/transports/corba/CorbaTypeTransporter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,15 @@ namespace RTT {
* @param poa The POA to manage the server code.
* @return the created CChannelElement_i.
*/
virtual CRemoteChannelElement_i* createChannelElement_i(DataFlowInterface* sender, ::PortableServer::POA* poa, bool is_pull, bool is_signalling) const = 0;
virtual CRemoteChannelElement_i* createOutputChannelElement_i(std::string const& dispatcherName, ::PortableServer::POA* poa, bool is_pull, bool is_signalling) const = 0;

/**
* Builds a channel element for remote transport in both directions.
* @param sender The data flow interface which will be sending or receiving this channel.
* @param poa The POA to manage the server code.
* @return the created CChannelElement_i.
*/
virtual CRemoteChannelElement_i* createInputChannelElement_i(::PortableServer::POA* poa, bool is_pull) const = 0;

/**
* The CORBA transport does not support creating 'CORBA' streams.
Expand Down
Loading