VideoTools
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
gpublisher.h
Go to the documentation of this file.
1 //
2 // Copyright (C) 2017 Graeme Walker
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 // ===
17 ///
18 /// \file gpublisher.h
19 ///
20 
21 #ifndef G_PUBLISHER__H
22 #define G_PUBLISHER__H
23 
24 #include "gdef.h"
25 #include "gsharedmemory.h"
26 #include "gdatetime.h"
27 #include "gitem.h"
28 #include "gsemaphore.h"
29 #include "gexception.h"
30 #include "gsignalsafe.h"
31 #include <utility> // pair
32 #include <string>
33 #include <vector>
34 
35 namespace G
36 {
37  class Publisher ;
38  class PublisherChannel ;
39  class PublisherSubscription ;
40  class PublisherSubscriber ;
41  class PublisherInfo ;
42  G_EXCEPTION( PublisherError , "publish/subscribe error" ) ;
43 }
44 
45 /// \class G::Publisher
46 /// A broadcast communication channel between unrelated processes using shared
47 /// memory. The shared memory name is public so that subscribers can find it
48 /// (see shm_open()). Subscribers are then notified via individual unix-domain
49 /// sockets.
50 ///
51 /// Communication is message-based and unreliable; newer messages will overwrite
52 /// older ones if not consumed. Messages have an associated type name.
53 ///
54 /// There are two shared memory segments per channel; one control and one data.
55 /// This allows the data segment to be discarded and replaced if it is too small.
56 ///
57 /// The named sockets are created by the subscribers, their name is put into the
58 /// shared memory segment, and they are unlinked by the publisher on first use.
59 /// Consequently, sockets exist in the filesystem in the interval between
60 /// binding by the subscriber and the next publish event in the publisher.
61 ///
62 /// As the publisher goes away it marks the shared memory as defunct, notifies
63 /// subscribers one last time, and unlinks the shared memory from the
64 /// filesystem. It also unlinks any sockets that have not been previously
65 /// unlinked.
66 ///
68 {
69 public:
70  explicit Publisher( const std::string & channel_name , bool auto_cleanup = true ) ;
71  ///< Constructor for a publisher.
72 
73  Publisher( const std::string & channel_name , Item publisher_info , bool auto_cleanup = true ) ;
74  ///< A constructor overload including arbitrary "info" metadata that
75  ///< is stored in the channel (subject to buffer size limits).
76  ///< The metadata is enriched with a "type" field that is
77  ///< initialised with the basename of argv0.
78 
79  ~Publisher() ;
80  ///< Destructor. Marks the shared memory as defunct and sends out
81  ///< notification to subscribers causing their receive()s to return
82  ///< false.
83  ///<
84  ///< Any recent subscribers that have never been publish()ed to before
85  ///< will get a notification and as a result their named sockets will
86  ///< be unlink()d. This means that the subscribers do not have to do
87  ///< any special filesystem cleanup themselves.
88 
89  void publish( const char * p , size_t n , const char * type = nullptr ) ;
90  ///< Publishes a chunk of data to subscribers.
91 
92  void publish( const std::vector<char> & , const char * type = nullptr ) ;
93  ///< Publishes a chunk of data to subscribers.
94 
95  void publish( const std::vector<std::pair<const char*,size_t> > & , const char * type ) ;
96  ///< Publishes segmented data to subscribers.
97 
98  void publish( const std::vector<std::vector<char> > & , const char * type ) ;
99  ///< Publishes chunked data to subscribers.
100 
101  static std::vector<std::string> list( std::vector<std::string> * others = nullptr ) ;
102  ///< Returns a list of channel names. Optionally returns by reference
103  ///< a list of visible-but-unreadable channels.
104 
105  static void purge( const std::string & channel_name ) ;
106  ///< Tries to clean up any failed slots in the named channel.
107  ///< This should only be run administratively because it is the
108  ///< subscribers' job to clean up after themselves.
109 
110  static std::string delete_( const std::string & channel_name ) ;
111  ///< Deletes the channel. This should only be run administratively.
112  ///< Returns a failure reason on error.
113 
114  static Item info( const std::string & channel_name , bool all_slots = true ) ;
115  ///< Returns a variant containing information about the state
116  ///< of the channel. The 'publisher' sub-item is the metadata
117  ///< from the constructor, enriched with a "pid" field that
118  ///< is the publisher's process id.
119 
120 private:
121  Publisher( const Publisher & ) ;
122  void operator=( const Publisher & ) ;
123  void publishStart() ;
124  void publishPart() ;
125  void publishEnd() ;
126 
127 private:
128  std::string m_name ;
129  SharedMemory m_shmem_control ;
130  unique_ptr<SharedMemory> m_shmem_data ;
131  unique_ptr<PublisherInfo> m_info ;
132  std::vector<const char*> m_data_p ;
133  std::vector<size_t> m_data_n ;
134 } ;
135 
136 /// \class G::PublisherChannel
137 /// A named publisher channel that can be subscribed to.
138 ///
140 {
141 public:
142  explicit PublisherChannel( const std::string & channel_name ,
143  const std::string & socket_path_prefix = std::string() ) ;
144  ///< Constructor.
145  ///<
146  ///< The socket path prefix should be an absolute path with
147  ///< a basename; the actual path will have ".<pid>" added.
148  ///< The default is "/tmp/<channel_name>".
149  ///<
150  ///< The implementation maps the publisher's 'control'
151  ///< memory segment.
152 
154  ///< Destructor.
155 
157  ///< Subscribes to the publisher returning a new()ed object.
158  ///< Notification is via a named socket. The actual socket
159  ///< name is the given prefix (or some sensible default)
160  ///< plus ".<pid>". The lifetime of the subscriber object
161  ///< must be less than the lifetime of the channel.
162  ///<
163  ///< The implementation creates a socket, grabs a free slot
164  ///< in the control memory segment, and installs the socket
165  ///< file descriptor in the slot. The slot and the socket
166  ///< are freed in the PublisherSubscriber's destructor.
167 
168  bool receive( size_t , int , std::vector<char> & , std::string * = nullptr , G::EpochTime * = nullptr ) ;
169  ///< Used by PublisherSubscriber.
170 
171  void releaseSlot( size_t slot_id ) ;
172  ///< Used by PublisherSubscriber.
173 
174  std::string name() const ;
175  ///< Returns the channel name, as passed in to the constructor.
176 
177 private:
179  void operator=( const PublisherChannel & ) ;
180 
181 private:
182  std::string m_name ;
183  SharedMemory m_shmem_control ;
184  unique_ptr<SharedMemory> m_shmem_data ;
185  std::string m_path_prefix ;
186 } ;
187 
188 /// \class G::PublisherSubscriber
189 /// A publication-channel subscriber endpoint.
190 ///
192 {
193 public:
194  PublisherSubscriber( PublisherChannel & , size_t slot_id , int socket_fd ) ;
195  ///< Pseudo-private constructor, used by G::PublisherChannel.
196 
198  ///< Destructor.
199 
200  size_t id() const ;
201  ///< Returns the slot id.
202 
203  int fd() const ;
204  ///< Returns the named socket file descriptor.
205 
206  bool receive( std::vector<char> & buffer , std::string * type_p = nullptr , G::EpochTime * time_p = nullptr ) ;
207  ///< Does a read for new publish()ed data. Blocks if there is nothing
208  ///< to read. Returns false if the publisher has gone away. Returns
209  ///< an empty buffer in the case of an ignorable event.
210 
211  bool peek( std::vector<char> & buffer , std::string * type_p = nullptr , G::EpochTime * time_p = nullptr ) ;
212  ///< Does a receive() but without requiring a publication event.
213  ///< Returns false if no data is available.
214 
215 private:
217  void operator=( const PublisherSubscriber & ) ;
218 
219 private:
220  PublisherChannel & m_channel ;
221  size_t m_slot_id ;
222  int m_socket_fd ;
223 } ;
224 
225 /// \class G::PublisherSubscription
226 /// An easy-to-use combination of a G::PublisherChannel object
227 /// and a single G::PublisherSubscriber.
228 ///
230 {
231 public:
232  explicit PublisherSubscription( const std::string & channel_name , bool lazy = false ) ;
233  ///< Constructor. The channel is normally subscribed to immediately,
234  ///< with errors resulting in exceptions; but if the lazy option is
235  ///< used then then initialisation happens in init() and the constructor
236  ///< will not throw.
237  ///<
238  ///< Channel names are simple names (eg. "foo"), but at this interface
239  ///< they can be overloaded with the socket prefix, eg. "/var/tmp/foo.s/foo".
240 
241  bool open() ;
242  ///< Tries to initialise the object after lazy construction or
243  ///< after close(). Returns true when successfully initialised.
244  ///< Does not throw.
245 
246  int fd() const ;
247  ///< Returns the subscriber's file descriptor or minus one.
248 
249  bool receive( std::vector<char> & buffer , std::string * type_p = nullptr , G::EpochTime * time_p = nullptr ) ;
250  ///< Does a read for new publish()ed data. Blocks if there is
251  ///< nothing to read. Returns false if the publisher has gone
252  ///< away. Returns an empty buffer in the case of an ignorable
253  ///< event.
254 
255  bool peek( std::vector<char> & buffer , std::string * type_p = nullptr , G::EpochTime * time_p = nullptr ) ;
256  ///< Does a receive() but without requiring a publication event.
257 
258  std::string name() const ;
259  ///< Returns the channel name, as passed in to the constructor.
260 
261  std::string info() const ;
262  ///< Returns some channel metadata as a short json string (see
263  ///< G::Item::str()). This is the 'publisher' subset of Publisher::info(),
264  ///< so originally passed in to the Publisher constructor, plus some
265  ///< enrichment with "type" and "pid".
266 
267  unsigned int age() const ;
268  ///< Returns the age of the latest data in seconds, or zero.
269  ///< The implementation uses peek().
270 
271  void close() ;
272  ///< Closes the channel subscription, releasing resources and
273  ///< becoming inactive. The fd() method will return -1,
274  ///< although name() remains valid.
275 
276 private:
278  void operator=( const PublisherSubscription & ) ;
279 
280 private:
281  std::string m_channel_name ;
282  std::string m_channel_info ;
283  bool m_lazy ;
284  bool m_closed ;
285  std::string m_path_prefix ;
286  unique_ptr<PublisherChannel> m_channel ; // first
287  unique_ptr<PublisherSubscriber> m_subscriber ; // second
288 } ;
289 
290 #endif
PublisherSubscriber * subscribe()
Subscribes to the publisher returning a new()ed object.
Definition: gpublisher.cpp:327
PublisherSubscriber(PublisherChannel &, size_t slot_id, int socket_fd)
Pseudo-private constructor, used by G::PublisherChannel.
Definition: gpublisher.cpp:353
A subsecond-resolution timestamp based on a time_t.
Definition: gdatetime.h:39
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
Definition: gpublisher.cpp:459
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
Definition: gpublisher.cpp:383
A publication-channel subscriber endpoint.
Definition: gpublisher.h:191
bool open()
Tries to initialise the object after lazy construction or after close().
Definition: gpublisher.cpp:427
static std::string delete_(const std::string &channel_name)
Deletes the channel.
Definition: gpublisher.cpp:303
Publisher(const std::string &channel_name, bool auto_cleanup=true)
Constructor for a publisher.
Definition: gpublisher.cpp:211
std::string name() const
Returns the channel name, as passed in to the constructor.
Definition: gpublisher.cpp:469
static std::vector< std::string > list(std::vector< std::string > *others=nullptr)
Returns a list of channel names.
Definition: gpublisher.cpp:288
PublisherChannel(const std::string &channel_name, const std::string &socket_path_prefix=std::string())
Constructor.
Definition: gpublisher.cpp:310
std::string name() const
Returns the channel name, as passed in to the constructor.
Definition: gpublisher.cpp:346
An easy-to-use combination of a G::PublisherChannel object and a single G::PublisherSubscriber.
Definition: gpublisher.h:229
bool peek(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a receive() but without requiring a publication event.
Definition: gpublisher.cpp:464
int fd() const
Returns the named socket file descriptor.
Definition: gpublisher.cpp:373
int fd() const
Returns the subscriber's file descriptor or minus one.
Definition: gpublisher.cpp:454
size_t id() const
Returns the slot id.
Definition: gpublisher.cpp:368
A named publisher channel that can be subscribed to.
Definition: gpublisher.h:139
PublisherSubscription(const std::string &channel_name, bool lazy=false)
Constructor.
Definition: gpublisher.cpp:399
~PublisherChannel()
Destructor.
Definition: gpublisher.cpp:323
void releaseSlot(size_t slot_id)
Used by PublisherSubscriber.
Definition: gpublisher.cpp:335
A POSIX shared memory class.
Definition: gsharedmemory.h:41
static void purge(const std::string &channel_name)
Tries to clean up any failed slots in the named channel.
Definition: gpublisher.cpp:298
void publish(const char *p, size_t n, const char *type=nullptr)
Publishes a chunk of data to subscribers.
Definition: gpublisher.cpp:236
A variant class holding a string, an item map keyed by name, or an ordered list of items...
Definition: gitem.h:41
A broadcast communication channel between unrelated processes using shared memory.
Definition: gpublisher.h:67
~Publisher()
Destructor.
Definition: gpublisher.cpp:231
static Item info(const std::string &channel_name, bool all_slots=true)
Returns a variant containing information about the state of the channel.
Definition: gpublisher.cpp:293
bool receive(size_t, int, std::vector< char > &, std::string *=nullptr, G::EpochTime *=nullptr)
Used by PublisherSubscriber.
Definition: gpublisher.cpp:340
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *time_p=nullptr)
Does a read for new publish()ed data.
Definition: gpublisher.cpp:378
unsigned int age() const
Returns the age of the latest data in seconds, or zero.
Definition: gpublisher.cpp:479
std::string info() const
Returns some channel metadata as a short json string (see G::Item::str()).
Definition: gpublisher.cpp:474
~PublisherSubscriber()
Destructor.
Definition: gpublisher.cpp:362
void close()
Closes the channel subscription, releasing resources and becoming inactive.
Definition: gpublisher.cpp:490