VideoTools
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
gpublisher.cpp
1 //
2 // Copyright (C) 2017 Graeme Walker
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 // ===
17 //
18 // gpublisher.cpp
19 //
20 
21 #include "gdef.h"
22 #include "gpublisher.h"
23 #include "glocalsocket.h"
24 #include "gprocess.h"
25 #include "gmsg.h"
26 #include "gfile.h"
27 #include "gdatetime.h"
28 #include "gstr.h"
29 #include "groot.h"
30 #include "gpath.h"
31 #include "garg.h"
32 #include "gcleanup.h"
33 #include "gassert.h"
34 #include "glog.h"
35 #include <sstream>
36 #include <algorithm>
37 #include <cstring>
38 #include <sys/socket.h>
39 #include <sys/un.h>
40 #include <string.h>
41 
42 namespace G
43 {
44  /// \namespace G::PublisherImp
45  /// A private scope used in the implementation of G::Publisher and friends.
46  ///
47  namespace PublisherImp
48  {
49  struct DataMemory ;
50  struct ControlMemory ;
51  struct Slot ;
52  struct InfoSlot ;
53  struct Lock ;
54  struct SocketHolder ;
55  struct Snapshot ;
56  enum { ERRORS = 5 } ;
57  enum { SLOTS = 10 } ;
58  enum { MAGIC = 0xa5a5 } ;
59  enum { e_connect , e_unlink , e_send } ; // error[] index
60  enum { socket_path_size = 200 } ;
61  struct path_t /// A character array for storing filesystem paths in shared memory.
62  {
63  char s[socket_path_size] ;
64  } ;
65 
66  static void initialise( SharedMemory & , const std::string & , const G::Item & , bool ) ;
67  static void check( SharedMemory & , const std::string & ) ;
68  static std::string check( const std::string & ) ;
69  static std::string checkName( const std::string & ) ;
70  static void deactivate( SignalSafe , void * ) ;
71  static void nop( SignalSafe , void * ) ;
72  static void releaseSlots( SignalSafe signal_safe , void * ) ;
73  static void closeAllSlots( SignalSafe , ControlMemory * mem ) ;
74  static void closeSlot( SignalSafe , Slot & ) ;
75  static void clearSlot( SignalSafe , Slot & ) ;
76  static void notifyAll( SignalSafe , ControlMemory * , PublisherInfo & ) ;
77  static void notifyOne( SignalSafe , Slot & , InfoSlot & ) ;
78  static void cleanupControlMemory( SignalSafe , const char * ) ;
79  static void cleanupDataMemory( SignalSafe , const char * ) ;
80  static void cleanupProcess( SignalSafe signal_safe , const char * ) ;
81  static Snapshot snapshot( const std::string & ) ;
82  static void createSocket( SocketHolder & holder , const std::string & path_prefix ) ;
83  static size_t findFreeSlot( ControlMemory & ) ;
84  static void claimSlot( Slot & , const std::string & , pid_t ) ;
85  static void publish( SharedMemory & , unique_ptr<SharedMemory> & , PublisherInfo & , const std::string & ,
86  size_t data_total , size_t data_count , const char ** data_p_p , size_t * data_n_p , const char * type ) ;
87  static size_t subscribe( SharedMemory & shmem_control , SocketHolder & , const std::string & ) ;
88  static void releaseSlot( SharedMemory & shmem_control , size_t slot_id ) ;
89  static void releaseSlot( SignalSafe , Lock & , Slot & ) ;
90  static size_t flush( int socket_fd ) ;
91  static bool getch( int socket_fd ) ;
92  static bool receive( SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
93  const std::string & , size_t slot_id , int socket_fd , std::vector<char> & buffer ,
94  std::string * type_p , G::EpochTime * time_p ) ;
95  static std::vector<std::string> list( std::vector<std::string> * ) ;
96  static G::Item info( const std::string & channel_name , bool detail , bool all_slots ) ;
97  static void purge( const std::string & channel_name ) ;
98  static std::string delete_( const std::string & channel_name ) ;
99  static char * strdup_ignore_leak( const char * ) ;
100  static void strset( char * p , size_t n , const std::string & s_in ) ;
101  static void strget( const char * p_in , size_t n_in , std::string & s_out ) ;
102  static std::string strget( const char * p_in , size_t n_in ) ;
103  static size_t morethan( size_t ) ;
104  }
105 } ;
106 
107 /// \class G::PublisherImp::Slot
108 /// A shared-memory structure used by G::Publisher.
109 ///
111 {
112  bool in_use ; // subscriber lock
113  bool failed ; // publisher failure
114  pid_t pid ; // subscriber's pid
115  unsigned long seq ;
116  int socket_fd ; // publisher's fd
117  int error[ERRORS] ; // publisher errno set
118  path_t socket_path ;
119 } ;
120 
121 /// \class G::PublisherImp::ControlMemory
122 /// A shared-memory structure used by G::Publisher.
123 ///
125 {
126  int magic ;
128  pid_t publisher_pid ;
129  char publisher_info[2048] ; // eg. json
130  unsigned long seq ;
131  Slot slot[SLOTS] ;
132 } ;
133 
134 /// \class G::PublisherImp::DataMemory
135 /// A shared-memory structure used by G::Publisher.
136 ///
138 {
139  size_t size_limit ;
140  char type[60] ;
141  time_t time_s ;
142  g_uint32_t time_us ;
143  size_t data_size ;
144  char data[1] ;
145 } ;
146 
147 /// \class G::PublisherImp::Lock
148 /// A RAII semaphore lock class used by G::PublisherImp.
149 ///
151 {
152  explicit Lock( Semaphore * s ) ;
153  ~Lock() ;
154  //
155  Semaphore * m_s ;
156 } ;
157 
158 /// \class G::PublisherImp::SocketHolder
159 /// A private implementation class used by G::PublisherImp.
160 ///
162 {
163  explicit SocketHolder( int = -1 ) ;
164  int release() ;
165  ~SocketHolder() ;
166  //
167  int fd ;
168  std::string path ;
169 } ;
170 
171 /// \class G::PublisherImp::Snapshot
172 /// A private implementation class used by G::PublisherImp.
173 ///
175 {
176  ControlMemory control ;
177  bool has_data ;
178  DataMemory data ;
179 } ;
180 
181 /// \class G::PublisherImp::InfoSlot
182 /// A structure used by G::Publisher, having a sub-set of G::PublisherImp::Slot
183 /// members, used for diagnostics and error reporting. This structure is
184 /// modified by low-level code while the shared-memory is locked, but used
185 /// for error reporting without any locking.
186 ///
188 {
189  InfoSlot() ;
190  int e() const ;
191  std::string path() const ;
192  void report() ;
193  //
194  bool failed ;
195  int error[ERRORS] ;
196  path_t socket_path ;
197 } ;
198 
199 /// \class G::PublisherInfo
200 /// A structure that holds diagnostic information for each G::Publisher slot.
201 ///
203 {
204  void clear( SignalSafe ) ;
205  //
206  PublisherImp::InfoSlot slot[PublisherImp::SLOTS] ;
207 } ;
208 
209 // ==
210 
211 G::Publisher::Publisher( const std::string & name , bool auto_cleanup ) :
212  m_name(PublisherImp::checkName(name)) ,
213  m_shmem_control(name,sizeof(PublisherImp::ControlMemory),SharedMemory::Control()) ,
214  m_info(new PublisherInfo)
215 {
217  info.add( "type" , G::Path(G::Arg::v0()).basename() ) ;
218  PublisherImp::initialise( m_shmem_control , name , info , auto_cleanup ) ;
219 }
220 
221 G::Publisher::Publisher( const std::string & name , G::Item info , bool auto_cleanup ) :
222  m_name(PublisherImp::checkName(name)) ,
223  m_shmem_control(name,sizeof(PublisherImp::ControlMemory),SharedMemory::Control()) ,
224  m_info(new PublisherInfo)
225 {
226  if( info.type() == G::Item::t_map && !info.has("type") )
227  info.add( "type" , G::Path(G::Arg::v0()).basename() ) ;
228  PublisherImp::initialise( m_shmem_control , name , info , auto_cleanup ) ;
229 }
230 
232 {
233  PublisherImp::deactivate( SignalSafe() , m_shmem_control.ptr() ) ;
234 }
235 
236 void G::Publisher::publish( const char * data_p , size_t data_n , const char * type )
237 {
238  if( data_p == nullptr || data_n == 0U ) return ;
239  m_data_p.resize( 1U ) ;
240  m_data_n.resize( 1U ) ;
241  m_data_p[0] = data_p ;
242  m_data_n[0] = data_n ;
243  PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_n , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
244 }
245 
246 void G::Publisher::publish( const std::vector<char> & data , const char * type )
247 {
248  if( data.empty() ) return ;
249  m_data_p.resize( 1U ) ;
250  m_data_n.resize( 1U ) ;
251  m_data_p[0] = &data[0] ;
252  m_data_n[0] = data.size() ;
253  PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data.size() , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
254 }
255 
256 void G::Publisher::publish( const std::vector<std::pair<const char *,size_t> > & data , const char * type )
257 {
258  m_data_p.resize( data.size() ) ;
259  m_data_n.resize( data.size() ) ;
260  size_t data_total = 0U ;
261  for( size_t i = 0U ; i < data.size() ; i++ )
262  {
263  m_data_p[i] = data[i].first ;
264  m_data_n[i] = data[i].second ;
265  data_total += data[i].second ;
266  }
267  if( data_total != 0U )
268  PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_total , data.size() , &m_data_p[0] , &m_data_n[0] , type ) ;
269 }
270 
271 void G::Publisher::publish( const std::vector<std::vector<char> > & data , const char * type )
272 {
273  m_data_p.resize( data.size() ) ;
274  m_data_n.resize( data.size() ) ;
275  size_t data_total = 0U ;
276  size_t i_out = 0U ;
277  for( size_t i_in = 0U ; i_in < data.size() ; i_in++ )
278  {
279  if( data[i_in].empty() ) continue ;
280  m_data_p[i_out] = &(data[i_in])[0] ;
281  m_data_n[i_out++] = data[i_in].size() ;
282  data_total += data[i_in].size() ;
283  }
284  if( data_total != 0U )
285  PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_total , i_out , &m_data_p[0] , &m_data_n[0] , type ) ;
286 }
287 
288 std::vector<std::string> G::Publisher::list( std::vector<std::string> * others )
289 {
290  return PublisherImp::list( others ) ;
291 }
292 
293 G::Item G::Publisher::info( const std::string & channel_name , bool all_slots )
294 {
295  return PublisherImp::info( channel_name , true , all_slots ) ;
296 }
297 
298 void G::Publisher::purge( const std::string & channel_name )
299 {
300  PublisherImp::purge( channel_name ) ;
301 }
302 
303 std::string G::Publisher::delete_( const std::string & channel_name )
304 {
305  return PublisherImp::delete_( channel_name ) ;
306 }
307 
308 // ==
309 
310 G::PublisherChannel::PublisherChannel( const std::string & name , const std::string & path_prefix ) :
311  m_name(PublisherImp::checkName(name)) ,
312  m_shmem_control(name,SharedMemory::Control()) ,
313  m_path_prefix(path_prefix)
314 {
315  if( m_path_prefix.empty() )
316  m_path_prefix = std::string("/tmp/") + SharedMemory::prefix() + name ; // "/tmp/xx-name"
317 
318  PublisherImp::check( m_shmem_control , name ) ;
319  Cleanup::add( PublisherImp::cleanupProcess ,
320  PublisherImp::strdup_ignore_leak(SharedMemory::osName(m_name).c_str()) ) ;
321 }
322 
324 {
325 }
326 
328 {
329  PublisherImp::SocketHolder socket_holder ;
330  PublisherImp::createSocket( socket_holder , m_path_prefix ) ;
331  size_t slot_id = PublisherImp::subscribe( m_shmem_control , socket_holder , m_name ) ;
332  return new PublisherSubscriber( *this , slot_id , socket_holder.release() ) ;
333 }
334 
335 void G::PublisherChannel::releaseSlot( size_t slot_id )
336 {
337  PublisherImp::releaseSlot( m_shmem_control , slot_id ) ;
338 }
339 
340 bool G::PublisherChannel::receive( size_t slot_id , int socket_fd , std::vector<char> & buffer ,
341  std::string * type_p , G::EpochTime * time_p )
342 {
343  return PublisherImp::receive( m_shmem_control , m_shmem_data , m_name , slot_id , socket_fd , buffer , type_p , time_p ) ;
344 }
345 
346 std::string G::PublisherChannel::name() const
347 {
348  return m_name ;
349 }
350 
351 // ==
352 
353 G::PublisherSubscriber::PublisherSubscriber( PublisherChannel & channel , size_t slot_id , int socket_fd ) :
354  m_channel(channel) ,
355  m_slot_id(slot_id) ,
356  m_socket_fd(socket_fd)
357 {
358  G_ASSERT( slot_id < PublisherImp::SLOTS ) ;
359  G_ASSERT( socket_fd >= 0 ) ;
360 }
361 
363 {
364  m_channel.releaseSlot( m_slot_id ) ;
365  ::close( m_socket_fd ) ;
366 }
367 
369 {
370  return m_slot_id ;
371 }
372 
374 {
375  return m_socket_fd ;
376 }
377 
378 bool G::PublisherSubscriber::receive( std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
379 {
380  return m_channel.receive( m_slot_id , m_socket_fd , buffer , type_p , time_p ) ;
381 }
382 
383 bool G::PublisherSubscriber::peek( std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
384 {
385  try
386  {
387  bool ok = m_channel.receive( m_slot_id , -1 , buffer , type_p , time_p ) ;
388  return ok && !buffer.empty() ;
389  }
390  catch( SharedMemory::Error & )
391  {
392  // eg. nothing ever published, so no data segment
393  return false ;
394  }
395 }
396 
397 // ==
398 
399 G::PublisherSubscription::PublisherSubscription( const std::string & channel_name , bool lazy ) :
400  m_channel_name(channel_name) ,
401  m_lazy(lazy) ,
402  m_closed(false)
403 {
404  m_channel_name = Path(channel_name).basename() ;
405  m_path_prefix = Path(channel_name).dirname().str() ;
406 
407  // reject eg. "/tmp/channel" -- must be eg. "/tmp/xx-name/channel"
408  if( !m_path_prefix.empty() &&
409  ( Path(m_path_prefix).dirname() == Path() || Path(m_path_prefix).dirname().dirname() == Path() ) )
410  throw PublisherError( "not enough directory parts in channel path prefix: [" + m_path_prefix + "]" ) ;
411 
412  if( !lazy )
413  {
414  try
415  {
416  m_channel.reset( new PublisherChannel(m_channel_name,m_path_prefix) ) ;
417  m_subscriber.reset( m_channel->subscribe() ) ;
418  m_channel_info = PublisherImp::info(m_channel_name,false,false).str() ;
419  }
420  catch( SharedMemory::Error & e )
421  {
422  throw PublisherError( "cannot subscribe to channel [" + channel_name + "]: " + std::string(e.what()) ) ;
423  }
424  }
425 }
426 
428 {
429  bool opened = false ;
430  if( !m_channel_name.empty() && m_channel.get() == nullptr )
431  {
432  try
433  {
434  m_channel.reset( new PublisherChannel(m_channel_name,m_path_prefix) ) ;
435  m_subscriber.reset( m_channel->subscribe() ) ;
436  m_channel_info = PublisherImp::info(m_channel_name,false,false).str() ;
437  opened = true ;
438  }
439  catch( PublisherError & e )
440  {
441  }
442  catch( SharedMemory::Error & e )
443  {
444  }
445  if( !opened )
446  {
447  G_ASSERT( m_subscriber.get() == nullptr ) ;
448  m_channel.reset() ;
449  }
450  }
451  return opened ;
452 }
453 
455 {
456  return m_subscriber ? m_subscriber->fd() : -1 ;
457 }
458 
459 bool G::PublisherSubscription::receive( std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
460 {
461  return m_subscriber ? m_subscriber->receive(buffer,type_p,time_p) : false ;
462 }
463 
464 bool G::PublisherSubscription::peek( std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
465 {
466  return m_subscriber ? m_subscriber->peek(buffer,type_p,time_p) : false ;
467 }
468 
470 {
471  return m_channel_name ;
472 }
473 
475 {
476  return m_channel_info ;
477 }
478 
479 unsigned int G::PublisherSubscription::age() const
480 {
481  std::vector<char> buffer ;
482  G::EpochTime time( 0 ) ;
484  if( const_cast<PublisherSubscription*>(this)->peek(buffer,nullptr,&time) && now > time )
485  return static_cast<unsigned int>((now-time).s) ;
486  else
487  return 0U ;
488 }
489 
491 {
492  m_closed = true ;
493  m_subscriber.reset() ; // first
494  m_channel.reset() ; // second
495 }
496 
497 // ==
498 
499 void G::PublisherInfo::clear( SignalSafe )
500 {
501  for( size_t i = 0U ; i < PublisherImp::SLOTS ; i++ )
503 }
504 
505 // ==
506 
507 void G::PublisherImp::initialise( SharedMemory & shmem_control , const std::string & channel_name ,
508  const G::Item & info , bool auto_cleanup )
509 {
510  // initialise the shared memory (without locking)
511  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
512  {
513  mem->publisher_pid = ::getpid() ;
514  strset( mem->publisher_info , sizeof(mem->publisher_info) , info.str() ) ;
515  mem->seq = 1 ;
516  for( size_t i = 0U ; i < SLOTS ; i++ )
517  clearSlot( SignalSafe() , mem->slot[i] ) ;
518  {
519  Root claim_root ;
520  new (&mem->mutex) Semaphore ;
521  }
522  mem->magic = MAGIC ; // last
523  }
524 
525  // add signal handlers for cleanup
526  if( auto_cleanup )
527  {
528  Cleanup::add( cleanupControlMemory , strdup_ignore_leak(SharedMemory::osName(channel_name).c_str()) ) ;
529  Cleanup::add( cleanupDataMemory , strdup_ignore_leak(SharedMemory::osName(channel_name+".d").c_str()) ) ;
530  }
531 }
532 
533 std::string G::PublisherImp::checkName( const std::string & name )
534 {
535  if( G::Str::printable(name) != name || name.find_first_of("/\\*?") != std::string::npos || name.find('_') == 0U )
536  throw PublisherError( "invalid channel name: special characters disallowed" , G::Str::printable(name) ) ;
537  return name ;
538 }
539 
540 void G::PublisherImp::check( SharedMemory & shmem_control , const std::string & name )
541 {
542  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
543  if( shmem_control.size() != sizeof(ControlMemory) || mem->magic != MAGIC )
544  {
545  G_DEBUG( "G::PublisherImp::check: not valid as a publisher control segment: [" << name << "] (" << shmem_control.size() << ")" ) ;
546  throw PublisherError( "invalid shared memory segment: [" + SharedMemory::osName(name) + "]" ) ;
547  }
548 }
549 
550 void G::PublisherImp::cleanupControlMemory( SignalSafe signal_safe , const char * name )
551 {
552  SharedMemory::cleanup( signal_safe , name , deactivate ) ;
553 }
554 
555 void G::PublisherImp::cleanupDataMemory( SignalSafe signal_safe , const char * name )
556 {
557  SharedMemory::cleanup( signal_safe , name , nop ) ;
558 }
559 
560 void G::PublisherImp::cleanupProcess( SignalSafe signal_safe , const char * name )
561 {
562  SharedMemory::trap( signal_safe , name , releaseSlots ) ;
563 }
564 
565 void G::PublisherImp::nop( SignalSafe , void * )
566 {
567 }
568 
569 void G::PublisherImp::releaseSlots( SignalSafe signal_safe , void * p )
570 {
571  pid_t pid = ::getpid() ;
572  ControlMemory * mem = static_cast<ControlMemory*>(p) ;
573  {
574  Lock lock( Semaphore::at(&mem->mutex) ) ;
575  for( size_t i = 0U ; i < SLOTS ; i++ )
576  {
577  if( mem->slot[i].in_use && mem->slot[i].pid == pid )
578  {
579  mem->slot[i].in_use = false ;
580  mem->slot[i].pid = 0 ;
581  mem->slot[i].socket_path.s[0] = '\0' ;
582  }
583  }
584  }
585 }
586 
587 void G::PublisherImp::deactivate( SignalSafe signal_safe , void * p )
588 {
589  static PublisherInfo info ;
590  ControlMemory * mem = static_cast<ControlMemory*>(p) ;
591  {
592  Lock lock( Semaphore::at(&mem->mutex) ) ;
593  mem->magic = 0 ; // mark as defunct
594  notifyAll( signal_safe , mem , info ) ;
595  closeAllSlots( signal_safe , mem ) ;
596  }
597 }
598 
599 void G::PublisherImp::closeAllSlots( SignalSafe signal_safe , ControlMemory * mem )
600 {
601  if( mem->publisher_pid == ::getpid() )
602  {
603  for( size_t i = 0U ; i < SLOTS ; i++ )
604  closeSlot( signal_safe , mem->slot[i] ) ;
605  }
606 }
607 
608 void G::PublisherImp::closeSlot( SignalSafe , Slot & slot )
609 {
610  if( slot.socket_fd != -1 )
611  {
612  ::close( slot.socket_fd ) ;
613  slot.socket_fd = -1 ;
614  }
615 }
616 
617 void G::PublisherImp::publish( SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
618  PublisherInfo & info , const std::string & channel_name ,
619  size_t data_total , size_t data_count , const char ** data_p_p , size_t * data_n_p ,
620  const char * type )
621 {
622  // lazy construction of the data segment once we know an appropriate size --
623  // subscribers will not try to access the data segment until we have sent
624  // out the first event
625  //
626  if( shmem_data.get() == nullptr )
627  {
628  const size_t size_limit = morethan( data_total ) ;
629  shmem_data.reset( new SharedMemory(channel_name+".d",size_limit+sizeof(DataMemory)) ) ;
630  DataMemory * dmem = static_cast<DataMemory*>(shmem_data->ptr()) ;
631  dmem->size_limit = size_limit ;
632  dmem->type[0] = '\0' ;
633  dmem->data_size = 0U ;
634  dmem->time_s = 0 ;
635  dmem->time_us = 0U ;
636  }
637 
638  // publish to all subscribers -- copy the payload into the data segment and notify-all
639  //
640  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
641  {
642  G::EpochTime time = G::DateTime::now() ;
643  Lock lock( Semaphore::at(&mem->mutex) ) ;
644  DataMemory * dmem = static_cast<DataMemory*>(shmem_data->ptr()) ;
645  if( data_total > dmem->size_limit )
646  {
647  const size_t new_size_limit = morethan( data_total ) ;
648  shmem_data->remap( new_size_limit + sizeof(DataMemory) , true ) ;
649  dmem = static_cast<DataMemory*>(shmem_data->ptr()) ;
650  dmem->size_limit = new_size_limit ;
651  }
652  mem->seq++ ; if( mem->seq == 0U ) mem->seq = 1UL ;
653  dmem->data_size = data_total ;
654  dmem->time_s = time.s ;
655  dmem->time_us = time.us ;
656  ::memset( dmem->type , 0 , sizeof(dmem->type) ) ;
657  if( type != nullptr ) ::strncpy( dmem->type , type , sizeof(dmem->type)-1U ) ;
658  for( char * out = dmem->data ; data_count ; out += *data_n_p , data_count-- , data_p_p++ , data_n_p++ )
659  ::memcpy( out , *data_p_p , *data_n_p ) ;
660  notifyAll( SignalSafe() , mem , info ) ;
661  }
662 
663  // report errors
664  //
665  for( size_t i = 0U ; i < SLOTS ; i++ )
666  {
667  if( info.slot[i].failed )
668  info.slot[i].report() ;
669  }
670 }
671 
672 void G::PublisherImp::notifyAll( SignalSafe signal_safe , ControlMemory * mem , PublisherInfo & info )
673 {
674  info.clear( signal_safe ) ;
675  for( size_t i = 0U ; i < SLOTS ; i++ )
676  {
677  if( mem->slot[i].in_use && !mem->slot[i].failed )
678  {
679  notifyOne( signal_safe , mem->slot[i] , info.slot[i] ) ;
680  }
681  else if( mem->slot[i].socket_fd != -1 )
682  {
683  // tidy up after subscriber has gone away
684  ::close( mem->slot[i].socket_fd ) ;
685  mem->slot[i].socket_fd = -1 ;
686  clearSlot( signal_safe , mem->slot[i] ) ;
687  }
688  }
689 }
690 
691 void G::PublisherImp::notifyOne( SignalSafe signal_safe , Slot & slot , InfoSlot & info_slot )
692 {
693  // connect to the subscriber if necessary
694  if( slot.socket_fd == -1 && !slot.failed ) // ie. newly in_use
695  {
696  // connect to the new subscriber
697  slot.socket_fd = ::socket( AF_UNIX , SOCK_DGRAM , 0 ) ;
698  slot.socket_path.s[sizeof(slot.socket_path.s)-1U] = '\0' ;
699  LocalSocket::Address address( signal_safe , slot.socket_path.s ) ;
700  Identity identity = Root::start( signal_safe ) ;
701 
702  int rc = ::connect( slot.socket_fd , address.p() , address.size() ) ;
703  if( rc < 0 )
704  {
705  int e = errno ;
706  slot.error[e_connect] = info_slot.error[e_connect] = e ;
707  }
708 
709  rc = ::unlink( slot.socket_path.s ) ;
710  if( rc < 0 )
711  {
712  int e = errno ;
713  slot.error[e_unlink] = info_slot.error[e_unlink] = e ;
714  }
715 
716  Root::stop( signal_safe , identity ) ;
717  }
718 
719  // poke the subscriber - dont use G::Msg::send() in case it is not signal-handler-safe
720  char msg = '.' ;
721  int rc = ::send( slot.socket_fd , &msg , 1 , MSG_NOSIGNAL | MSG_DONTWAIT ) ;
722  if( rc < 0 )
723  {
724  int e = errno ;
725  slot.error[e_send] = info_slot.error[e_send] = e ;
726  }
727 
728  // record fatal errors -- keep the slot in_use because it is the subscriber's
729  // responsibility to release it
730  if( slot.error[e_connect] || G::Msg::fatal(slot.error[e_send]) )
731  {
732  slot.failed = info_slot.failed = true ;
733  info_slot.socket_path = slot.socket_path ;
734  ::close( slot.socket_fd ) ;
735  slot.socket_fd = -1 ;
736  }
737 }
738 
739 void G::PublisherImp::createSocket( SocketHolder & holder , const std::string & path_prefix )
740 {
741  // build the socket path, including the pid
742  std::stringstream ss ;
743  ss << path_prefix << "." << ::getpid() ;
744  holder.path = ss.str() ;
745  if( holder.path.length() >= PublisherImp::socket_path_size )
746  throw PublisherError( "socket path too long" ) ;
747 
748  // create the socket
749  holder.fd = ::socket( AF_UNIX , SOCK_DGRAM , 0 ) ;
750  if( holder.fd < 0 )
751  throw PublisherError( "cannot create socket" ) ;
752 
753  // bind the socket path
754  LocalSocket::Address address( holder.path ) ;
755  int rc = 0 ;
756  {
757  Root claim_root ;
758  rc = ::bind( holder.fd , address.p() , address.size() ) ;
759  }
760  if( rc != 0 )
761  throw PublisherError( "cannot bind socket path" , holder.path ) ;
762 
763  G_DEBUG( "G::PublisherImp::createSocket: new socket [" << holder.path << "]" ) ;
764 }
765 
766 size_t G::PublisherImp::subscribe( SharedMemory & shmem_control , SocketHolder & socket_holder , const std::string & name )
767 {
768  // find a free slot and grab it
769  //
770  size_t slot_id = SLOTS ;
771  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
772  {
773  Lock lock( Semaphore::at(&mem->mutex) ) ;
774  slot_id = findFreeSlot( *mem ) ;
775  if( slot_id < SLOTS )
776  claimSlot( mem->slot[slot_id] , socket_holder.path , ::getpid() ) ;
777  }
778  if( slot_id == SLOTS )
779  throw PublisherError( "no free slots in channel [" + name + "]" ) ; // probably need to kill failed subscribers
780 
781  return slot_id ;
782 }
783 
784 void G::PublisherImp::releaseSlot( SharedMemory & shmem_control , size_t slot_id )
785 {
786  G_ASSERT( slot_id < SLOTS ) ;
787  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
788  {
789  Lock lock( Semaphore::at(&mem->mutex) ) ;
790  releaseSlot( SignalSafe() , lock , mem->slot[slot_id] ) ;
791  }
792 }
793 
794 void G::PublisherImp::releaseSlot( SignalSafe signal_safe , Lock & , Slot & slot )
795 {
796  slot.in_use = false ;
797  // leave the rest of the slot alone so that the publisher can clean up too
798 }
799 
800 size_t G::PublisherImp::flush( int socket_fd )
801 {
802  char c ;
803  size_t count = 0U ;
804  while( 1 == ::recv(socket_fd,&c,1,MSG_DONTWAIT) )
805  count++ ;
806  if( count > 0U )
807  G_DEBUG( "G::PublisherImp::flush: event backlog: " << count ) ;
808  return count ;
809 }
810 
811 bool G::PublisherImp::getch( int socket_fd )
812 {
813  char c ;
814  return 1 == ::recv( socket_fd , &c , 1 , 0 ) ;
815 }
816 
817 bool G::PublisherImp::receive( SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
818  const std::string & channel_name , size_t slot_id , int socket_fd , std::vector<char> & buffer ,
819  std::string * type_p , G::EpochTime * time_p )
820 {
821  bool peek = socket_fd == -1 ;
822  if( time_p != nullptr )
823  time_p->s = 0 , time_p->us = 0U ;
824 
825  // check the publisher is there
826  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
827  if( mem->magic != MAGIC ) // no-mutex read of constant data
828  return false ;
829 
830  // possibly-blocking wait for socket
831  if( !peek )
832  {
833  size_t flushed = flush( socket_fd ) ;
834  if( flushed == 0U && !getch(socket_fd) )
835  {
836  buffer.clear() ;
837  if( type_p != nullptr ) type_p->clear() ;
838  throw PublisherError( "socket receive error" ) ;
839  }
840  G_DEBUG( "G::PublisherImp::receive: got publication event" ) ;
841  }
842 
843  // lazy attach to the data segment -- this can fail if peeking because
844  // the data segment will not exist if nothing has been published
845  if( shmem_data.get() == nullptr )
846  {
847  shmem_data.reset( new SharedMemory(channel_name+".d") ) ;
848  G_DEBUG( "G::PublisherImp::receive: data segment size: " << shmem_data->size() ) ;
849  }
850 
851  // check the publisher again
852  if( mem->magic != MAGIC )
853  return false ;
854 
855  // copy the data payload into the caller's buffer
856  unsigned long seq = 0UL ;
857  {
858  if( type_p != nullptr )
859  type_p->resize( sizeof(((DataMemory*)(nullptr))->type)+1U ) ;
860 
861  // lock the shared memory -- could do better wrt. serialising readers -- use two
862  // mutexes and a reader-count for a 'readers-writer' lock
863  Lock lock( Semaphore::at(&mem->mutex) ) ;
864  G_ASSERT( shmem_data.get() != nullptr ) ;
865 
866  // if the publisher has grown the data segment then we follow suit
867  DataMemory * dmem = static_cast<DataMemory*>(shmem_data->ptr()) ;
868  G_ASSERT( dmem != nullptr ) ;
869  if( (dmem->size_limit+sizeof(DataMemory)) > shmem_data->size() )
870  shmem_data->remap( dmem->size_limit+sizeof(DataMemory) , true ) ;
871  dmem = static_cast<DataMemory*>(shmem_data->ptr()) ;
872 
873  // check that we have not seen the current data already
874  Slot & slot = mem->slot[slot_id] ;
875  seq = slot.seq ;
876  bool ok = slot.in_use && ( peek || slot.seq == 0UL || slot.seq != mem->seq ) ;
877  if( ok )
878  {
879  // update the sequence number
880  if( !peek )
881  slot.seq = mem->seq ;
882 
883  // copy out the payload
884  buffer.resize( dmem->data_size ) ;
885  ::memcpy( &buffer[0] , dmem->data , buffer.size() ) ;
886 
887  // copy out the type
888  if( type_p != nullptr )
889  strget( dmem->type , sizeof(dmem->type) , *type_p ) ;
890 
891  // copy out the publication time
892  if( time_p != nullptr )
893  time_p->s = dmem->time_s , time_p->us = dmem->time_us ;
894  }
895  else
896  {
897  buffer.clear() ;
898  if( type_p != nullptr ) type_p->clear() ;
899  }
900  }
901  G_DEBUG( "G::PublisherImp::receive: got message [" << seq << "]" ) ;
902 
903  return true ;
904 }
905 
906 size_t G::PublisherImp::findFreeSlot( ControlMemory & mem )
907 {
908  for( size_t i = 0U ; i < SLOTS ; i++ )
909  {
910  if( !mem.slot[i].in_use && mem.slot[i].socket_fd == -1 )
911  return i ;
912  }
913  return SLOTS ;
914 }
915 
916 void G::PublisherImp::claimSlot( Slot & slot , const std::string & socket_path , pid_t pid )
917 {
918  clearSlot( SignalSafe() , slot ) ;
919  slot.in_use = true ;
920  slot.pid = pid ;
921  ::strncpy( slot.socket_path.s , socket_path.c_str() , sizeof(slot.socket_path.s)-1U ) ;
922 }
923 
924 void G::PublisherImp::clearSlot( SignalSafe , Slot & slot )
925 {
926  slot.in_use = false ;
927  slot.failed = false ;
928  slot.pid = 0 ;
929  slot.seq = 0UL ;
930  slot.socket_fd = -1 ;
931  ::memset( slot.error , 0 , sizeof(slot.error) ) ;
932  ::memset( slot.socket_path.s , 0 , sizeof(slot.socket_path.s) ) ;
933 }
934 
935 G::PublisherImp::Snapshot G::PublisherImp::snapshot( const std::string & channel_name )
936 {
937  Snapshot snapshot ;
938  snapshot.has_data = false ;
939 
940  SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
941  check( shmem_control , channel_name ) ;
942 
943  try
944  {
945  SharedMemory shmem_data( channel_name+".d" ) ;
946  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
947  {
948  Lock lock( Semaphore::at(&mem->mutex) ) ;
949  DataMemory * dmem = static_cast<DataMemory*>(shmem_data.ptr()) ;
950  snapshot.control = *mem ;
951  snapshot.has_data = true ;
952  snapshot.data = *dmem ;
953  }
954  }
955  catch( G::SharedMemory::Error & )
956  {
957  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
958  {
959  Lock lock( Semaphore::at(&mem->mutex) ) ;
960  snapshot.control = *mem ;
961  snapshot.has_data = false ;
962  }
963  }
964 
965  return snapshot ;
966 }
967 
968 std::string G::PublisherImp::check( const std::string & channel_name )
969 {
970  std::string reason ;
971  try
972  {
973  SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
974  check( shmem_control , channel_name ) ;
975  }
976  catch( G::SharedMemory::Error & e )
977  {
978  reason = e.what() ;
979  }
980  catch( G::PublisherError & e )
981  {
982  reason = e.what() ;
983  }
984  return reason ;
985 }
986 
987 std::vector<std::string> G::PublisherImp::list( std::vector<std::string> * others )
988 {
989  typedef std::vector<std::string> List ;
990  List list = G::SharedMemory::list() ;
991  for( List::iterator p = list.begin() ; p != list.end() ; )
992  {
993  std::string name = *p ;
994  std::string reason = check( name ) ;
995  G_DEBUG( "G::PublisherImp::list: name=[" << name << "] reason=[" << reason << "]" ) ;
996  if( reason.empty() )
997  {
998  ++p ;
999  }
1000  else
1001  {
1002  p = list.erase( p ) ;
1003 
1004  // heuristics -- only report if permission denied -- could do better
1005  const bool report =
1006  name.find('.') == std::string::npos &&
1007  reason.find("permission denied") != std::string::npos ;
1008 
1009  if( others != nullptr && report )
1010  others->push_back( name ) ;
1011  }
1012  }
1013  return list ;
1014 }
1015 
1016 G::Item G::PublisherImp::info( const std::string & channel_name , bool detail , bool all_slots )
1017 {
1018  Snapshot s = snapshot( channel_name ) ;
1019 
1020  Item publisher = Item::map() ;
1021  std::string publisher_info_str = strget( s.control.publisher_info , sizeof(s.control.publisher_info) ) ;
1022  if( !G::Item::parse( publisher_info_str , publisher ) )
1023  publisher = Item::map() ;
1024  publisher.add( "pid" , Str::fromInt(s.control.publisher_pid) ) ;
1025 
1026  if( detail )
1027  {
1028  Item item = Item::map() ;
1029  item.add( "publisher" , publisher ) ;
1030 
1031  item.add( "seq" , Str::fromULong(s.control.seq) ) ;
1032  item.add( "data" , Item::map() ) ;
1033  if( s.has_data )
1034  {
1035  item["data"].add( "type" , Str::printable(strget(s.data.type,sizeof(s.data.type))) ) ;
1036  item["data"].add( "size" , Str::fromULong(s.data.data_size) ) ;
1037  item["data"].add( "limit" , Str::fromULong(s.data.size_limit) ) ;
1038  item["data"].add( "time" , Str::fromULong(s.data.time_s) ) ; // long vs. time_t
1039  }
1040  item.add( "slots" , Item::list() ) ;
1041  for( size_t i = 0U ; i < SLOTS ; i++ )
1042  {
1043  const Slot & slot = s.control.slot[i] ;
1044  if( all_slots || ( slot.in_use && !slot.failed ) )
1045  {
1046  int error = slot.error[0] ? slot.error[0] : ( slot.error[1] ? slot.error[1] : slot.error[2] ) ;
1047  std::string strerror = error ? G::Process::strerror(error) : std::string() ;
1048  Item slot_info = Item::map() ;
1049  slot_info.add( "index" , Str::fromUInt(i) ) ;
1050  slot_info.add( "in_use" , slot.in_use ? "1" : "0" ) ;
1051  slot_info.add( "failed" , slot.failed ? "1" : "0" ) ;
1052  slot_info.add( "pid" , Str::fromUInt(slot.pid) ) ;
1053  slot_info.add( "seq" , Str::fromUInt(slot.seq) ) ;
1054  slot_info.add( "socket_fd" , Str::fromInt(slot.socket_fd) ) ;
1055  slot_info.add( "error" , strerror ) ;
1056  slot_info.add( "socket" , Str::printable(slot.socket_path.s) ) ;
1057  item["slots"].add( slot_info ) ;
1058  }
1059  }
1060  return item ;
1061  }
1062  else
1063  {
1064  return publisher ;
1065  }
1066 }
1067 
1068 void G::PublisherImp::purge( const std::string & channel_name )
1069 {
1070  SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
1071  ControlMemory * mem = static_cast<ControlMemory*>(shmem_control.ptr()) ;
1072  {
1073  Lock lock( Semaphore::at(&mem->mutex) ) ;
1074  for( size_t i = 0U ; i < SLOTS ; i++ )
1075  {
1076  Slot & slot = mem->slot[i] ;
1077  if( slot.in_use && slot.failed && slot.seq < mem->seq && (mem->seq-slot.seq) > 10 )
1078  {
1079  clearSlot( SignalSafe() , slot ) ;
1080  }
1081  }
1082  }
1083 }
1084 
1085 std::string G::PublisherImp::delete_( const std::string & channel_name )
1086 {
1087  std::string e1 = SharedMemory::delete_( channel_name + ".d" , false ) ;
1088  std::string e2 = SharedMemory::delete_( channel_name , true ) ;
1089  const char * sep = !e1.empty() && !e2.empty() ? ": " : "" ;
1090  return e1 == e2 ? e1 : ( e1 + sep + e2 ) ;
1091 }
1092 
1093 char * G::PublisherImp::strdup_ignore_leak( const char * p )
1094 {
1095  return ::strdup( p ) ;
1096 }
1097 
1098 void G::PublisherImp::strset( char * p , size_t n , const std::string & s )
1099 {
1100  if( p == nullptr || n == 0U ) return ;
1101  size_t x = std::min( s.size() , n-1U ) ;
1102  if( x ) ::memcpy( p , s.data() , x ) ;
1103  p[x] = '\0' ;
1104 }
1105 
1106 void G::PublisherImp::strget( const char * p , size_t n , std::string & s )
1107 {
1108  s.assign( p , n ) ;
1109  if( s.find('\0') != std::string::npos )
1110  s.resize( s.find('\0') ) ;
1111 }
1112 
1113 std::string G::PublisherImp::strget( const char * p , size_t n )
1114 {
1115  std::string result ;
1116  strget( p , n , result ) ;
1117  return result ;
1118 }
1119 
1120 size_t G::PublisherImp::morethan( size_t n )
1121 {
1122  return n + n/2U + 10U ;
1123 }
1124 
1125 // ==
1126 
1127 G::PublisherImp::Lock::Lock( Semaphore * s ) :
1128  m_s(s)
1129 {
1130  s->decrement() ;
1131 }
1132 
1133 G::PublisherImp::Lock::~Lock()
1134 {
1135  m_s->increment() ;
1136 }
1137 
1138 // ==
1139 
1140 G::PublisherImp::SocketHolder::SocketHolder( int fd_ ) :
1141  fd(fd_)
1142 {
1143 }
1144 
1145 int G::PublisherImp::SocketHolder::release()
1146 {
1147  int fd_ = fd ;
1148  fd = -1 ;
1149  return fd_ ;
1150 }
1151 
1152 G::PublisherImp::SocketHolder::~SocketHolder()
1153 {
1154  if( fd != -1 )
1155  ::close(fd) ;
1156 }
1157 
1158 // ==
1159 
1160 G::PublisherImp::InfoSlot::InfoSlot()
1161 {
1162  failed = false ;
1163  for( size_t i = 0 ; i < ERRORS ; i++ )
1164  error[i] = 0 ;
1165  socket_path.s[0] = '\0' ;
1166 }
1167 
1168 std::string G::PublisherImp::InfoSlot::path() const
1169 {
1170  std::string path( socket_path.s , sizeof(socket_path.s) ) ;
1171  return Str::printable( path.c_str() ) ;
1172 }
1173 
1174 int G::PublisherImp::InfoSlot::e() const
1175 {
1176  for( size_t i = 0 ; i < ERRORS ; i++ )
1177  {
1178  if( error[i] )
1179  return error[i] ;
1180  }
1181  return 0 ;
1182 }
1183 
1184 void G::PublisherImp::InfoSlot::report()
1185 {
1186  std::ostringstream ss ;
1187  ss
1188  << "cannot publish to subscriber: socket path [" << path() << "] "
1189  << "error [" << G::Process::strerror(e()) << "]" ;
1190 
1191  G_ERROR( "G::Publisher::publish: " << ss.str() ) ;
1192 }
1193 
PublisherSubscriber * subscribe()
Subscribes to the publisher returning a new()ed object.
Definition: gpublisher.cpp:327
void * ptr() const
Returns the mapped address.
std::string str() const
Returns the path string.
Definition: gpath.cpp:290
PublisherSubscriber(PublisherChannel &, size_t slot_id, int socket_fd)
Pseudo-private constructor, used by G::PublisherChannel.
Definition: gpublisher.cpp:353
A shared-memory structure used by G::Publisher.
Definition: gpublisher.cpp:110
A subsecond-resolution timestamp based on a time_t.
Definition: gdatetime.h:39
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
Definition: gpublisher.cpp:459
static std::string printable(const std::string &in, char escape= '\\')
Returns a printable represention of the given input string.
Definition: gstr.cpp:663
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
Definition: gpublisher.cpp:383
A private implementation class used by G::PublisherImp.
Definition: gpublisher.cpp:174
A character array for storing filesystem paths in shared memory.
Definition: gpublisher.cpp:61
A publication-channel subscriber endpoint.
Definition: gpublisher.h:191
bool open()
Tries to initialise the object after lazy construction or after close().
Definition: gpublisher.cpp:427
static std::string delete_(const std::string &channel_name)
Deletes the channel.
Definition: gpublisher.cpp:303
An empty structure that is used to indicate a signal-safe, reentrant implementation.
Definition: gsignalsafe.h:36
Publisher(const std::string &channel_name, bool auto_cleanup=true)
Constructor for a publisher.
Definition: gpublisher.cpp:211
static Semaphore * at(storage_type *)
Syntactic sugar to return an object pointer corresponding to the given storage pointer.
static std::string prefix()
Returns the prefix, as set by by a call to the other overload.
std::string basename() const
Returns the rightmost part of the path, ignoring "." parts.
Definition: gpath.cpp:310
std::string name() const
Returns the channel name, as passed in to the constructor.
Definition: gpublisher.cpp:469
static Item map()
Factory function for a map item.
Definition: gitem.cpp:33
static std::vector< std::string > list(std::vector< std::string > *others=nullptr)
Returns a list of channel names.
Definition: gpublisher.cpp:288
PublisherChannel(const std::string &channel_name, const std::string &socket_path_prefix=std::string())
Constructor.
Definition: gpublisher.cpp:310
A private implementation class used by G::PublisherImp.
Definition: gpublisher.cpp:161
static std::string delete_(const std::string &name, bool control_segment)
Unlinks the segment from the filesystem.
std::string name() const
Returns the channel name, as passed in to the constructor.
Definition: gpublisher.cpp:346
static Identity start(SignalSafe)
A signal-safe alternative to construction.
Definition: groot.cpp:80
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
Definition: gpublisher.cpp:464
static std::vector< std::string > list(bool os_names=false)
Returns a list of possible control segment names.
A shared-memory structure used by G::Publisher.
Definition: gpublisher.cpp:137
static std::string fromInt(int i)
Converts int 'i' to a string.
Definition: gstr.cpp:294
int fd() const
Returns the named socket file descriptor.
Definition: gpublisher.cpp:373
static void stop(SignalSafe, Identity)
A signal-safe alternative to destruction.
Definition: groot.cpp:86
A shared-memory structure used by G::Publisher.
Definition: gpublisher.cpp:124
static EpochTime now()
Returns the current epoch time.
int fd() const
Returns the subscriber's file descriptor or minus one.
Definition: gpublisher.cpp:454
A RAII semaphore lock class used by G::PublisherImp.
Definition: gpublisher.cpp:150
size_t id() const
Returns the slot id.
Definition: gpublisher.cpp:368
A semaphore class with a posix or sysv implementation chosen at build-time.
Definition: gsemaphore.h:47
A named publisher channel that can be subscribed to.
Definition: gpublisher.h:139
std::string str(int indent=-1) const
Returns a string representation, using a json-like format.
Definition: gitem.cpp:67
PublisherSubscription(const std::string &channel_name, bool lazy=false)
Constructor.
Definition: gpublisher.cpp:399
~PublisherChannel()
Destructor.
Definition: gpublisher.cpp:323
static std::string fromUInt(unsigned int ui)
Converts unsigned int 'ui' to a string.
Definition: gstr.cpp:315
void releaseSlot(size_t slot_id)
Used by PublisherSubscriber.
Definition: gpublisher.cpp:335
static Item parse(const std::string &)
Parses the string representation. Throws on error.
Definition: gitem.cpp:43
static Item list()
Factory function for a list item.
Definition: gitem.cpp:28
bool remap(size_t, bool may_move, bool no_throw=false)
Resizes and remaps the shared memory.
Slot0 slot(T &object, void(T::*fn)())
A slot factory function overloaded for a zero-parameter callback.
Definition: gslot.h:209
static void trap(SignalSafe, const char *os_name, void(*fn)(SignalSafe, void *))
An exit handler that calls the user's function on the shared memory contents.
A POSIX shared memory class.
Definition: gsharedmemory.h:41
static void purge(const std::string &channel_name)
Tries to clean up any failed slots in the named channel.
Definition: gpublisher.cpp:298
A structure used by G::Publisher, having a sub-set of G::PublisherImp::Slot members, used for diagnostics and error reporting.
Definition: gpublisher.cpp:187
void publish(const char *p, size_t n, const char *type=nullptr)
Publishes a chunk of data to subscribers.
Definition: gpublisher.cpp:236
Path dirname() const
Returns the path without the rightmost part, ignoring "." parts.
Definition: gpath.cpp:318
A variant class holding a string, an item map keyed by name, or an ordered list of items...
Definition: gitem.h:41
~Publisher()
Destructor.
Definition: gpublisher.cpp:231
void add(const std::string &s)
Adds a string item to this list item.
Definition: gitem.cpp:108
static Item info(const std::string &channel_name, bool all_slots=true)
Returns a variant containing information about the state of the channel.
Definition: gpublisher.cpp:293
static void cleanup(SignalSafe, const char *os_name, void(*fn)(SignalSafe, void *))
An exit handler that unlinks the named shared memory segment and calls the user's function for the me...
semaphore storage
Definition: gsemaphore.h:51
bool receive(size_t, int, std::vector< char > &, std::string *=nullptr, G::EpochTime *=nullptr)
Used by PublisherSubscriber.
Definition: gpublisher.cpp:340
bool has(const std::string &k) const
Returns true if this map item has the named key.
Definition: gitem.cpp:146
static std::string osName(const std::string &name)
Converts the segment name to an internal form that can be passed to cleanup() or trap().
static std::string fromULong(unsigned long ul)
Converts unsigned long 'ul' to a string.
Definition: gstr.cpp:322
A structure that holds diagnostic information for each G::Publisher slot.
Definition: gpublisher.cpp:202
static std::string v0()
Returns a copy of argv[0] from the first call to the constructor that takes argc/argv.
Definition: garg.cpp:88
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
Definition: gpublisher.cpp:378
unsigned int age() const
Returns the age of the latest data in seconds, or zero.
Definition: gpublisher.cpp:479
A Path object represents a file system path.
Definition: gpath.h:72
static void add(void(*fn)(SignalSafe, const char *), const char *arg)
Adds the given handler to the list of handlers that are to be called when the process terminates abno...
std::string info() const
Returns some channel metadata as a short json string (see G::Item::str()).
Definition: gpublisher.cpp:474
~PublisherSubscriber()
Destructor.
Definition: gpublisher.cpp:362
void close()
Closes the channel subscription, releasing resources and becoming inactive.
Definition: gpublisher.cpp:490
static bool fatal(int error)
Returns true if the error value indicates a permanent problem with the socket.
Definition: gmsg.cpp:119
size_t size() const
Returns the segment size. This is affected by remap().
Type type() const
Returns the item type (string, list or map).
Definition: gitem.cpp:85