2025-02-27 17:59:18 +08:00

261 lines
6.4 KiB
C++
Executable File

/*
This is an example of how you can create a single poll thread for
multiple transmitters and sessions, using a single pre-created
RTPAbortDescriptors instance
*/
#include "rtpconfig.h"
#include <iostream>
using namespace std;
#ifdef RTP_SUPPORT_THREAD
#include "rtpsession.h"
#include "rtpudpv4transmitter.h"
#include "rtpipv4address.h"
#include "rtpsessionparams.h"
#include "rtperrors.h"
#include "rtplibraryversion.h"
#include "rtpsourcedata.h"
#include "rtpabortdescriptors.h"
#include "rtpselect.h"
#include "rtprandom.h"
#include <jthread/jthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string>
#include <vector>
using namespace jrtplib;
using namespace jthread;
inline void checkerror(int rtperr)
{
if (rtperr < 0)
{
cerr << "ERROR: " << RTPGetErrorString(rtperr) << std::endl;
exit(-1);
}
}
class MyRTPSession : public RTPSession
{
public:
MyRTPSession(RTPRandom *rnd) : RTPSession(rnd) { }
~MyRTPSession() { }
protected:
void OnValidatedRTPPacket(RTPSourceData *srcdat, RTPPacket *rtppack, bool isonprobation, bool *ispackethandled)
{
printf("SSRC %x Got packet in OnValidatedRTPPacket from source 0x%04x!\n", GetLocalSSRC(), srcdat->GetSSRC());
DeletePacket(rtppack);
*ispackethandled = true;
}
void OnRTCPSDESItem(RTPSourceData *srcdat, RTCPSDESPacket::ItemType t, const void *itemdata, size_t itemlength)
{
char msg[1024];
memset(msg, 0, sizeof(msg));
if (itemlength >= sizeof(msg))
itemlength = sizeof(msg)-1;
memcpy(msg, itemdata, itemlength);
printf("SSRC %x Received SDES item (%d): %s from SSRC %x\n", GetLocalSSRC(), (int)t, msg, srcdat->GetSSRC());
}
};
class MyPollThread : public JThread
{
public:
MyPollThread(const vector<SocketType> &sockets, const vector<RTPSession *> &sessions)
: m_sockets(sockets), m_sessions(sessions)
{
if (m_mutex.Init() < 0)
{
cerr << "ERROR: unable to initialize mutex" << endl;
exit(-1);
}
m_stop = false;
}
void SignalStop()
{
m_mutex.Lock();
m_stop = true;
m_mutex.Unlock();
}
private:
void *Thread()
{
JThread::ThreadStarted();
vector<int8_t> flags(m_sockets.size());
bool done = false;
m_mutex.Lock();
done = m_stop;
m_mutex.Unlock();
while (!done)
{
double minInt = 10.0; // wait at most 10 secs
for (int i = 0 ; i < m_sessions.size() ; i++)
{
double nextInt = m_sessions[i]->GetRTCPDelay().GetDouble();
if (nextInt > 0 && nextInt < minInt)
minInt = nextInt;
else if (nextInt <= 0) // call the Poll function to make sure that RTCP packets are sent
{
//cout << "RTCP packet should be sent, calling Poll" << endl;
m_sessions[i]->Poll();
}
}
RTPTime waitTime(minInt);
//cout << "Waiting at most " << minInt << " seconds in select" << endl;
int status = RTPSelect(&m_sockets[0], &flags[0], m_sockets.size(), waitTime);
checkerror(status);
if (status > 0) // some descriptors were set
{
for (int i = 0 ; i < m_sockets.size() ; i++)
{
if (flags[i])
{
int idx = i/2; // two sockets per session
if (idx < m_sessions.size())
m_sessions[idx]->Poll();
}
}
}
m_mutex.Lock();
done = m_stop;
m_mutex.Unlock();
}
return 0;
}
JMutex m_mutex;
bool m_stop;
vector<SocketType> m_sockets;
vector<RTPSession *> m_sessions;
};
int main(void)
{
#ifdef RTP_SOCKETTYPE_WINSOCK
WSADATA dat;
WSAStartup(MAKEWORD(2,2),&dat);
#endif // RTP_SOCKETTYPE_WINSOCK
RTPAbortDescriptors abortDesc;
vector<SocketType> pollSockets;
checkerror(abortDesc.Init());
int numTrans = 5;
int portbaseBase = 6000;
vector<RTPUDPv4Transmitter *> transmitters;
for (int i = 0 ; i < numTrans ; i++)
{
RTPUDPv4TransmissionParams transParams;
transParams.SetPortbase(portbaseBase + i*2);
transParams.SetCreatedAbortDescriptors(&abortDesc);
RTPUDPv4Transmitter *pTrans = new RTPUDPv4Transmitter(0);
checkerror(pTrans->Init(true)); // We'll need thread safety
checkerror(pTrans->Create(64000, &transParams));
transmitters.push_back(pTrans);
}
vector<uint16_t> portBases;
vector<RTPSession *> sessions;
RTPRandom *rnd = RTPRandom::CreateDefaultRandomNumberGenerator();
for (int i = 0 ; i < transmitters.size() ; i++)
{
RTPUDPv4Transmitter *pTrans = transmitters[i];
RTPUDPv4TransmissionInfo *pInfo = static_cast<RTPUDPv4TransmissionInfo *>(pTrans->GetTransmissionInfo());
pollSockets.push_back(pInfo->GetRTPSocket());
pollSockets.push_back(pInfo->GetRTCPSocket());
portBases.push_back(pInfo->GetRTPPort());
pTrans->DeleteTransmissionInfo(pInfo);
RTPSession *pSess = new MyRTPSession(rnd); // make them all use the same random number generator
RTPSessionParams sessParams;
// We're going to use our own poll thread!
// IMPORTANT: don't use a single external RTPAbortDescriptors instance
// with the internal poll thread! It will cause threads to
// hang!
sessParams.SetUsePollThread(false);
sessParams.SetOwnTimestampUnit(1.0/8000.0);
checkerror(pSess->Create(sessParams, pTrans));
sessions.push_back(pSess);
}
// First, the pollSockets array will contain two sockets per session,
// and an extra entry will be added for the abort socket
pollSockets.push_back(abortDesc.GetAbortSocket());
// Let each session send to the next
uint8_t localHost[4] = { 127, 0, 0, 1 };
for (int i = 0 ; i < sessions.size() ; i++)
{
uint16_t destPortbase = portBases[(i+1)%portBases.size()];
checkerror(sessions[i]->AddDestination(RTPIPv4Address(localHost, destPortbase)));
}
MyPollThread myPollThread(pollSockets, sessions);
if (myPollThread.Start() < 0)
{
cerr << "ERROR: couldn't start own poll thread" << endl;
return -1;
}
cout << "Own poll thread started" << endl;
cout << "Main thread will sleep for 30 seconds (the sessions should still send RTCP packets to each other)" << endl;
RTPTime::Wait(RTPTime(30.0));
myPollThread.SignalStop(); // will cause the thread to end after an iteration
while (myPollThread.IsRunning())
{
abortDesc.SendAbortSignal(); // will make sure the thread isn't waiting for incoming data
RTPTime::Wait(RTPTime(0.01));
}
cout << "Own poll thread ended, cleaning up..." << endl;
for (int i = 0 ; i < sessions.size() ; i++)
delete sessions[i];
for (int i = 0 ; i < transmitters.size() ; i++)
delete transmitters[i];
delete rnd;
#ifdef RTP_SOCKETTYPE_WINSOCK
WSACleanup();
#endif // RTP_SOCKETTYPE_WINSOCK
cout << "Done" << endl;
return 0;
}
#else
int main(void)
{
cout << "Thread support was not enabled at compile time" << endl;
return 0;
}
#endif // RTP_SUPPORT_THREAD