OmniEvents
ProxyPushSupplier.cc
Go to the documentation of this file.
1 // Package : omniEvents
2 // ProxyPushSupplier.cc Created : 2003/12/04
3 // Author : Alex Tingle
4 //
5 // Copyright (C) 2003,2005 Alex Tingle.
6 //
7 // This file is part of the omniEvents application.
8 //
9 // omniEvents is free software; you can redistribute it and/or
10 // modify it under the terms of the GNU Lesser General Public
11 // License as published by the Free Software Foundation; either
12 // version 2.1 of the License, or (at your option) any later version.
13 //
14 // omniEvents is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 // Lesser General Public License for more details.
18 //
19 // You should have received a copy of the GNU Lesser General Public
20 // License along with this library; if not, write to the Free Software
21 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
22 //
23 
24 #include "ProxyPushSupplier.h"
25 #include "Orb.h"
26 #include "omniEventsLog.h"
27 #include "PersistNode.h"
28 #include <assert.h>
29 
30 namespace OmniEvents {
31 
36  omni_mutex& mutex;
37 public:
38  omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
39  ~omni_mutex_kcol(void) { mutex.lock(); }
40 private:
41  // dummy copy constructor and operator= to prevent copying
44 };
45 
46 
47 //
48 // ProxyPushSupplierManager
49 //
50 
51 PortableServer::Servant
53  const PortableServer::ObjectId& oid,
54  PortableServer::POA_ptr poa
55 )
56 {
58  PauseThenWake p(this);
59  _servants.insert(result);
60  return result;
61 }
62 
63 void
65  const PortableServer::ObjectId& oid,
66  PortableServer::POA_ptr adapter,
67  PortableServer::Servant serv,
68  CORBA::Boolean cleanup_in_progress,
69  CORBA::Boolean remaining_activations
70 )
71 {
72  // This etherealize method needs a special implementation because
73  // ProxyPushSupplier_i objects are freed with _remove_ref() rather than
74  // delete.
75  // Otherwise, this method strongly resembles ProxyManager::etherealize().
76  omni_mutex_lock pause(_lock);
77  ProxyPushSupplier_i* narrowed =dynamic_cast<ProxyPushSupplier_i*>(serv);
78  assert(narrowed!=NULL);
79  set<Proxy*>::iterator pos =_servants.find(narrowed);
80  if(pos!=_servants.end())
81  {
82  _servants.erase(pos);
83  narrowed->_remove_ref();
84  }
85  else
86  {
87  DB(1,"\t\teh? - POA attempted to etherealize unknown servant.");
88  }
89 }
90 
92  PortableServer::POA_ptr parentPoa,
93  EventQueue& q
94 )
95 : ProxyManager(parentPoa),
96  omni_thread(NULL,PRIORITY_HIGH),
97  _queue(q),
98  _lock(),_condition(&_lock),
99  _refCount(1)
100 {
101  ProxyManager::activate("ProxyPushSupplier");
102  start_undetached();
103 }
104 
106 {
107  DB(20,"~ProxyPushSupplierManager()")
108 }
109 
110 CosEventChannelAdmin::ProxyPushSupplier_ptr
112 {
113  return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
114  _managedPoa.in(),
115  CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
116  );
117 }
118 
120 {
121  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
122  {
123  Proxy* p =*i; // Sun's CC requires this temporary.
124  ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
125  // We are in the EventChannel's thread.
126  // Make sure all calls go though the ProxyPushSupplier POA.
127  CosEventChannelAdmin::ProxyPushSupplier_var ppsv =pps->_this();
128  ppsv->disconnect_push_supplier();
129  }
130 }
131 
132 void*
134 {
135  // This loop repeatedly triggers all of the servants in turn. As long as
136  // something happens each time, then we loop as fast as we can.
137  // As soon as activity dries up, we start to wait longer and longer between
138  // loops (up to a maximum). When there is no work to do, just block until
139  // a new event arrives.
140  //
141  // Rationale: The faster we loop the more events we can deliver to each
142  // consumer per second. However, when nothing is happening, this busy loop
143  // just soaks up CPU and kills performance. The optimum sleep time varies
144  // wildly from platform to platform, and also depends upon the typical ping
145  // time to the consumers.
146  //
147  // This dynamic approach should deliver reasonable performance when things
148  // are hectic, but not soak up too much CPU when not much is happening.
149  //
150  const unsigned long sleepTimeNanosec0 =0x8000; // 33us (doubled before use)
151  const unsigned long maxSleepNanosec =0x800000; // 8.4ms
152  unsigned long sleepTimeNanosec =sleepTimeNanosec0;
153 
154  omni_mutex_lock conditionLock(_lock);
155  while(true)
156  {
157  try {
158  if(_refCount<1)
159  break;
160 
161  bool busy=false;
162  bool waiting=false;
163 
164  // Trigger each servant in turn.
165  for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
166  {
167  Proxy* p =*i; // Sun's CC requires this temporary.
168  ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
169  pps->trigger(busy,waiting);
170  }
171 
172  if(busy)
173  {
174  // Something happened last time round. So we'll be optimistic and
175  // immediately go round for another go. Briefly unlock the mutex first,
176  // just to let the other kids get in if they need to.
177  omni_mutex_kcol l(_lock); // 'lock' reversed!
178  // Reset the sleep time.
179  sleepTimeNanosec=sleepTimeNanosec0;
180  }
181  else if(waiting)
182  {
183  // Nothing happened, so we'll wait for a bit and then give it another
184  // go. Each time we wait for twice as long, up to the maximum.
185  if(sleepTimeNanosec<maxSleepNanosec)
186  sleepTimeNanosec<<=1; // (multiply by 2)
187  unsigned long sec,nsec;
188  omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
189  _condition.timedwait(sec,nsec);
190  }
191  else
192  {
193  // There is nothing to do, so block until a new event arrives.
194  _condition.wait();
195  }
196 
197  }
198  catch (CORBA::SystemException& ex) {
199  DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
200  IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
201  }
202  catch (CORBA::Exception& ex) {
203  DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
204  IF_OMNIORB4(": "<<ex._name()<<) ".")
205  }
206  catch(...) {
207  DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
208  break;
209  }
210  }
211  return NULL;
212 }
213 
215 {
216 #if OMNIEVENTS__DEBUG_REF_COUNTS
217  DB(20,"ProxyPushSupplierManager::_add_ref()")
218 #endif
219  omni_mutex_lock pause(_lock);
220  ++_refCount;
221 }
222 
224 {
225 #if OMNIEVENTS__DEBUG_REF_COUNTS
226  DB(20,"ProxyPushSupplierManager::_remove_ref()")
227 #endif
228  int myref;
229  {
230  PauseThenWake p(this);
231  myref = --_refCount;
232  }
233  if(myref<0)
234  {
235  DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
236  }
237  else if(myref==0)
238  {
239  DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
240  join(NULL);
241  }
242 }
243 
244 
245 //
246 // ProxyPushSupplier_i
247 //
248 
250  CosEventComm::PushConsumer_ptr pushConsumer)
251 {
252  if(CORBA::is_nil(pushConsumer))
253  throw CORBA::BAD_PARAM();
254  if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
255  throw CosEventChannelAdmin::AlreadyConnected();
256  _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
257 
258  // Test to see whether pushSupplier is a ProxyPushSupplier.
259  // If so, then we will aggressively try to reconnect, when we are reincarnated
260  CORBA::Request_var req =_target->_request("_is_a");
261  req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
262  req->set_return_type(CORBA::_tc_boolean);
263  req->send_deferred();
264  Orb::inst().deferredRequest(req._retn(),this); // Register for callback
265 
267  {
268  WriteLock log;
269  output(log.os);
270  }
271 }
272 
273 
275 {
276  DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
277  eraseKey("ConsumerAdmin/ProxyPushSupplier");
279  if(CORBA::is_nil(_target))
280  {
281  throw CORBA::OBJECT_NOT_EXIST(
282  IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
283  CORBA::COMPLETED_NO
284  );
285  }
286  else
287  {
288  CORBA::Request_var req=_target->_request("disconnect_push_consumer");
289  _target=CosEventComm::PushConsumer::_nil();
290  req->send_deferred();
291  Orb::inst().deferredRequest(req._retn());
292  }
293 }
294 
295 
297  PortableServer::POA_ptr poa,
298  EventQueue& q
299 )
300 : Proxy(poa),
301  EventQueue::Reader(q),
302  _target(CosEventComm::PushConsumer::_nil()),
303  _targetIsProxy(false)
304 {
305  // pass
306 }
307 
309 {
310  DB(20,"~ProxyPushSupplier_i()")
311 }
312 
314 
315 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
316 {
317  if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
318  {
319  CORBA::Environment_ptr env=_req->env(); // No need to free environment.
320  if(!CORBA::is_nil(env) && env->exception())
321  {
322  // Shut down the connection
323  CORBA::Exception* ex =env->exception(); // No need to free exception.
324  DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
325  Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
326  _req=CORBA::Request::_nil();
327 
328  // Try to notify the Consumer that the connection is closing.
329  CORBA::Request_var req=_target->_request("disconnect_push_consumer");
330  req->send_deferred();
331  Orb::inst().deferredRequest(req._retn());
332 
333  _target=CosEventComm::PushConsumer::_nil(); // disconnected.
334  eraseKey("ConsumerAdmin/ProxyPushSupplier");
335  deactivateObject();
336  return; // No more work to do
337  }
338  _req=CORBA::Request::_nil();
339  busy=true;
340  }
341  if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
342  {
343  _req=_target->_request("push");
344  _req->add_in_arg() <<= *(nextEvent());
345  _req->send_deferred();
346  busy=true;
347  }
348  if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
349  waiting=true;
350 }
351 
352 
353 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
354 {
355  if(_targetIsProxy)
356  {
357  // There should only ever be one of these callbacks per proxy,
358  // because each proxy should only be connected once.
359  DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
360  }
361  else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
362  {
364  {
365  WriteLock log;
366  output(log.os);
367  DB(15,"ProxyPushSupplier is federated.");
368  }
369  }
370  else
371  {
372  DB(2,"ProxyPushSupplier got unexpected callback.");
373  _targetIsProxy=false; // Reset it just to be sure.
374  }
375 }
376 
377 
379  const string& oid,
380  const PersistNode& node
381 )
382 {
383  try
384  {
385  using namespace CosEventChannelAdmin;
386 
387  string ior( node.attrString("IOR").c_str() );
388  CosEventComm::PushConsumer_var pushConsumer =
389  string_to_<CosEventComm::PushConsumer>(ior.c_str());
390  // Do not activate until we know that we have read a valid target.
391  activateObjectWithId(oid.c_str());
392  _remove_ref();
393  _target=pushConsumer._retn();
394  _targetIsProxy=bool(node.attrLong("proxy"));
395 
396  // If pushConsumer is a proxy, then try to reconnect.
397  if(_targetIsProxy)
398  {
399  DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
400  // This will only work if the proxy is implemented in the same way as
401  // omniEvents, so connect_() automatically creates a proxy.
402  ProxyPushConsumer_var proxyCons =
403  string_to_<ProxyPushConsumer>(ior.c_str());
404  CosEventComm::PushSupplier_var thisSupp =_this();
405  proxyCons->connect_push_supplier(thisSupp);
406  DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
407  }
408  }
409  catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
410  // The supplier doesn't need to be reconnected.
411  DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
412  }
413  catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
414  // Don't know what to make of this...
415  DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
416  }
417  catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
418  catch(CORBA::TRANSIENT& ) {} // object 'pushConsumer' not responding.
419  catch(CORBA::COMM_FAILURE& ) {} // object 'pushConsumer' not responding.
420 }
421 
422 
424 {
425  basicOutput(
426  os,"ConsumerAdmin/ProxyPushSupplier",
427  _target.in(),
428  _targetIsProxy? " proxy=1": NULL
429  );
430 }
431 
432 
433 }; // end namespace OmniEvents
OmniEvents::ProxyManager::_servants
set< Proxy * > _servants
The set of all active Proxies in this object's _managedPoa.
Definition: ProxyManager.h:90
PersistNode.h
OmniEvents::ProxyPushSupplier_i::ProxyPushSupplier_i
ProxyPushSupplier_i(PortableServer::POA_ptr poa, EventQueue &q)
Definition: ProxyPushSupplier.cc:296
NP_MINORSTRING
#define NP_MINORSTRING(systemException)
Definition: Orb.h:52
OmniEvents::ProxyPushSupplierManager::run_undetached
void * run_undetached(void *)
Definition: ProxyPushSupplier.cc:133
OmniEvents::ProxyPushSupplier_i
Definition: ProxyPushSupplier.h:101
OmniEvents::PersistNode::attrLong
long attrLong(const string &key, long fallback=0) const
Definition: PersistNode.cc:163
OmniEvents::Orb::reportObjectFailure
void reportObjectFailure(const char *here, CORBA::Object_ptr obj, CORBA::Exception *ex)
Called by omniEvents when an object has failed (fatal exception).
Definition: Orb.cc:204
OmniEvents::Servant::deactivateObject
void deactivateObject()
Calls deactivate_object() to deactivate this servant in its POA.
Definition: Servant.cc:160
Orb.h
OMNIEVENTS__DEBUG_REF_COUNTS__DEFN
#define OMNIEVENTS__DEBUG_REF_COUNTS__DEFN(C)
Defines debug versions of _add/remove_ref() for class C.
Definition: Servant.h:70
OmniEvents::ProxyManager
Base class for ServantActivator classes that manage Proxy servants.
Definition: ProxyManager.h:57
OmniEvents::omni_mutex_kcol::mutex
omni_mutex & mutex
Definition: ProxyPushSupplier.cc:36
OmniEvents::omni_mutex_kcol::operator=
omni_mutex_kcol & operator=(const omni_mutex_kcol &)
IF_OMNIORB4
#define IF_OMNIORB4(omniORB4_code)
Definition: Orb.h:46
OmniEvents::Proxy::basicOutput
void basicOutput(ostream &os, const char *name, CORBA::Object_ptr target=CORBA::Object::_nil(), const char *extraAttributes=NULL)
Helper method for constructing persistency output.
Definition: ProxyManager.cc:201
DB
#define DB(l, x)
Definition: Orb.h:49
ProxyPushSupplier.h
OmniEvents::ProxyPushSupplierManager::incarnate
PortableServer::Servant incarnate(const PortableServer::ObjectId &oid, PortableServer::POA_ptr poa)
Definition: ProxyPushSupplier.cc:52
OmniEvents::ProxyPushSupplierManager::disconnect
void disconnect()
Send disconnect_push_consumer() to all connected PushConsumers.
Definition: ProxyPushSupplier.cc:119
OmniEvents::ProxyPushSupplierManager::_add_ref
void _add_ref()
Definition: ProxyPushSupplier.cc:214
OmniEvents::ProxyPushSupplier_i::connect_push_consumer
void connect_push_consumer(CosEventComm::PushConsumer_ptr pushConsumer)
Definition: ProxyPushSupplier.cc:249
OmniEvents::ProxyManager::activate
void activate(const char *name)
Creates the Proxy-type's POA, and registers this object as its ServantManager.
Definition: ProxyManager.cc:103
OmniEvents::Servant::activateObjectWithId
void activateObjectWithId(const char *oidStr)
Calls activate_object_with_id() to activate this servant in its POA.
Definition: Servant.cc:125
OmniEvents::Orb::inst
static Orb & inst()
Definition: Orb.h:81
OmniEvents::Proxy::_req
CORBA::Request_var _req
Definition: ProxyManager.h:128
OmniEvents::ProxyPushSupplierManager::_refCount
int _refCount
Definition: ProxyPushSupplier.h:97
OmniEvents::ProxyPushSupplierManager::etherealize
void etherealize(const PortableServer::ObjectId &oid, PortableServer::POA_ptr adapter, PortableServer::Servant serv, CORBA::Boolean cleanup_in_progress, CORBA::Boolean remaining_activations)
Pauses the thread, and then calls the parent's implementation.
Definition: ProxyPushSupplier.cc:64
OmniEvents::ProxyPushSupplier_i::disconnect_push_supplier
void disconnect_push_supplier()
Definition: ProxyPushSupplier.cc:274
OmniEvents::PersistNode
Definition: PersistNode.h:48
OmniEvents::ProxyPushSupplier_i::trigger
OMNIEVENTS__DEBUG_REF_COUNTS__DECL void trigger(bool &busy, bool &waiting)
Sets 'busy' if some work was done.
Definition: ProxyPushSupplier.cc:315
OmniEvents::ProxyPushSupplier_i::_targetIsProxy
bool _targetIsProxy
TRUE if _target is a ProxyPushConsumer.
Definition: ProxyPushSupplier.h:126
OmniEvents::ProxyPushSupplierManager::~ProxyPushSupplierManager
~ProxyPushSupplierManager()
Definition: ProxyPushSupplier.cc:105
OmniEvents::WriteLock
Obtains an output stream to the active persistancy logfile, and locks it for exclusive access.
Definition: omniEventsLog.h:242
OmniEvents::ProxyPushSupplierManager::ProxyPushSupplierManager
ProxyPushSupplierManager(PortableServer::POA_ptr parentPoa, EventQueue &q)
Definition: ProxyPushSupplier.cc:91
OmniEvents::ProxyPushSupplier_i::~ProxyPushSupplier_i
~ProxyPushSupplier_i()
Definition: ProxyPushSupplier.cc:308
OmniEvents::omni_mutex_kcol
The opposite of omni_mutex_lock, unlocks the mutex upon construction and re-locks it upon destruction...
Definition: ProxyPushSupplier.cc:35
OmniEvents::Proxy
Base class for three of the four Proxy servants.
Definition: ProxyManager.h:104
OmniEvents::omniEventsLog::exists
static bool exists()
Library code may create Event Service objects without the need for persistency.
Definition: omniEventsLog.h:144
HERE
#define HERE
Generates a string literal that describes the filename and line number.
Definition: daemon_windows.cc:44
OmniEvents
Definition: Callback.h:39
OmniEvents::ProxyPushSupplierManager::_remove_ref
void _remove_ref()
Shutdown the thread when refCount reaches zero.
Definition: ProxyPushSupplier.cc:223
OmniEvents::ProxyPushSupplierManager::_lock
omni_mutex _lock
Definition: ProxyPushSupplier.h:78
omniEventsLog.h
OmniEvents::Proxy::eraseKey
void eraseKey(const char *name)
Helper method for constructing persistency output.
Definition: ProxyManager.cc:189
OmniEvents::ProxyPushSupplierManager::createObject
CosEventChannelAdmin::ProxyPushSupplier_ptr createObject()
Definition: ProxyPushSupplier.cc:111
OmniEvents::ProxyManager::_managedPoa
PortableServer::POA_var _managedPoa
The POA owned & managed by this object.
Definition: ProxyManager.h:95
OmniEvents::ProxyPushSupplierManager::PauseThenWake
Helper class that locks ProxyPushSupplier upon construction, and wakes it up on destruction.
Definition: ProxyPushSupplier.h:85
IFELSE_OMNIORB4
#define IFELSE_OMNIORB4(omniORB4_code, default_code)
Definition: Orb.h:45
OmniEvents::omni_mutex_kcol::omni_mutex_kcol
omni_mutex_kcol(omni_mutex &m)
Definition: ProxyPushSupplier.cc:38
OmniEvents::ProxyPushSupplier_i::_target
CosEventComm::PushConsumer_var _target
Definition: ProxyPushSupplier.h:125
OmniEvents::ProxyPushSupplier_i::reincarnate
void reincarnate(const string &oid, const PersistNode &node)
Re-create a servant from information saved in the log file.
Definition: ProxyPushSupplier.cc:378
OmniEvents::ProxyPushSupplierManager::_condition
omni_condition _condition
Definition: ProxyPushSupplier.h:79
OmniEvents::PersistNode::attrString
string attrString(const string &key, const string &fallback="") const
Definition: PersistNode.cc:155
OmniEvents::ProxyPushSupplier_i::output
void output(ostream &os)
Save this object's state to a stream.
Definition: ProxyPushSupplier.cc:423
OmniEvents::ProxyPushSupplier_i::callback
void callback(CORBA::Request_ptr req)
Sets _targetIsProxy, if it is.
Definition: ProxyPushSupplier.cc:353
OmniEvents::WriteLock::os
ostream & os
Definition: omniEventsLog.h:254
OmniEvents::Orb::deferredRequest
void deferredRequest(CORBA::Request_ptr req, Callback *callback=NULL)
Adopts the request and then stores it in _deferredRequests.
Definition: Orb.cc:187
OmniEvents::omni_mutex_kcol::~omni_mutex_kcol
~omni_mutex_kcol(void)
Definition: ProxyPushSupplier.cc:39
OmniEvents::ProxyPushSupplierManager::_queue
EventQueue & _queue
Definition: ProxyPushSupplier.h:96
OmniEvents::EventQueue
The EventQueue is a circular buffer, that contains _size-1 events.
Definition: EventQueue.h:56