]> Some of my projects - aniplayer-old.git/commitdiff
- Make hashing threads work.
authorAPTX <APTX@.(none)>
Fri, 21 Aug 2009 17:58:31 +0000 (19:58 +0200)
committerAPTX <APTX@.(none)>
Fri, 21 Aug 2009 17:58:31 +0000 (19:58 +0200)
lib/anidbudpclient/circularbuffer.h
lib/anidbudpclient/hash.cpp
lib/anidbudpclient/hash.h
lib/anidbudpclient/hashconsumer.cpp
lib/anidbudpclient/hashconsumer.h
lib/anidbudpclient/hashproducer.cpp
lib/anidbudpclient/hashproducer.h
lib/anidbudpclient/mylistaddcommand.cpp
lib/anidbudpclient/mylistaddcommand.h
src/videowindow.cpp

index dd9a9da40814348c0059bd7c4d6b1bbf25a2164e..1be3bc006daa2bd9c8033fb5dc84dcd6dbfa80e2 100644 (file)
@@ -17,26 +17,29 @@ public:
                m_end = false;
        }
 
-       void put(T data, bool last = false)
+       bool put(T data, bool last = false, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT)
        {
-               if (m_end) return;
+               if (m_end) return false;
 
-               free.acquire();
+               if (!free.tryAcquire(1, timeout))
+                       return false;
                buffer[w] = data;
                m_end = last;
                used.release();
                w++;
                w %= SIZE;
+               return true;
        }
 
-       T get()
+       bool get(T *data, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT)
        {
-               used.acquire();
-               T data = buffer[r];
+               if (!used.tryAcquire(1, timeout))
+                       return false;
+               *data = buffer[r];
                free.release();
                r++;
                r %= SIZE;
-               return data;
+               return true;
        }
 
        bool end() const
@@ -62,6 +65,8 @@ private:
        int r;
        int w;
        bool m_end;
+
+       static const int SEMAPHORE_ACQUIRE_TIMEOUT = 100;
 };
 
 typedef CircularBuffer<QByteArray, 2> Buffer;
index bd5dc0bcae87b858ed2ec1a2c64fa908e29418f1..9141bbb18ab497da8a53d5c9959984cb46a8e250 100644 (file)
@@ -22,11 +22,13 @@ void Hash::hashFile(const QFileInfo &file)
 {
 qDebug() << "Hash::hashFile";
        fileQueue.enqueue(file);
+       totalFileSize += file.size();
 
        if (hashing)
                return;
 
-       emit startHashing(fileQueue.first().absoluteFilePath());
+       totalTime.start();
+       startHashing();
 }
 
 void Hash::endHashing(const QByteArray &hash)
@@ -34,33 +36,54 @@ void Hash::endHashing(const QByteArray &hash)
 qDebug() << "Hash::endHashing";
        QFileInfo f = fileQueue.dequeue();
 
+       int fileElapsed = fileTime.elapsed();
+
+       emit fileHashed(f, hash);
+qDebug() << "File:" << f.fileName() << "Hash:" << hash << "Time:" << fileElapsed;
+
+
        if (!fileQueue.isEmpty())
        {
-               emit startHashing(fileQueue.first().absoluteFilePath());
+               startHashing();
        }
        else
        {
                hashing = false;
+               int totalElapsed = totalTime.elapsed();
+               emit finished();
+qDebug() << "Total time:" << totalElapsed;
+               hashedFileSize = totalFileSize = 0;
        }
-       emit fileHashed(f, hash);
-qDebug() << "FILE" << f.fileName() << "HASH" << hash;
+
+}
+
+void Hash::reportProgress(qint64 read, qint64 total)
+{
+       emit fileProgress((read * 100) / total);
+       hashedFileSize += fileQueue.first().size() - read;
+       emit progress((hashedFileSize * 100) / totalFileSize);
 }
 
+void Hash::startHashing()
+{
+       QString file = fileQueue.first().absoluteFilePath();
+
+       fileTime.start();
+
+       producer->readFile(file);
+       consumer->hashFile(file);
+}
 
 void Hash::setUp()
 {
        if (producer || consumer || buffer)
                return;
-
+qDebug() << "MAIN thread id is: " << QThread::currentThreadId();
        buffer = new HashPrivate::Buffer;
        producer = new HashPrivate::HashProducer(buffer, this);
        consumer = new HashPrivate::HashConsumer(buffer, this);
-       connect(this, SIGNAL(startHashing(QString)), consumer, SLOT(hashFile(QString)), Qt::QueuedConnection);
-       connect(this, SIGNAL(startHashing(QString)), producer, SLOT(readFile(QString)), Qt::QueuedConnection);
-       connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray)), Qt::QueuedConnection);
-
-       producer->start();
-       consumer->start();
+       connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray)));
+       connect(consumer, SIGNAL(progress(qint64,qint64)), this, SLOT(reportProgress(qint64,qint64)));
 }
 
 void Hash::tearDown()
@@ -68,11 +91,6 @@ void Hash::tearDown()
        if (!producer || !consumer || !buffer)
                return;
 
-       producer->stop();
-       consumer->stop();
-       producer->wait();
-       consumer->wait();
-
        delete producer;
        delete consumer;
        delete buffer;
index 38eaf3200e9fdf4d869b5a5e556e8362a9a13df2..b950162aba918b0b2ff9e65eeaa7ecf06ce2afae 100644 (file)
@@ -6,6 +6,7 @@
 #include <QQueue>
 #include <QMap>
 #include <QFileInfo>
+#include <QTime>
 
 #include "hashproducer.h"
 #include "hashconsumer.h"
@@ -24,13 +25,18 @@ public:
        void hashFile(const QFileInfo &file);
 
 signals:
-       void startHashing(const QString &file);
        void fileHashed(const QFileInfo &file, const QByteArray &hash);
+       void fileProgress(int percent);
+       void progress(int percent);
+       void finished();
 
 private slots:
        void endHashing(const QByteArray &hash);
 
+       void reportProgress(qint64 read, qint64 total);
+
 private:
+       void startHashing();
        void setUp();
        void tearDown();
 
@@ -42,6 +48,12 @@ private:
        QMap<QFileInfo, QByteArray> hashedFiles;
 
        bool hashing;
+
+       qint64 hashedFileSize;
+       qint64 totalFileSize;
+
+       QTime fileTime;
+       QTime totalTime;
 };
 
 } // namesapce AniDBUdpClient
index f5dc40f4e54c12b7f6004aba0d0a53a9f46767f5..b3b1cf22e2a3bf82f76491833b7d6a2be96245ae 100644 (file)
@@ -9,54 +9,76 @@ HashConsumer::HashConsumer(Buffer *buffer, QObject *parent) : QThread(parent)
 {
        this->buffer = buffer;
        hash = new QCryptographicHash(QCryptographicHash::Md4);
-       connect(this, SIGNAL(startHashing()), this, SLOT(doHash()));
+
+       restart = false;
+       abort = false;
 }
 
 HashConsumer::~HashConsumer()
 {
+       mutex.lock();
+       abort = true;
+       condition.wakeOne();
+       mutex.unlock();
+
+       wait();
        delete hash;
 }
 
 void HashConsumer::hashFile(const QString &file)
 {
+       QMutexLocker locker(&mutex);
 qDebug() << "hashFile()";
        fileSize = QFileInfo(file).size();
 
-       emit startHashing();
-}
-
-void HashConsumer::stop()
-{
-       m_stop = true;
-       quit();
+       if (!isRunning())
+               start();
+       else
+               condition.wakeOne();
 }
 
 void HashConsumer::run()
 {
-       exec();
-}
+qDebug() << "Starting thread consumer";
+qDebug() << "Thread consumer id is: " << QThread::currentThreadId();
 
-void HashConsumer::doHash()
-{
-       while (!buffer->end())
+       forever
        {
-qDebug() << "doHash()->while(" << buffer->end() << ")";
-               hashSome();
-       }
-       buffer->reset();
-       hash->reset();
-}
+               mutex.lock();
+               qint64 totalSize = fileSize;
+               qint64 read = 0;
+               mutex.unlock();
 
-void HashConsumer::hashSome()
-{
-       QByteArray data = buffer->get();
+               while (!(buffer->end() || abort))
+               {
+//                     qDebug() << "hash->while(" << buffer->end() << ")";
+                       QByteArray data;
+
+                       while (!(buffer->get(&data) || abort));
 
-       hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4));
+                       hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4));
 
-       if (buffer->end())
-               emit finishedHashing(hash->result());
+                       read += data.size();
+                       emit progress(read, totalSize);
+               }
+               bool r = buffer->reset();
+qDebug() << "buffer reset" << r;
 
-qDebug() << "hashSome()";
+               if (abort)
+                       return;
+
+               mutex.lock();
+               if (!restart)
+               {
+                       emit finishedHashing(hash->result().toHex());
+                       condition.wait(&mutex);
+               }
+               restart = false;
+               mutex.unlock();
+
+               hash->reset();
+       }
+qDebug() << "Thread consumer is stopping";
 }
 
 } // namespace HashPrivate
index 69c6c0f16c656d4518856bca071177582d0e1594..c167fc15bf363a6a72fce0290568cb76e7b55ad0 100644 (file)
@@ -6,6 +6,8 @@
 #include <QCryptographicHash>
 #include <QFile>
 #include <QFileInfo>
+#include <QMutex>
+#include <QWaitCondition>
 
 #include "circularbuffer.h"
 
@@ -22,26 +24,24 @@ public:
 public slots:
        void hashFile(const QString &file);
 
-       void stop();
-
 protected:
        void run();
 
 signals:
-       void startHashing();
+       void progress(qint64 done, qint64 total);
        void finishedHashing(QByteArray hash);
 
-private slots:
-       void doHash();
-
 private:
-       void hashSome();
-
        Buffer *buffer;
        QCryptographicHash *hash;
        qint64 fileSize;
 
        bool m_stop;
+       bool restart;
+       bool abort;
+
+       QMutex mutex;
+       QWaitCondition condition;
 };
 
 } // namespace HashPrivate
index d4c435873df591f40b507fe5246ece921049502e..a9376a9924dab030ecde2e4d54367b76fc83a861 100644 (file)
@@ -1,5 +1,7 @@
 #include "hashproducer.h"
 
+#include <QTimer>
+
 #include <QDebug>
 
 namespace AniDBUdpClient {
@@ -8,51 +10,74 @@ namespace HashPrivate {
 HashProducer::HashProducer(Buffer *buffer, QObject *parent) : QThread(parent)
 {
        this->buffer = buffer;
-       connect(this, SIGNAL(startReading()), this, SLOT(doRead()));
+       restart = false;
+       abort = false;
+}
+
+HashProducer::~HashProducer()
+{
+       mutex.lock();
+       abort = true;
+       condition.wakeOne();
+       mutex.unlock();
+
+       wait();
 }
 
 void HashProducer::readFile(const QString &file)
 {
 qDebug() << "readFile";
-       this->file.setFileName(file);
+qDebug() << "Thread id is: " << QThread::currentThreadId();
 
-       fileSize = file.size();
+       QMutexLocker locker(&mutex);
 
-       if (!this->file.open(QIODevice::ReadOnly))
-       {
-qDebug() << "Failed toopen file" << this->file.fileName();
-               return;
-       }
+       fileName = file;
 
-       emit startReading();
-}
+       if (!isRunning())
+               start();
+       else
+               condition.wakeOne();
 
-void HashProducer::stop()
-{
-       m_stop = true;
-       quit();
 }
 
 void HashProducer::run()
 {
-       exec();
-}
+qDebug() << "Starting thread producer";
+qDebug() << "Thread producer id is: " << QThread::currentThreadId();
 
-void HashProducer::doRead()
-{
-       while (!this->file.atEnd())
+       forever
        {
-qDebug() << "doRead->while(" << (!this->file.atEnd()) << ")";
-               readSome();
-       }
-       this->file.close();
-}
+               mutex.lock();
+qDebug() << "Obtaining new file name";
+               QFile file(fileName);
+               mutex.unlock();
 
-void HashProducer::readSome()
-{
-       QByteArray data = file.read(ED2K_PART_SIZE);
-qDebug() << "readSome";
-       buffer->put(data, file.atEnd());
+               if (file.exists())
+               {
+                       qDebug() << "File exists, opening";
+                       if (file.open(QIODevice::ReadOnly))
+                       {
+                               while (!file.atEnd())
+                               {
+                                       if (abort)
+                                               return;
+//                                     qDebug() << "read->while(" << (!file.atEnd()) << ")";
+                                       QByteArray data = file.read(ED2K_PART_SIZE);
+                                       while (!(buffer->put(data, file.atEnd()) || abort));
+                               }
+                       }
+               }
+
+               if (abort)
+                       return;
+
+               mutex.lock();
+               if (!restart)
+                       condition.wait(&mutex);
+               restart = false;
+               mutex.unlock();
+       }
+qDebug() << "Thread producer is stopping";
 }
 
 } // namespace HashPrivate
index 0bde20345dfbb097920bfc7b53046220da152c74..3b0ddab9561d8d7bab776e611b967f176f289ae3 100644 (file)
@@ -5,6 +5,8 @@
 #include <QThread>
 #include <QFile>
 #include <QFileInfo>
+#include <QMutex>
+#include <QWaitCondition>
 
 #include "circularbuffer.h"
 
@@ -17,29 +19,27 @@ class HashProducer : public QThread
 
 public:
        HashProducer(Buffer *buffer, QObject *parent = 0);
+       ~HashProducer();
 
 public slots:
        void readFile(const QString &file);
 
-       void stop();
 protected:
        void run();
 
-signals:
-       void startReading();
-       void finishedReading();
-
-private slots:
-       void doRead();
-
 private:
-       void readSome();
-
        Buffer *buffer;
+
+       QString fileName;
        QFile file;
        qint64 fileSize;
 
        bool m_stop;
+       bool restart;
+       bool abort;
+
+       QMutex mutex;
+       QWaitCondition condition;
 };
 
 } // namespace HashPrivate
index e11a9534ad5c95763e86198a1c2141a3d5b49646..972e017d4c75b3c8df1a0cf6a20e9e0e3dd8b48d 100644 (file)
@@ -128,6 +128,7 @@ qDebug() << "FAILED to read Mylist ID";
 
 void MylistAddCommand::hash()
 {
+       t.start();
        future = QtConcurrent::run(this, &MylistAddCommand::doHash, m_file);
        futureWatcher.setFuture(future);
 }
@@ -141,6 +142,7 @@ qDebug() << "WTF?";
        }
        m_ed2k = QByteArray(future);
        emit hashComplete();
+       qDebug() << "Time:" << t.elapsed();
 }
 
 QByteArray MylistAddCommand::doHash(QString file)
@@ -159,7 +161,7 @@ qDebug() << "hash thread init";
        {
                size = f.read(data, ED2K_PART_SIZE);
                ed2k.addData(QCryptographicHash::hash(QByteArray(data, size), QCryptographicHash::Md4));
-qDebug() << "hashing...";
+//qDebug() << "hashing...";
        }
        f.close();
        delete[] data;
index 18556cb2635b2b41e29dccd9392f0387628f4f34..169f202a55f0e9fd207d64ca730ae76cfe0baa4b 100644 (file)
@@ -5,6 +5,7 @@
 
 #include <QFuture>
 #include <QFutureWatcher>
+#include <QTime>
 
 namespace AniDBUdpClient {
 
@@ -51,6 +52,8 @@ private:
        int mylistId;
 
        static const qint64 ED2K_PART_SIZE = 9728000;
+
+       QTime t;
 };
 
 } // namespace AniDBUdpClient
index ebcde19782ce236a84ac0a2176546946879b263e..f418e2f2901c7c511828567168baf2e8655f4b51 100644 (file)
@@ -246,8 +246,8 @@ void VideoWindow::open(const QString &file, bool closeOnStop)
                return;
        }
 
-//     AniDBUdpClient::Hash *h = new AniDBUdpClient::Hash(this);
-//     h->hashFile(fileInfo);
+       AniDBUdpClient::Hash *h = new AniDBUdpClient::Hash(this);
+       h->hashFile(fileInfo);
 
 
        playlist->setDirectory(fileInfo.absoluteDir());