PacMan 2.0(r200)
Runs on digital PAC measurement computers

PacQueue.h

Go to the documentation of this file.
00001 /***************************************************************************
00002  *   Copyright (C) 2008-2010 by Matthias Nagl                              *
00003  *   mnagl@uni-goettingen.de                                               *
00004  *                                                                         *
00005  *   This program is free software; you can redistribute it and/or modify  *
00006  *   it under the terms of the GNU General Public License as published by  *
00007  *   the Free Software Foundation; either version 2 of the License, or     *
00008  *   (at your option) any later version.                                   *
00009  *                                                                         *
00010  *   This program is distributed in the hope that it will be useful,       *
00011  *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
00012  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
00013  *   GNU General Public License for more details.                          *
00014  *                                                                         *
00015  *   You should have received a copy of the GNU General Public License     *
00016  *   along with this program; if not, write to the                         *
00017  *   Free Software Foundation, Inc.,                                       *
00018  *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
00019  ***************************************************************************/
00020 
00021 #ifndef PACQUEUE_H
00022 #define PACQUEUE_H
00023 
00024 #include <QSemaphore>
00025 #include <QtDebug>
00026 #include "pacDataTypes.h"
00027 
00033 template <class T>
00034 class PacQueue
00035 {
00036   public:
00037     PacQueue(int buffersize=0);
00038     ~PacQueue();
00039     void setMaxLength(const int maxlength);
00040     int maxLength() const { return buffersize; };
00041 
00043     inline Pac::QueueStatus enqueue(T *w)
00044     {
00045       if (freefields.tryAcquire(1)) {
00046         qint32 cfield = writepos.fetchAndAddOrdered(1);
00047         buffer[cfield % buffersize] = w;
00048         usedfields.release(1);
00049         // reset counter carefully if necessary
00050         if (cfield >= buffersize)
00051           writepos.testAndSetOrdered(cfield, cfield % buffersize);
00052         // update counter
00053         enqueuedchunks++;
00054         return Pac::EnqueueSuccess;
00055       }
00056       droppedchunks++;
00057       return Pac::QueueIsFull;
00058     }
00059 
00061     inline T * dequeue()
00062     {
00063       usedfields.acquire(1);
00064       qint32 cfield = readpos.fetchAndAddOrdered(1);
00065       T *w = buffer[cfield % buffersize];
00066       freefields.release(1);
00067       // reset counter carefully if necessary
00068       if (cfield >= buffersize)
00069         readpos.testAndSetOrdered(cfield, cfield % buffersize);
00070       return w;
00071     }
00072 
00077     inline bool nonblockingDequeue(T ** dq)
00078     {
00079       if (!dq)
00080         return false;
00081       if (usedfields.tryAcquire(1)) {
00082         qint32 cfield = readpos.fetchAndAddOrdered(1);
00083         *dq = buffer[cfield % buffersize];
00084         freefields.release(1);
00085         // reset counter carefully if necessary
00086         if (cfield >= buffersize)
00087           readpos.testAndSetOrdered(cfield, cfield % buffersize);
00088         return true;
00089       }
00090       *dq = 0;
00091       return false;
00092     }
00093 
00094     inline T ** head() const
00095     {
00096       if (usedfields.available() > 0) {
00097         return &buffer[readpos % buffersize];
00098       }
00099       return 0;
00100     }
00101 
00103     inline qlonglong getHeadSeqNum() const
00104     {
00105       T **h = head();
00106       if (h == 0)
00107         return -1;
00108       else if (*h == 0)
00109         return 0;
00110       else
00111         return **h;
00112     }
00113 
00114     void getStats(int *enqueuedchunks, int *droppedchunks);
00115     void clear();
00116 
00117   private:
00118     T **buffer;
00119     mutable QSemaphore usedfields, freefields;
00120     qint32 buffersize;
00121     QAtomicInt readpos, writepos; 
00122     int enqueuedchunks; 
00123     int droppedchunks; 
00124 };
00125 
00126 
00127 template <class T> PacQueue<T>::PacQueue(int buffersize)
00128     : buffer(0), usedfields(0), freefields(buffersize), buffersize(buffersize), readpos(0), writepos(0), enqueuedchunks(0), droppedchunks(0)
00129 {
00130   buffer = new T*[buffersize];
00131 }
00132 
00133 
00134 template <class T> PacQueue<T>::~PacQueue()
00135 {
00136   delete [] buffer;
00137 }
00138 
00139 template <class T> void PacQueue<T>::setMaxLength(const int maxlength)
00140 {
00141   if (usedfields.available() > 0)
00142     qDebug() << "WARNING: Queues should be empty before resizing!";
00143   clear();
00144   buffersize = maxlength;
00145   delete [] buffer;
00146   buffer = new T*[buffersize];
00147   freefields.acquire(freefields.available());
00148   freefields.release(buffersize);
00149   readpos = 0;
00150   writepos = 0;
00151 }
00152 
00153 template <class T> void PacQueue<T>::getStats(int *enqueuedchunks, int *droppedchunks)
00154 {
00155   *enqueuedchunks = this->enqueuedchunks;
00156   this->enqueuedchunks=0;
00157   *droppedchunks = this->droppedchunks;
00158   this->droppedchunks=0;
00159 }
00160 
00161 template <class T> void PacQueue<T>::clear()
00162 {
00163   T *tmp;
00164   while (nonblockingDequeue(&tmp))
00165     delete tmp;
00166 }
00167 
00168 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Properties Friends