VideoTools
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
gvrtpserver.cpp
Go to the documentation of this file.
1 //
2 // Copyright (C) 2017 Graeme Walker
3 //
4 // This program is free software: you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation, either version 3 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program. If not, see <http://www.gnu.org/licenses/>.
16 // ===
17 //
18 // gvrtpserver.cpp
19 //
20 
21 #include "gdef.h"
22 #include "gnet.h"
23 #include "gvrtpserver.h"
24 #include "gvmulticast.h"
25 #include "groot.h"
26 #include "gaddress.h"
27 #include "grjpeg.h"
28 #include "gstr.h"
29 #include "gfile.h"
30 #include "gtest.h"
31 #include "ghexdump.h"
32 #include "glog.h"
33 #include <fstream>
34 #include <sstream>
35 #include <algorithm>
36 
37 Gv::RtpServer::RtpServer( RtpServerHandler & handler , int scale , bool monochrome , GNet::Address bind_address ,
38  const std::string & group_address , unsigned int packet_type , const G::Path & fmtp_file ,
39  int jpeg_fudge_factor , const std::string & filter_spec , unsigned int source_stale_timeout ) :
40  m_handler(handler) ,
41  m_scale(scale) ,
42  m_monochrome(monochrome) ,
43  m_packet_type(packet_type) ,
44  m_source_id(0) ,
45  m_source_time(0) ,
46  m_source_stale_timeout(10U) ,
47  m_fmtp_file(fmtp_file) ,
48  m_jpeg_fudge_factor(jpeg_fudge_factor) ,
49  m_filter_spec(filter_spec) ,
50  m_socket(bind_address.domain()) ,
51  m_packet_buffer(1500U) ,
52  m_packet_stream(jpeg_fudge_factor) ,
53  m_seq_old(0U) ,
54  m_test_timer(*this,&RtpServer::onTimer,*this) ,
55  m_test_index(0)
56 {
57  {
58  G::Root claim_root ;
59  m_socket.bind( bind_address ) ;
60  }
61 
62  m_socket.addReadHandler( *this ) ;
63 
64  if( !group_address.empty() )
65  join( group_address ) ;
66 
67  if( G::Test::enabled("rtpserver-replay") )
68  m_test_timer.startTimer( 2U ) ;
69 
70  {
71  // TODO rtp packet filtering expressions
72  G::StringArray list ;
73  G::Str::splitIntoTokens( m_filter_spec , list , "," ) ;
74  for( G::StringArray::iterator p = list.begin() ; p != list.end() ; ++p )
75  {
76  if( G::Str::isNumeric(*p) )
77  m_filter_list.push_back( G::Str::toUInt(*p) ) ;
78  }
79  }
80 }
81 
83 {
84 }
85 
86 void Gv::RtpServer::join( const std::string & group )
87 {
88  Gv::Multicast::join( m_socket.fd() , group ) ;
89 }
90 
91 void Gv::RtpServer::onTimer()
92 {
93  // if we have seen no network packets in the initial startup time period then
94  // look for replay files called "test-1.dat" etc.
95  G_DEBUG( "Gv::RtpServer::onTimer: timeout: " << m_seq_old << " " << m_test_index ) ;
96  if( ( m_seq_old == 0U && m_test_index == 0 ) || m_test_index > 0 )
97  {
98  std::string test_file = "test-" + G::Str::fromInt(++m_test_index) + ".dat" ;
99  if( G::File::exists(test_file) )
100  {
101  G_LOG_S( "Server::onTimer: replaying test data file [" << test_file << "]" ) ;
102  std::ifstream f( test_file.c_str() ) ;
103  std::stringstream ss ;
104  ss << f.rdbuf() ;
105  std::string s = ss.str() ;
106  onData( s.data() , s.size() ) ;
107  m_test_timer.startTimer( 0U ) ;
108  }
109  }
110 }
111 
112 void Gv::RtpServer::readEvent()
113 {
114  G_ASSERT( m_packet_buffer.size() == 1500U ) ;
115  GNet::Socket::ssize_type n = m_socket.read( &m_packet_buffer[0] , m_packet_buffer.size() ) ;
116  if( n != 0U )
117  onData( &m_packet_buffer[0] , n ) ;
118 }
119 
120 void Gv::RtpServer::onData( const char * p , std::string::size_type n )
121 {
122  #if 0
123  if( G::Test::enabled("rtpserver-save") )
124  {
125  static int seq = 0 ;
126  std::ostringstream ss ;
127  ss << "test-" << ++seq << ".dat" ;
128  std::ofstream file( ss.str().c_str() ) ;
129  file << std::string(p,n) ;
130  }
131  #endif
132 
133  processRtpData( p , n ) ;
134 }
135 
136 void Gv::RtpServer::processRtpData( const char * p , std::string::size_type n )
137 {
138  // parse as RTP
139  //
140  if( n < Gv::RtpPacket::smallest() )
141  {
142  G_WARNING( "Server::processRtpData: ignoring invalid rtp packet: too small" ) ;
143  return ;
144  }
145  Gv::RtpPacket rtp_packet( p , n ) ;
146  if( !rtp_packet.valid() )
147  {
148  G_WARNING( "Server::processRtpData: ignoring invalid rtp packet: " << rtp_packet.reason() ) ;
149  return ;
150  }
151 
152  // only process packets of one payload type
153  if( m_packet_type == 0U )
154  {
155  G_LOG( "Gv::RtpServer::processRtpData: processing packets of type " << rtp_packet.type() ) ;
156  m_packet_type = rtp_packet.type() ;
157  }
158  if( m_packet_type != 0U && rtp_packet.type() != m_packet_type )
159  {
160  G_WARNING_ONCE( "Gv::RtpServer::processRtpData: ignoring unwanted rtp packet type: " << rtp_packet.type() << " != " << m_packet_type ) ;
161  G_DEBUG( "Gv::RtpServer::processRtpData: ignoring unwanted rtp packet type: " << rtp_packet.type() << " != " << m_packet_type ) ;
162  return ;
163  }
164 
165  // only process packets from one source
166  if( m_source_id == 0U )
167  {
168  G_DEBUG( "Gv::RtpServer::processRtpData: source id: " << rtp_packet.src() ) ;
169  m_source_id = rtp_packet.src() ;
170  }
171  if( rtp_packet.src() == m_source_id )
172  {
173  m_source_time = ::time(nullptr) ;
174  }
175  else
176  {
177  if( (m_source_time+m_source_stale_timeout) < ::time(nullptr) )
178  {
179  G_LOG( "Gv::RtpServer::processRtpData: switching source: " << m_source_id << " -> " << rtp_packet.src() ) ;
180  m_source_id = rtp_packet.src() ;
181  m_source_time = ::time(nullptr) ;
182  }
183  else
184  {
185  G_WARNING( "Gv::RtpServer::processRtpData: ignoring unknown rtp packet source: " << rtp_packet.src() << " != " << m_source_id ) ;
186  return ;
187  }
188  }
189 
190  // report missing packets
191  if( m_seq_old != 0U && rtp_packet.seq() != (m_seq_old+1U) && rtp_packet.seq() != 0U )
192  {
193  G_WARNING( "Gv::RtpServer::processRtpData: missing packet(s): " << (m_seq_old+1U) << "-" << (rtp_packet.seq()-1U) ) ;
194  }
195  m_seq_old = rtp_packet.seq() ;
196 
197  // type-specific processing
198  //
199  if( rtp_packet.typeJpeg() ) // 26 = "JPEG/9000"
200  {
201  // parse as RTP JPEG
202 
203  if( rtp_packet.size() < Gv::RtpJpegPacket::smallest() )
204  {
205  G_WARNING( "Gv::RtpServer::processRtpData: invalid rtp-jpeg packet: too small" ) ;
206  return ;
207  }
208 
209  Gv::RtpJpegPacket jpeg_packet( rtp_packet.begin() , rtp_packet.end() ) ;
210  if( !jpeg_packet.valid() )
211  {
212  G_WARNING( "Gv::RtpServer::processRtpData: invalid rtp-jpeg packet: " << jpeg_packet.reason() ) ;
213  return ;
214  }
215 
216  if( G::Log::at(G::Log::s_LogVerbose) )
217  G_LOG( "Gv::RtpPacketStream::add: rtp-jpeg packet: " << rtp_packet.str() << " ; " << jpeg_packet.str() ) ;
218 
219  if( !filter(jpeg_packet) )
220  m_packet_stream.add( rtp_packet , jpeg_packet ) ;
221 
222  while( m_packet_stream.more() )
223  processJpegPayload( m_packet_stream.get() ) ;
224  }
225  else if( rtp_packet.typeDynamic() ) // dynamic mapping using sdp
226  {
227  // parse as RTP H264
228 
229  if( rtp_packet.size() < Gv::RtpAvcPacket::smallest() )
230  {
231  G_WARNING( "Gv::RtpServer::processRtpData: invalid rtp-avc packet: too small" ) ;
232  return ;
233  }
234 
235  Gv::RtpAvcPacket avc_packet( rtp_packet.ubegin() , rtp_packet.uend() ) ;
236  if( !avc_packet.valid() )
237  {
238  G_WARNING( "Gv::RtpServer::processRtpData: invalid rtp-avc packet: " << avc_packet.reason() ) ;
239  return ;
240  }
241 
242  if( G::Log::at(G::Log::s_LogVerbose) )
243  G_LOG( "Gv::RtpServer::processRtpAvcPacket: rtp-avc packet: " << rtp_packet.str() << " ; " << avc_packet.str(G::Log::at(G::Log::s_Debug)) ) ;
244 
245  if( !filter(avc_packet) )
246  m_packet_stream.add( rtp_packet , avc_packet ) ;
247 
248  while( m_packet_stream.more() )
249  processAvcPayload( m_packet_stream.get() ) ;
250  }
251 }
252 
253 bool Gv::RtpServer::filter( const Gv::RtpJpegPacket & ) const
254 {
255  return false ;
256 }
257 
258 bool Gv::RtpServer::filter( const Gv::RtpAvcPacket & avc_packet ) const
259 {
260  if( !m_filter_list.empty() )
261  {
262  //unsigned int packet_level = avc_packet.nri() ; // 0..3
263  unsigned int nalu_type = avc_packet.type_is_fu() ? avc_packet.fu_type() : avc_packet.type() ;
264  bool match = std::find( m_filter_list.begin() , m_filter_list.end() , nalu_type ) != m_filter_list.end() ;
265  return !match ; // keep on match
266  }
267  else
268  {
269  return false ; // keep
270  }
271 }
272 
273 void Gv::RtpServer::processJpegPayload( const std::vector<char> & payload )
274 {
275  // parse out the image size
276  Gr::JpegInfo jpeg_info( &payload[0] , payload.size() ) ;
277  Gr::ImageType image_type = Gr::ImageType::jpeg( jpeg_info.dx() , jpeg_info.dy() , jpeg_info.channels() ) ;
278  G_DEBUG( "Gv::RtpServer::processJpegFrame: processing jpeg image: " << image_type ) ;
279 
280  int scale = autoscale( m_scale , jpeg_info.dx() ) ;
281  if( scale > 1 || m_monochrome )
282  {
283  m_jpeg_reader.setup( scale , m_monochrome ) ;
284  m_jpeg_reader.decode( m_jpeg_image_data , &payload[0] , payload.size() ) ;
285  m_jpeg_writer.encode( m_jpeg_image_data , m_jpeg_buffer ) ;
286  m_handler.onImage( m_jpeg_buffer , Gr::ImageType::jpeg(image_type,scale,m_monochrome) , true/*keyframe*/ ) ;
287  }
288  else
289  {
290  m_handler.onImage( payload , image_type , true/*keyframe*/ ) ;
291  }
292 }
293 
294 void Gv::RtpServer::processAvcPayload( const std::vector<char> & payload )
295 {
296  // lazy decoding of the fmtp into the avcc and lazy construction of the decoder reader stream
297  if( m_avcc.get() == nullptr && m_fmtp_file != G::Path() )
298  {
299  G_DEBUG( "Gv::RtpServer::processAvcPayload: fmtp processing" ) ;
300  std::string fmtp = readFmtpFile( m_fmtp_file ) ;
301  if( !fmtp.empty() )
302  {
304  if( !avcc.valid() )
305  throw InvalidFmtp( avcc.reason() ) ;
306 
307  m_avcc.reset( new Gr::Avc::Configuration(avcc) ) ;
308 
309  if( m_avc_reader_stream.get() == nullptr )
310  m_avc_reader_stream.reset( new Gv::AvcReaderStream( *m_avcc.get() ) ) ;
311  }
312  }
313 
314  // if no fmtp then create a decoder reader stream without the avcc and rely on SPS and PPS NALUs
315  if( m_avc_reader_stream.get() == nullptr )
316  {
317  G_DEBUG( "Gv::RtpServer::processAvcFrame: initialising avc decoder without fmtp" ) ;
318  m_avc_reader_stream.reset( new Gv::AvcReaderStream ) ;
319  }
320 
321  // decode
322  G_DEBUG( "Gv::RtpServer::processAvcFrame: avc decode: " << G::hexdump<16>(payload.begin(),payload.end()) ) ;
323  Gv::AvcReader reader( *m_avc_reader_stream.get() , &payload[0] , payload.size() ) ;
324  if( reader.valid() )
325  {
326  m_output_buffer.clear() ;
327  Gr::ImageType image_type = reader.fill( m_output_buffer , autoscale(m_scale,reader.dx()) , m_monochrome ) ;
328  m_handler.onImage( m_output_buffer , image_type , reader.keyframe() ) ;
329  }
330  else
331  {
332  G_DEBUG( "Gv::RtpServer::processAvcFrame: avc decode failed" ) ;
333  }
334 }
335 
336 int Gv::RtpServer::autoscale( int s , int dx )
337 {
338  if( s == -1 ) // "auto"
339  {
340  s = 1 ;
341  dx = std::max( 0 , dx ) ;
342  while( (dx/s) > 800 )
343  s++ ;
344  }
345  return s ;
346 }
347 
348 void Gv::RtpServer::onException( std::exception & e )
349 {
350  G_DEBUG( "Gv::RtpServer::onException: " << e.what() ) ;
351  throw ;
352 }
353 
354 std::string Gv::RtpServer::readFmtpFile( const G::Path & fmtp_file )
355 {
356  std::ifstream f ;
357  {
358  G::Root claim_root ;
359  f.open( fmtp_file.str().c_str() ) ;
360  }
361  std::string line ;
362  do
363  {
364  line.clear() ;
365  std::getline( f , line ) ;
366  } while( line.find("#") == 0U ) ;
367  return line ;
368 }
369 
370 // ==
371 
372 Gv::RtpServerHandler::~RtpServerHandler()
373 {
374 }
375 
376 /// \file gvrtpserver.cpp
std::string str() const
Returns the path string.
Definition: gpath.cpp:290
static size_t smallest()
The smallest parsable packet.
A decoder for an AVC (aka H.264) video packet.
Definition: gvavcreader.h:79
static size_t smallest()
Returns the smallest valid packet size.
Definition: gvrtppacket.cpp:57
RtpServer(RtpServerHandler &, int scale, bool monochrome, GNet::Address bind_address, const std::string &group_address, unsigned int packet_type, const G::Path &fmtp_file, int jpeg_fudge_factor, const std::string &filter_spec, unsigned int source_stale_timeout)
Constructor.
Definition: gvrtpserver.cpp:37
An RTP payload parser for the jpeg payload type.
static bool at(Severity)
Returns true if G::LogOutput::output() would log at the given level.
Definition: glog.cpp:43
static void join(SOCKET, const std::string &)
Joins the socket to the multicast group. IPv4 only. Throws on error.
Definition: gvmulticast.cpp:31
static ImageType jpeg(int dx, int dy, int channels=3)
Factory function for a jpeg image type.
Synopsis:
Contains AVC configuration parameters, initialised from an "avcC" file segment or from an SDP "fmtp" ...
Definition: gravc.h:93
The GNet::Address class encapsulates a TCP/UDP transport address.
Definition: gaddress.h:55
std::vector< std::string > StringArray
A std::vector of std::strings.
Definition: gstrings.h:33
An encapsulation of image type, including width, height and number of channels, with support for a st...
Definition: grimagetype.h:43
A class which acquires the process's special privileges on construction and releases them on destruct...
Definition: groot.h:49
void addReadHandler(EventHandler &handler)
Adds this socket to the event source list so that the given handler receives read events...
Definition: gsocket.cpp:222
static void splitIntoTokens(const std::string &in, StringArray &out, const std::string &ws)
Splits the string into 'ws'-delimited tokens.
Definition: gstr.cpp:868
unsigned int type() const
Returns the RTP-AVC packet type, matching the Type enum.
static bool isNumeric(const std::string &s, bool allow_minus_sign=false)
Returns true if every character is a decimal digit.
Definition: gstr.cpp:228
virtual ~RtpServer()
Destructor.
Definition: gvrtpserver.cpp:82
static std::string fromInt(int i)
Converts int 'i' to a string.
Definition: gstr.cpp:294
An RTP server class.
Definition: gvrtpserver.h:52
void bind(const Address &)
Binds the socket with the given address.
Definition: gsocket.cpp:93
static unsigned int toUInt(const std::string &s)
Converts string 's' to an unsigned int.
Definition: gstr.cpp:450
static bool enabled()
Returns true if test features are enabled.
Definition: gtest.cpp:49
static Configuration fromFmtp(const std::string &fmtp)
Factory function taking a SDP (Session Description Protocol) "fmtp" attribute string, something like "profile-level-id=...; ...; sprop-parameters-sets=Z00AKZpmA8==,aO48gA==".
Definition: gravc.cpp:50
static size_t smallest()
The smallest parsable packet.
std::string reason() const
Returns the in-valid() reason.
Definition: gravc.cpp:251
static bool exists(const Path &file)
Returns true if the file (directory, device etc.) exists.
Definition: gfile.cpp:132
An RTP packet parser, as per RFC 3550 (section 5).
Definition: gvrtppacket.h:44
bool valid() const
Returns true if a usable object.
Definition: gravc.cpp:246
unsigned int fu_type() const
Returns the type of the fragmented NALU.
Holds state for an AVC (aka H.264) decoder.
Definition: gvavcreader.h:42
Provides some basic information about a jpeg image without full decoding.
Definition: grjpeg.h:63
An RTP payload parser for the "H264" payload type.
A interface that Gv::RtpServer uses to deliver image data.
Definition: gvrtpserver.h:118
bool type_is_fu() const
Returns true if type() is FU_A or FU_B.
A Path object represents a file system path.
Definition: gpath.h:72