std.parallelism

Переместиться к: defaultPoolThreads · parallel · scopedTask · Task · task · TaskPool · taskPool · totalCPUs

std.parallelism реализует примитивы высокого уровня для параллельного выполнения на системах с симметричной мультипроцессорностью (SMP). К ним относятся параллельный foreach (цикл), параллельный reduce (свёртка), жадный параллельный map (отображение), параллельный pipelining (конвейерная обработка), и параллелизм future/promise (будущие результаты/обещание). std.parallelism рекомендуется, когда одна и та же операция должна выполняться параллельно на различных данных, или когда функция должна выполняться в фоновом потоке, и ее результат возвращается в чётко заданный главный поток. Для использования сообщений между произвольными потоками смотрите модуль std.concurrency.
std.parallelism основан на концепции Task (задача). Задача Task – это объект, который представляет основную единицу работы в этой библиотеке и может выполняться параллельно с любой другой задачей. Непосредственное использование Task позволяет программировать с помощью парадигмы future/promise. Все остальные поддерживаемые парадигмы параллельности (параллельные foreach, map, reduce, pipelining) представляют собой дополнительный уровень абстракции над Task. Они автоматически создают один или несколько объектов Task или тесно связанных типов, которые концептуально идентичны, но не являются частью общедоступного API.
После создания задача Task может быть выполнена в новом потоке или отправлена в TaskPool (пул задач) для выполнения. TaskPool формирует очередь задач и рабочие потоки. Его цель – эффективное отображение большого количества задач на меньшее количество потоков. Очередь задач – это FIFO-очередь объектов Task, которые были отправлены в TaskPool и ждут выполнения. Рабочий поток – это поток, который связан только с одной очередью задач. Он выполняет задачу Task из начала своей очереди, когда в очереди имеется работа, или спит, когда нет работы. Каждая очередь задач связана с нулём или более рабочими потоками. Если результат задачи необходим до того, как рабочий поток станет доступным, задачу Task можно удалить из очереди задач и немедленно выполнить в потоке, в котором требуется этот результат.

Предупреждение: Артефакты этого модуля, если они не отмечены как @trusted или @safe, допускают неявный обмен данными между потоками и не могут гарантировать, что клиентский код будет свободен от низкоуровневых гонок данных.

Исходный код: std/parallelism.d

Автор: David Simcha

Лицензия:
Boost License 1.0
Примеры:
import std.algorithm : map;
import std.range : iota;
import std.math : approxEqual;
import std.parallelism : taskPool;

// Параллельный reduce можно комбинировать с
// std.algorithm.map для получения интересного эффекта.
// Следующий пример (спасибо Russel Winder)
// вычисляет pi через квадратуру с использованием
// std.algorithm.map и TaskPool.reduce.
// getTerm вычисляется параллельным образом по мере 
// необходимости в TaskPool.reduce.
//
// Тайминги на машине с четырёхядерным Intel i5-3450
// для n = 1_000_000_000:
//
// TaskPool.reduce:       1.067 s
// std.algorithm.reduce:  4.011 s

enum n = 1_000_000;
enum delta = 1.0 / n;

alias getTerm = (int i)
{
    immutable x = ( i - 0.5 ) * delta;
    return delta / ( 1.0 + x * x ) ;
};

immutable pi = 4.0 * taskPool.reduce!"a + b"(n.iota.map!getTerm);

assert(pi.approxEqual(3.1415926));

Переместиться к: args · done · executeInNewThread · ReturnType · spinForce · workForce · yieldForce

struct Task(alias fun, Args...);
Task представляет собой фундаментальную единицу работы. Задача Task может выполняться параллельно с любой другой задачей. Использование этой структуры напрямую предоставляет параллелизм типа future/promise. В этой парадигме функция (или делегат, или что-либо другое с возможностью вызова) выполняется в потоке, отличном от того, в которым он был вызван. Вызывающий поток не блокируется во время выполнения функции. Вызов workForce, yieldForce или spinForce используется для гарантирования завершения Task и получения возвращаемого значения, если таковое имеется. Эти функции, а также done, действуют как полные барьеры памяти (full memory barriers), что означает, что любые результаты операций записи в память, произведённых в потоке, выполняющем задачу Task, гарантированно будут видны в вызывающем потоке после возвращения одной из этих функций.
Для создания экземпляра этой структуры можно использовать функции std.parallelism.task и std.parallelism.scopedTask. Смотрите примеры использования в task.
Результаты функции возвращаются из yieldForce, spinForce и workForce по ссылке. Если fun возвращает ref, ссылка указывает на возвращаемую ref-ссылку функции fun. В противном случае он укажет на поле в этой структуре.
Копирование этой структуры отключено, так как она не предоставляет никакой полезной семантики. Если вы хотите передать эту структуру, вы должны сделать это по ссылке или через указатель.
Недостатки:
Изменения аргументов ref и out не распространяются на вызывающую сторону, а только на args в этой структуре.
alias args = _args[1 .. __dollar];
Аргументы, с которыми была вызвана функция. Изменения аргументов, связанные с out и ref будут видны здесь.
alias ReturnType = typeof(fun(_args));
Тип возвращаемого значения функции, вызываемой этой задачей. Может быть void.
@property ref @trusted ReturnType spinForce();
Если задача Task ещё не запущена, выполняет её в текущем потоке. Если она выполнена, вернёт её возвращаемое значение, если оно есть. Если задача выполняется, будет продолжать крутиться до тех пор, пока она не будет выполнена, а затем вернёт возвращаемое значение. Если в ней брошено исключение, это исключение перебрасывается.
Эта функция должна использоваться, когда вы ожидаете, что результат Task будет доступен за время, меньшее, чем переключение контекста операционной системы.
@property ref @trusted ReturnType yieldForce();
Если задача Task ещё не запущена, выполняет её в текущем потоке. Если она выполнена, вернёт её возвращаемое значение, если оно есть. Если задача выполняется, ждёт переменную состояния. Если в ней брошено исключение, это исключение перебрасывается.
Эта функция должна использоваться для дорогих функций, так как ожидание переменной состояния вводит латентность, но позволяет избежать траты циклов процессора.
@property ref @trusted ReturnType workForce();
Если задача Task ещё не запущена, выполняет её в текущем потоке. Если она выполнена, вернёт её результат. Если задача выполняется, выполняет любую другую задачу Task из экземпляра TaskPool, в который была отправлена эта задача, до тех пор, пока эта задача не будет завершена. Если в ней брошено исключение, это исключение перебрасывается. Если другие задачи недоступны, или эта задача была выполнена с использованием executeInNewThread, ждёт переменную состояния.
@property @trusted bool done();
Возвращает true, если задача Task завершена.
Исключения:
Перебрасывает любое исключение, возникшее во время выполнения задачи.
@trusted void executeInNewThread();

@trusted void executeInNewThread(int priority);
Создаёт новый поток для выполнения этой задачи Task, выполняет её во вновь созданном потоке, а затем завершает поток. Это можно использовать для параллелизма future/promise. Задаче можно задать явный приоритет priority. Если он предоставлен, его значение пересылается в core.thread.Thread.priority. См. пример использования в std.parallelism.task.

Переместиться к: 2 · 3

auto task(alias fun, Args...)(Args args);
Создает Task в куче, управляемой сборщиком мусора, которая вызывает псевдоним. Её можно выполнить через Task.executeInNewThread или путем отправки в std.parallelism.TaskPool. Глобально доступный экземпляр TaskPool предоставляется функцией std.parallelism.taskPool.
Возвращает:
Указатель на задачу Task.

Пример:

// Одновременно читает два файла в память.
import std.file;

void main()
{
    // Создание и выполнение задачи чтения файла
    // foo.txt.
    auto file1Task = task!read("foo.txt");
    file1Task.executeInNewThread();

    // Параллельное чтение файла bar.txt.
    auto file2Data = read("bar.txt");

    // Получить результат чтения foo.txt.
    auto file1Data = file1Task.yieldForce;
}
// Сортировка массива с использованием параллельного алгоритма быстрой сортировки.
// Первая часть выполняется последовательно. Обе 
// рекуррентные ветви выполняются параллельно.
//
// Тайминги сортировки массива из 1 000 000 элементов типа double 
// на двухъядерной машине Athlon 64 X2:
//
// Эта реализация:                             176 milliseconds.
// Эквивалентная последовательная реализация:  280 milliseconds
void parallelSort(T)(T[] data)
{
    // Маленькие подмассивы сортируются последовательно.
    if (data.length < 100)
    {
         std.algorithm.sort(data);
         return;
    }

    // Разделение массива.
    swap(data[$ / 2], data[$ - 1]);
    auto pivot = data[$ - 1];
    bool lessThanPivot(T elem) { return elem < pivot; }

    auto greaterEqual = partition!lessThanPivot(data[0..$ - 1]);
    swap(data[$ - greaterEqual.length - 1], data[$ - 1]);

    auto less = data[0..$ - greaterEqual.length - 1];
    greaterEqual = data[$ - greaterEqual.length..$];

    // Параллельное выполнение обеих рекурсивных ветвей.
    auto recurseTask = task!parallelSort(greaterEqual);
    taskPool.put(recurseTask);
    parallelSort(less);
    recurseTask.yieldForce;
}

auto task(F, Args...)(F delegateOrFp, Args args)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);
Создает задачу Task в куче, управляемой сборщиком мусора, которая вызывает указатель на функцию, делегат или класс/структуру с перегруженным методом opCall.

Пример:

// Читает два файла одновременно, но на этот раз 
// использует указатель на функцию вместо 
// псевдонима для представления std.file.read.
import std.file;

void main()
{
    // Создание и выполнение задачи чтения файла
    // foo.txt.
    auto file1Task = task(&read, "foo.txt");
    file1Task.executeInNewThread();

    // Параллельное чтение файла bar.txt.
    auto file2Data = read("bar.txt");

    // Получение результата чтения foo.txt.
    auto file1Data = file1Task.yieldForce;
}

Замечания: Эта функция принимает делегата, не относящегося к области видимости, то есть её можно использовать с замыканием. Если вы не можете распределить замыкание из-за объектов в стеке, которые have scoped destruction, смотрите вариант scopedTask, который принимает делегат области видимости.
This function takes a non-scope delegate, meaning it can be used with closures. If you can't allocate a closure due to objects on the stack that have scoped destruction, see scopedTask, which takes a scope delegate.

@trusted auto task(F, Args...)(F fun, Args args)
if (is(typeof(fun(args))) && isSafeTask!F);
Версия task, которую можно использовать из @safe-кода. Механизм использования идентичен не-@safe варианту, но безопасность вводит некоторые ограничения:
1. fun должна быть @safe или @trusted.
2. F не должен иметь никакого неразделяемого (unshared) псевдонима, как это определено std.traits.hasUnsharedAliasing. Это означает, что он не может быть неразделяемым делегатом или неразделяемым классом или структурой с перегруженным методом opCall. Также исключается возможность принятия параметров-псевдонимов шаблона.
3. Args не должны иметь никаких неразделяемых псевдонимов.
4. fun не должна возвращать по ссылке.
5. Тип возвращаемого значения не должен иметь неразделяемого псевдонима, если только fun не является pure, или если Task не выполняется через executeInNewThread вместо использования TaskPool.
auto scopedTask(alias fun, Args...)(Args args);

auto scopedTask(F, Args...)(scope F delegateOrFp, Args args)
if (is(typeof(delegateOrFp(args))) && !isSafeTask!F);

@trusted auto scopedTask(F, Args...)(F fun, Args args)
if (is(typeof(fun(args))) && isSafeTask!F);
Эти функции позволяют создавать объекты задач Task в стеке, а не в куче, управляемой сборщиком мусора. Время жизни Task, созданной scopedTask, не может превышать время жизни области видимости, в которой она была создана.
scopedTask может быть предпочтительнее task в следующих случаях:
1. Когда создаётся задача, вызывающая делегат, и замыкание не может быть распределено из-за объектов в стеке, которые have scoped destruction. Перегрузка scopedTask, относящаяся к делегатам, принимает scope delegate.
2. Как микро-оптимизация, чтобы избежать распределения памяти в куче, связанного с задачей или с созданием замыкания.
В остальном использование идентично task.

Замечания: Объекты задач Task, созданные с помощью scopedTask, автоматически вызовут метод Task.yieldForce в своем деструкторе, если необходимо, чтобы задача была завершена до того, как кадр стека, в котором они находятся, будет уничтожен.

immutable uint totalCPUs;
Общее количество ядер ЦП, доступных на текущем компьютере, как об этом сообщает операционная система.

Переместиться к: amap · asyncBuf · finish · isDaemon · map · parallel · priority · put · reduce · size · stop · this · workerIndex · WorkerLocalStorage · workerLocalStorage · WorkerLocalStorageRange

class TaskPool;
Этот класс инкапсулирует очередь задач и набор рабочих потоков. Его цель - эффективно отображать большое количество задач на меньшее количество потоков. Очередь задач – это FIFO-очередь объектов Task, которые были отправлены в TaskPool и ждут выполнения. Рабочий поток – это поток, который выполняет задачу Task из начала своей очереди, когда в очереди имеется задача, и спит, когда очередь пуста.
Этот класс, как правило, следует использовать с помощью глобальной копии, доступной через свойство std.parallelism.taskPool. Иногда полезно явно создавать экземпляр TaskPool:
1. Если вам нужны экземпляры TaskPool с несколькими приоритетами, например, пул с низким приоритетом и пул с высоким приоритетом.
2. Когда потоки в глобальном пуле задач ждут примитив синхронизации (например, мьютекс), и вы хотите распараллелить код, который должен быть запущен, до того, как эти потоки могут быть возобновлены.

Переместиться к: 2

@trusted this();
Конструктор по умолчанию, который инициализирует TaskPool с totalCPUs - 1 рабочими потоками. Минус 1 включен, потому что основной поток также будет доступен для работы.

Замечание: На одноядерных машинах примитивы, предоставляемые TaskPool, работают прозрачно в однопоточном режиме.

@trusted this(size_t nWorkers);
Позволяет настраивать количество рабочих потоков.
ParallelForeach!R parallel(R)(R range, size_t workUnitSize);

ParallelForeach!R parallel(R)(R range);
Реализует параллельный цикл foreach по диапазону range. Это работает путем неявного создания и отправки одной задачи Task в TaskPool для каждого рабочего потока. Рабочий блок (work unit) представляет собой набор последовательных элементов диапазона, которые должны обрабатываться рабочим потоком между взаимодействием с любым другим потоком. Количество элементов, обрабатываемых в рабочем блоке, контролируется параметром workUnitSize. Меньшие рабочие блоки обеспечивают лучшую балансировку нагрузки, но большие рабочие блоки позволяют избегать накладных расходов, связанных со взаимодействием с другими потоками, зачастую для получения следующего рабочего блока. Большие рабочие блоки также позволяют избегать false sharing (понятие, связанное с кешем отдельных ядер процессора; вот здесь расписывается более-менее обстоятельно: habrahabr.ru/company/intel/blog/143446/ – прим. пер.) в случаях, когда диапазон изменяется. Чем меньше времени занимает одна итерация цикла, тем больше должно быть значение workUnitSize. Для очень длительных итераций цикла значение workUnitSize должно быть равно 1. Также доступна перегрузка, которая выбирает размер рабочего блока по умолчанию.

Пример:

// Поиск логарифма каждого числа от 1 до
// 10_000_000 параллельно.
auto logs = new double[10_000_000];

// Параллельный foreach работает с индексной переменной или без нее.
// Это может быть итерация по ref, если range.front
// возвращает ref.

// Итерация по логарифмированию с использованием рабочих блоков размером 100. 
foreach (i, ref elem; taskPool.parallel(logs, 100))
{
    elem = log(i + 1.0);
}

// То же самое, но с использованием размера рабочего блока по умолчанию.
//
// Тайминги на двухъядерной машине Athlon 64 X2:
//
// Параллельный foreach:  388 миллисекунд
// Обычный foreach:       619 миллисекунд
foreach (i, ref elem; taskPool.parallel(logs))
{
    elem = log(i + 1.0);
}

Замечания: Использование памяти в этой реализации гарантируется константой в range.length.

Прерывание параллельного цикла foreach через операторы break, break с меткой, continue с меткой, return или goto, приведёт к исключению ParallelForeachError.
В случае диапазона без произвольного доступа, параллельный foreach лениво буферизует его в массив размера workUnitSize перед выполнением части параллельного цикла. Исключением является случай, если параллельный foreach выполняется по диапазону, возвращаемому asyncBuf или map, копирование не производится, и буферы просто меняются местами. В этом случае workUnitSize игнорируется, а размер рабочего блока устанавливается в размер буфера диапазона range.
Гарантируется, что при выходе из цикла будет выполняться барьер памяти, так что результаты, полученные всеми потоками, будут видны в вызывающем потоке.
Обработка исключений:
Если по крайней мере одно исключение выброшено из параллельного цикла foreach, подача дополнительных объектов Task прекращается как можно скорее, недетерминированным образом. Все исполняемые или выполненные рабочие блоки могут быть завершены. Затем, все исключения, которые были брошены любым рабочим блоком сцепляются (chained) с использованием Throwable.next и выбрасываются повторно. Порядок цепочки исключений не детерминирован.

Переместиться к: amap

template amap(functions...)
auto amap(Args...)(Args args)
if (isRandomAccessRange!(Args[0]));
Жадный (eager) параллельный map (отображение). Жадность этой функции означает, что у неё меньше накладных расходов, чем у лениво вычисляемой TaskPool.map, и является более предпочтительной, когда жадность приемлема по потребности в памяти. functions – это функции, подлежащие вычислению, переданные в качестве параметров-псевдонимов шаблонов в стиле, аналогичном std.algorithm.iteration.map. Первым аргументом должен быть диапазон с произвольным доступом. По соображениям производительности, amap будет считать элементы диапазона ещё не инициализированными. Элементы будут перезаписаны без вызова деструктора или выполнения присвоения. Таким образом, диапазон не должен содержать значимых данных: либо неинициализированные объекты, либо объекты в их состоянии .init.

Eager parallel map. The eagerness of this function means it has less overhead than the lazily evaluated TaskPool.map and should be preferred where the memory requirements of eagerness are acceptable. functions are the functions to be evaluated, passed as template alias parameters in a style similar to std.algorithm.iteration.map. The first argument must be a random access range. For performance reasons, amap will assume the range elements have not yet been initialized. Elements will be overwritten without calling a destructor nor doing an assignment. As such, the range must not contain meaningful

data: either un-initialized objects, or objects in their .init state.

auto numbers = iota(100_000_000.0);

// Поиск квадратных корней чисел.
//
// Тайминги на двухъядерной машине Athlon 64 X2:
//
// Параллельный жадный map:                   0.802 s
// Эквивалентная последовательная реализация: 1.768 s
auto squareRoots = taskPool.amap!sqrt(numbers);
Сразу после аргумента диапазона может быть предоставлен необязательный аргумент размера рабочего блока. Рабочие блоки, используемые в amap, идентичны тем, которые определены для параллельного foreach. Если размер рабочего блока не задан, используется размер рабочего блока по умолчанию.
// То же самое, но задаёт размер рабочего блока в 100. 
auto squareRoots = taskPool.amap!sqrt(numbers, 100);
В качестве последнего аргумента может быть предоставлен выходной диапазон для возврата результатов. Если он не предоставлен, будет выделен массив соответствующего типа в куче, управляемой сборщиком мусора. Если он предоставлен, он должен быть диапазоном с произвольным доступом с присваиваемыми элементами, должен иметь ссылочную семантику относительно присвоения его элементам, и должен иметь ту же длину, что и входной диапазон. Запись в соседние элементы из разных потоков должна быть безопасной.
// То же самое, но явно выделяется массив 
// для возврата результатов. Тип элемента
// может быть либо в точности тем же типом, 
// что и возвращаемое значение функций, либо 
// целью неявного преобразования.
auto squareRoots = new float[numbers.length];
taskPool.amap!sqrt(numbers, squareRoots);

// Несколько функций, явный выходной диапазон и
// явный размер рабочего блока.
auto results = new Tuple!(float, real)[numbers.length];
taskPool.amap!(sqrt, log)(numbers, 100, results);

Замечание: Гарантируется, что барьер памяти будет выполнен после того, как все результаты будут записаны, но перед возвратом, чтобы результаты, полученные всеми потоками, были видны в вызывающем потоке.

Советы: Чтобы выполнить map-операцию на-месте, укажите один и тот же диапазон для ввода и вывода.

Чтобы распараллелить копирование диапазона с дорогостоящими для вычисления элементами в массив, передайте в amap тождественную функцию (функцию, которая просто возвращает любой аргумент, переданный в неё).
Обработка исключений:
Если по крайней мере одно исключение выброшено из параллельного цикла foreach, подача дополнительных объектов Task прекращается как можно скорее, недетерминированным образом. Все исполняемые или выполненные рабочие блоки могут быть завершены. Затем, все исключения, которые были брошены любым рабочим блоком сцепляются (chained) с использованием Throwable.next и выбрасываются повторно. Порядок цепочки исключений не детерминирован.

Переместиться к: map

template map(functions...)
auto map(S)(S source, size_t bufSize = 100, size_t workUnitSize = size_t.max)
if (isInputRange!S);
Наполовину-ленивый параллельный map (отображение), который может использоваться для конвейерной обработки (pipelining). map-функции вычисляются для первых bufSize элементов, их результаты сохраняются в буфере, и становятся доступными для popFront. Тем временем на заднем плане заполняется второй буфер такого же размера. Когда первый буфер исчерпан, он заменяется вторым буфером и заполняется, пока считываются значения из того, что изначально было вторым буфером. Эта реализация позволяет записывать элементы в буфер без необходимости выполнять атомарные операции или синхронизацию при записи каждого элемента, и позволяет эффективно выполнять параллельное вычисление map-функции.
У map больше накладных расходов, чем у простой процедуры, используемой в amap, но она позволяет избежать одновременного удержания всех результатов в памяти и работает с диапазонами без произвольного доступа.
Параметры:
S source Входной диапазон для отображения. Если source не имеет произвольного доступа, он будет лениво буферизирован в массив размера bufSize до того, как будет вычисляться map-функция. (Исключения из этого правила см. в Примечаниях.)
size_t bufSize Размер буфера для хранения вычисленных элементов.
size_t workUnitSize Количество элементов для вычисления в одной задаче Task. Должен быть меньше или равен bufSize и должен быть делителем bufSize, чтобы можно было использовать все рабочие потоки. Если используется значение по умолчанию size_t.max, workUnitSize будет установлен по умолчанию в ширину пула.
Возвращает:
Входной диапазон, представляющий результаты отображения. Этот диапазон имеет параметр длины length, если источник source его имеет.

Замечания: Если диапазон возвращён из map или asyncBuf, и используется как входные данные для map, то в качестве оптимизации выполняется копирование из выходного буфера первого диапазона во входной буфер второго диапазона, хотя диапазоны, возвращённые из map и asyncBuf могут являться диапазонами без произвольного доступа. Это означает, что параметр bufSize, переданный в текущем вызове map, будет игнорироваться, а размер буфера будет размером буфера источника source.

Пример:

// Конвеерное (pipeline) чтение файла, 
// преобразование каждой строки в число,
// взятие логарифмов чисел и выполнение сложения -
// нахождение суммы логарифмов.

auto lineRange = File("numberList.txt").byLine();
auto dupedLines = std.algorithm.map!"a.idup"(lineRange);
auto nums = taskPool.map!(to!double)(dupedLines);
auto logs = taskPool.map!log10(nums);

double sum = 0;
foreach (elem; logs)
{
    sum += elem;
}
Обработка исключений:
Любые исключения, возникающие при итерации по источнику source или при вычислении map-функции, перебрасываются при вызове popFront или, если брошены во время построения, просто перенаправляются вызывающему. В случае исключений, возникающих при вычислении map-функции, исключения включаются в цепочку, как в TaskPool.amap.

Переместиться к: 2

auto asyncBuf(S)(S source, size_t bufSize = 100)
if (isInputRange!S);
Принимает дорогостоящий для итерации диапазон-источник source, возвращает входной диапазон, который асинхронно буферизует содержимое источника в буфер из bufSize элементов в рабочем потоке, одновременно делая ранее буферизованные элементы из второго буфера, также размером bufSize, доступными через интерфейс диапазона возвращаемого объекта. Возвращаемый диапазон имеет свойство длины, если S имеет свойство длины (т.е., если hasLength!S). asyncBuf полезен, например, при выполнении дорогостоящих операций над элементами диапазонов, которые представляют данные на диске или в сети.

Пример:

import std.conv, std.stdio;

void main()
{
    // Извлечение строк из файла в фоновом потоке 
    // при обработке ранее полученных строк,
    // где из буфера byLine жадно 
    // дублируется каждая строка.
    auto lines = File("foo.txt").byLine();
    auto duped = std.algorithm.map!"a.idup"(lines);

    // Извлечение других строк в фоновом режиме, в то время 
    // как мы преобразуем строки, уже прочитанные в память,
    // в матрицу, состоящую из double.
    double[][] matrix;
    auto asyncReader = taskPool.asyncBuf(duped);

    foreach (line; asyncReader)
    {
        auto ls = line.split("\t");
        matrix ~= to!(double[])(ls);
    }
}
Обработка исключений:
Любые исключения, возникающие при итерации по источнику source, перебрасываются при вызове popFront или, если брошены во время построения, просто перенаправляются вызывающему.

auto asyncBuf(C1, C2)(C1 next, C2 empty, size_t initialBufSize = 0, size_t nBuffers = 100)
if (is(typeof(C2.init()) : bool) && Parameters!C1.length == 1 && Parameters!C2.length == 0 && isArray!(Parameters!C1[0]));
Принимает вызываемый объект next (следующий), который пишет в предоставленный пользователем буфер, и второй вызываемый объект empty (пусто), который определяет, доступны ли ещё данные для записи через next, возвращает входной диапазон, который асинхронно вызывает next с набором буферов размером nBuffers и делает результаты доступными в том порядке, в котором они были получены через интерфейс входного диапазона возвращаемого объекта. Аналогично перегрузке asyncBuf с входным диапазоном, первая половина буферов становится доступной через интерфейс диапазона, в то время как вторая половина заполняется и наоборот.
Параметры:
C1 next Вызываемый объект, который принимает один аргумент, который должен быть массивом с изменяемыми элементами. При вызове next записывает данные в массив, предоставленный вызывающим.
C2 empty Вызываемый объект, который не принимает аргументов и возвращает тип, неявно конвертируемый в bool. Он используется для обозначения того, что данные, получаемые при вызове next, более не доступны.
size_t initialBufSize Исходный размер каждого буфера. Если next принимает свой массив по ссылке, он может изменить размер буферов.
size_t nBuffers Количество буферов для перебора при вызове next.

Пример:

// Получение строки файла в фоновом потоке 
// при одновременной обработке ранее
// выбранных строк без их дублирования.
auto file = File("foo.txt");

void next(ref char[] buf)
{
    file.readln(buf);
}

// Извлечение следующих строк в фоновом режиме, 
// в то время как мы преобразуем строки,
// уже прочитанные в память, в матрицу,
// состоящую из double.
double[][] matrix;
auto asyncReader = taskPool.asyncBuf(&next, &file.eof);

foreach (line; asyncReader)
{
    auto ls = line.split("\t");
    matrix ~= to!(double[])(ls);
}
Обработка исключений:
Любые исключения, возникающие при итерировании по диапазону, перебрасываются при вызове popFront.

Предупреждение: Нельзя использовать диапазон, возвращаемый этой функцией, в параллельном цикле foreach, потому что буферы могут быть перезаписаны в то время, когда обрабатывающая их задача находится в очереди. Это проверяется во время компиляции и приведёт к static assertion failure.

Переместиться к: reduce

template reduce(functions...)
auto reduce(Args...)(Args args);
Параллельный reduce над диапазоном с произвольным доступом. Если не указано иное, использование похоже на std.algorithm.iteration.reduce. Эта функция работает, разбивая диапазон, к которому нужно применить reduce, на рабочие блоки, которые обрабатываются параллельно. Как только результаты всех рабочих блоков вычислены, для этих результатов выполняется окончательный последовательный reduce для вычисления окончательного ответа. Поэтому необходимо осторожно выбирать значение семени.
Поскольку reduce выполняется параллельно, функции functions должны быть ассоциативными. Для простоты нотации, пусть # – инфиксный оператор, представляющий functions. Тогда (a # b) # c должно быть равно a # (b # c). Сложение чисел с плавающей точкой не является ассоциативным, в отличие от сложения в точной арифметике. Сложение чисел с плавающей точкой с использованием этой функции может давать отличающиеся результаты в сравнении с последовательным сложением. Однако для большинства практических целей сложение чисел с плавающей запятой можно рассматривать как ассоциативное.
Заметим, что, поскольку функции functions предполагаются ассоциативными, в последовательную часть алгоритма добавляются дополнительные оптимизации. Они используют параллелизм уровня инструкций современных процессоров, в дополнение к параллелизму уровня потоков, который используется везде в этом модуле. Это может привести к улучшению линейных ускорений относительно std.algorithm.iteration.reduce, особенно для fine-grained тестов, таких как скалярное произведение.
В качестве первого аргумента можно предоставить явное семя. Если оно предоставлено, оно используется в качестве семени для всех рабочих блоков и для окончательного reduce результатов от всех рабочих блоков. Поэтому, если оно не является значением идентичности для выполняемой операции, результаты могут отличаться от результатов, генерируемых std.algorithm.iteration.reduce или в зависимости от того, сколько рабочих блоков используется. Следующим аргументом должен быть диапазон, к которому нужно применить reduce.
// Параллельное нахождение суммы квадратов диапазона, 
// с использованием явного семени.
//
// Тайминги на двухъядерной машине Athlon 64 X2:
//
// Параллельный reduce:                         72 milliseconds
// Использование std.algorithm.reduce instead:  181 milliseconds
auto nums = iota(10_000_000.0f);
auto sumSquares = taskPool.reduce!"a + b"(
    0.0, std.algorithm.map!"a * a"(nums)
);
Если явное семя не предоставляется, первый элемент каждого рабочего блока используется в качестве семени. Для окончательного reduce в качестве семени используется результат работы первого рабочего блока.
// Параллельный поиск суммы диапазона, с использованием 
// первого элемента каждого рабочего блока в качестве семени.
auto sum = taskPool.reduce!"a + b"(nums);
В качестве последнего аргумента можно явно указать размер рабочего блока. Задание слишком малого размера рабочего блока даст эффективную сериализацию reduce, а окончательный reduce результатов каждого рабочего блока будет доминировать во времени вычисления. Если TaskPool.size для этого экземпляра равен нулю, этот параметр игнорируется и используется один рабочий блок.
// Использование рабочего блока с размером 100.
auto sum2 = taskPool.reduce!"a + b"(nums, 100);

// Рабочий блок с размером 100 и явное семя.
auto sum3 = taskPool.reduce!"a + b"(0.0, nums, 100);
Параллельный reduce поддерживает множественные функции, подобно std.algorithm.reduce.
// Нахождение сразу минимума и максимума диапазона nums.
auto minMax = taskPool.reduce!(min, max)(nums);
assert(minMax[0] == reduce!min(nums));
assert(minMax[1] == reduce!max(nums));
Обработка исключений:
После завершения этой функции любые выброшенные исключения будут сгруппированы в цепочку с помощью Throwable.next и повторно выброшены. Порядок цепочки не детерминирован.
const nothrow @property @safe size_t workerIndex();
Возвращает индекс текущего потока относительно этого TaskPool. Любой поток не в этом пуле получит индекс 0. Рабочие потоки в этом пуле получат уникальные индексы от 1 до this.size.
Эта функция полезна для поддержания локальных для рабочего потока ресурсов.

Пример:

// Выполняет цикл, параллельно вычисляющий наибольший 
// общий делитель каждого из чисел от 0 до 999
// с 42. Записывает результаты в набор файлов,
// по одному для каждого потока. Это позволяет
// выводить результаты без какой-либо синхронизации.

import std.conv, std.range, std.numeric, std.stdio;

void main()
{
    auto filesHandles = new File[taskPool.size + 1];
    scope(exit) {
        foreach (ref handle; fileHandles)
        {
            handle.close();
        }
    }

    foreach (i, ref handle; fileHandles)
    {
        handle = File("workerResults" ~ to!string(i) ~ ".txt");
    }

    foreach (num; parallel(iota(1_000)))
    {
        auto outHandle = fileHandles[taskPool.workerIndex];
        outHandle.writeln(num, '\t', gcd(num, 42));
    }
}

Переместиться к: get · toRange

struct WorkerLocalStorage(T);
Структура для создания хранилища worker-local. Хранилище worker-local – это локальное для потока хранилище, которое существует только для рабочих потоков в данном пуле TaskPool плюс один поток вне пула. Он выделяется в куче, управляемой сборщиком мусора, таким образом, чтобы избежать false sharing, и не требует иметь глобальную область видимости в любом потоке. Он может быть доступен из любого рабочего потока в TaskPool, который его создал, и одного потока вне этого TaskPool. Все потоки вне пула, который создал данный экземпляр worker-local хранилища, используют один слот.
Поскольку нижележащие данные для этой структуры распределены в куче, эта структура имеет ссылочную семантику при передаче между функциями.
Основными вариантами использования WorkerLocalStorageStorage являются:
1. Выполнение параллельных reductions в императивном, а не функциональном, стиле программирования. В этом случае полезно рассматривать WorkerLocalStorageStorage как локальный для каждого потока только для одной части параллельного алгоритма.
2. Утилизация временных буферов сквозь (?? across) итерации параллельного цикла foreach.

Пример:

// Вычисление pi, как в нашем примере в начале модуля, 
// но с использованием императивного стиля вместо функционального.
immutable n = 1_000_000_000;
immutable delta = 1.0L / n;

auto sums = taskPool.workerLocalStorage(0.0L);
foreach (i; parallel(iota(n)))
{
    immutable x = ( i - 0.5L ) * delta;
    immutable toAdd = delta / ( 1.0 + x * x );
    sums.get += toAdd;
}

// Сложение результатов от каждого рабочего потока.
real pi = 0;
foreach (threadResult; sums.toRange)
{
    pi += 4.0L * threadResult;
}

Переместиться к: 2

@property ref T get();
Получение экземпляра текущего потока. Возвращает по ссылке (by ref). Обратите внимание, что вызов get из любого потока вне TaskPool, создавшего этот экземпляр, вернет ту же ссылку, поэтому экземпляр хранилища worker-local должен быть доступен только из одного потока за пределами созданного пула. Нарушение этого правила приведет к неопределенному поведению.
Если включены утверждения (assertions) и вызывается toRange, то этот экземпляр WorkerLocalStorageStorage больше не является worker-local, и при вызове этого метода возникает ошибка утверждения (assertion failure). Проверка не производится, если утверждения отключены по соображениям производительности.
@property void get(T val);
Присвоение значения экземпляру текущего потока. Для этой функции действуют все те же предостережения, что и для её перегрузки.
@property WorkerLocalStorageRange!T toRange();
Возвращает значения для всех потоков в виде диапазона, который можно использовать для дальнейшей обработки результатов каждого потока после выполнения параллельной части вашего алгоритма. Не используйте этот метод в параллельной части вашего алгоритма.
Вызов этой функции устанавливает флаг, указывающий, что эта структура больше не является worker-local, а попытка снова использовать метод get приведет к ошибке утверждения (assertion failure), если утверждения включены.
struct WorkerLocalStorageRange(T);
Диапазон примитивов для хранилища worker-local. Его целью является доступ к результатам работы каждого рабочего потока из единого потока, после того, как хранилище worker-local больше не используется из нескольких потоков. Не используйте эту структуру в параллельной части вашего алгоритма.
Правильный способ создания экземпляра этого объекта – вызвать WorkerLocalStorage.toRange. После создания экземпляра этот объект ведет себя как конечный диапазон с произвольным доступом с приваиваемыми, lvalue элементами и длиной, равной количеству рабочих потоков в TaskPool, создавшем их, плюс 1.
WorkerLocalStorage!T workerLocalStorage(T)(lazy T initialVal = T.init);
Создает экземпляр хранилища worker-local, инициализированный заданным значением. Значение является ленивым, так что вы можете, например, легко создать один экземпляр класса для каждого обработчика. Смотрите пример использования в описании структуры WorkerLocalStorage.
@trusted void stop();
Посылает сигналы всем рабочим потокам завершаться, как только они закончат с их текущей задачей Task, или немедленно, если они не выполняют задачу. Задачи, которые были в очереди, не будут выполняться, если к их выполнению не приведет вызов Task.workForce, Task.yieldForce или Task.spinForce.
Используйте только в том случае, если вы выполняли ожидание в каждой Task и, следовательно, знаете, что очередь пуста, или если вы спекулятивно выполнили некоторые задачи и больше не нуждаетесь в результатах. Use only if you have waited on every Task and therefore know the queue is empty, or if you speculatively executed some tasks and no longer need the results.
@trusted void finish(bool blocking = false);
Сигнализирует рабочим потокам о завершении работы, когда очередь становится пустой.
Если аргумент blocking установлен в true, ждёт завершения всех рабочих потоков перед возвратом. Этот параметр может использоваться в приложениях, где результаты задачи никогда не потребляются – например, когда TaskPool используется как рудиментарный планировщик для задач, которые обмениваются посредством чего-то, отличного от возвращаемых значений.

Предупреждение: Вызов этой функции с параметром blocking = true из рабочего потока, который является членом этого же TaskPool, у которого вызван метод finish, приведет к тупиковой ситуации (deadlock).

const pure nothrow @property @safe size_t size();
Возвращает количество рабочих потоков в пуле.
void put(alias fun, Args...)(ref Task!(fun, Args) task)
if (!isSafeReturn!(typeof(task)));

void put(alias fun, Args...)(Task!(fun, Args)* task)
if (!isSafeReturn!(typeof(*task)));
Помещает объект задачи Task в конец очереди задач. Объект Task можно передать указателем или ссылкой.

Пример:

import std.file;

// Создать задачу.
auto t = task!read("foo.txt");

// Добавить её в очередь для выполнения.
taskPool.put(t);

Замечания: @trusted-перегрузки этой функции вызываются для задач Task, если для возвращаемого типа задачи значение std.traits.hasUnsharedAliasing является ложным, или если функция, выполняемая задачей, является pure (чистой). Объекты Task, которые удовлетворяют всем остальным требованиям, указанным в @trusted-перегрузках task и scopedTask, можно создавать и выполнять из @safe-кода через Task.executeInNewThread, но не через TaskPool.

Хотя эта функция принимает адрес переменных, которые могут находиться в стеке, некоторые перегрузки отмечены как @trusted. Задача Task включает деструктор, ожидающий завершения задачи, прежде чем уничтожить фрейм стека, на который он назначен. Поэтому невозможно, чтобы кадр стека был уничтожен до завершения задачи и больше не ссылался на TaskPool.

@property @trusted bool isDaemon();

@property @trusted void isDaemon(bool newVal);
Эти свойства определяют, являются ли рабочие потоки потоками-демонами. Поток-демон автоматически прекращается, когда прекращаются все потоки-не-демоны. Поток-не-демон предотвратит завершение программы, пока он не завершится.
Если какой-либо объект TaskPool с потоками-не-демонами активен, у него необходимо вызвать либо stop, либо finish, прежде чем программа сможет завершить работу.
Рабочие потоки в экземпляре TaskPool, возвращаемом свойством taskPool, по умолчанию являются демонами. Рабочие потоки создаваемых вручную пулов задач по умолчанию не являются демонами.

Замечание: Для пула нулевого размера, геттер произвольно возвращает значение true, а сеттер не действует.

@property @trusted int priority();

@property @trusted void priority(int newPriority);
Эти функции позволяют получать и устанавливать приоритет планирования операционной системы (OS scheduling priority) для рабочих потоков в этом экземпляре TaskPool. Они перенаправляются в core.thread.Thread.priority, поэтому данное значение приоритета priority здесь означает то же самое, что и идентичное значение priority в core.thread.

Замечание: Для пула нулевого размера, геттер произвольно возвращает значение core.thread.Thread.PRIORITY_MIN, а сеттер не действует.

@property @trusted TaskPool taskPool();
Возвращает лениво инициализируемый глобальный экземпляр TaskPool. Эту функцию можно безопасно вызывать одновременно из нескольких потоков, не являющихся рабочими. Рабочие потоки в этом пуле являются потоками-демонами, что означает, что нет необходимости вызывать TaskPool.stop или TaskPool.finish перед завершением основного потока.
@property @trusted uint defaultPoolThreads();

@property @trusted void defaultPoolThreads(uint newVal);
Эти свойства получают и устанавливают количество рабочих потоков в экземпляре TaskPool, возвращаемом свойством taskPool. Значение по умолчанию равно totalCPUs - 1. Вызов сеттера после первого вызова taskPool не изменяет количество рабочих потоков в экземпляре, возвращаемом taskPool.
ParallelForeach!R parallel(R)(R range);

ParallelForeach!R parallel(R)(R range, size_t workUnitSize);
Удобные функции, которые перенаправляются в taskPool.parallel. Из целью является сделать синтаксис параллельного foreach короче и удобнее для чтения.

Пример:

// Параллельный поиск логарифмов каждого 
// из чисел от 1 до 1_000_000, с использованием
// экземпляра TaskPool по умолчанию.
auto logs = new double[1_000_000];

foreach (i, ref elem; parallel(logs))
{
    elem = log(i + 1.0);
}