261 lines
6.4 KiB
C++
Executable File
261 lines
6.4 KiB
C++
Executable File
/*
|
|
This is an example of how you can create a single poll thread for
|
|
multiple transmitters and sessions, using a single pre-created
|
|
RTPAbortDescriptors instance
|
|
*/
|
|
|
|
#include "rtpconfig.h"
|
|
#include <iostream>
|
|
|
|
using namespace std;
|
|
|
|
#ifdef RTP_SUPPORT_THREAD
|
|
|
|
#include "rtpsession.h"
|
|
#include "rtpudpv4transmitter.h"
|
|
#include "rtpipv4address.h"
|
|
#include "rtpsessionparams.h"
|
|
#include "rtperrors.h"
|
|
#include "rtplibraryversion.h"
|
|
#include "rtpsourcedata.h"
|
|
#include "rtpabortdescriptors.h"
|
|
#include "rtpselect.h"
|
|
#include "rtprandom.h"
|
|
#include <jthread/jthread.h>
|
|
#include <stdlib.h>
|
|
#include <stdio.h>
|
|
#include <string>
|
|
#include <vector>
|
|
|
|
using namespace jrtplib;
|
|
using namespace jthread;
|
|
|
|
inline void checkerror(int rtperr)
|
|
{
|
|
if (rtperr < 0)
|
|
{
|
|
cerr << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
|
|
exit(-1);
|
|
}
|
|
}
|
|
|
|
class MyRTPSession : public RTPSession
|
|
{
|
|
public:
|
|
MyRTPSession(RTPRandom *rnd) : RTPSession(rnd) { }
|
|
~MyRTPSession() { }
|
|
protected:
|
|
void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
|
|
{
|
|
printf("SSRC %x Got packet in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), srcdat->GetSSRC());
|
|
DeletePacket(rtppack);
|
|
*ispackethandled = true;
|
|
}
|
|
|
|
void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
|
|
{
|
|
char msg[1024];
|
|
|
|
memset(msg, 0, sizeof(msg));
|
|
if (itemlength >= sizeof(msg))
|
|
itemlength = sizeof(msg)-1;
|
|
|
|
memcpy(msg, itemdata, itemlength);
|
|
printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
|
|
}
|
|
};
|
|
|
|
class MyPollThread : public JThread
|
|
{
|
|
public:
|
|
MyPollThread(const vector<SocketType> &sockets, const vector<RTPSession *> &sessions)
|
|
: m_sockets(sockets), m_sessions(sessions)
|
|
{
|
|
if (m_mutex.Init() < 0)
|
|
{
|
|
cerr << "ERROR: unable to initialize mutex" << endl;
|
|
exit(-1);
|
|
}
|
|
m_stop = false;
|
|
}
|
|
|
|
void SignalStop()
|
|
{
|
|
m_mutex.Lock();
|
|
m_stop = true;
|
|
m_mutex.Unlock();
|
|
}
|
|
private:
|
|
void *Thread()
|
|
{
|
|
JThread::ThreadStarted();
|
|
|
|
vector<int8_t> flags(m_sockets.size());
|
|
bool done = false;
|
|
m_mutex.Lock();
|
|
done = m_stop;
|
|
m_mutex.Unlock();
|
|
|
|
while (!done)
|
|
{
|
|
double minInt = 10.0; // wait at most 10 secs
|
|
for (int i = 0 ; i < m_sessions.size() ; i++)
|
|
{
|
|
double nextInt = m_sessions[i]->GetRTCPDelay().GetDouble();
|
|
|
|
if (nextInt > 0 && nextInt < minInt)
|
|
minInt = nextInt;
|
|
else if (nextInt <= 0) // call the Poll function to make sure that RTCP packets are sent
|
|
{
|
|
//cout << "RTCP packet should be sent, calling Poll" << endl;
|
|
m_sessions[i]->Poll();
|
|
}
|
|
}
|
|
|
|
RTPTime waitTime(minInt);
|
|
//cout << "Waiting at most " << minInt << " seconds in select" << endl;
|
|
|
|
int status = RTPSelect(&m_sockets[0], &flags[0], m_sockets.size(), waitTime);
|
|
checkerror(status);
|
|
|
|
if (status > 0) // some descriptors were set
|
|
{
|
|
for (int i = 0 ; i < m_sockets.size() ; i++)
|
|
{
|
|
if (flags[i])
|
|
{
|
|
int idx = i/2; // two sockets per session
|
|
if (idx < m_sessions.size())
|
|
m_sessions[idx]->Poll();
|
|
}
|
|
}
|
|
}
|
|
|
|
m_mutex.Lock();
|
|
done = m_stop;
|
|
m_mutex.Unlock();
|
|
}
|
|
|
|
return 0;
|
|
}
|
|
|
|
JMutex m_mutex;
|
|
bool m_stop;
|
|
vector<SocketType> m_sockets;
|
|
vector<RTPSession *> m_sessions;
|
|
};
|
|
|
|
int main(void)
|
|
{
|
|
#ifdef RTP_SOCKETTYPE_WINSOCK
|
|
WSADATA dat;
|
|
WSAStartup(MAKEWORD(2,2),&dat);
|
|
#endif // RTP_SOCKETTYPE_WINSOCK
|
|
|
|
RTPAbortDescriptors abortDesc;
|
|
vector<SocketType> pollSockets;
|
|
|
|
checkerror(abortDesc.Init());
|
|
|
|
int numTrans = 5;
|
|
int portbaseBase = 6000;
|
|
vector<RTPUDPv4Transmitter *> transmitters;
|
|
for (int i = 0 ; i < numTrans ; i++)
|
|
{
|
|
RTPUDPv4TransmissionParams transParams;
|
|
transParams.SetPortbase(portbaseBase + i*2);
|
|
transParams.SetCreatedAbortDescriptors(&abortDesc);
|
|
|
|
RTPUDPv4Transmitter *pTrans = new RTPUDPv4Transmitter(0);
|
|
checkerror(pTrans->Init(true)); // We'll need thread safety
|
|
checkerror(pTrans->Create(64000, &transParams));
|
|
|
|
transmitters.push_back(pTrans);
|
|
}
|
|
|
|
vector<uint16_t> portBases;
|
|
vector<RTPSession *> sessions;
|
|
|
|
RTPRandom *rnd = RTPRandom::CreateDefaultRandomNumberGenerator();
|
|
|
|
for (int i = 0 ; i < transmitters.size() ; i++)
|
|
{
|
|
RTPUDPv4Transmitter *pTrans = transmitters[i];
|
|
RTPUDPv4TransmissionInfo *pInfo = static_cast<RTPUDPv4TransmissionInfo *>(pTrans->GetTransmissionInfo());
|
|
|
|
pollSockets.push_back(pInfo->GetRTPSocket());
|
|
pollSockets.push_back(pInfo->GetRTCPSocket());
|
|
portBases.push_back(pInfo->GetRTPPort());
|
|
|
|
pTrans->DeleteTransmissionInfo(pInfo);
|
|
|
|
RTPSession *pSess = new MyRTPSession(rnd); // make them all use the same random number generator
|
|
RTPSessionParams sessParams;
|
|
|
|
// We're going to use our own poll thread!
|
|
// IMPORTANT: don't use a single external RTPAbortDescriptors instance
|
|
// with the internal poll thread! It will cause threads to
|
|
// hang!
|
|
sessParams.SetUsePollThread(false);
|
|
sessParams.SetOwnTimestampUnit(1.0/8000.0);
|
|
checkerror(pSess->Create(sessParams, pTrans));
|
|
|
|
sessions.push_back(pSess);
|
|
}
|
|
|
|
// First, the pollSockets array will contain two sockets per session,
|
|
// and an extra entry will be added for the abort socket
|
|
pollSockets.push_back(abortDesc.GetAbortSocket());
|
|
|
|
// Let each session send to the next
|
|
uint8_t localHost[4] = { 127, 0, 0, 1 };
|
|
for (int i = 0 ; i < sessions.size() ; i++)
|
|
{
|
|
uint16_t destPortbase = portBases[(i+1)%portBases.size()];
|
|
checkerror(sessions[i]->AddDestination(RTPIPv4Address(localHost, destPortbase)));
|
|
}
|
|
|
|
MyPollThread myPollThread(pollSockets, sessions);
|
|
if (myPollThread.Start() < 0)
|
|
{
|
|
cerr << "ERROR: couldn't start own poll thread" << endl;
|
|
return -1;
|
|
}
|
|
|
|
cout << "Own poll thread started" << endl;
|
|
|
|
cout << "Main thread will sleep for 30 seconds (the sessions should still send RTCP packets to each other)" << endl;
|
|
RTPTime::Wait(RTPTime(30.0));
|
|
|
|
myPollThread.SignalStop(); // will cause the thread to end after an iteration
|
|
while (myPollThread.IsRunning())
|
|
{
|
|
abortDesc.SendAbortSignal(); // will make sure the thread isn't waiting for incoming data
|
|
RTPTime::Wait(RTPTime(0.01));
|
|
}
|
|
|
|
cout << "Own poll thread ended, cleaning up..." << endl;
|
|
|
|
for (int i = 0 ; i < sessions.size() ; i++)
|
|
delete sessions[i];
|
|
for (int i = 0 ; i < transmitters.size() ; i++)
|
|
delete transmitters[i];
|
|
|
|
delete rnd;
|
|
#ifdef RTP_SOCKETTYPE_WINSOCK
|
|
WSACleanup();
|
|
#endif // RTP_SOCKETTYPE_WINSOCK
|
|
cout << "Done" << endl;
|
|
return 0;
|
|
}
|
|
|
|
#else
|
|
|
|
int main(void)
|
|
{
|
|
cout << "Thread support was not enabled at compile time" << endl;
|
|
return 0;
|
|
}
|
|
|
|
#endif // RTP_SUPPORT_THREAD
|