chunkedseq
container library for large in-memory data sets
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Modules Pages
Parallel processing

The containers of chunkedseq are well suited to applications which use fork-join parallelism: thanks to the logarithmic-time split operations, chunkedseq containers can be divided efficiently, and thanks to the logarithmic-time concatenate operations, chunkedseq containers can be merged efficiently. Moreover, chunkedseq containers can be processed efficiently in a sequential fashion, thereby enabling a liberal programming style in which sequential and parallel processing styles are combined synergistically. The following example programs deomonstrate this style.

Remarks
The data structures of the chunkedseq package are not concurrent data structures, or, put differently, chunkedseq data structures admit only single-threaded update operations.
The following examples are evidence that this single-threading restriction does not necessarily limit parallelism.

Example: pkeep_if

To see how our deque can be used for parallel processing, let us consider the following program, which constructs the subsequence of a given sequence, based on selections taken by a client-supplied predicate function. Assuming fork-join parallel constructs, such as Cilk's spawn and sync, the selection and build process of the pkeep_if function can achieve a large (in fact, unbounded) amount of parallelism thanks to the fact that the span of the computation is logarithmic in the size of the input sequence. Moreover, pkeep_if is work efficient thanks to the fact that the algorithm takes linear time in the size of the input sequence (assuming, of course, that the client-supplied predicate takes constant time).

#include <iostream>
#include "chunkedseq.hpp"
// moves items which satisfy a given predicate p from src to dst
// preserving original order of items in src
template <class Predicate_function>
void pkeep_if(cbdeque& dst, cbdeque& src, const Predicate_function& p) {
const int cutoff = 8096;
long sz = src.size();
if (sz <= cutoff) {
// compute result in a sequential fashion
while (sz-- > 0) {
long item = src.pop_back();
if (p(item))
dst.push_front(item);
}
} else {
cbdeque src2;
cbdeque dst2;
// divide the input evenly in two halves
size_t mid = sz / 2;
src.split(mid, src2);
// recurse on subproblems
// calls can execute in parallel
pkeep_if(dst, src, p);
pkeep_if(dst2, src2, p);
// combine results (after parallel calls complete)
dst.concat(dst2);
}
}
int main(int argc, const char * argv[]) {
cbdeque src;
cbdeque dst;
const long n = 1000000;
// fill the source container with [1, ..., 2n]
for (long i = 1; i <= 2*n; i++)
src.push_back(i);
// leave src empty and dst = [1, 3, 5, ... n-1]
pkeep_if(dst, src, [] (long x) { return x%2 == 1; });
assert(src.empty());
assert(dst.size() == n);
// calculate the sum
long sum = 0;
while (! dst.empty())
sum += dst.pop_front();
// the sum of n consecutive odd integers starting from 1 equals n^2
assert(sum == n*n);
std::cout << "sum = " << sum << std::endl;
return 0;
}

Output

sum = 1000000000000

Example: pcopy

This algorithm implements a parallel version of std::copy. Note, however, that the two versions differ slightly: in our version, the type of the destination parameter is a reference to the destination, whereas the corresponding type in std::copy is instead an iterator that points to the beginning of the destination container.

#include <iostream>
#include <string>
#include <assert.h>
#include "chunkedseq.hpp"
template <class Chunkedseq>
void pcopy(typename Chunkedseq::iterator first,
typename Chunkedseq::iterator last,
Chunkedseq& destination) {
using iterator = typename Chunkedseq::iterator;
using ptr = typename Chunkedseq::const_pointer;
const long cutoff = 8192;
long sz = last.size() - first.size();
if (sz <= cutoff) {
// compute result in a sequential fashion
Chunkedseq::for_each_segment(first, last, [&] (ptr lo, ptr hi) {
destination.pushn_back(lo, hi-lo);
});
} else {
// select split position to be the median
iterator mid = first + (sz/2);
Chunkedseq destination2;
// recurse on subproblems
// calls can execute in parallel
pcopy(first, mid, destination);
pcopy(mid, last, destination2);
// combine results
destination.concat(destination2);
}
}
int main(int argc, const char * argv[]) {
const int chunk_size = 2;
mydeque_type mydeque = { 0, 1, 2, 3, 4, 5 };
mydeque_type mydeque2;
pcopy(mydeque.begin(), mydeque.end(), mydeque2);
std::cout << "mydeque2 contains:";
auto p = mydeque2.begin();
while (p != mydeque2.end())
std::cout << " " << *p++;
std::cout << std::endl;
return 0;
}

Output

mydeque2 contains: 0 1 2 3 4 5

Example: pcopy_if

This algorithm implements a parallel version of std::copy_if. Just as before, our implementation uses a type for the third parameter that is different from the corresponding third parameter of the STL version.

#include <iostream>
#include <string>
#include <assert.h>
#include "chunkedseq.hpp"
template <class Chunkedseq, class UnaryPredicate>
void pcopy_if(typename Chunkedseq::iterator first,
typename Chunkedseq::iterator last,
Chunkedseq& destination,
const UnaryPredicate& pred) {
using iterator = typename Chunkedseq::iterator;
using ptr = typename Chunkedseq::const_pointer;
const long cutoff = 8192;
long sz = last.size() - first.size();
if (sz <= cutoff) {
// compute result in a sequential fashion
Chunkedseq::for_each_segment(first, last, [&] (ptr lo, ptr hi) {
for (ptr p = lo; p < hi; p++) {
value_type v = *p;
if (pred(v))
destination.push_back(v);
}
});
} else {
// select split position to be the median
iterator mid = first + (sz/2);
Chunkedseq destination2;
// recurse on subproblems
// calls can execute in parallel
pcopy_if(first, mid, destination, pred);
pcopy_if(mid, last, destination2, pred);
destination.concat(destination2);
}
}
int main(int argc, const char * argv[]) {
const int chunk_size = 2;
mydeque_type mydeque = { 0, 1, 2, 3, 4, 5 };
mydeque_type mydeque2;
pcopy_if(mydeque.begin(), mydeque.end(), mydeque2, [] (int i) { return i%2==0; });
std::cout << "mydeque2 contains:";
auto p = mydeque2.begin();
while (p != mydeque2.end())
std::cout << " " << *p++;
std::cout << std::endl;
return 0;
}

Output

mydeque2 contains: 0 2 4