Я столкнулся с проблемой в своем приложении, из-за которой тест иногда не проходит. Мне удалось сузить проблему до этого фрагмента кода. Фрагмент содержит две асинхронные последовательности, которые наблюдают и изменяют одно и то же свойство @Published
, fire
.
Я ожидаю, что асинхронные последовательности будут получать все обновления от свойства @Published
, гарантируя, что тест всегда будет успешным. Однако после определенного количества запусков теста (около 100 000) тест зависает и никогда не завершается.
Сначала я подозревал, что проблема связана с гонкой данных, но проблема сохраняется даже после аннотации класса с помощью @MainActor
, чтобы гарантировать, что весь класс работает в одном потоке.
При успешных тестах консоль выводит:
createStream1(): true
createStream2(): true
createStream1(): false
При неудачных тестах консоль печатает:
createStream1(): true
createStream2(): true
createStream1(): true
Фрагмент кода:
final class Tests: XCTestCase {
@MainActor func test_SUTStream1() async {
await SUT().toTest()
}
}
@MainActor
class SUT {
@Published private(set) var fire = false
init() {}
func toTest() async {
let cancelable = Task {
await createStream2()
}
await createStream1()
cancelable.cancel()
}
private func createStream1() async {
fire = true
for await didFire in $fire.values {
print("\(#function): \(didFire)")
if didFire == false {
break
}
}
}
private func createStream2() async {
for await didFire in $fire.values {
print("\(#function): \(didFire)")
if didFire == true {
fire = false
break
}
}
}
}
@Published
?Есть две проблемы:
Проблема в том, что ценности не буферизуют свои значения при наличии обратного давления. Таким образом, если последующие значения будут опубликованы до того, как будет использовано существующее, предыдущее значение будет удалено.
И даже когда они принадлежат одному и тому же актеру, между продолжениями происходит гонка. Как SE-0306 – Актеры предупреждает нас (курсив в оригинале):
Примечание по реализации: на уровне реализации сообщения представляют собой частичные задачи (описанные предложением Структурированный параллелизм ) для асинхронного вызова, и каждый экземпляр актера содержит своего собственного последовательного исполнителя (также в предложении Структурированный параллелизм ). Последовательный исполнитель по умолчанию отвечает за поочередное выполнение частичных задач. Концептуально это похоже на последовательную DispatchQueue, но с важным отличием: выполнение задач, ожидающих актера, не гарантируется в том же порядке, в котором они изначально ожидали этого актера. Система времени выполнения Swift стремится по возможности избегать инверсии приоритетов, используя такие методы, как повышение приоритета. Таким образом, система времени выполнения учитывает приоритет задачи при выборе следующей задачи для запуска на актере из его очереди. В отличие от сериалов
DispatchQueue
, в которых действует принцип «первым пришел — первым вышел». Кроме того, среда выполнения актеров Swift использует более легкую реализацию очереди, чем Dispatch, чтобы в полной мере использовать преимущества функций Swiftasync
.
Таким образом, в целом задачи (и продолжения) с одним и тем же приоритетом имеют тенденцию работать в порядке FIFO, но не совсем так: иногда вы можете увидеть частичные задачи, не соблюдающие порядок FIFO.
Сочетание этих двух факторов означает, что иногда значения могут снижаться.
Чтобы сосредоточиться на первом пункте, рассмотрим следующее, в котором я добавил задержки (чтобы смягчить гонку второго пункта). Это будет последовательно проявлять поведение:
@MainActor
class Foo {
@Published private(set) var value = 0
init() {}
func experiment() async throws {
let publishTask = Task {
for i in 0 ..< 10 {
value = i
try await Task.sleep(for: .milliseconds(300))
}
}
let consumeTask = Task {
for await value in $value.values {
print(value)
if value == 9 { break }
try await Task.sleep(for: .seconds(2))
}
}
try await withTaskCancellationHandler {
try await publishTask.value
try await consumeTask.value
} onCancel: {
publishTask.cancel()
consumeTask.cancel()
}
print("done")
}
}
Вызов experiment
выдаст:
0
6
9
Если вы не хотите, чтобы значения удалялись, вы можете добавить буфер к consumeTask
:
let consumeTask = Task {
let sequence = $value
.buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
.values
for await value in sequence {
print(value)
if value == 9 { break }
try await Task.sleep(for: .seconds(2))
}
}
Это зафиксирует все значения (конечно, вплоть до размера буфера).
Существуют и другие шаблоны, которые можно использовать для поддержки различных типов поведения противодавления, но, будем надеяться, это иллюстрирует фундаментальную проблему.