Qt 4.8
qthreadpool.cpp
Go to the documentation of this file.
1 /****************************************************************************
2 **
3 ** Copyright (C) 2014 Digia Plc and/or its subsidiary(-ies).
4 ** Contact: http://www.qt-project.org/legal
5 **
6 ** This file is part of the QtCore module of the Qt Toolkit.
7 **
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** Commercial License Usage
10 ** Licensees holding valid commercial Qt licenses may use this file in
11 ** accordance with the commercial license agreement provided with the
12 ** Software or, alternatively, in accordance with the terms contained in
13 ** a written agreement between you and Digia. For licensing terms and
14 ** conditions see http://qt.digia.com/licensing. For further information
15 ** use the contact form at http://qt.digia.com/contact-us.
16 **
17 ** GNU Lesser General Public License Usage
18 ** Alternatively, this file may be used under the terms of the GNU Lesser
19 ** General Public License version 2.1 as published by the Free Software
20 ** Foundation and appearing in the file LICENSE.LGPL included in the
21 ** packaging of this file. Please review the following information to
22 ** ensure the GNU Lesser General Public License version 2.1 requirements
23 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
24 **
25 ** In addition, as a special exception, Digia gives you certain additional
26 ** rights. These rights are described in the Digia Qt LGPL Exception
27 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
28 **
29 ** GNU General Public License Usage
30 ** Alternatively, this file may be used under the terms of the GNU
31 ** General Public License version 3.0 as published by the Free Software
32 ** Foundation and appearing in the file LICENSE.GPL included in the
33 ** packaging of this file. Please review the following information to
34 ** ensure the GNU General Public License version 3.0 requirements will be
35 ** met: http://www.gnu.org/copyleft/gpl.html.
36 **
37 **
38 ** $QT_END_LICENSE$
39 **
40 ****************************************************************************/
41 
42 #include "qthreadpool.h"
43 #include "qthreadpool_p.h"
44 #include "qelapsedtimer.h"
45 
46 #ifndef QT_NO_THREAD
47 
49 
50 inline bool operator<(int priority, const QPair<QRunnable *, int> &p)
51 {
52  return p.second < priority;
53 }
54 inline bool operator<(const QPair<QRunnable *, int> &p, int priority)
55 {
56  return priority < p.second;
57 }
58 
59 Q_GLOBAL_STATIC(QThreadPool, theInstance)
60 
61 /*
62  QThread wrapper, provides synchronization against a ThreadPool
63 */
65 {
66 public:
68  void run();
69  void registerThreadInactive();
70 
74 };
75 
76 /*
77  QThreadPool private class.
78 */
79 
80 
85  :manager(manager), runnable(0)
86 { }
87 
88 /* \internal
89 
90 */
92 {
93  QMutexLocker locker(&manager->mutex);
94  for(;;) {
95  QRunnable *r = runnable;
96  runnable = 0;
97 
98  do {
99  if (r) {
100  const bool autoDelete = r->autoDelete();
101 
102 
103  // run the task
104  locker.unlock();
105 #ifndef QT_NO_EXCEPTIONS
106  try {
107 #endif
108  r->run();
109 #ifndef QT_NO_EXCEPTIONS
110  } catch (...) {
111  qWarning("Qt Concurrent has caught an exception thrown from a worker thread.\n"
112  "This is not supported, exceptions thrown in worker threads must be\n"
113  "caught before control returns to Qt Concurrent.");
115  throw;
116  }
117 #endif
118  locker.relock();
119 
120  if (autoDelete && !--r->ref)
121  delete r;
122  }
123 
124  // if too many threads are active, expire this thread
125  if (manager->tooManyThreadsActive())
126  break;
127 
128  r = !manager->queue.isEmpty() ? manager->queue.takeFirst().first : 0;
129  } while (r != 0);
130 
131  if (manager->isExiting) {
133  break;
134  }
135 
136  // if too many threads are active, expire this thread
137  bool expired = manager->tooManyThreadsActive();
138  if (!expired) {
139  manager->waitingThreads.enqueue(this);
141  // wait for work, exiting after the expiry timeout is reached
142  runnableReady.wait(locker.mutex(), manager->expiryTimeout);
143  ++manager->activeThreads;
144  if (manager->waitingThreads.removeOne(this))
145  expired = true;
146  }
147  if (expired) {
148  manager->expiredThreads.enqueue(this);
150  break;
151  }
152  }
153 }
154 
156 {
157  if (--manager->activeThreads == 0)
158  manager->noActiveThreads.wakeAll();
159 }
160 
161 
162 /* \internal
163 
164 */
166  : isExiting(false),
167  expiryTimeout(30000),
168  maxThreadCount(qAbs(QThread::idealThreadCount())),
169  reservedThreads(0),
170  activeThreads(0)
171 { }
172 
174 {
175  if (allThreads.isEmpty()) {
176  // always create at least one thread
177  startThread(task);
178  return true;
179  }
180 
181  // can't do anything if we're over the limit
183  return false;
184 
185  if (waitingThreads.count() > 0) {
186  // recycle an available thread
187  enqueueTask(task);
188  waitingThreads.takeFirst()->runnableReady.wakeOne();
189  return true;
190  }
191 
192  if (!expiredThreads.isEmpty()) {
193  // restart an expired thread
195  Q_ASSERT(thread->runnable == 0);
196 
197  ++activeThreads;
198 
199  if (task->autoDelete())
200  ++task->ref;
201  thread->runnable = task;
202  thread->start();
203  return true;
204  }
205 
206  // start a new thread
207  startThread(task);
208  return true;
209 }
210 
211 void QThreadPoolPrivate::enqueueTask(QRunnable *runnable, int priority)
212 {
213  if (runnable->autoDelete())
214  ++runnable->ref;
215 
216  // put it on the queue
217  QList<QPair<QRunnable *, int> >::iterator at =
218  qUpperBound(queue.begin(), queue.end(), priority);
219  queue.insert(at, qMakePair(runnable, priority));
220 }
221 
223 {
224  return (allThreads.count()
227  + reservedThreads);
228 }
229 
231 {
232  // try to push tasks on the queue to any available threads
233  while (!queue.isEmpty() && tryStart(queue.first().first))
234  queue.removeFirst();
235 }
236 
238 {
239  const int activeThreadCount = this->activeThreadCount();
240  return activeThreadCount > maxThreadCount && (activeThreadCount - reservedThreads) > 1;
241 }
242 
247 {
249  thread->setObjectName(QLatin1String("Thread (pooled)"));
250  allThreads.insert(thread.data());
251  ++activeThreads;
252 
253  if (runnable->autoDelete())
254  ++runnable->ref;
255  thread->runnable = runnable;
256  thread.take()->start();
257 }
258 
267 {
268  QMutexLocker locker(&mutex);
269  isExiting = true;
270 
271  do {
272  // make a copy of the set so that we can iterate without the lock
273  QSet<QThreadPoolThread *> allThreadsCopy = allThreads;
274  allThreads.clear();
275  locker.unlock();
276 
277  foreach (QThreadPoolThread *thread, allThreadsCopy) {
278  thread->runnableReady.wakeAll();
279  thread->wait();
280  delete thread;
281  }
282 
283  locker.relock();
284  // repeat until all newly arrived threads have also completed
285  } while (!allThreads.isEmpty());
286 
289 
290  isExiting = false;
291 }
292 
294 {
295  QMutexLocker locker(&mutex);
296  if (msecs < 0) {
297  while (!(queue.isEmpty() && activeThreads == 0))
298  noActiveThreads.wait(locker.mutex());
299  } else {
301  timer.start();
302  int t;
303  while (!(queue.isEmpty() && activeThreads == 0) &&
304  ((t = msecs - timer.elapsed()) > 0))
305  noActiveThreads.wait(locker.mutex(), t);
306  }
307  return queue.isEmpty() && activeThreads == 0;
308 }
309 
318 {
319  QMutexLocker locker(&mutex);
320  if (queue.isEmpty())
321  return false;
322 
323  QRunnable *runnable = queue.takeFirst().first;
324  const bool autoDelete = runnable->autoDelete();
325  bool del = autoDelete && !--runnable->ref;
326 
327  locker.unlock();
328  runnable->run();
329  locker.relock();
330 
331  if (del) {
332  delete runnable;
333  }
334 
335  return true;
336 }
337 
348 {
349  if (runnable == 0 || queue.isEmpty())
350  return;
351  bool found = false;
352  {
353  QMutexLocker locker(&mutex);
355  QList<QPair<QRunnable *, int> >::iterator end = queue.end();
356 
357  while (it != end) {
358  if (it->first == runnable) {
359  found = true;
360  queue.erase(it);
361  break;
362  }
363  ++it;
364  }
365  }
366 
367  if (!found)
368  return;
369 
370  const bool autoDelete = runnable->autoDelete();
371  bool del = autoDelete && !--runnable->ref;
372 
373  runnable->run();
374 
375  if (del) {
376  delete runnable;
377  }
378 }
379 
440  : QObject(*new QThreadPoolPrivate, parent)
441 { }
442 
448 {
449  d_func()->waitForDone();
450  d_func()->reset();
451 }
452 
457 {
458  return theInstance();
459 }
460 
476 void QThreadPool::start(QRunnable *runnable, int priority)
477 {
478  if (!runnable)
479  return;
480 
481  Q_D(QThreadPool);
482  QMutexLocker locker(&d->mutex);
483  if (!d->tryStart(runnable)) {
484  d->enqueueTask(runnable, priority);
485 
486  if (!d->waitingThreads.isEmpty())
487  d->waitingThreads.takeFirst()->runnableReady.wakeOne();
488  }
489 }
490 
508 {
509  if (!runnable)
510  return false;
511 
512  Q_D(QThreadPool);
513 
514  QMutexLocker locker(&d->mutex);
515 
516  if (d->allThreads.isEmpty() == false && d->activeThreadCount() >= d->maxThreadCount)
517  return false;
518 
519  return d->tryStart(runnable);
520 }
521 
539 int QThreadPool::expiryTimeout() const
540 {
541  Q_D(const QThreadPool);
542  return d->expiryTimeout;
543 }
544 
546 {
547  Q_D(QThreadPool);
548  if (d->expiryTimeout == expiryTimeout)
549  return;
550  d->expiryTimeout = expiryTimeout;
551 }
552 
567 int QThreadPool::maxThreadCount() const
568 {
569  Q_D(const QThreadPool);
570  return d->maxThreadCount;
571 }
572 
574 {
575  Q_D(QThreadPool);
576  QMutexLocker locker(&d->mutex);
577 
578  if (maxThreadCount == d->maxThreadCount)
579  return;
580 
581  d->maxThreadCount = maxThreadCount;
582  d->tryToStartMoreThreads();
583 }
584 
599 {
600  Q_D(const QThreadPool);
601  QMutexLocker locker(&d->mutex);
602  return d->activeThreadCount();
603 }
604 
618 {
619  Q_D(QThreadPool);
620  QMutexLocker locker(&d->mutex);
621  ++d->reservedThreads;
622 }
623 
637 {
638  Q_D(QThreadPool);
639  QMutexLocker locker(&d->mutex);
640  --d->reservedThreads;
641  d->tryToStartMoreThreads();
642 }
643 
648 {
649  Q_D(QThreadPool);
650  d->waitForDone();
651  d->reset();
652 }
653 
666 {
667  Q_D(QThreadPool);
668  bool rc = d->waitForDone(msecs);
669  if (rc)
670  d->reset();
671  return rc;
672 }
673 
675 
676 #endif
double d
Definition: qnumeric_p.h:62
void releaseThread()
Releases a thread previously reserved by a call to reserveThread().
QRunnable * runnable
Definition: qthreadpool.cpp:73
Q_OUTOFLINE_TEMPLATE RandomAccessIterator qUpperBound(RandomAccessIterator begin, RandomAccessIterator end, const T &value)
Definition: qalgorithms.h:262
bool tooManyThreadsActive() const
QThreadPoolThread(QThreadPoolPrivate *manager)
Definition: qthreadpool.cpp:84
int activeThreadCount() const
#define QT_END_NAMESPACE
This macro expands to.
Definition: qglobal.h:90
T * data() const
Returns the value of the pointer referenced by this object.
#define it(className, varName)
void registerThreadInactive()
#define at(className, varName)
bool isEmpty() const
Definition: qset.h:77
T * take()
Returns the value of the pointer referenced by this object.
void unlock()
Unlocks this mutex locker.
Definition: qmutex.h:117
iterator begin()
Returns an STL-style iterator pointing to the first item in the list.
Definition: qlist.h:267
void insert(int i, const T &t)
Inserts value at index position i in the list.
Definition: qlist.h:575
QLatin1String(DBUS_INTERFACE_DBUS))) Q_GLOBAL_STATIC_WITH_ARGS(QString
int count(const T &t) const
Returns the number of occurrences of value in the list.
Definition: qlist.h:891
EventLoopTimerRef timer
Q_DECL_CONSTEXPR T qAbs(const T &t)
Definition: qglobal.h:1201
#define Q_ASSERT(cond)
Definition: qglobal.h:1823
The QObject class is the base class of all Qt objects.
Definition: qobject.h:111
virtual void run()=0
Implement this pure virtual function in your subclass.
#define Q_D(Class)
Definition: qglobal.h:2482
The QElapsedTimer class provides a fast way to calculate elapsed times.
Definition: qelapsedtimer.h:53
The QRunnable class is the base class for all runnable objects.
Definition: qrunnable.h:52
qint64 elapsed() const
Returns the number of milliseconds since this QElapsedTimer was last started.
bool isEmpty() const
Returns true if the list contains no items; otherwise returns false.
Definition: qlist.h:152
void stealRunnable(QRunnable *)
Searches for runnable in the queue, removes it from the queue and runs it if found.
The QScopedPointer class stores a pointer to a dynamically allocated object, and deletes it upon dest...
void setObjectName(const QString &name)
Definition: qobject.cpp:1112
void relock()
Relocks an unlocked mutex locker.
Definition: qmutex.h:125
bool removeOne(const T &t)
Removes the first occurrence of value in the list and returns true on success; otherwise returns fals...
Definition: qlist.h:796
The QThreadPool class manages a collection of QThreads.
Definition: qthreadpool.h:58
#define QT_BEGIN_NAMESPACE
This macro expands to.
Definition: qglobal.h:89
QQueue< QThreadPoolThread * > expiredThreads
Definition: qthreadpool_p.h:92
bool tryStart(QRunnable *runnable)
Attempts to reserve a thread to run runnable.
void waitForDone()
Waits for each thread to exit and removes all threads from the thread pool.
T takeFirst()
Removes the first item in the list and returns it.
Definition: qlist.h:489
friend class QThreadPoolThread
Definition: qthreadpool_p.h:71
iterator end()
Returns an STL-style iterator pointing to the imaginary item after the last item in the list...
Definition: qlist.h:270
#define Q_GLOBAL_STATIC(TYPE, NAME)
Declares a global static variable with the given type and name.
Definition: qglobal.h:1968
int expiryTimeout() const
void start(QRunnable *runnable, int priority=0)
Reserves a thread and uses it to run runnable, unless this thread will make the current thread count ...
Q_CORE_EXPORT void qWarning(const char *,...)
const_iterator insert(const T &value)
Definition: qset.h:179
QThreadPoolPrivate * manager
Definition: qthreadpool.cpp:72
void clear()
Definition: qset.h:87
void removeFirst()
Removes the first item in the list.
Definition: qlist.h:286
int count() const
Definition: qset.h:178
~QThreadPool()
Destroys the QThreadPool.
void run()
The starting point for the thread.
Definition: qthreadpool.cpp:91
void clear()
Removes all items from the list.
Definition: qlist.h:764
void reset()
Makes all threads exit, waits for each thread to exit and deletes it.
int ref
Definition: qrunnable.h:54
QSet< QThreadPoolThread * > allThreads
Definition: qthreadpool_p.h:90
bool wait(QMutex *mutex, unsigned long time=ULONG_MAX)
void setExpiryTimeout(int expiryTimeout)
T & first()
Returns a reference to the first item in the list.
Definition: qlist.h:282
void enqueue(const T &t)
Adds value t to the tail of the queue.
Definition: qqueue.h:60
iterator erase(iterator pos)
Removes the item associated with the iterator pos from the list, and returns an iterator to the next ...
Definition: qlist.h:464
The QMutexLocker class is a convenience class that simplifies locking and unlocking mutexes...
Definition: qmutex.h:101
int maxThreadCount() const
void start(Priority=InheritPriority)
Begins execution of the thread by calling run().
bool autoDelete() const
Returns true is auto-deletion is enabled; false otherwise.
Definition: qrunnable.h:66
bool wait(unsigned long time=ULONG_MAX)
Blocks the thread until either of these conditions is met:
Q_OUTOFLINE_TEMPLATE QPair< T1, T2 > qMakePair(const T1 &x, const T2 &y)
Definition: qpair.h:102
void tryToStartMoreThreads()
QWaitCondition noActiveThreads
Definition: qthreadpool_p.h:94
static QThreadPool * globalInstance()
Returns the global QThreadPool instance.
QObject * parent
Definition: qobject.h:92
QFuture< T > run(Function function,...)
bool startFrontRunnable()
Pulls a runnable from the front queue and runs it in the current thread.
int activeThreadCount() const
void enqueueTask(QRunnable *task, int priority=0)
QWaitCondition runnableReady
Definition: qthreadpool.cpp:71
QThreadPool(QObject *parent=0)
Constructs a thread pool with the given parent.
T dequeue()
Removes the head item in the queue and returns it.
Definition: qqueue.h:61
#define class
void startThread(QRunnable *runnable=0)
static const KeyPair *const end
QQueue< QThreadPoolThread * > waitingThreads
Definition: qthreadpool_p.h:91
The QThread class provides a platform-independent way to manage threads.
Definition: qthread.h:59
void setMaxThreadCount(int maxThreadCount)
void start()
Starts this timer.
QMutex * mutex() const
Returns a pointer to the mutex that was locked in the constructor.
Definition: qmutex.h:140
QList< QPair< QRunnable *, int > > queue
Definition: qthreadpool_p.h:93
void reserveThread()
Reserves one thread, disregarding activeThreadCount() and maxThreadCount().
The QList class is a template class that provides lists.
Definition: qdatastream.h:62
bool tryStart(QRunnable *task)
bool waitForDone(int msecs=-1)
static int idealThreadCount()
Returns the ideal number of threads that can be run on the system.