Это низкоуровневый API обмена сообщениями, на котором могут быть построены более структурированные или ограничительные API. Общая идея состоит в том, что каждая пересылаемая сущность представлена общим типом дескриптора, называемым Tid, который позволяет отправлять сообщения в логические потоки, которые выполняются как в текущем процессе, так и во внешних процессах с использованием одного и того же интерфейса. Это важный аспект масштабируемости, поскольку он позволяет распределять компоненты программы по доступным ресурсам с незначительными изменениями в реальной реализации.
Логический поток - это контекст выполнения, который имеет свой собственный стек и который выполняется асинхронно по отношению к другим логическим потокам. Это могут быть
preemptively
scheduled потоки ядра, нити (совместные потоки в пользовательском пространстве) или некоторые другие концепции с аналогичным поведением.
Тип параллельного выполнения, используемый при создании логических потоков, определяется планировщиком (Scheduler), выбранным во время инициализации. В настоящее время поведение по умолчанию заключается в создании нового потока ядра для каждого порождающего вызова, но существуют другие планировщики, которые мультиплексируют нити внутри основного потока, или используют некоторую комбинацию этих двух подходов.
class
MessageMismatch:
object.Exception;
Исключение, выбрасываемое шаблоном receiveOnly, если отправлено сообщение с типом, отличным от ожидаемого в принимающем потоке.
class
OwnerTerminated:
object.Exception;
Исключение, выбрасываемое шаблоном receive, если поток, породивший принимающий поток, был прекращён, и сообщений больше не существует.
class
LinkTerminated:
object.Exception;
Исключение, выбрасываемое, если связанный поток прекращён.
class
PriorityMessageException:
object.Exception;
Исключение, выбрасываемое, если сообщение было отправлено в поток через
std.concurrency.prioritySend, и у получателя нет обработчика сообщений такого типа.
Посланное сообщение.
class
MailboxFull:
object.Exception;
Исключение, выбрасываемое при переполнении почтового ящика (mailbox), если почтовый ящик настроен с помощью OnCrowding.throwException.
class
TidMissingException:
object.Exception;
Исключение, выбрасываемое в случаем отсутствия Tid, например, когда ownerTid не находит поток-владелец.
Тип, используемый для представления логического потока.
void
toString(scope void delegate(const(char)[])
sink);
Создаёт удобную строку для идентификации этого Tid. Это полезно только для того, чтобы увидеть, являются ли Tid'ы, которые в настоящее время выполняются, одинаковыми или разными, например, для ведения журнала или отладки. Возможна ситуация, при которой Tid, выполняемый в будущем, будет иметь тот же результат toString(), что и другой Tid, который уже завершён.
@property @safe Tid
thisTid();
Возвращает Tid текущего потока.
@property Tid
ownerTid();
Возвращает Tid потока, который породил текущий поток.
Исключения: TidMissingException в случае остутсвия потока-владельца.
Tid
spawn(F, T...)(F
fn, T
args)
if (isSpawnable!(F, T));
Запускает fn(args) в новом логическом потоке.
Выполняет переданную функцию в новом логическом потоке, представленном Tid. Вызывающий поток назначается владельцем нового потока. Когда поток-владелец прекращается, в новый поток будет отправлено сообщение OwnerTerminated, в результате чего будет вызвано исключение OwnerTerminated при вызове receive().
Параметры: F fn |
Выполняемая функция. |
T args |
Аргументы функции. |
Возвращает: Tid, представляющий новый логический поток.
Замечания:
Аргументы args не должны иметь неразделямых псевдонимов. Другими словами, все аргументы, передаваемые в fn должны быть либо разделяемыми (shared), либо неизменяемыми (immutable), или не иметь косвенного доступа через указатель. Это необходимо для обеспечения изоляции между потоками.
Пример:
import std.stdio, std.concurrency;
void f1(string str)
{
writeln(str);
}
void f2(char[] str)
{
writeln(str);
}
void main()
{
auto str = "Hello, world";
auto tid1 = spawn(&f1, str);
auto tid2 = spawn(&f2, str.dup);
spawn({ writeln("This is so great!"); });
}
Tid
spawnLinked(F, T...)(F
fn, T
args)
if (isSpawnable!(F, T));
Запускает fn(args) в логическом потоке и получает сообщение LinkTerminated при завершении операции.
Выполняет переданную функцию в новом логическом потоке, представленном Tid.
Этот новый поток связан с вызывающим потоком, так что если либо он, либо вызывающий поток завершается, другому будет отправлено сообщение LinkTerminated, в результате чего будет выброшено исключение LinkTerminated в receive(). Также сохраняются отношения владелец-потомок от spawn(), поэтому, если связь между потоками нарушена, завершение потока-владельца всё равно приведёт к исключению OwnerTerminated, которое будет выброшено в receive().
Параметры: F fn |
Выполняемая функция. |
T args |
Аргументы функции. |
Возвращает: Tid, представляющий новый поток.
void
send(T...)(Tid
tid, T
vals);
Помещает значения vals в качестве сообщения в конце очереди сообщений для потока tid.
Отправляет заданное значение в поток, представленный
tid. Как и в случае с
std.concurrency.spawn,
T не должен иметь неразделяемых псевдонимов.
void
prioritySend(T...)(Tid
tid, T
vals);
Помещает значения vals в качестве сообщения в начале очереди сообщений для потока tid.
Отправляет сообщение в tid, но помещает его в начало очереди сообщений потока tid, а не в конце. Эта функция обычно используется для экстренной связи, сигналах об исключительной ситуации и т.д.
void
receive(T...)(T
ops);
Получает сообщение из другого потока.
Получает сообщение из другого потока, или блокирует, если нет сообщений указанных типов.
Эта функция работает по шаблону, сопоставляющему сообщение с набором делегатов и выполняющему первое найденное совпадение.
Если делегат, принимающий
std.variant.Variant, включается в качестве последнего аргумента
receive, он будет соответствовать любому сообщению, которое не было сопоставлено с раннее встретившимся делегатом. Если передано более одного аргумента,
Variant будет содержать кортеж
std.typecons.Tuple из всех отправленных значений.
Пример:
import std.stdio;
import std.variant;
import std.concurrency;
void spawnedFunction()
{
receive(
(int i) { writeln("Received an int."); },
(float f) { writeln("Received a float."); },
(Variant v) { writeln("Received some other type."); }
);
}
void main()
{
auto tid = spawn(&spawnedFunction);
send(tid, 42);
}
receiveOnlyRet!T
receiveOnly(T...)();
Получает сообщения только с аргументами типов T.
Исключения: MessageMismatch, если получено сообщение из типов, отличных от T.
Возвращает:
Полученное сообщение. Если
T.length больше единицы,
сообщение будет упаковано в кортеж
std.typecons.Tuple.
Пример:
import std.concurrency;
void spawnedFunc()
{
auto msg = receiveOnly!(int, string)();
assert(msg[0] == 42);
assert(msg[1] == "42");
}
void main()
{
auto tid = spawn(&spawnedFunc);
send(tid, 42, "42");
}
bool
receiveTimeout(T...)(Duration
duration, T
ops);
Попытается получить сообщение, но сдастся, если не появится соответсвия за время
duration. Не будет ждать вообще, если переданное
core.time.Duration отрицательно.
То же, что и
receive, за исключением того, что вместо бесконечного ожидания сообщения, ожидает, пока не получит сообщение или пока не пройдёт переданное время
core.time.Duration. Возвращает
true, если сообщение получено, и
false, если время ожидания закончилось.
Эти типы поведения могут быть указаны на случай переполнения почтового ящика.
Подождать, пока не появится пространство.
Выбросить исключение MailboxFull.
Отменить отправку и вернуться.
void
setMaxMailboxSize(Tid
tid, size_t
messages, OnCrowding
doThis);
Устанавливает максимальный размер почтового ящика.
Устанавливает ограничение на максимальное количество сообщений пользователя, разрешённых в почтовом ящике.
Если этот предел достигнут, вызывающий поток, пытающийся добавить новое сообщение, выполнит поведение, указанное в doThis.
Если messages равно нулю, почтовый ящик неограничен.
Параметры: Tid tid |
Tid потока, для которого этот предел должен быть установлен. |
size_t messages |
Максимальное количество сообщений или ноль, если нет ограничений. |
OnCrowding doThis |
Поведение, выполняемое при отправке сообщения в заполненный почтовый ящик. |
void
setMaxMailboxSize(Tid
tid, size_t
messages, bool function(Tid)
onCrowdingDoThis);
Устанавливает максимальный размер почтового ящика.
Устанавливает ограничение на максимальное количество сообщений пользователя, разрешённых в почтовом ящике.
Если это ограничение достигнуто, вызывающий поток, пытающийся добавить новое сообщение, выполнит onCrowdingDoThis.
Если messages равно нулю, почтовый ящик неограничен.
Параметры: Tid tid |
Tid потока, для которого этот предел должен быть установлен. |
size_t messages |
Максимальное количество сообщений или ноль, если нет ограничений. |
bool function(Tid) onCrowdingDoThis |
Процедура, вызываемая, когда сообщение отправляется в заполненный почтовый ящик.
|
bool
register(string
name, Tid
tid);
Ассоциирует имя name с tid.
Сопоставляет имя name с tid в локальной области видимости процесса.
Когда поток, представленный tid, завершается, любые связанные с ним имена будут автоматически разрегистрированы.
Параметры: string name |
Имя, ассоциируемое с tid. |
Tid tid |
Tid, для которого региструруется имя. |
Возвращает:
true, если имя name доступно, и tid представляет собой неумерший поток.
bool
unregister(string
name);
Удаляет зарегистрированное имя name, связанное с tid.
Параметры: string name |
Разрегиструруемое имя. |
Возвращает: true, если имя name зарегестрировано,
false, если нет.
Получить Tid, связанный с именем name.
Параметры: string name |
Имя для поиска.
|
Возвращает:
Связанный Tid или Tid.init, если имя name не зарегистрировано.
Инкапсулирует все данные уровня реализации, необходимые для планирования.
При определении Scheduler экземпляр этой структуры должен быть связан с каждым логическим потоком.
Он содержит всю информацию уровня реализации, необходимую внутреннему API.
static nothrow @property ref auto
thisInfo();
Получает локальный для потока экземпляр ThreadInfo.
Получает локальный для потока экземпляр ThreadInfo, который должен использоваться как экземпляр по умолчанию,
когда запрашивается информация для потока, не созданного Scheduler'ом.
Очищает эту ThreadInfo.
Нужно вызывать при завершении запланированного потока. Демонтирует систему обмена сообщениями для потока и уведомляет заинтересованные стороны о завершении потока.
Планировщик, управляющий тем, как потоки выполняются при spawn.
Реализация Scheduler позволяет настроить механизм параллелизма,
используемый этим модулем, в соответствии с различными потребностями.
По умолчанию вызов spawn создаст новый поток ядра, который выполнит предоставленную процедуру и уничтожится при её завершении.
Но можно создавать планировщики, которые повторно используют потоки,
которые мультиплексируют нити (сопрограммы) в составе единого потока или любое количество других подходов.
Сделав выбор Scheduler опцией уровня пользователя,
std.concurrency может использоваться для гораздо большего количества типов приложений, чем если бы это поведение было предопределено.
Пример:
import std.concurrency;
import std.stdio;
void main()
{
scheduler = new FiberScheduler;
scheduler.start(
{
writeln("the rest of main goes here");
});
}
У некоторых планировщиков есть цикл диспетчеризации, который должен выполняться,
чтобы они работали должным образом, поэтому для согласованности, при использовании планировщика, start() необходимо вызывать внутри main().
Это передаёт управление планировщику и гарантирует, что все порождённые потоки будут выполняться ожидаемым образом.
abstract void
start(void delegate()
op);
Порождает переданный op и запускает Scheduler.
Предполагается, что он будет вызываться в начале программы, чтобы передать все scheduling (yield all
scheduling) активному экземпляру планировщика.
Это необходимо для планировщиков, которые явно управляют потоками, а не просто полагаются для этого на операционную систему,
и поэтому start всегда нужно вызывать внутри main(),
чтобы начать нормальное выполнение программы.
Параметры: void delegate() op |
Обертка для любого основного потока, который выполнялся бы в отсутствие настраиваемого планировщика.
Scheduler автоматически выполнит его через вызов spawn.
|
abstract void
spawn(void delegate()
op);
Назначает логический поток для выполнения переданного op.
Эта процедура вызывается через spawn.
Ожидается, что он создаст новый логический поток и запустит операцию.
Этот поток должен вызвать thisInfo.cleanup() при завершении, если запланированный поток не является потоком ядра –
все потоки ядра будут автоматически очищаться от ThreadInfo локальным деструктором потока.
Параметры: void delegate() op |
Функция для выполнения. Это может быть настоящей функцией, переданной пользователем для порождения её самой, или может быть оберткой функции.
|
abstract nothrow void
yield();
Обеспечивает выполнение в другом логическом потоке.
Эта процедура вызывается в разных местах в API-интерфейсах, совместимых с параллелизмом,
чтобы предоставить планировщику возможность выполнить yield
при использовании какой-то совместной модели многопоточности.
Если это не подходит, например, когда каждый логический поток поддерживается выделенным потоком ядра, эта процедура может быть отключена.
abstract nothrow @property ref ThreadInfo
thisInfo();
Возвращает соответствующий экземпляр ThreadInfo.
Возвращает экземпляр ThreadInfo, специфичный для логического потока, который вызывает эту процедуру, или,
если вызывающий поток не был создан этим планировщиком, вместо этого возвращает ThreadInfo.thisInfo.
abstract nothrow Condition
newCondition(Mutex
m);
Создает аналог переменной состояния для сигнализации.
Создает новый аналог переменной Condition (состояние), который используется для проверки и сигнализации о добавлении сообщений в очередь сообщений потока.
Как и в случае yield, некоторым планировщикам может потребоваться определить пользовательское поведение,
чтобы вызовы Condition.wait() вместо блокировки передавали управление к другому потоку, если новые сообщения не доступны.
Параметры: Mutex m |
Мьютекс, который будет связан с этим состоянием. Он будет заблокирован до любой операции с этим состоянием,
и поэтому в некоторых случаях планировщику может потребоваться удерживать ссылку на него и разблокировать мьютекс,
прежде чем передать выполнение в другой логический поток.
|
class
ThreadScheduler:
std.concurrency.Scheduler;
Пример планировщика Scheduler, использующий потоки ядра.
Это экземпляр планировщика, который отражает поведение планирования по умолчанию, в котором создаётся однин поток ядра для каждого вызова spawn.
Он полностью функционален, и его можно создавать и использовать, но не является необходимой частью функционирования данного модуля по умолчанию.
void
start(void delegate()
op);
Просто запускает op напрямую, поскольку при таком подходе не требуется никакого реального планирования.
void
spawn(void delegate()
op);
Создает новый поток ядра и назначает его для запуска переданного op.
Этот планировщик не имеет явного мультиплексирования, поэтому это пустая операция.
nothrow @property ref ThreadInfo
thisInfo();
Возвращает ThreadInfo.thisInfo, так как это локальный экземпляр потока ThreadInfo, что является правильным поведением для этого планировщика.
nothrow Condition
newCondition(Mutex
m);
Создает новую переменную Condition. Здесь не нужно никакого нестандартного поведения.
class
FiberScheduler:
std.concurrency.Scheduler;
Экземпляр планировщика с использованием нитей (
Fiber, класс определённый в модуле core.thread – прим.пер.).
Это экземпляр планировщика, который создаёт новую нить при каждом вызове spawn и мультиплексирует выполнение всех нитей в основном потоке.
void
start(void delegate()
op);
Создает новую нить для переданного op, а затем запускает диспетчер.
nothrow void
spawn(void delegate()
op);
Создает новую нить для переданного op и добавляет её в список диспетчеризации.
Если вызывается из планируемой нити, это приведёт к передаче выполнения другой запланированной нити.
nothrow @property ref ThreadInfo
thisInfo();
Возвращает соответствующий экземпляр ThreadInfo.
Возвращает экземпляр ThreadInfo, специфичный для вызывающей нити, если нить была создана этим диспетчером, в противном случае возвращает ThreadInfo.thisInfo.
nothrow Condition
newCondition(Mutex
m);
Возвращает аналог Condition (состояние), который передаётся при вызове wait или notify.
Устанавливает поведение планировщика в программе.
Эта переменная задает поведение планировщика Scheduler в этой программе.
Как правило, при установке планировщика, scheduler.start() следует вызывать в функции main. Выход из этой процедуры не произойдёт, пока не будет завершено выполнение программы.
Если вызывающий объект является нитью Fiber и не является генератором Generator,
эта функция вызовет scheduler.yield() или Fiber.yield(), в зависимости от обстоятельств.
class
Generator(T): Fiber, IsGenerator;
Генератор Generator – это нить,
которая периодически возвращает значения типа T в вызывающий поток с помощью yield. Действует как входной диапазон InputRange.
Пример:
import std.concurrency;
import std.stdio;
void main()
{
auto tid = spawn(
{
while (true)
{
writeln(receiveOnly!int());
}
});
auto r = new Generator!int(
{
foreach (i; 1 .. 10)
yield(i);
});
foreach (e; r)
{
tid.send(e);
}
}
Переместиться к: 2 · 3 · 4
this(void function()
fn);
Инициализирует объект генератора, связанный со статической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры: void function() fn |
Функция, передаваемая в нить. |
In:
fn не должна быть null.
this(void function()
fn, size_t
sz);
Инициализирует объект генератора, связанный со статической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры: void function() fn |
Функция, передаваемая в нить. |
size_t sz |
Размер стека для этой нити. |
In:
fn не должна быть null.
this(void delegate()
dg);
Инициализирует объект генератора, связанный с динамической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры: void delegate() dg |
Функция, передаваемая в нить. |
In:
dg не должен быть null.
this(void delegate()
dg, size_t
sz);
Инициализирует объект генератора, связанный с динамической D-функцией. Функция будет вызываться один раз, чтобы подготовить диапазон для итерации.
Параметры: void delegate() dg |
Функция, передаваемая в нить.. |
size_t sz |
Размер стека для этой нити. |
In:
dg не должен быть null.
final @property bool
empty();
Возвращает true, если генератор пуст.
Получает следующее значение из базовой функции.
final @property T
front();
Возвращает последнее сгенерированное значение.
void
yield(T)(ref T
value);
void
yield(T)(T
value);
Передает значение value типа T потоку, вызвавшему выполняющийся в настоящее время генератор.
Параметры: T value |
Передаваемое значение. |
ref auto
initOnce(alias var)(lazy typeof(var)
init);
Инициализирует var ленивым значением init потоко-безопасным способом.
Реализация гарантирует, что все потоки, одновременно вызывающие initOnce с одним и тем же аргументом, блокируются до полной инициализации var. Все побочные эффекты init глобально видны впоследствии.
Параметры: var |
Инициализируемая переменная |
typeof(var) init |
Ленивое инициализирующее значение |
Возвращает: Ссылку на инициализированную переменную
Примеры:
Типичным вариантом использования является выполнение ленивой, но потокобезопасной инициализации.
static class MySingleton
{
static MySingleton instance()
{
static __gshared MySingleton inst;
return initOnce!inst(new MySingleton);
}
}
assert(MySingleton.instance !is null);
ref auto
initOnce(alias var)(lazy typeof(var)
init, Mutex
mutex);
То же самое, что и предыдущая, но вместо общего доступа ко всем экземплярам initOnce принимает отдельный мьютекс mutex.
Следует использовать, чтобы избежать блокировок, когда выражение init ожидает результата другого потока, который также может вызвать initOnce. Используйте с осторожностью.
Параметры: var |
Инициализируемая переменная |
typeof(var) init |
Ленивое инициализирующее значение |
Mutex mutex |
Мьютекс для предотвращения гонки условий |
Возвращает: Ссылку на инициализированную переменную
Примеры:
Используйте отдельный мьютекс, когда
init блокирует другой поток, который может также вызвать
initOnce.
static shared bool varA, varB;
__gshared Mutex m;
m = new Mutex;
spawn({
initOnce!varB(true, m);
ownerTid.send(true);
});
initOnce!varA(receiveOnly!bool);
assert(varA == true);
assert(varB == true);