m_end = false;
}
- void put(T data, bool last = false)
+ bool put(T data, bool last = false, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT)
{
- if (m_end) return;
+ if (m_end) return false;
- free.acquire();
+ if (!free.tryAcquire(1, timeout))
+ return false;
buffer[w] = data;
m_end = last;
used.release();
w++;
w %= SIZE;
+ return true;
}
- T get()
+ bool get(T *data, int timeout = SEMAPHORE_ACQUIRE_TIMEOUT)
{
- used.acquire();
- T data = buffer[r];
+ if (!used.tryAcquire(1, timeout))
+ return false;
+ *data = buffer[r];
free.release();
r++;
r %= SIZE;
- return data;
+ return true;
}
bool end() const
int r;
int w;
bool m_end;
+
+ static const int SEMAPHORE_ACQUIRE_TIMEOUT = 100;
};
typedef CircularBuffer<QByteArray, 2> Buffer;
{
qDebug() << "Hash::hashFile";
fileQueue.enqueue(file);
+ totalFileSize += file.size();
if (hashing)
return;
- emit startHashing(fileQueue.first().absoluteFilePath());
+ totalTime.start();
+ startHashing();
}
void Hash::endHashing(const QByteArray &hash)
qDebug() << "Hash::endHashing";
QFileInfo f = fileQueue.dequeue();
+ int fileElapsed = fileTime.elapsed();
+
+ emit fileHashed(f, hash);
+qDebug() << "File:" << f.fileName() << "Hash:" << hash << "Time:" << fileElapsed;
+
+
if (!fileQueue.isEmpty())
{
- emit startHashing(fileQueue.first().absoluteFilePath());
+ startHashing();
}
else
{
hashing = false;
+ int totalElapsed = totalTime.elapsed();
+ emit finished();
+qDebug() << "Total time:" << totalElapsed;
+ hashedFileSize = totalFileSize = 0;
}
- emit fileHashed(f, hash);
-qDebug() << "FILE" << f.fileName() << "HASH" << hash;
+
+}
+
+void Hash::reportProgress(qint64 read, qint64 total)
+{
+ emit fileProgress((read * 100) / total);
+ hashedFileSize += fileQueue.first().size() - read;
+ emit progress((hashedFileSize * 100) / totalFileSize);
}
+void Hash::startHashing()
+{
+ QString file = fileQueue.first().absoluteFilePath();
+
+ fileTime.start();
+
+ producer->readFile(file);
+ consumer->hashFile(file);
+}
void Hash::setUp()
{
if (producer || consumer || buffer)
return;
-
+qDebug() << "MAIN thread id is: " << QThread::currentThreadId();
buffer = new HashPrivate::Buffer;
producer = new HashPrivate::HashProducer(buffer, this);
consumer = new HashPrivate::HashConsumer(buffer, this);
- connect(this, SIGNAL(startHashing(QString)), consumer, SLOT(hashFile(QString)), Qt::QueuedConnection);
- connect(this, SIGNAL(startHashing(QString)), producer, SLOT(readFile(QString)), Qt::QueuedConnection);
- connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray)), Qt::QueuedConnection);
-
- producer->start();
- consumer->start();
+ connect(consumer, SIGNAL(finishedHashing(QByteArray)), this, SLOT(endHashing(QByteArray)));
+ connect(consumer, SIGNAL(progress(qint64,qint64)), this, SLOT(reportProgress(qint64,qint64)));
}
void Hash::tearDown()
if (!producer || !consumer || !buffer)
return;
- producer->stop();
- consumer->stop();
- producer->wait();
- consumer->wait();
-
delete producer;
delete consumer;
delete buffer;
#include <QQueue>
#include <QMap>
#include <QFileInfo>
+#include <QTime>
#include "hashproducer.h"
#include "hashconsumer.h"
void hashFile(const QFileInfo &file);
signals:
- void startHashing(const QString &file);
void fileHashed(const QFileInfo &file, const QByteArray &hash);
+ void fileProgress(int percent);
+ void progress(int percent);
+ void finished();
private slots:
void endHashing(const QByteArray &hash);
+ void reportProgress(qint64 read, qint64 total);
+
private:
+ void startHashing();
void setUp();
void tearDown();
QMap<QFileInfo, QByteArray> hashedFiles;
bool hashing;
+
+ qint64 hashedFileSize;
+ qint64 totalFileSize;
+
+ QTime fileTime;
+ QTime totalTime;
};
} // namesapce AniDBUdpClient
{
this->buffer = buffer;
hash = new QCryptographicHash(QCryptographicHash::Md4);
- connect(this, SIGNAL(startHashing()), this, SLOT(doHash()));
+
+ restart = false;
+ abort = false;
}
HashConsumer::~HashConsumer()
{
+ mutex.lock();
+ abort = true;
+ condition.wakeOne();
+ mutex.unlock();
+
+ wait();
delete hash;
}
void HashConsumer::hashFile(const QString &file)
{
+ QMutexLocker locker(&mutex);
qDebug() << "hashFile()";
fileSize = QFileInfo(file).size();
- emit startHashing();
-}
-
-void HashConsumer::stop()
-{
- m_stop = true;
- quit();
+ if (!isRunning())
+ start();
+ else
+ condition.wakeOne();
}
void HashConsumer::run()
{
- exec();
-}
+qDebug() << "Starting thread consumer";
+qDebug() << "Thread consumer id is: " << QThread::currentThreadId();
-void HashConsumer::doHash()
-{
- while (!buffer->end())
+ forever
{
-qDebug() << "doHash()->while(" << buffer->end() << ")";
- hashSome();
- }
- buffer->reset();
- hash->reset();
-}
+ mutex.lock();
+ qint64 totalSize = fileSize;
+ qint64 read = 0;
+ mutex.unlock();
-void HashConsumer::hashSome()
-{
- QByteArray data = buffer->get();
+ while (!(buffer->end() || abort))
+ {
+// qDebug() << "hash->while(" << buffer->end() << ")";
+ QByteArray data;
+
+ while (!(buffer->get(&data) || abort));
- hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4));
+ hash->addData(QCryptographicHash::hash(data, QCryptographicHash::Md4));
- if (buffer->end())
- emit finishedHashing(hash->result());
+ read += data.size();
+ emit progress(read, totalSize);
+ }
+ bool r = buffer->reset();
+qDebug() << "buffer reset" << r;
-qDebug() << "hashSome()";
+ if (abort)
+ return;
+
+ mutex.lock();
+ if (!restart)
+ {
+ emit finishedHashing(hash->result().toHex());
+ condition.wait(&mutex);
+ }
+ restart = false;
+ mutex.unlock();
+
+ hash->reset();
+ }
+qDebug() << "Thread consumer is stopping";
}
} // namespace HashPrivate
#include <QCryptographicHash>
#include <QFile>
#include <QFileInfo>
+#include <QMutex>
+#include <QWaitCondition>
#include "circularbuffer.h"
public slots:
void hashFile(const QString &file);
- void stop();
-
protected:
void run();
signals:
- void startHashing();
+ void progress(qint64 done, qint64 total);
void finishedHashing(QByteArray hash);
-private slots:
- void doHash();
-
private:
- void hashSome();
-
Buffer *buffer;
QCryptographicHash *hash;
qint64 fileSize;
bool m_stop;
+ bool restart;
+ bool abort;
+
+ QMutex mutex;
+ QWaitCondition condition;
};
} // namespace HashPrivate
#include "hashproducer.h"
+#include <QTimer>
+
#include <QDebug>
namespace AniDBUdpClient {
HashProducer::HashProducer(Buffer *buffer, QObject *parent) : QThread(parent)
{
this->buffer = buffer;
- connect(this, SIGNAL(startReading()), this, SLOT(doRead()));
+ restart = false;
+ abort = false;
+}
+
+HashProducer::~HashProducer()
+{
+ mutex.lock();
+ abort = true;
+ condition.wakeOne();
+ mutex.unlock();
+
+ wait();
}
void HashProducer::readFile(const QString &file)
{
qDebug() << "readFile";
- this->file.setFileName(file);
+qDebug() << "Thread id is: " << QThread::currentThreadId();
- fileSize = file.size();
+ QMutexLocker locker(&mutex);
- if (!this->file.open(QIODevice::ReadOnly))
- {
-qDebug() << "Failed toopen file" << this->file.fileName();
- return;
- }
+ fileName = file;
- emit startReading();
-}
+ if (!isRunning())
+ start();
+ else
+ condition.wakeOne();
-void HashProducer::stop()
-{
- m_stop = true;
- quit();
}
void HashProducer::run()
{
- exec();
-}
+qDebug() << "Starting thread producer";
+qDebug() << "Thread producer id is: " << QThread::currentThreadId();
-void HashProducer::doRead()
-{
- while (!this->file.atEnd())
+ forever
{
-qDebug() << "doRead->while(" << (!this->file.atEnd()) << ")";
- readSome();
- }
- this->file.close();
-}
+ mutex.lock();
+qDebug() << "Obtaining new file name";
+ QFile file(fileName);
+ mutex.unlock();
-void HashProducer::readSome()
-{
- QByteArray data = file.read(ED2K_PART_SIZE);
-qDebug() << "readSome";
- buffer->put(data, file.atEnd());
+ if (file.exists())
+ {
+ qDebug() << "File exists, opening";
+ if (file.open(QIODevice::ReadOnly))
+ {
+ while (!file.atEnd())
+ {
+ if (abort)
+ return;
+// qDebug() << "read->while(" << (!file.atEnd()) << ")";
+ QByteArray data = file.read(ED2K_PART_SIZE);
+ while (!(buffer->put(data, file.atEnd()) || abort));
+ }
+ }
+ }
+
+ if (abort)
+ return;
+
+ mutex.lock();
+ if (!restart)
+ condition.wait(&mutex);
+ restart = false;
+ mutex.unlock();
+ }
+qDebug() << "Thread producer is stopping";
}
} // namespace HashPrivate
#include <QThread>
#include <QFile>
#include <QFileInfo>
+#include <QMutex>
+#include <QWaitCondition>
#include "circularbuffer.h"
public:
HashProducer(Buffer *buffer, QObject *parent = 0);
+ ~HashProducer();
public slots:
void readFile(const QString &file);
- void stop();
protected:
void run();
-signals:
- void startReading();
- void finishedReading();
-
-private slots:
- void doRead();
-
private:
- void readSome();
-
Buffer *buffer;
+
+ QString fileName;
QFile file;
qint64 fileSize;
bool m_stop;
+ bool restart;
+ bool abort;
+
+ QMutex mutex;
+ QWaitCondition condition;
};
} // namespace HashPrivate
void MylistAddCommand::hash()
{
+ t.start();
future = QtConcurrent::run(this, &MylistAddCommand::doHash, m_file);
futureWatcher.setFuture(future);
}
}
m_ed2k = QByteArray(future);
emit hashComplete();
+ qDebug() << "Time:" << t.elapsed();
}
QByteArray MylistAddCommand::doHash(QString file)
{
size = f.read(data, ED2K_PART_SIZE);
ed2k.addData(QCryptographicHash::hash(QByteArray(data, size), QCryptographicHash::Md4));
-qDebug() << "hashing...";
+//qDebug() << "hashing...";
}
f.close();
delete[] data;
#include <QFuture>
#include <QFutureWatcher>
+#include <QTime>
namespace AniDBUdpClient {
int mylistId;
static const qint64 ED2K_PART_SIZE = 9728000;
+
+ QTime t;
};
} // namespace AniDBUdpClient
return;
}
-// AniDBUdpClient::Hash *h = new AniDBUdpClient::Hash(this);
-// h->hashFile(fileInfo);
+ AniDBUdpClient::Hash *h = new AniDBUdpClient::Hash(this);
+ h->hashFile(fileInfo);
playlist->setDirectory(fileInfo.absoluteDir());