35 CosEventComm::PushSupplier_ptr pushSupplier)
38 if(CORBA::is_nil(pushSupplier))
45 throw CosEventChannelAdmin::AlreadyConnected();
51 CosEventComm::PushSupplier::_duplicate(pushSupplier)
53 _connections.insert( Connections_t::value_type(oidstr,newConnection) );
57 CORBA::Request_var req =pushSupplier->_request(
"_is_a");
58 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
59 req->set_return_type(CORBA::_tc_boolean);
74 DB(5,
"ProxyPushConsumer_i::disconnect_push_consumer()")
80 CORBA::Request_var req =
81 pos->second->_target->_request(
"disconnect_push_supplier");
82 pos->second->_remove_ref();
93 log.
os<<
"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<
'\n';
97 DB(5,
"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
104 #ifdef OMNIEVENTS_REAL_TIME_PUSH
112 _queue.push_back(
new CORBA::Any(event));
117 PortableServer::POA_ptr p,
118 list<CORBA::Any*>& q,
121 :
Servant(PortableServer::POA::_nil()),
123 _channelName(p->the_name()),
124 _consumerAdmin(consumerAdmin),
126 _useLocalQueue(false)
130 using namespace PortableServer;
141 CORBA::PolicyList policies;
143 policies[0]=p->create_lifespan_policy(PERSISTENT);
144 policies[1]=p->create_id_assignment_policy(USER_ID);
145 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
146 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
147 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
148 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
149 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
154 string poaName =string(
_channelName.in())+
".ProxyPushConsumer";
155 POAManager_var parentManager =p->the_POAManager();
156 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
158 catch(POA::AdapterAlreadyExists&)
160 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
161 "POA::AdapterAlreadyExists")
163 catch(POA::InvalidPolicy& ex)
165 DB(0,
"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
166 "POA::InvalidPolicy: "<<ex.index)
170 for(CORBA::ULong i=0; i<policies.length(); ++i)
171 policies[i]->destroy();
174 _poa->set_servant(
this);
180 DB(20,
"~ProxyPushConsumer_i()")
185 i->second->_remove_ref();
193 CosEventChannelAdmin::ProxyPushConsumer_ptr
196 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
198 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
210 CORBA::Request_var req =
211 curr->second->_target->_request(
"disconnect_push_supplier");
212 curr->second->_remove_ref();
216 req->send_deferred();
225 for(map<string,PersistNode*>::const_iterator i=node.
_child.begin();
229 const char* oidstr =i->first.c_str();
230 string ior( i->second->attrString(
"IOR") );
231 bool isProxy( i->second->attrLong(
"proxy") );
235 using namespace CosEventComm;
236 using namespace CosEventChannelAdmin;
238 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
243 DB(5,
"Reincarnated ProxyPushConsumer: "<<oidstr)
248 DB(15,
"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
251 ProxyPushSupplier_var proxySupp =
252 string_to_<ProxyPushSupplier>(ior.c_str());
253 PortableServer::ObjectId_var objectId =
254 PortableServer::string_to_ObjectId(oidstr);
255 CORBA::Object_var obj =
256 _poa->create_reference_with_id(
258 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
260 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
261 proxySupp->connect_push_consumer(thisCons.in());
262 DB(7,
"Reconnected ProxyPushConsumer: "<<oidstr)
265 catch(CORBA::BAD_PARAM&) {
267 DB(5,
"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
269 catch(CosEventChannelAdmin::AlreadyConnected&){
271 DB(7,
"Remote ProxyPushSupplier already connected: "<<oidstr)
273 catch(CosEventChannelAdmin::TypeError&){
275 DB(2,
"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
277 catch(CORBA::OBJECT_NOT_EXIST&) {}
278 catch(CORBA::TRANSIENT& ) {}
279 catch(CORBA::COMM_FAILURE& ) {}
286 for(Connections_t::const_iterator i=
_connections.begin();
290 i->second->output(os);
300 using namespace PortableServer;
301 ObjectId_var oid =
Orb::inst()._POACurrent->get_object_id();
302 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
303 return string(oidStr.in());
305 catch(PortableServer::Current::NoContext&)
309 catch(CORBA::BAD_PARAM&)
316 throw CORBA::NO_IMPLEMENT();
325 #if OMNIEVENTS__DEBUG_SERVANT
326 int ProxyPushConsumer_i::Connection::_objectCount =0;
330 const char* channelName,
331 const string& oidstr,
332 CosEventComm::PushSupplier_ptr pushSupplier,
337 _target(pushSupplier),
338 _targetIsProxy(isProxy)
340 #if OMNIEVENTS__DEBUG_SERVANT
342 DB(21,
"ProxyPushConsumer_i::Connection::Connection() count="<<_objectCount)
348 #if OMNIEVENTS__DEBUG_SERVANT
350 DB(20,
"ProxyPushConsumer_i::Connection::~Connection() count="<<_objectCount)
352 DB(20,
"ProxyPushConsumer_i::Connection::~Connection()")
360 bool save =_targetIsProxy;
361 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
367 DB(15,
"ProxyPushConsumer is federated.");
372 DB(2,
"ProxyPushConsumer got unexpected callback.");
380 os<<
"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
382 if(!CORBA::is_nil(_target.in()))
384 CORBA::String_var iorstr;
386 os<<
" IOR="<<iorstr.in();