Implement command queue.

This commit is contained in:
Andrius Štikonas 2015-02-07 17:29:12 +00:00
parent daf7213a05
commit b64663177e
4 changed files with 45 additions and 13 deletions

View File

@ -28,6 +28,5 @@ int main(int argc, char *argv[])
Server server(sockets); Server server(sockets);
ConsoleReader *reader = new ConsoleReader(sockets); ConsoleReader *reader = new ConsoleReader(sockets);
qWarning() << "event loop";
return app.exec(); return app.exec();
} }

View File

@ -29,12 +29,13 @@ Server::Server ( std::vector<Socket*> *sockets_vector )
udpSocketSend->connectToHost ( QHostAddress::Broadcast, 10000 ); udpSocketSend->connectToHost ( QHostAddress::Broadcast, 10000 );
udpSocketGet->bind ( QHostAddress::Any, 10000); udpSocketGet->bind ( QHostAddress::Any, 10000);
connect ( udpSocketGet, &QUdpSocket::readyRead, this, &Server::readPendingDatagrams);
udpSocketSend->write ( discover );
udpSocketSend->write ( discover ); udpSocketSend->write ( discover );
udpSocketSend->disconnectFromHost(); udpSocketSend->disconnectFromHost();
delete udpSocketSend; delete udpSocketSend;
connect ( udpSocketGet, &QUdpSocket::readyRead, this, &Server::readPendingDatagrams);
qWarning() << "starting server";
start(); start();
} }
@ -64,6 +65,7 @@ void Server::readPendingDatagrams()
{ {
if ( reply.mid ( 4, 2 ) == QByteArray::fromHex ( "71 61" ) ) // Reply to discover packet if ( reply.mid ( 4, 2 ) == QByteArray::fromHex ( "71 61" ) ) // Reply to discover packet
{ {
qWarning() << "Discover";
bool duplicate = false; bool duplicate = false;
for ( std::vector<Socket*>::const_iterator i = sockets->begin() ; i != sockets->end(); ++i ) for ( std::vector<Socket*>::const_iterator i = sockets->begin() ; i != sockets->end(); ++i )
{ {

View File

@ -50,12 +50,15 @@ Socket::Socket ( QHostAddress IPaddress, QByteArray reply )
udpSocket = new QUdpSocket(); udpSocket = new QUdpSocket();
udpSocket->connectToHost ( ip, 10000 ); udpSocket->connectToHost ( ip, 10000 );
connect (this, &Socket::datagramQueued, this, &Socket::listen);
subscribed = false; subscribed = false;
subscribeTimer = new QTimer(this); subscribeTimer = new QTimer(this);
subscribeTimer->setInterval(200); // increase later, debug subscribeTimer->setInterval(2*60*1000); // increase later, debug
subscribeTimer->setSingleShot(false); subscribeTimer->setSingleShot(false);
connect(subscribeTimer, &QTimer::timeout, this, &Socket::subscribe); connect(subscribeTimer, &QTimer::timeout, this, &Socket::subscribe);
subscribeTimer->start(); subscribeTimer->start();
subscribe();
tableData();
} }
Socket::~Socket() Socket::~Socket()
@ -64,9 +67,24 @@ Socket::~Socket()
delete udpSocket; delete udpSocket;
} }
void Socket::sendDatagram ( Datagram d )
{
commands.enqueue(d);
Q_EMIT datagramQueued();
}
void Socket::run()
{
while ( commands.size() > 0 )
{
qWarning() << "sending datagram" << commands.head() << " subscribed = " << subscribed;
udpSocket->write ( datagram[commands.head()] );
QThread::msleep(100);
}
}
void Socket::subscribe() void Socket::subscribe()
{ {
qWarning() << "subscribing";
sendDatagram ( Subscribe ); sendDatagram ( Subscribe );
} }
@ -93,12 +111,6 @@ void Socket::tableData()
sendDatagram ( TimingData ); sendDatagram ( TimingData );
} }
void Socket::sendDatagram ( Datagram d )
{
qWarning() << "sending datagram" << d << " subscribed = " << subscribed;
udpSocket->write ( datagram[d] );
}
bool Socket::parseReply ( QByteArray reply ) bool Socket::parseReply ( QByteArray reply )
{ {
qWarning() << "parsing datagram"; qWarning() << "parsing datagram";
@ -144,6 +156,10 @@ bool Socket::parseReply ( QByteArray reply )
powered = reply.right ( 1 ) == one; powered = reply.right ( 1 ) == one;
if (powered != poweredOld) if (powered != poweredOld)
Q_EMIT stateChanged(); Q_EMIT stateChanged();
if ( datagram == PowerOff && powered == true )
{
datagram = PowerOn;
}
break; break;
} }
case TableData: case TableData:
@ -169,6 +185,15 @@ bool Socket::parseReply ( QByteArray reply )
default: default:
return false; return false;
} }
if (commands.size() > 0)
{
if ( datagram == commands.head() )
{
commands.dequeue();
}
}
return true; return true;
} }

View File

@ -20,18 +20,21 @@
#include <QByteArray> #include <QByteArray>
#include <QHostAddress> #include <QHostAddress>
#include <QQueue>
#include <QTimer> #include <QTimer>
#include <QThread>
class QUdpSocket; class QUdpSocket;
const QByteArray magicKey = QByteArray::fromHex ( "68 64" ); // recognize datagrams from the socket const QByteArray magicKey = QByteArray::fromHex ( "68 64" ); // recognize datagrams from the socket
class Socket : public QObject class Socket : public QThread
{ {
Q_OBJECT Q_OBJECT
Q_SIGNALS: Q_SIGNALS:
void stateChanged(); void stateChanged();
void datagramQueued();
public: public:
Socket ( QHostAddress, QByteArray ); Socket ( QHostAddress, QByteArray );
@ -52,12 +55,14 @@ private:
void sendDatagram ( Datagram ); void sendDatagram ( Datagram );
QByteArray fromIP ( unsigned char, unsigned char, unsigned char, unsigned char ); QByteArray fromIP ( unsigned char, unsigned char, unsigned char, unsigned char );
void subscribe(); void subscribe();
void listen() { start(); }
void run();
QByteArray commandID[MaxCommands]; QByteArray commandID[MaxCommands];
QByteArray datagram[MaxCommands]; QByteArray datagram[MaxCommands];
QByteArray rmac; // Reveresed mac QByteArray rmac; // Reveresed mac
QByteArray versionID; QByteArray versionID;
QByteArray socketTableNumber, socketTableVersion, timingTableNumber, timingTableVersion; QByteArray socketTableNumber, socketTableVersion, timingTableNumber, timingTableVersion; // FIXME: not used yet
const QByteArray twenties = QByteArray::fromHex ( "20 20 20 20 20 20" ); // mac address padding const QByteArray twenties = QByteArray::fromHex ( "20 20 20 20 20 20" ); // mac address padding
const QByteArray zeros = QByteArray::fromHex ( "00 00 00 00" ); const QByteArray zeros = QByteArray::fromHex ( "00 00 00 00" );
@ -67,6 +72,7 @@ private:
QUdpSocket *udpSocket; QUdpSocket *udpSocket;
QTimer *subscribeTimer; QTimer *subscribeTimer;
bool subscribed; bool subscribed;
QQueue<Datagram> commands;
}; };