Qt 4.8
qtconcurrentreducekernel.h
Go to the documentation of this file.
1 /****************************************************************************
2 **
3 ** Copyright (C) 2014 Digia Plc and/or its subsidiary(-ies).
4 ** Contact: http://www.qt-project.org/legal
5 **
6 ** This file is part of the QtCore module of the Qt Toolkit.
7 **
8 ** $QT_BEGIN_LICENSE:LGPL$
9 ** Commercial License Usage
10 ** Licensees holding valid commercial Qt licenses may use this file in
11 ** accordance with the commercial license agreement provided with the
12 ** Software or, alternatively, in accordance with the terms contained in
13 ** a written agreement between you and Digia. For licensing terms and
14 ** conditions see http://qt.digia.com/licensing. For further information
15 ** use the contact form at http://qt.digia.com/contact-us.
16 **
17 ** GNU Lesser General Public License Usage
18 ** Alternatively, this file may be used under the terms of the GNU Lesser
19 ** General Public License version 2.1 as published by the Free Software
20 ** Foundation and appearing in the file LICENSE.LGPL included in the
21 ** packaging of this file. Please review the following information to
22 ** ensure the GNU Lesser General Public License version 2.1 requirements
23 ** will be met: http://www.gnu.org/licenses/old-licenses/lgpl-2.1.html.
24 **
25 ** In addition, as a special exception, Digia gives you certain additional
26 ** rights. These rights are described in the Digia Qt LGPL Exception
27 ** version 1.1, included in the file LGPL_EXCEPTION.txt in this package.
28 **
29 ** GNU General Public License Usage
30 ** Alternatively, this file may be used under the terms of the GNU
31 ** General Public License version 3.0 as published by the Free Software
32 ** Foundation and appearing in the file LICENSE.GPL included in the
33 ** packaging of this file. Please review the following information to
34 ** ensure the GNU General Public License version 3.0 requirements will be
35 ** met: http://www.gnu.org/copyleft/gpl.html.
36 **
37 **
38 ** $QT_END_LICENSE$
39 **
40 ****************************************************************************/
41 
42 #ifndef QTCONCURRENT_REDUCEKERNEL_H
43 #define QTCONCURRENT_REDUCEKERNEL_H
44 
45 #include <QtCore/qglobal.h>
46 
47 #ifndef QT_NO_CONCURRENT
48 
49 #include <QtCore/qatomic.h>
50 #include <QtCore/qlist.h>
51 #include <QtCore/qmap.h>
52 #include <QtCore/qmutex.h>
53 #include <QtCore/qthread.h>
54 #include <QtCore/qthreadpool.h>
55 #include <QtCore/qvector.h>
56 
59 
60 QT_MODULE(Core)
61 
62 namespace QtConcurrent {
63 
64 #ifndef qdoc
65 
66 /*
67  The ReduceQueueStartLimit and ReduceQueueThrottleLimit constants
68  limit the reduce queue size for MapReduce. When the number of
69  reduce blocks in the queue exceeds ReduceQueueStartLimit,
70  MapReduce won't start any new threads, and when it exceeds
71  ReduceQueueThrottleLimit running threads will be stopped.
72 */
73 enum {
74  ReduceQueueStartLimit = 20,
75  ReduceQueueThrottleLimit = 30
76 };
77 
78 // IntermediateResults holds a block of intermediate results from a
79 // map or filter functor. The begin/end offsets indicates the origin
80 // and range of the block.
81 template <typename T>
82 class IntermediateResults
83 {
84 public:
85  int begin, end;
86  QVector<T> vector;
87 };
88 
89 #endif // qdoc
90 
95  // ParallelReduce = 0x8
96 };
97 Q_DECLARE_FLAGS(ReduceOptions, ReduceOption)
98 Q_DECLARE_OPERATORS_FOR_FLAGS(ReduceOptions)
99 
100 #ifndef qdoc
101 
102 // supports both ordered and out-of-order reduction
103 template <typename ReduceFunctor, typename ReduceResultType, typename T>
104 class ReduceKernel
105 {
106  typedef QMap<int, IntermediateResults<T> > ResultsMap;
107 
108  const ReduceOptions reduceOptions;
109 
110  QMutex mutex;
111  int progress, resultsMapSize, threadCount;
112  ResultsMap resultsMap;
113 
114  bool canReduce(int begin) const
115  {
116  return (((reduceOptions & UnorderedReduce)
117  && progress == 0)
118  || ((reduceOptions & OrderedReduce)
119  && progress == begin));
120  }
121 
122  void reduceResult(ReduceFunctor &reduce,
123  ReduceResultType &r,
124  const IntermediateResults<T> &result)
125  {
126  for (int i = 0; i < result.vector.size(); ++i) {
127  reduce(r, result.vector.at(i));
128  }
129  }
130 
131  void reduceResults(ReduceFunctor &reduce,
132  ReduceResultType &r,
133  ResultsMap &map)
134  {
135  typename ResultsMap::iterator it = map.begin();
136  while (it != map.end()) {
137  reduceResult(reduce, r, it.value());
138  ++it;
139  }
140  }
141 
142 public:
143  ReduceKernel(ReduceOptions _reduceOptions)
144  : reduceOptions(_reduceOptions), progress(0), resultsMapSize(0),
145  threadCount(QThreadPool::globalInstance()->maxThreadCount())
146  { }
147 
148  void runReduce(ReduceFunctor &reduce,
149  ReduceResultType &r,
150  const IntermediateResults<T> &result)
151  {
152  QMutexLocker locker(&mutex);
153  if (!canReduce(result.begin)) {
154  ++resultsMapSize;
155  resultsMap.insert(result.begin, result);
156  return;
157  }
158 
159  if (reduceOptions & UnorderedReduce) {
160  // UnorderedReduce
161  progress = -1;
162 
163  // reduce this result
164  locker.unlock();
165  reduceResult(reduce, r, result);
166  locker.relock();
167 
168  // reduce all stored results as well
169  while (!resultsMap.isEmpty()) {
170  ResultsMap resultsMapCopy = resultsMap;
171  resultsMap.clear();
172 
173  locker.unlock();
174  reduceResults(reduce, r, resultsMapCopy);
175  locker.relock();
176 
177  resultsMapSize -= resultsMapCopy.size();
178  }
179 
180  progress = 0;
181  } else {
182  // reduce this result
183  locker.unlock();
184  reduceResult(reduce, r, result);
185  locker.relock();
186 
187  // OrderedReduce
188  progress += result.end - result.begin;
189 
190  // reduce as many other results as possible
191  typename ResultsMap::iterator it = resultsMap.begin();
192  while (it != resultsMap.end()) {
193  if (it.value().begin != progress)
194  break;
195 
196  locker.unlock();
197  reduceResult(reduce, r, it.value());
198  locker.relock();
199 
200  --resultsMapSize;
201  progress += it.value().end - it.value().begin;
202  it = resultsMap.erase(it);
203  }
204  }
205  }
206 
207  // final reduction
208  void finish(ReduceFunctor &reduce, ReduceResultType &r)
209  {
210  reduceResults(reduce, r, resultsMap);
211  }
212 
213  inline bool shouldThrottle()
214  {
215  return (resultsMapSize > (ReduceQueueThrottleLimit * threadCount));
216  }
217 
218  inline bool shouldStartThread()
219  {
220  return (resultsMapSize <= (ReduceQueueStartLimit * threadCount));
221  }
222 };
223 
224 template <typename Sequence, typename Base, typename Functor1, typename Functor2>
225 struct SequenceHolder2 : public Base
226 {
227  SequenceHolder2(const Sequence &_sequence,
228  Functor1 functor1,
229  Functor2 functor2,
230  ReduceOptions reduceOptions)
231  : Base(_sequence.begin(), _sequence.end(), functor1, functor2, reduceOptions),
232  sequence(_sequence)
233  { }
234 
235  Sequence sequence;
236 
237  void finish()
238  {
239  Base::finish();
240  // Clear the sequence to make sure all temporaries are destroyed
241  // before finished is signaled.
242  sequence = Sequence();
243  }
244 };
245 
246 #endif //qdoc
247 
248 } // namespace QtConcurrent
249 
252 
253 #endif // QT_NO_CONCURRENT
254 
255 #endif
#define QT_END_NAMESPACE
This macro expands to.
Definition: qglobal.h:90
The QMutex class provides access serialization between threads.
Definition: qmutex.h:60
#define QT_MODULE(x)
Definition: qglobal.h:2783
#define QT_BEGIN_HEADER
Definition: qglobal.h:136
#define Q_DECLARE_FLAGS(Flags, Enum)
The Q_DECLARE_FLAGS() macro expands to.
Definition: qglobal.h:2348
#define it(className, varName)
void unlock()
Unlocks this mutex locker.
Definition: qmutex.h:117
The QVector class is a template class that provides a dynamic array.
Definition: qdatastream.h:64
void relock()
Relocks an unlocked mutex locker.
Definition: qmutex.h:125
#define QT_BEGIN_NAMESPACE
This macro expands to.
Definition: qglobal.h:89
#define Q_DECLARE_OPERATORS_FOR_FLAGS(Flags)
The Q_DECLARE_OPERATORS_FOR_FLAGS() macro declares global operator|() functions for Flags...
Definition: qglobal.h:2355
ReduceOption
This enum specifies the order of which results from the map or filter function are passed to the redu...
The QMutexLocker class is a convenience class that simplifies locking and unlocking mutexes...
Definition: qmutex.h:101
The QtConcurrent namespace provides high-level APIs that make it possible to write multi-threaded pro...
static QThreadPool * globalInstance()
Returns the global QThreadPool instance.
static const KeyPair *const end
#define QT_END_HEADER
Definition: qglobal.h:137
The QMap class is a template class that provides a skip-list-based dictionary.
Definition: qdatastream.h:67