VideoTools
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
gfatpipe.cpp
Go to the documentation of this file.
1 //
2 // Copyright (C) 2017 Graeme Walker
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 // ===
17 //
18 // gfatpipe.cpp
19 //
20 
21 #include "gdef.h"
22 #include "gfatpipe.h"
23 #include "gmsg.h"
24 #include "gcleanup.h"
25 #include "gprocess.h"
26 #include "gstr.h"
27 #include "gassert.h"
28 #include "glog.h"
29 #include <stdexcept>
30 #include <sstream>
31 #include <sys/select.h>
32 #include <sys/socket.h>
33 #include <sys/un.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <unistd.h>
37 
38 std::string G::FatPipe::name()
39 {
40  std::ostringstream ss ;
41  ss << "fatpipe." << getpid() ;
42  return ss.str() ;
43 }
44 
45 std::string G::FatPipe::name( size_t size )
46 {
47  std::ostringstream ss ;
48  ss << "fatpipe." << getpid() << "." << size ;
49  return ss.str() ;
50 }
51 
52 size_t G::FatPipe::sensible( size_t size )
53 {
54  return std::max( size_t(1024U)*16U , size ) ;
55 }
56 
58  m_child(false) ,
59  m_parent(false) ,
60  m_shmem(name(),sizeof(ControlMemory),SharedMemory::Control()) ,
61  m_pipe_fd(-1) ,
62  m_new_data_fd(-1)
63 {
64  if( ::socketpair( AF_UNIX , SOCK_DGRAM , 0 , m_pipe_fds ) < 0 )
65  throw Error( "socketpair pipe creation failed" ) ;
66 
67  m_shmem.inherit() ; // inherit fd across exec()
68  m_shmem.unlink() ;
69 
70  ControlMemory * mem = static_cast<ControlMemory*>(m_shmem.ptr()) ;
71  mem->magic = ControlMemory::MAGIC ;
72  new (&mem->mutex) G::Semaphore ; // placement new
73 }
74 
76 {
77  if( m_parent && m_pipe_fd != -1 )
78  G::Msg::send( m_pipe_fd , "x" , 1 , MSG_DONTWAIT ) ;
79 
80  close_( m_pipe_fds[0] ) ;
81  close_( m_pipe_fds[1] ) ;
82  close_( m_pipe_fd ) ;
83 
84  //ControlMemory * mem = static_cast<ControlMemory*>(m_shmem.ptr()) ;
85  //Semaphore * sem = G::Semaphore::at(&mem->mutex) ;
86  //sem->~Semaphore() ; // placement delete
87 }
88 
89 void G::FatPipe::cleanup( G::SignalSafe , const char * fd_p )
90 {
91  char * end = const_cast<char*>( fd_p + std::strlen(fd_p) ) ;
92  long pipe_fd = ::strtol( fd_p , &end , 10 ) ;
93  if( pipe_fd >= 0L )
94  G::Msg::send( static_cast<int>(pipe_fd) , "x" , 1 , MSG_DONTWAIT ) ;
95 }
96 
97 void G::FatPipe::close_( int fd )
98 {
99  if( fd != -1 )
100  ::close( fd ) ;
101 }
102 
104 {
105  try
106  {
107  m_child = true ;
108  ::close( m_pipe_fds[1] ) ; // close write end
109  m_pipe_fd = m_pipe_fds[0] ; // read end
110  m_pipe_fds[0] = m_pipe_fds[1] = -1 ;
111 
112  // no close on exec
113  ::fcntl( m_pipe_fd , F_SETFD , ::fcntl(m_pipe_fd,F_GETFD) & ~FD_CLOEXEC ) ;
114 
115  m_arg_shmemfd = tostring(m_shmem.fd()) ;
116  m_arg_pipefd = tostring(m_pipe_fd) ;
117  }
118  catch(...)
119  {
120  }
121 }
122 
123 std::string G::FatPipe::tostring( int n )
124 {
125  std::ostringstream ss ;
126  ss << n ;
127  return ss.str() ;
128 }
129 
130 const char * G::FatPipe::pipefd() const
131 {
132  return m_arg_pipefd.c_str() ;
133 }
134 
135 const char * G::FatPipe::shmemfd() const
136 {
137  return m_arg_shmemfd.c_str() ;
138 }
139 
140 void G::FatPipe::doParent( bool auto_cleanup )
141 {
142  m_parent = true ;
143  ::close( m_pipe_fds[0] ) ; // close read end
144  m_pipe_fd = m_pipe_fds[1] ; // write end
145  m_pipe_fds[0] = m_pipe_fds[1] = -1 ;
146 
147  if( auto_cleanup )
148  {
149  static std::list<std::string> arg_list ;
150  arg_list.push_back( G::Str::fromInt(m_pipe_fd) ) ;
151  G::Cleanup::add( cleanup , arg_list.back().c_str() ) ;
152  }
153 }
154 
155 void G::FatPipe::send( const char * data_p , size_t data_n , const char * type )
156 {
157  if( data_p == nullptr || data_n == 0U ) return ;
158  m_data_p.resize( 1U ) ;
159  m_data_n.resize( 1U ) ;
160  m_data_p[0] = data_p ;
161  m_data_n[0] = data_n ;
162  send( data_n , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
163 }
164 
165 void G::FatPipe::send( const std::vector<char> & data , const char * type )
166 {
167  if( data.empty() ) return ;
168  m_data_p.resize( 1U ) ;
169  m_data_n.resize( 1U ) ;
170  m_data_p[0] = &data[0] ;
171  m_data_n[0] = data.size() ;
172  send( data.size() , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
173 }
174 
175 void G::FatPipe::send( const std::vector<std::pair<const char *,size_t> > & data , const char * type )
176 {
177  m_data_p.resize( data.size() ) ;
178  m_data_n.resize( data.size() ) ;
179  size_t data_total = 0U ;
180  for( size_t i = 0U ; i < data.size() ; i++ )
181  {
182  m_data_p[i] = data[i].first ;
183  m_data_n[i] = data[i].second ;
184  data_total += data[i].second ;
185  }
186  if( data_total != 0U )
187  send( data_total , data.size() , &m_data_p[0] , &m_data_n[0] , type ) ;
188 }
189 
190 void G::FatPipe::send( const std::vector<std::vector<char> > & data , const char * type )
191 {
192  m_data_p.resize( data.size() ) ;
193  m_data_n.resize( data.size() ) ;
194  size_t data_total = 0U ;
195  size_t i_out = 0U ;
196  for( size_t i_in = 0U ; i_in < data.size() ; i_in++ )
197  {
198  if( data[i_in].empty() ) continue ;
199  m_data_p[i_out] = &(data[i_in])[0] ;
200  m_data_n[i_out++] = data[i_in].size() ;
201  data_total += data[i_in].size() ;
202  }
203  if( data_total != 0U )
204  send( data_total , i_out , &m_data_p[0] , &m_data_n[0] , type ) ;
205 }
206 
207 void G::FatPipe::send( size_t data_total , size_t data_count , const char ** data_p_p , size_t * data_n_p , const char * type )
208 {
209  // create the data segment on first use, or resize it if it's too small
210  //
211  if( m_shmem_data.get() == nullptr || (sizeof(DataMemory)+data_total) > m_shmem_data->size() )
212  {
213  size_t new_size = sizeof(DataMemory) + data_total + data_total/2U ;
214  G_DEBUG( "G::FatPipe::send: new data segment: size " << new_size ) ;
215  m_shmem_data.reset( new SharedMemory( name(new_size) , new_size ) ) ;
216  m_new_data_fd = m_shmem_data->fd() ;
217  G_DEBUG( "G::FatPipe::send: new data segment: fd " << m_new_data_fd ) ;
218  m_shmem_data->unlink() ;
219  DataMemory * dmem = static_cast<DataMemory*>(m_shmem_data->ptr()) ;
220  dmem->magic = DataMemory::MAGIC ;
221  dmem->type[0] = '\0' ;
222  dmem->data_size = 0U ;
223  dmem->time_s = 0 ;
224  dmem->time_us = 0U ;
225  }
226 
227  // lock the control segment and copy the payload into the data segment
228  //
229  ControlMemory * mem = static_cast<ControlMemory*>(m_shmem.ptr()) ;
230  DataMemory * dmem = static_cast<DataMemory*>(m_shmem_data->ptr()) ;
231  {
232  G::EpochTime time = G::DateTime::now() ;
233  Lock lock( G::Semaphore::at(&mem->mutex) ) ;
234  G_ASSERT( mem->magic == ControlMemory::MAGIC ) ;
235  G_ASSERT( dmem->magic == DataMemory::MAGIC ) ;
236  ::memset( dmem->type , 0 , sizeof(dmem->type) ) ;
237  if( type != nullptr ) ::strncpy( dmem->type , type , sizeof(dmem->type)-1U ) ;
238  for( char * out = dmem->data ; data_count ; out += *data_n_p , data_count-- , data_p_p++ , data_n_p++ )
239  ::memcpy( out , *data_p_p , *data_n_p ) ;
240  dmem->data_size = data_total ;
241  dmem->time_s = time.s ;
242  dmem->time_us = time.us ;
243  }
244 
245  // poke the child, sending it the new data segment fd if necessary
246  //
247  if( m_pipe_fd != -1 )
248  {
249  bool wait = m_new_data_fd != -1 ;
250  int flags = wait ? 0 : MSG_DONTWAIT ;
251  ssize_t rc = G::Msg::send( m_pipe_fd , "." , 1 , flags , m_new_data_fd ) ;
252  if( rc == 1 )
253  {
254  m_new_data_fd = -1 ; // transferred ok
255  }
256  else
257  {
258  int e = errno ;
259  const bool temporary = e == ENOBUFS ; // not used on linux
260  if( !temporary )
261  throw Error( "pipe write failed" , G::Process::strerror(e) ) ;
262  }
263  }
264 }
265 
267 {
268  if( m_pipe_fd == -1 ) return false ;
269  ssize_t rc = G::Msg::send( m_pipe_fd , "p" , 1 , MSG_DONTWAIT ) ;
270  return rc == 1 ;
271 }
272 
273 // ==
274 
275 G::FatPipeReceiver::FatPipeReceiver( int shmem_fd , int pipe_fd ) :
276  m_pipe_fd(pipe_fd) ,
277  m_shmem(shmem_fd)
278 {
279 }
280 
282 {
283  return m_pipe_fd ;
284 }
285 
286 G::FatPipeReceiver::Info G::FatPipeReceiver::flush( int pipe_fd )
287 {
288  // read whatever's in the pipe -- see if there's a "."
289  // payload and/or a new data segment fd
290  //
291  Info result ;
292  result.got_data = false ;
293  result.got_eof = false ;
294  result.data_fd = -1 ;
295  for(;;)
296  {
297  char c = '\0' ;
298  int fd = -1 ;
299  ssize_t rc = G::Msg::recv( pipe_fd , &c , 1 , MSG_DONTWAIT , &fd ) ;
300  G_DEBUG( "G::FatPipeReceiver::flush: recv() rc=" << rc << " c=" << int(c) << " fd=" << fd ) ;
301  if( fd != -1 )
302  result.data_fd = fd ;
303  if( rc == 1 && c == '.' )
304  result.got_data = true ;
305  else if( rc == 1 && c == 'x' )
306  result.got_eof = true ;
307  else if( rc == 1 && c == 'p' )
308  ; // ping
309  else
310  break ;
311  }
312  return result ;
313 }
314 
315 bool G::FatPipeReceiver::receive( std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
316 {
317  // clear the outputs
318  buffer.clear() ;
319  if( type_p != nullptr )
320  type_p->clear() ;
321 
322  // read everything in the pipe
323  Info pipe_info = flush( m_pipe_fd ) ;
324  G_DEBUG( "G::FatPipeReceiver::receive: got-data=" << pipe_info.got_data << " data-fd=" << pipe_info.data_fd ) ;
325  if( pipe_info.got_eof )
326  return false ;
327 
328  // maintain the data segment
329  if( m_shmem_data.get() == nullptr && pipe_info.got_data && pipe_info.data_fd == -1 )
330  {
331  G_WARNING( "G::FatPipeReceiver::receive: data segment file descriptor was not received" ) ;
332  return false ;
333  }
334  if( pipe_info.data_fd != -1 )
335  {
336  G_DEBUG( "G::FatPipeReceiver::receive: new data segment fd: " << pipe_info.data_fd ) ;
337  m_shmem_data.reset( new SharedMemory(pipe_info.data_fd) ) ;
338  }
339 
340  // return an empty buffer if no data
341  if( !pipe_info.got_data )
342  return true ;
343 
344  // read the data
345  G_ASSERT( m_shmem_data.get() != nullptr ) ;
346  copy( m_shmem , *m_shmem_data.get() , buffer , type_p , time_p ) ;
347 
348  // trim padding from the type string
349  if( type_p && type_p->find('\0') != std::string::npos )
350  type_p->resize( type_p->find('\0') ) ;
351 
352  return true ;
353 }
354 
355 void G::FatPipeReceiver::copy( SharedMemory & shmem , SharedMemory & shmem_data ,
356  std::vector<char> & buffer , std::string * type_p , G::EpochTime * time_p )
357 {
358  ControlMemory * mem = static_cast<ControlMemory*>( shmem.ptr() ) ;
359  if( mem->magic != ControlMemory::MAGIC ) // no-mutex read of constant data
360  throw Error( "magic number mismatch in shared memory" ) ;
361 
362  DataMemory * dmem = static_cast<DataMemory*>( shmem_data.ptr() ) ;
363  G::Semaphore * mutex = G::Semaphore::at( &mem->mutex ) ;
364  {
365  Lock lock( mutex ) ;
366  G_ASSERT( dmem->magic == DataMemory::MAGIC ) ;
367  buffer.resize( dmem->data_size ) ;
368  ::memcpy( &buffer[0] , dmem->data , buffer.size() ) ;
369  if( dmem->data_size && type_p != nullptr )
370  type_p->assign( dmem->type , sizeof(dmem->type) ) ; // inc. padding
371  if( time_p != nullptr )
372  time_p->s = dmem->time_s , time_p->us = dmem->time_us ;
373  dmem->data_size = 0U ;
374  }
375 }
376 
378 {
379  fd_set read_fds ;
380  FD_ZERO( &read_fds ) ;
381  FD_SET( fd , &read_fds ) ;
382  for(;;)
383  {
384  int rc = ::select( fd+1 , &read_fds , nullptr , nullptr , nullptr ) ;
385  if( rc < 0 && errno == EINTR ) continue ;
386  if( rc != 1 )
387  throw Error( "cannot wait on the pipe" ) ;
388  break ;
389  }
390 }
391 
392 // ==
393 
394 G::FatPipe::Lock::Lock( Semaphore * s ) :
395  m_s(s)
396 {
397  s->decrement() ;
398 }
399 
400 G::FatPipe::Lock::~Lock()
401 {
402  m_s->increment() ;
403 }
404 
405 /// \file gfatpipe.cpp
void * ptr() const
Returns the mapped address.
A subsecond-resolution timestamp based on a time_t.
Definition: gdatetime.h:39
void decrement()
Decrement-but-block-if-zero operator.
int pipefd() const
Returns the pipe fd.
Definition: gfatpipe.cpp:281
An empty structure that is used to indicate a signal-safe, reentrant implementation.
Definition: gsignalsafe.h:36
void send(const char *data, size_t size, const char *type=nullptr)
Sends a chunk of data to the child process.
Definition: gfatpipe.cpp:155
static Semaphore * at(storage_type *)
Syntactic sugar to return an object pointer corresponding to the given storage pointer.
void inherit()
Marks fd() as inherited across exec() ie. no-close-on-exec.
void doChild()
To be called from the child process after fork().
Definition: gfatpipe.cpp:103
static ssize_t send(int, const void *, size_t, int, int fd_to_send=-1)
A send() replacement using sendmsg().
Definition: gmsg.cpp:32
const char * pipefd() const
Returns the pipe file descriptor as a string pointer (suitable for exec()).
Definition: gfatpipe.cpp:130
static std::string fromInt(int i)
Converts int 'i' to a string.
Definition: gstr.cpp:294
FatPipe()
Constructor.
Definition: gfatpipe.cpp:57
~FatPipe()
Destructor.
Definition: gfatpipe.cpp:75
static EpochTime now()
Returns the current epoch time.
A semaphore class with a posix or sysv implementation chosen at build-time.
Definition: gsemaphore.h:47
const char * shmemfd() const
Returns the shared memory file descriptor as a string pointer (suitable for exec()).
Definition: gfatpipe.cpp:135
void doParent(bool auto_cleanup=true)
To be called from the parent process after fork().
Definition: gfatpipe.cpp:140
A POSIX shared memory class.
Definition: gsharedmemory.h:41
Shared memory structure for G::FatPipe.
Definition: gfatpipe.h:133
static void wait(int pipe_fd)
A convenience function that sets the pipe fd to be non-blocking and does a non-multiplexed wait for a...
Definition: gfatpipe.cpp:377
FatPipeReceiver(int shmem_fd, int pipe_fd)
Constructor.
Definition: gfatpipe.cpp:275
void unlink()
Unlinks the segment from the filesystem.
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *=nullptr)
Reads a message from the fat pipe's shared memory into the supplied buffer.
Definition: gfatpipe.cpp:315
bool ping()
Returns true if the receiver seems to be there.
Definition: gfatpipe.cpp:266
static void add(void(*fn)(SignalSafe, const char *), const char *arg)
Adds the given handler to the list of handlers that are to be called when the process terminates abno...
static ssize_t recv(int, void *, size_t, int, int *fd_received_p=nullptr)
A recv() replacement using recvmsg().
Definition: gmsg.cpp:76