31 #include <sys/select.h>
32 #include <sys/socket.h>
38 std::string G::FatPipe::name()
40 std::ostringstream ss ;
41 ss <<
"fatpipe." << getpid() ;
45 std::string G::FatPipe::name(
size_t size )
47 std::ostringstream ss ;
48 ss <<
"fatpipe." << getpid() <<
"." << size ;
52 size_t G::FatPipe::sensible(
size_t size )
54 return std::max(
size_t(1024U)*16U , size ) ;
64 if( ::socketpair( AF_UNIX , SOCK_DGRAM , 0 , m_pipe_fds ) < 0 )
65 throw Error(
"socketpair pipe creation failed" ) ;
71 mem->magic = ControlMemory::MAGIC ;
77 if( m_parent && m_pipe_fd != -1 )
80 close_( m_pipe_fds[0] ) ;
81 close_( m_pipe_fds[1] ) ;
91 char * end =
const_cast<char*
>( fd_p + std::strlen(fd_p) ) ;
92 long pipe_fd = ::strtol( fd_p , &end , 10 ) ;
94 G::Msg::send( static_cast<int>(pipe_fd) ,
"x" , 1 , MSG_DONTWAIT ) ;
97 void G::FatPipe::close_(
int fd )
108 ::close( m_pipe_fds[1] ) ;
109 m_pipe_fd = m_pipe_fds[0] ;
110 m_pipe_fds[0] = m_pipe_fds[1] = -1 ;
113 ::fcntl( m_pipe_fd , F_SETFD , ::fcntl(m_pipe_fd,F_GETFD) & ~FD_CLOEXEC ) ;
115 m_arg_shmemfd = tostring(m_shmem.fd()) ;
116 m_arg_pipefd = tostring(m_pipe_fd) ;
123 std::string G::FatPipe::tostring(
int n )
125 std::ostringstream ss ;
132 return m_arg_pipefd.c_str() ;
137 return m_arg_shmemfd.c_str() ;
143 ::close( m_pipe_fds[0] ) ;
144 m_pipe_fd = m_pipe_fds[1] ;
145 m_pipe_fds[0] = m_pipe_fds[1] = -1 ;
149 static std::list<std::string> arg_list ;
157 if( data_p ==
nullptr || data_n == 0U ) return ;
158 m_data_p.resize( 1U ) ;
159 m_data_n.resize( 1U ) ;
160 m_data_p[0] = data_p ;
161 m_data_n[0] = data_n ;
162 send( data_n , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
167 if( data.empty() )
return ;
168 m_data_p.resize( 1U ) ;
169 m_data_n.resize( 1U ) ;
170 m_data_p[0] = &data[0] ;
171 m_data_n[0] = data.size() ;
172 send( data.size() , 1U , &m_data_p[0] , &m_data_n[0] , type ) ;
175 void G::FatPipe::send(
const std::vector<std::pair<const char *,size_t> > & data ,
const char * type )
177 m_data_p.resize( data.size() ) ;
178 m_data_n.resize( data.size() ) ;
179 size_t data_total = 0U ;
180 for(
size_t i = 0U ; i < data.size() ; i++ )
182 m_data_p[i] = data[i].first ;
183 m_data_n[i] = data[i].second ;
184 data_total += data[i].second ;
186 if( data_total != 0U )
187 send( data_total , data.size() , &m_data_p[0] , &m_data_n[0] , type ) ;
192 m_data_p.resize( data.size() ) ;
193 m_data_n.resize( data.size() ) ;
194 size_t data_total = 0U ;
196 for(
size_t i_in = 0U ; i_in < data.size() ; i_in++ )
198 if( data[i_in].empty() )
continue ;
199 m_data_p[i_out] = &(data[i_in])[0] ;
200 m_data_n[i_out++] = data[i_in].size() ;
201 data_total += data[i_in].size() ;
203 if( data_total != 0U )
204 send( data_total , i_out , &m_data_p[0] , &m_data_n[0] , type ) ;
207 void G::FatPipe::send(
size_t data_total ,
size_t data_count ,
const char ** data_p_p ,
size_t * data_n_p ,
const char * type )
211 if( m_shmem_data.get() ==
nullptr || (
sizeof(DataMemory)+data_total) > m_shmem_data->size() )
213 size_t new_size =
sizeof(DataMemory) + data_total + data_total/2U ;
214 G_DEBUG(
"G::FatPipe::send: new data segment: size " << new_size ) ;
215 m_shmem_data.reset(
new SharedMemory( name(new_size) , new_size ) ) ;
216 m_new_data_fd = m_shmem_data->fd() ;
217 G_DEBUG(
"G::FatPipe::send: new data segment: fd " << m_new_data_fd ) ;
218 m_shmem_data->unlink() ;
219 DataMemory * dmem =
static_cast<DataMemory*
>(m_shmem_data->ptr()) ;
220 dmem->magic = DataMemory::MAGIC ;
221 dmem->type[0] =
'\0' ;
222 dmem->data_size = 0U ;
229 ControlMemory * mem =
static_cast<ControlMemory*
>(m_shmem.ptr()) ;
230 DataMemory * dmem =
static_cast<DataMemory*
>(m_shmem_data->ptr()) ;
234 G_ASSERT( mem->magic == ControlMemory::MAGIC ) ;
235 G_ASSERT( dmem->magic == DataMemory::MAGIC ) ;
236 ::memset( dmem->type , 0 ,
sizeof(dmem->type) ) ;
237 if( type !=
nullptr ) ::strncpy( dmem->type , type ,
sizeof(dmem->type)-1U ) ;
238 for(
char * out = dmem->data ; data_count ; out += *data_n_p , data_count-- , data_p_p++ , data_n_p++ )
239 ::memcpy( out , *data_p_p , *data_n_p ) ;
240 dmem->data_size = data_total ;
241 dmem->time_s = time.s ;
242 dmem->time_us = time.us ;
247 if( m_pipe_fd != -1 )
249 bool wait = m_new_data_fd != -1 ;
250 int flags = wait ? 0 : MSG_DONTWAIT ;
251 ssize_t rc =
G::Msg::send( m_pipe_fd ,
"." , 1 , flags , m_new_data_fd ) ;
259 const bool temporary = e == ENOBUFS ;
261 throw Error(
"pipe write failed" , G::Process::strerror(e) ) ;
268 if( m_pipe_fd == -1 )
return false ;
269 ssize_t rc =
G::Msg::send( m_pipe_fd ,
"p" , 1 , MSG_DONTWAIT ) ;
286 G::FatPipeReceiver::Info G::FatPipeReceiver::flush(
int pipe_fd )
292 result.got_data = false ;
293 result.got_eof = false ;
294 result.data_fd = -1 ;
299 ssize_t rc =
G::Msg::recv( pipe_fd , &c , 1 , MSG_DONTWAIT , &fd ) ;
300 G_DEBUG(
"G::FatPipeReceiver::flush: recv() rc=" << rc <<
" c=" <<
int(c) <<
" fd=" << fd ) ;
302 result.data_fd = fd ;
303 if( rc == 1 && c ==
'.' )
304 result.got_data = true ;
305 else if( rc == 1 && c ==
'x' )
306 result.got_eof = true ;
307 else if( rc == 1 && c ==
'p' )
319 if( type_p !=
nullptr )
323 Info pipe_info = flush( m_pipe_fd ) ;
324 G_DEBUG(
"G::FatPipeReceiver::receive: got-data=" << pipe_info.got_data <<
" data-fd=" << pipe_info.data_fd ) ;
325 if( pipe_info.got_eof )
329 if( m_shmem_data.get() ==
nullptr && pipe_info.got_data && pipe_info.data_fd == -1 )
331 G_WARNING(
"G::FatPipeReceiver::receive: data segment file descriptor was not received" ) ;
334 if( pipe_info.data_fd != -1 )
336 G_DEBUG(
"G::FatPipeReceiver::receive: new data segment fd: " << pipe_info.data_fd ) ;
337 m_shmem_data.reset(
new SharedMemory(pipe_info.data_fd) ) ;
341 if( !pipe_info.got_data )
345 G_ASSERT( m_shmem_data.get() != nullptr ) ;
346 copy( m_shmem , *m_shmem_data.get() , buffer , type_p , time_p ) ;
349 if( type_p && type_p->find(
'\0') != std::string::npos )
350 type_p->resize( type_p->find(
'\0') ) ;
356 std::vector<char> & buffer , std::string * type_p ,
G::EpochTime * time_p )
358 ControlMemory * mem =
static_cast<ControlMemory*
>( shmem.
ptr() ) ;
359 if( mem->magic != ControlMemory::MAGIC )
360 throw Error(
"magic number mismatch in shared memory" ) ;
362 DataMemory * dmem =
static_cast<DataMemory*
>( shmem_data.
ptr() ) ;
366 G_ASSERT( dmem->magic == DataMemory::MAGIC ) ;
367 buffer.resize( dmem->data_size ) ;
368 ::memcpy( &buffer[0] , dmem->data , buffer.size() ) ;
369 if( dmem->data_size && type_p !=
nullptr )
370 type_p->assign( dmem->type ,
sizeof(dmem->type) ) ;
371 if( time_p !=
nullptr )
372 time_p->s = dmem->time_s , time_p->us = dmem->time_us ;
373 dmem->data_size = 0U ;
380 FD_ZERO( &read_fds ) ;
381 FD_SET( fd , &read_fds ) ;
384 int rc = ::select( fd+1 , &read_fds ,
nullptr ,
nullptr ,
nullptr ) ;
385 if( rc < 0 && errno == EINTR ) continue ;
387 throw Error(
"cannot wait on the pipe" ) ;
394 G::FatPipe::Lock::Lock(
Semaphore * s ) :
400 G::FatPipe::Lock::~Lock()
void * ptr() const
Returns the mapped address.
A subsecond-resolution timestamp based on a time_t.
void decrement()
Decrement-but-block-if-zero operator.
int pipefd() const
Returns the pipe fd.
An empty structure that is used to indicate a signal-safe, reentrant implementation.
void send(const char *data, size_t size, const char *type=nullptr)
Sends a chunk of data to the child process.
static Semaphore * at(storage_type *)
Syntactic sugar to return an object pointer corresponding to the given storage pointer.
void inherit()
Marks fd() as inherited across exec() ie. no-close-on-exec.
void doChild()
To be called from the child process after fork().
static ssize_t send(int, const void *, size_t, int, int fd_to_send=-1)
A send() replacement using sendmsg().
const char * pipefd() const
Returns the pipe file descriptor as a string pointer (suitable for exec()).
static std::string fromInt(int i)
Converts int 'i' to a string.
static EpochTime now()
Returns the current epoch time.
A semaphore class with a posix or sysv implementation chosen at build-time.
const char * shmemfd() const
Returns the shared memory file descriptor as a string pointer (suitable for exec()).
void doParent(bool auto_cleanup=true)
To be called from the parent process after fork().
A POSIX shared memory class.
Shared memory structure for G::FatPipe.
static void wait(int pipe_fd)
A convenience function that sets the pipe fd to be non-blocking and does a non-multiplexed wait for a...
FatPipeReceiver(int shmem_fd, int pipe_fd)
Constructor.
void unlink()
Unlinks the segment from the filesystem.
bool receive(std::vector< char > &buffer, std::string *type_p=nullptr, G::EpochTime *=nullptr)
Reads a message from the fat pipe's shared memory into the supplied buffer.
bool ping()
Returns true if the receiver seems to be there.
static void add(void(*fn)(SignalSafe, const char *), const char *arg)
Adds the given handler to the list of handlers that are to be called when the process terminates abno...
static ssize_t recv(int, void *, size_t, int, int *fd_received_p=nullptr)
A recv() replacement using recvmsg().