1/* ---------------------------------------------------------------------------
2** This software is in the public domain, furnished "as is", without technical
3** support, and with no warranty, express or implied, as to its usefulness for
4** any purpose.
5**
6** v4l2DeviceSource.cpp
7**
8** V4L2 Live555 source
9**
10** -------------------------------------------------------------------------*/
11
12#include <fcntl.h>
13#include <iomanip>
14#include <sstream>
15
16// project
17#include "logger.h"
18#include "DeviceSource.h"
19
20// ---------------------------------
21// V4L2 FramedSource Stats
22// ---------------------------------
23int V4L2DeviceSource::Stats::notify(int tv_sec, int framesize)
24{
25 m_fps++;
26 m_size+=framesize;
27 if (tv_sec != m_fps_sec)
28 {
29 LOG(INFO) << m_msg << "tv_sec:" << tv_sec << " fps:" << m_fps << " bandwidth:"<< (m_size/128) << "kbps";
30 m_fps_sec = tv_sec;
31 m_fps = 0;
32 m_size = 0;
33 }
34 return m_fps;
35}
36
37// ---------------------------------
38// V4L2 FramedSource
39// ---------------------------------
40V4L2DeviceSource* V4L2DeviceSource::createNew(UsageEnvironment& env, DeviceInterface * device, int outputFd, unsigned int queueSize, bool useThread)
41{
42 V4L2DeviceSource* source = NULL;
43 if (device)
44 {
45 source = new V4L2DeviceSource(env, device, outputFd, queueSize, useThread);
46 }
47 return source;
48}
49
50// Constructor
51V4L2DeviceSource::V4L2DeviceSource(UsageEnvironment& env, DeviceInterface * device, int outputFd, unsigned int queueSize, bool useThread)
52 : FramedSource(env),
53 m_in("in"),
54 m_out("out") ,
55 m_outfd(outputFd),
56 m_device(device),
57 m_queueSize(queueSize)
58{
59 m_eventTriggerId = envir().taskScheduler().createEventTrigger(V4L2DeviceSource::deliverFrameStub);
60 memset(&m_thid, 0, sizeof(m_thid));
61 memset(&m_mutex, 0, sizeof(m_mutex));
62 if (m_device)
63 {
64 if (useThread)
65 {
66 pthread_mutex_init(&m_mutex, NULL);
67 pthread_create(&m_thid, NULL, threadStub, this);
68 }
69 else
70 {
71 envir().taskScheduler().turnOnBackgroundReadHandling( m_device->getFd(), V4L2DeviceSource::incomingPacketHandlerStub, this);
72 }
73 }
74}
75
76// Destructor
77V4L2DeviceSource::~V4L2DeviceSource()
78{
79 envir().taskScheduler().deleteEventTrigger(m_eventTriggerId);
80 pthread_join(m_thid, NULL);
81 pthread_mutex_destroy(&m_mutex);
82 delete m_device;
83}
84
85// thread mainloop
86void* V4L2DeviceSource::thread()
87{
88 int stop=0;
89 fd_set fdset;
90 FD_ZERO(&fdset);
91 timeval tv;
92
93 LOG(NOTICE) << "begin thread";
94 while (!stop)
95 {
96 int fd = m_device->getFd();
97 FD_SET(fd, &fdset);
98 tv.tv_sec=1;
99 tv.tv_usec=0;
100 int ret = select(fd+1, &fdset, NULL, NULL, &tv);
101 if (ret == 1)
102 {
103 if (FD_ISSET(fd, &fdset))
104 {
105 if (this->getNextFrame() <= 0)
106 {
107 LOG(ERROR) << "error:" << strerror(errno);
108 stop=1;
109 }
110 }
111 }
112 else if (ret == -1)
113 {
114 LOG(ERROR) << "stop " << strerror(errno);
115 stop=1;
116 }
117 }
118 LOG(NOTICE) << "end thread";
119 return NULL;
120}
121
122// getting FrameSource callback
123void V4L2DeviceSource::doGetNextFrame()
124{
125 deliverFrame();
126}
127
128// stopping FrameSource callback
129void V4L2DeviceSource::doStopGettingFrames()
130{
131 LOG(NOTICE) << "V4L2DeviceSource::doStopGettingFrames";
132 FramedSource::doStopGettingFrames();
133}
134
135// deliver frame to the sink
136void V4L2DeviceSource::deliverFrame()
137{
138 if (isCurrentlyAwaitingData())
139 {
140 fDurationInMicroseconds = 0;
141 fFrameSize = 0;
142
143 pthread_mutex_lock (&m_mutex);
144 if (m_captureQueue.empty())
145 {
146 LOG(DEBUG) << "Queue is empty";
147 }
148 else
149 {
150 timeval curTime;
151 gettimeofday(&curTime, NULL);
152 Frame * frame = m_captureQueue.front();
153 m_captureQueue.pop_front();
154
155 m_out.notify(curTime.tv_sec, frame->m_size);
156 if (frame->m_size > fMaxSize)
157 {
158 fFrameSize = fMaxSize;
159 fNumTruncatedBytes = frame->m_size - fMaxSize;
160 }
161 else
162 {
163 fFrameSize = frame->m_size;
164 }
165 timeval diff;
166 timersub(&curTime,&(frame->m_timestamp),&diff);
167
168 LOG(DEBUG) << "deliverFrame\ttimestamp:" << curTime.tv_sec << "." << curTime.tv_usec << "\tsize:" << fFrameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms\tqueue:" << m_captureQueue.size();
169
170 fPresentationTime = frame->m_timestamp;
171 memcpy(fTo, frame->m_buffer, fFrameSize);
172 delete frame;
173 }
174 pthread_mutex_unlock (&m_mutex);
175
176 if (fFrameSize > 0)
177 {
178 // send Frame to the consumer
179 FramedSource::afterGetting(this);
180 }
181 }
182}
183
184// FrameSource callback on read event
185void V4L2DeviceSource::incomingPacketHandler()
186{
187 if (this->getNextFrame() <= 0)
188 {
189 handleClosure(this);
190 }
191}
192
193// read from device
194int V4L2DeviceSource::getNextFrame()
195{
196 timeval ref;
197 gettimeofday(&ref, NULL);
198 char buffer[m_device->getBufferSize()];
199 int frameSize = m_device->read(buffer, m_device->getBufferSize());
200 if (frameSize < 0)
201 {
202 LOG(NOTICE) << "V4L2DeviceSource::getNextFrame errno:" << errno << " " << strerror(errno);
203 }
204 else if (frameSize == 0)
205 {
206 LOG(NOTICE) << "V4L2DeviceSource::getNextFrame no data errno:" << errno << " " << strerror(errno);
207 }
208 else
209 {
210 timeval tv;
211 gettimeofday(&tv, NULL);
212 timeval diff;
213 timersub(&tv,&ref,&diff);
214 m_in.notify(tv.tv_sec, frameSize);
215 LOG(DEBUG) << "getNextFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << frameSize <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms";
216 processFrame(buffer,frameSize,ref);
217 if (m_outfd != -1)
218 {
219 write(m_outfd, buffer, frameSize);
220 }
221 }
222 return frameSize;
223}
224
225
226void V4L2DeviceSource::processFrame(char * frame, int frameSize, const timeval &ref)
227{
228 timeval tv;
229 gettimeofday(&tv, NULL);
230 timeval diff;
231 timersub(&tv,&ref,&diff);
232
233 std::list< std::pair<unsigned char*,size_t> > frameList = this->splitFrames((unsigned char*)frame, frameSize);
234 while (!frameList.empty())
235 {
236 std::pair<unsigned char*,size_t>& frame = frameList.front();
237 size_t size = frame.second;
238 char* buf = new char[size];
239 memcpy(buf, frame.first, size);
240 queueFrame(buf,size,ref);
241
242 LOG(DEBUG) << "queueFrame\ttimestamp:" << ref.tv_sec << "." << ref.tv_usec << "\tsize:" << size <<"\tdiff:" << (diff.tv_sec*1000+diff.tv_usec/1000) << "ms";
243 frameList.pop_front();
244 }
245}
246
247// post a frame to fifo
248void V4L2DeviceSource::queueFrame(char * frame, int frameSize, const timeval &tv)
249{
250 pthread_mutex_lock (&m_mutex);
251 while (m_captureQueue.size() >= m_queueSize)
252 {
253 LOG(DEBUG) << "Queue full size drop frame size:" << (int)m_captureQueue.size() ;
254 delete m_captureQueue.front();
255 m_captureQueue.pop_front();
256 }
257 m_captureQueue.push_back(new Frame(frame, frameSize, tv));
258 pthread_mutex_unlock (&m_mutex);
259
260 // post an event to ask to deliver the frame
261 envir().taskScheduler().triggerEvent(m_eventTriggerId, this);
262}
263
264// split packet in frames
265std::list< std::pair<unsigned char*,size_t> > V4L2DeviceSource::splitFrames(unsigned char* frame, unsigned frameSize)
266{
267 std::list< std::pair<unsigned char*,size_t> > frameList;
268 if (frame != NULL)
269 {
270 frameList.push_back(std::pair<unsigned char*,size_t>(frame, frameSize));
271 }
272 return frameList;
273}
274
275
276
277