38 #include <sys/socket.h>
47 namespace PublisherImp
50 struct ControlMemory ;
58 enum { MAGIC = 0xa5a5 } ;
59 enum { e_connect , e_unlink , e_send } ;
60 enum { socket_path_size = 200 } ;
63 char s[socket_path_size] ;
67 static void check(
SharedMemory & ,
const std::string & ) ;
68 static std::string check(
const std::string & ) ;
69 static std::string checkName(
const std::string & ) ;
70 static void deactivate(
SignalSafe ,
void * ) ;
72 static void releaseSlots(
SignalSafe signal_safe ,
void * ) ;
78 static void cleanupControlMemory(
SignalSafe ,
const char * ) ;
79 static void cleanupDataMemory(
SignalSafe ,
const char * ) ;
80 static void cleanupProcess(
SignalSafe signal_safe ,
const char * ) ;
81 static Snapshot snapshot(
const std::string & ) ;
82 static void createSocket(
SocketHolder & holder ,
const std::string & path_prefix ) ;
84 static void claimSlot(
Slot & ,
const std::string & , pid_t ) ;
86 size_t data_total ,
size_t data_count ,
const char ** data_p_p ,
size_t * data_n_p ,
const char * type ) ;
88 static void releaseSlot(
SharedMemory & shmem_control ,
size_t slot_id ) ;
90 static size_t flush(
int socket_fd ) ;
91 static bool getch(
int socket_fd ) ;
92 static bool receive(
SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
93 const std::string & ,
size_t slot_id ,
int socket_fd , std::vector<char> & buffer ,
95 static std::vector<std::string> list( std::vector<std::string> * ) ;
96 static G::Item info(
const std::string & channel_name ,
bool detail ,
bool all_slots ) ;
97 static void purge(
const std::string & channel_name ) ;
98 static std::string delete_(
const std::string & channel_name ) ;
99 static char * strdup_ignore_leak(
const char * ) ;
100 static void strset(
char * p ,
size_t n ,
const std::string & s_in ) ;
101 static void strget(
const char * p_in ,
size_t n_in , std::string & s_out ) ;
102 static std::string strget(
const char * p_in ,
size_t n_in ) ;
103 static size_t morethan(
size_t ) ;
128 pid_t publisher_pid ;
129 char publisher_info[2048] ;
191 std::string path()
const ;
212 m_name(PublisherImp::checkName(name)) ,
213 m_shmem_control(name,sizeof(PublisherImp::ControlMemory),
SharedMemory::Control()) ,
218 PublisherImp::initialise( m_shmem_control , name , info , auto_cleanup ) ;
222 m_name(PublisherImp::checkName(name)) ,
223 m_shmem_control(name,sizeof(PublisherImp::ControlMemory),
SharedMemory::Control()) ,
226 if( info.
type() == G::Item::t_map && !info.
has(
"type") )
228 PublisherImp::initialise( m_shmem_control , name , info , auto_cleanup ) ;
233 PublisherImp::deactivate(
SignalSafe() , m_shmem_control.ptr() ) ;
238 if( data_p ==
nullptr || data_n == 0U ) return ;
239 m_data_p.resize( 1U ) ;
240 m_data_n.resize( 1U ) ;
241 m_data_p[0] = data_p ;
242 m_data_n[0] = data_n ;
243 PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_n , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
248 if( data.empty() )
return ;
249 m_data_p.resize( 1U ) ;
250 m_data_n.resize( 1U ) ;
251 m_data_p[0] = &data[0] ;
252 m_data_n[0] = data.size() ;
253 PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data.size() , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
258 m_data_p.resize( data.size() ) ;
259 m_data_n.resize( data.size() ) ;
260 size_t data_total = 0U ;
261 for(
size_t i = 0U ; i < data.size() ; i++ )
263 m_data_p[i] = data[i].first ;
264 m_data_n[i] = data[i].second ;
265 data_total += data[i].second ;
267 if( data_total != 0U )
268 PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_total , data.size() , &m_data_p[0] , &m_data_n[0] , type ) ;
273 m_data_p.resize( data.size() ) ;
274 m_data_n.resize( data.size() ) ;
275 size_t data_total = 0U ;
277 for(
size_t i_in = 0U ; i_in < data.size() ; i_in++ )
279 if( data[i_in].empty() )
continue ;
280 m_data_p[i_out] = &(data[i_in])[0] ;
281 m_data_n[i_out++] = data[i_in].size() ;
282 data_total += data[i_in].size() ;
284 if( data_total != 0U )
285 PublisherImp::publish( m_shmem_control , m_shmem_data , *m_info.get() , m_name , data_total , i_out , &m_data_p[0] , &m_data_n[0] , type ) ;
290 return PublisherImp::list( others ) ;
295 return PublisherImp::info( channel_name ,
true , all_slots ) ;
300 PublisherImp::purge( channel_name ) ;
305 return PublisherImp::delete_( channel_name ) ;
311 m_name(PublisherImp::checkName(name)) ,
313 m_path_prefix(path_prefix)
315 if( m_path_prefix.empty() )
318 PublisherImp::check( m_shmem_control , name ) ;
330 PublisherImp::createSocket( socket_holder , m_path_prefix ) ;
331 size_t slot_id = PublisherImp::subscribe( m_shmem_control , socket_holder , m_name ) ;
337 PublisherImp::releaseSlot( m_shmem_control , slot_id ) ;
343 return PublisherImp::receive( m_shmem_control , m_shmem_data , m_name , slot_id , socket_fd , buffer , type_p , time_p ) ;
356 m_socket_fd(socket_fd)
358 G_ASSERT( slot_id < PublisherImp::SLOTS ) ;
359 G_ASSERT( socket_fd >= 0 ) ;
364 m_channel.releaseSlot( m_slot_id ) ;
365 ::close( m_socket_fd ) ;
380 return m_channel.receive( m_slot_id , m_socket_fd , buffer , type_p , time_p ) ;
387 bool ok = m_channel.receive( m_slot_id , -1 , buffer , type_p , time_p ) ;
388 return ok && !buffer.empty() ;
390 catch( SharedMemory::Error & )
400 m_channel_name(channel_name) ,
408 if( !m_path_prefix.empty() &&
410 throw PublisherError(
"not enough directory parts in channel path prefix: [" + m_path_prefix +
"]" ) ;
417 m_subscriber.reset( m_channel->subscribe() ) ;
418 m_channel_info = PublisherImp::info(m_channel_name,
false,
false).str() ;
420 catch( SharedMemory::Error & e )
422 throw PublisherError(
"cannot subscribe to channel [" + channel_name +
"]: " + std::string(e.what()) ) ;
429 bool opened = false ;
430 if( !m_channel_name.empty() && m_channel.get() == nullptr )
435 m_subscriber.reset( m_channel->subscribe() ) ;
436 m_channel_info = PublisherImp::info(m_channel_name,
false,
false).str() ;
439 catch( PublisherError & e )
442 catch( SharedMemory::Error & e )
447 G_ASSERT( m_subscriber.get() == nullptr ) ;
456 return m_subscriber ? m_subscriber->fd() : -1 ;
461 return m_subscriber ? m_subscriber->receive(buffer,type_p,time_p) : false ;
466 return m_subscriber ? m_subscriber->peek(buffer,type_p,time_p) : false ;
471 return m_channel_name ;
476 return m_channel_info ;
481 std::vector<char> buffer ;
484 if( const_cast<PublisherSubscription*>(
this)->peek(buffer,
nullptr,&time) && now > time )
485 return static_cast<unsigned int>((now-time).s) ;
493 m_subscriber.reset() ;
501 for(
size_t i = 0U ; i < PublisherImp::SLOTS ; i++ )
507 void G::PublisherImp::initialise( SharedMemory & shmem_control ,
const std::string & channel_name ,
508 const G::Item & info ,
bool auto_cleanup )
511 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
513 mem->publisher_pid = ::getpid() ;
514 strset( mem->publisher_info ,
sizeof(mem->publisher_info) , info.
str() ) ;
516 for(
size_t i = 0U ; i < SLOTS ; i++ )
517 clearSlot( SignalSafe() , mem->slot[i] ) ;
520 new (&mem->mutex) Semaphore ;
533 std::string G::PublisherImp::checkName(
const std::string & name )
535 if(
G::Str::printable(name) != name || name.find_first_of(
"/\\*?") != std::string::npos || name.find(
'_') == 0U )
536 throw PublisherError(
"invalid channel name: special characters disallowed" ,
G::Str::printable(name) ) ;
540 void G::PublisherImp::check( SharedMemory & shmem_control ,
const std::string & name )
542 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
543 if( shmem_control.size() !=
sizeof(ControlMemory) || mem->magic != MAGIC )
545 G_DEBUG(
"G::PublisherImp::check: not valid as a publisher control segment: [" << name <<
"] (" << shmem_control.size() <<
")" ) ;
550 void G::PublisherImp::cleanupControlMemory( SignalSafe signal_safe ,
const char * name )
555 void G::PublisherImp::cleanupDataMemory( SignalSafe signal_safe ,
const char * name )
560 void G::PublisherImp::cleanupProcess( SignalSafe signal_safe ,
const char * name )
565 void G::PublisherImp::nop( SignalSafe ,
void * )
569 void G::PublisherImp::releaseSlots( SignalSafe signal_safe ,
void * p )
571 pid_t pid = ::getpid() ;
572 ControlMemory * mem =
static_cast<ControlMemory*
>(p) ;
575 for(
size_t i = 0U ; i < SLOTS ; i++ )
577 if( mem->slot[i].in_use && mem->slot[i].pid == pid )
579 mem->slot[i].in_use = false ;
580 mem->slot[i].pid = 0 ;
581 mem->slot[i].socket_path.s[0] =
'\0' ;
587 void G::PublisherImp::deactivate( SignalSafe signal_safe ,
void * p )
589 static PublisherInfo info ;
590 ControlMemory * mem =
static_cast<ControlMemory*
>(p) ;
594 notifyAll( signal_safe , mem , info ) ;
595 closeAllSlots( signal_safe , mem ) ;
599 void G::PublisherImp::closeAllSlots( SignalSafe signal_safe , ControlMemory * mem )
601 if( mem->publisher_pid == ::getpid() )
603 for(
size_t i = 0U ; i < SLOTS ; i++ )
604 closeSlot( signal_safe , mem->slot[i] ) ;
608 void G::PublisherImp::closeSlot( SignalSafe , Slot &
slot )
610 if( slot.socket_fd != -1 )
612 ::close( slot.socket_fd ) ;
613 slot.socket_fd = -1 ;
617 void G::PublisherImp::publish( SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
618 PublisherInfo & info ,
const std::string & channel_name ,
619 size_t data_total ,
size_t data_count ,
const char ** data_p_p ,
size_t * data_n_p ,
626 if( shmem_data.get() == nullptr )
628 const size_t size_limit = morethan( data_total ) ;
629 shmem_data.reset(
new SharedMemory(channel_name+
".d",size_limit+
sizeof(DataMemory)) ) ;
630 DataMemory * dmem =
static_cast<DataMemory*
>(shmem_data->
ptr()) ;
631 dmem->size_limit = size_limit ;
632 dmem->type[0] =
'\0' ;
633 dmem->data_size = 0U ;
640 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
644 DataMemory * dmem =
static_cast<DataMemory*
>(shmem_data->
ptr()) ;
645 if( data_total > dmem->size_limit )
647 const size_t new_size_limit = morethan( data_total ) ;
648 shmem_data->
remap( new_size_limit +
sizeof(DataMemory) ,
true ) ;
649 dmem =
static_cast<DataMemory*
>(shmem_data->
ptr()) ;
650 dmem->size_limit = new_size_limit ;
652 mem->seq++ ;
if( mem->seq == 0U ) mem->seq = 1UL ;
653 dmem->data_size = data_total ;
654 dmem->time_s = time.s ;
655 dmem->time_us = time.us ;
656 ::memset( dmem->type , 0 ,
sizeof(dmem->type) ) ;
657 if( type !=
nullptr ) ::strncpy( dmem->type , type ,
sizeof(dmem->type)-1U ) ;
658 for(
char * out = dmem->data ; data_count ; out += *data_n_p , data_count-- , data_p_p++ , data_n_p++ )
659 ::memcpy( out , *data_p_p , *data_n_p ) ;
660 notifyAll( SignalSafe() , mem , info ) ;
665 for(
size_t i = 0U ; i < SLOTS ; i++ )
667 if( info.slot[i].failed )
668 info.slot[i].report() ;
672 void G::PublisherImp::notifyAll( SignalSafe signal_safe , ControlMemory * mem , PublisherInfo & info )
674 info.clear( signal_safe ) ;
675 for(
size_t i = 0U ; i < SLOTS ; i++ )
677 if( mem->slot[i].in_use && !mem->slot[i].failed )
679 notifyOne( signal_safe , mem->slot[i] , info.slot[i] ) ;
681 else if( mem->slot[i].socket_fd != -1 )
684 ::close( mem->slot[i].socket_fd ) ;
685 mem->slot[i].socket_fd = -1 ;
686 clearSlot( signal_safe , mem->slot[i] ) ;
691 void G::PublisherImp::notifyOne( SignalSafe signal_safe , Slot & slot , InfoSlot & info_slot )
694 if( slot.socket_fd == -1 && !slot.failed )
697 slot.socket_fd = ::socket( AF_UNIX , SOCK_DGRAM , 0 ) ;
698 slot.socket_path.s[
sizeof(slot.socket_path.s)-1U] =
'\0' ;
699 LocalSocket::Address address( signal_safe , slot.socket_path.s ) ;
702 int rc = ::connect( slot.socket_fd , address.p() , address.size() ) ;
706 slot.error[e_connect] = info_slot.error[e_connect] = e ;
709 rc = ::unlink( slot.socket_path.s ) ;
713 slot.error[e_unlink] = info_slot.error[e_unlink] = e ;
721 int rc = ::send( slot.socket_fd , &msg , 1 , MSG_NOSIGNAL | MSG_DONTWAIT ) ;
725 slot.error[e_send] = info_slot.error[e_send] = e ;
730 if( slot.error[e_connect] ||
G::Msg::fatal(slot.error[e_send]) )
732 slot.failed = info_slot.failed = true ;
733 info_slot.socket_path = slot.socket_path ;
734 ::close( slot.socket_fd ) ;
735 slot.socket_fd = -1 ;
739 void G::PublisherImp::createSocket( SocketHolder & holder ,
const std::string & path_prefix )
742 std::stringstream ss ;
743 ss << path_prefix <<
"." << ::getpid() ;
744 holder.path = ss.str() ;
745 if( holder.path.length() >= PublisherImp::socket_path_size )
746 throw PublisherError(
"socket path too long" ) ;
749 holder.fd = ::socket( AF_UNIX , SOCK_DGRAM , 0 ) ;
751 throw PublisherError(
"cannot create socket" ) ;
754 LocalSocket::Address address( holder.path ) ;
758 rc = ::bind( holder.fd , address.p() , address.size() ) ;
761 throw PublisherError(
"cannot bind socket path" , holder.path ) ;
763 G_DEBUG(
"G::PublisherImp::createSocket: new socket [" << holder.path <<
"]" ) ;
766 size_t G::PublisherImp::subscribe( SharedMemory & shmem_control , SocketHolder & socket_holder ,
const std::string & name )
770 size_t slot_id = SLOTS ;
771 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
774 slot_id = findFreeSlot( *mem ) ;
775 if( slot_id < SLOTS )
776 claimSlot( mem->slot[slot_id] , socket_holder.path , ::getpid() ) ;
778 if( slot_id == SLOTS )
779 throw PublisherError(
"no free slots in channel [" + name +
"]" ) ;
784 void G::PublisherImp::releaseSlot( SharedMemory & shmem_control ,
size_t slot_id )
786 G_ASSERT( slot_id < SLOTS ) ;
787 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
790 releaseSlot( SignalSafe() , lock , mem->slot[slot_id] ) ;
794 void G::PublisherImp::releaseSlot( SignalSafe signal_safe , Lock & , Slot & slot )
796 slot.in_use = false ;
800 size_t G::PublisherImp::flush(
int socket_fd )
804 while( 1 == ::recv(socket_fd,&c,1,MSG_DONTWAIT) )
807 G_DEBUG(
"G::PublisherImp::flush: event backlog: " << count ) ;
811 bool G::PublisherImp::getch(
int socket_fd )
814 return 1 == ::recv( socket_fd , &c , 1 , 0 ) ;
817 bool G::PublisherImp::receive( SharedMemory & shmem_control , unique_ptr<SharedMemory> & shmem_data ,
818 const std::string & channel_name ,
size_t slot_id ,
int socket_fd , std::vector<char> & buffer ,
821 bool peek = socket_fd == -1 ;
822 if( time_p !=
nullptr )
823 time_p->s = 0 , time_p->us = 0U ;
826 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
827 if( mem->magic != MAGIC )
833 size_t flushed = flush( socket_fd ) ;
834 if( flushed == 0U && !getch(socket_fd) )
837 if( type_p !=
nullptr ) type_p->clear() ;
838 throw PublisherError(
"socket receive error" ) ;
840 G_DEBUG(
"G::PublisherImp::receive: got publication event" ) ;
845 if( shmem_data.get() == nullptr )
847 shmem_data.reset(
new SharedMemory(channel_name+
".d") ) ;
848 G_DEBUG(
"G::PublisherImp::receive: data segment size: " << shmem_data->
size() ) ;
852 if( mem->magic != MAGIC )
856 unsigned long seq = 0UL ;
858 if( type_p !=
nullptr )
859 type_p->resize(
sizeof(((DataMemory*)(
nullptr))->type)+1U ) ;
864 G_ASSERT( shmem_data.get() != nullptr ) ;
867 DataMemory * dmem =
static_cast<DataMemory*
>(shmem_data->
ptr()) ;
868 G_ASSERT( dmem !=
nullptr ) ;
869 if( (dmem->size_limit+
sizeof(DataMemory)) > shmem_data->
size() )
870 shmem_data->
remap( dmem->size_limit+
sizeof(DataMemory) ,
true ) ;
871 dmem =
static_cast<DataMemory*
>(shmem_data->
ptr()) ;
874 Slot & slot = mem->slot[slot_id] ;
876 bool ok = slot.in_use && ( peek || slot.seq == 0UL || slot.seq != mem->seq ) ;
881 slot.seq = mem->seq ;
884 buffer.resize( dmem->data_size ) ;
885 ::memcpy( &buffer[0] , dmem->data , buffer.size() ) ;
888 if( type_p !=
nullptr )
889 strget( dmem->type ,
sizeof(dmem->type) , *type_p ) ;
892 if( time_p !=
nullptr )
893 time_p->s = dmem->time_s , time_p->us = dmem->time_us ;
898 if( type_p !=
nullptr ) type_p->clear() ;
901 G_DEBUG(
"G::PublisherImp::receive: got message [" << seq <<
"]" ) ;
906 size_t G::PublisherImp::findFreeSlot( ControlMemory & mem )
908 for(
size_t i = 0U ; i < SLOTS ; i++ )
910 if( !mem.slot[i].in_use && mem.slot[i].socket_fd == -1 )
916 void G::PublisherImp::claimSlot( Slot & slot ,
const std::string & socket_path , pid_t pid )
918 clearSlot( SignalSafe() , slot ) ;
921 ::strncpy( slot.socket_path.s , socket_path.c_str() ,
sizeof(slot.socket_path.s)-1U ) ;
924 void G::PublisherImp::clearSlot( SignalSafe , Slot & slot )
926 slot.in_use = false ;
927 slot.failed = false ;
930 slot.socket_fd = -1 ;
931 ::memset( slot.error , 0 ,
sizeof(slot.error) ) ;
932 ::memset( slot.socket_path.s , 0 ,
sizeof(slot.socket_path.s) ) ;
938 snapshot.has_data = false ;
940 SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
941 check( shmem_control , channel_name ) ;
945 SharedMemory shmem_data( channel_name+
".d" ) ;
946 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
949 DataMemory * dmem =
static_cast<DataMemory*
>(shmem_data.ptr()) ;
950 snapshot.control = *mem ;
951 snapshot.has_data = true ;
952 snapshot.data = *dmem ;
955 catch( G::SharedMemory::Error & )
957 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
960 snapshot.control = *mem ;
961 snapshot.has_data = false ;
968 std::string G::PublisherImp::check(
const std::string & channel_name )
973 SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
974 check( shmem_control , channel_name ) ;
976 catch( G::SharedMemory::Error & e )
980 catch( G::PublisherError & e )
987 std::vector<std::string> G::PublisherImp::list( std::vector<std::string> * others )
989 typedef std::vector<std::string> List ;
991 for( List::iterator p = list.begin() ; p != list.end() ; )
993 std::string name = *p ;
994 std::string reason = check( name ) ;
995 G_DEBUG(
"G::PublisherImp::list: name=[" << name <<
"] reason=[" << reason <<
"]" ) ;
1002 p = list.erase( p ) ;
1006 name.find(
'.') == std::string::npos &&
1007 reason.find(
"permission denied") != std::string::npos ;
1009 if( others !=
nullptr && report )
1010 others->push_back( name ) ;
1016 G::Item G::PublisherImp::info(
const std::string & channel_name ,
bool detail ,
bool all_slots )
1018 Snapshot s = snapshot( channel_name ) ;
1021 std::string publisher_info_str = strget( s.control.publisher_info ,
sizeof(s.control.publisher_info) ) ;
1024 publisher.add(
"pid" ,
Str::fromInt(s.control.publisher_pid) ) ;
1029 item.add(
"publisher" , publisher ) ;
1035 item[
"data"].add(
"type" ,
Str::printable(strget(s.data.type,
sizeof(s.data.type))) ) ;
1037 item[
"data"].add(
"limit" ,
Str::fromULong(s.data.size_limit) ) ;
1041 for(
size_t i = 0U ; i < SLOTS ; i++ )
1043 const Slot & slot = s.control.slot[i] ;
1044 if( all_slots || ( slot.in_use && !slot.failed ) )
1046 int error = slot.error[0] ? slot.error[0] : ( slot.error[1] ? slot.error[1] : slot.error[2] ) ;
1047 std::string strerror = error ? G::Process::strerror(error) : std::string() ;
1050 slot_info.add(
"in_use" , slot.in_use ?
"1" :
"0" ) ;
1051 slot_info.add(
"failed" , slot.failed ?
"1" :
"0" ) ;
1054 slot_info.add(
"socket_fd" ,
Str::fromInt(slot.socket_fd) ) ;
1055 slot_info.add(
"error" , strerror ) ;
1057 item[
"slots"].add( slot_info ) ;
1068 void G::PublisherImp::purge(
const std::string & channel_name )
1070 SharedMemory shmem_control( channel_name , SharedMemory::Control() ) ;
1071 ControlMemory * mem =
static_cast<ControlMemory*
>(shmem_control.ptr()) ;
1074 for(
size_t i = 0U ; i < SLOTS ; i++ )
1076 Slot & slot = mem->slot[i] ;
1077 if( slot.in_use && slot.failed && slot.seq < mem->seq && (mem->seq-slot.seq) > 10 )
1079 clearSlot( SignalSafe() , slot ) ;
1085 std::string G::PublisherImp::delete_(
const std::string & channel_name )
1089 const char * sep = !e1.empty() && !e2.empty() ?
": " :
"" ;
1090 return e1 == e2 ? e1 : ( e1 + sep + e2 ) ;
1093 char * G::PublisherImp::strdup_ignore_leak(
const char * p )
1095 return ::strdup( p ) ;
1098 void G::PublisherImp::strset(
char * p ,
size_t n ,
const std::string & s )
1100 if( p ==
nullptr || n == 0U ) return ;
1101 size_t x = std::min( s.size() , n-1U ) ;
1102 if( x ) ::memcpy( p , s.data() , x ) ;
1106 void G::PublisherImp::strget(
const char * p ,
size_t n , std::string & s )
1109 if( s.find(
'\0') != std::string::npos )
1110 s.resize( s.find(
'\0') ) ;
1113 std::string G::PublisherImp::strget(
const char * p ,
size_t n )
1115 std::string result ;
1116 strget( p , n , result ) ;
1120 size_t G::PublisherImp::morethan(
size_t n )
1122 return n + n/2U + 10U ;
1127 G::PublisherImp::Lock::Lock( Semaphore * s ) :
1133 G::PublisherImp::Lock::~Lock()
1140 G::PublisherImp::SocketHolder::SocketHolder(
int fd_ ) :
1145 int G::PublisherImp::SocketHolder::release()
1152 G::PublisherImp::SocketHolder::~SocketHolder()
1160 G::PublisherImp::InfoSlot::InfoSlot()
1163 for(
size_t i = 0 ; i < ERRORS ; i++ )
1165 socket_path.s[0] =
'\0' ;
1168 std::string G::PublisherImp::InfoSlot::path()
const
1170 std::string path( socket_path.s ,
sizeof(socket_path.s) ) ;
1174 int G::PublisherImp::InfoSlot::e()
const
1176 for(
size_t i = 0 ; i < ERRORS ; i++ )
1184 void G::PublisherImp::InfoSlot::report()
1186 std::ostringstream ss ;
1188 <<
"cannot publish to subscriber: socket path [" << path() <<
"] "
1189 <<
"error [" << G::Process::strerror(e()) <<
"]" ;
1191 G_ERROR(
"G::Publisher::publish: " << ss.str() ) ;
PublisherSubscriber * subscribe()
Subscribes to the publisher returning a new()ed object.
void * ptr() const
Returns the mapped address.
std::string str() const
Returns the path string.
PublisherSubscriber(PublisherChannel &, size_t slot_id, int socket_fd)
Pseudo-private constructor, used by G::PublisherChannel.
A shared-memory structure used by G::Publisher.
A subsecond-resolution timestamp based on a time_t.
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
static std::string printable(const std::string &in, char escape= '\\')
Returns a printable represention of the given input string.
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
A private implementation class used by G::PublisherImp.
A character array for storing filesystem paths in shared memory.
A publication-channel subscriber endpoint.
bool open()
Tries to initialise the object after lazy construction or after close().
static std::string delete_(const std::string &channel_name)
Deletes the channel.
An empty structure that is used to indicate a signal-safe, reentrant implementation.
Publisher(const std::string &channel_name, bool auto_cleanup=true)
Constructor for a publisher.
static Semaphore * at(storage_type *)
Syntactic sugar to return an object pointer corresponding to the given storage pointer.
static std::string prefix()
Returns the prefix, as set by by a call to the other overload.
std::string basename() const
Returns the rightmost part of the path, ignoring "." parts.
std::string name() const
Returns the channel name, as passed in to the constructor.
static Item map()
Factory function for a map item.
static std::vector< std::string > list(std::vector< std::string > *others=nullptr)
Returns a list of channel names.
PublisherChannel(const std::string &channel_name, const std::string &socket_path_prefix=std::string())
Constructor.
A private implementation class used by G::PublisherImp.
static std::string delete_(const std::string &name, bool control_segment)
Unlinks the segment from the filesystem.
std::string name() const
Returns the channel name, as passed in to the constructor.
static Identity start(SignalSafe)
A signal-safe alternative to construction.
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
static std::vector< std::string > list(bool os_names=false)
Returns a list of possible control segment names.
A shared-memory structure used by G::Publisher.
static std::string fromInt(int i)
Converts int 'i' to a string.
int fd() const
Returns the named socket file descriptor.
static void stop(SignalSafe, Identity)
A signal-safe alternative to destruction.
A shared-memory structure used by G::Publisher.
static EpochTime now()
Returns the current epoch time.
int fd() const
Returns the subscriber's file descriptor or minus one.
A RAII semaphore lock class used by G::PublisherImp.
size_t id() const
Returns the slot id.
A semaphore class with a posix or sysv implementation chosen at build-time.
A named publisher channel that can be subscribed to.
std::string str(int indent=-1) const
Returns a string representation, using a json-like format.
PublisherSubscription(const std::string &channel_name, bool lazy=false)
Constructor.
~PublisherChannel()
Destructor.
static std::string fromUInt(unsigned int ui)
Converts unsigned int 'ui' to a string.
void releaseSlot(size_t slot_id)
Used by PublisherSubscriber.
static Item parse(const std::string &)
Parses the string representation. Throws on error.
static Item list()
Factory function for a list item.
bool remap(size_t, bool may_move, bool no_throw=false)
Resizes and remaps the shared memory.
Slot0 slot(T &object, void(T::*fn)())
A slot factory function overloaded for a zero-parameter callback.
static void trap(SignalSafe, const char *os_name, void(*fn)(SignalSafe, void *))
An exit handler that calls the user's function on the shared memory contents.
A POSIX shared memory class.
static void purge(const std::string &channel_name)
Tries to clean up any failed slots in the named channel.
A structure used by G::Publisher, having a sub-set of G::PublisherImp::Slot members, used for diagnostics and error reporting.
void publish(const char *p, size_t n, const char *type=nullptr)
Publishes a chunk of data to subscribers.
Path dirname() const
Returns the path without the rightmost part, ignoring "." parts.
A variant class holding a string, an item map keyed by name, or an ordered list of items...
void add(const std::string &s)
Adds a string item to this list item.
static Item info(const std::string &channel_name, bool all_slots=true)
Returns a variant containing information about the state of the channel.
static void cleanup(SignalSafe, const char *os_name, void(*fn)(SignalSafe, void *))
An exit handler that unlinks the named shared memory segment and calls the user's function for the me...
bool receive(size_t, int, std::vector< char > &, std::string *=nullptr, G::EpochTime *=nullptr)
Used by PublisherSubscriber.
bool has(const std::string &k) const
Returns true if this map item has the named key.
static std::string osName(const std::string &name)
Converts the segment name to an internal form that can be passed to cleanup() or trap().
static std::string fromULong(unsigned long ul)
Converts unsigned long 'ul' to a string.
A structure that holds diagnostic information for each G::Publisher slot.
static std::string v0()
Returns a copy of argv[0] from the first call to the constructor that takes argc/argv.
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
unsigned int age() const
Returns the age of the latest data in seconds, or zero.
A Path object represents a file system path.
static void add(void(*fn)(SignalSafe, const char *), const char *arg)
Adds the given handler to the list of handlers that are to be called when the process terminates abno...
std::string info() const
Returns some channel metadata as a short json string (see G::Item::str()).
~PublisherSubscriber()
Destructor.
void close()
Closes the channel subscription, releasing resources and becoming inactive.
static bool fatal(int error)
Returns true if the error value indicates a permanent problem with the socket.
size_t size() const
Returns the segment size. This is affected by remap().
Type type() const
Returns the item type (string, list or map).