Swift Debounce или задержка с дискриминатором

Мое приложение включает в себя различные виджеты, указывающие значения в реальном времени (температура, загрузка процессора, память и т. д.). Эти виджеты обновляются на основе сообщений JSON, полученных через один веб-сокет.

На данный момент обновления обрабатываются в режиме реального времени, поэтому я обновляю свой виджет, как только будет получено новое значение.

Сообщения следующие: {widget: "widgetId", value: 150}.

Моя проблема заключается в том, что иногда для одного и того же виджета происходит несколько обновлений в секунду, что приводит к миганию экрана и бесполезному снижению производительности. Я хочу уменьшить это, просто отложив обновление пользовательского интерфейса, чтобы в течение определенного периода времени обновлялось только последнее полученное значение.

Я хотел бы изучить два следующих варианта:

  1. Обновляйте все виджеты одновременно раз в X секунд (сохраняя только самые последние обновления для каждого виджета и отправляя их все одновременно в пользовательский интерфейс).

  2. Обновляйте виджет не чаще одного раза в Х секунд (отказ по каналу).

Мой обработчик сокета выглядит так:

socket.receive(completionHandler: { [weak self] result in
    switch result {
        case .success(let message):
            switch message {
                case .data(let data):
                    guard let update = try? WidgetUpdate(serializedData: data) else {
                         print("Failed to deserialize data")
                         return
                     }
                     DispatchQueue.main.async {
                         self?.updateWidget(update: update)
                     }
                     @unknown default:
                         print("unknown")
            }
        }
    }
)

До сих пор я пробовал использовать PassthroughSubject следующим образом:

let subject = PassthroughSubject<WidgetUpdate, Never>()
cancellable = subject
    .debounce(for: .seconds(0.5), scheduler: RunLoop.main)
    .sink { update in
        self?.updateWidget(update: update)
    }

// And to publish inside the publisher from the socket handler like this
subject.send(update)

Проблема в том, что он явно отключает «все» обновления, а не виджет.
Я хотел бы избежать создания одного PassthroughSubject с помощью виджета, поскольку виджет можно добавлять или удалять динамически, и я не хочу управлять созданием/удалением связанных издателей.


52
1

Ответ:

Решено

Я думаю, что вам поможет оператор ReactiveX GroupBy . К сожалению, в Joint нет встроенного GroupBy, но его можно реализовать с помощью сканирования:

import Combine
import Foundation

extension Publisher {
    public func grouped<Key: Hashable> (
        by key: @escaping (Self.Output) -> Key
    ) -> some Publisher<CurrentValueSubject<Self.Output, Never>, Self.Failure> {
        scan(([:], nil)) { (accum: ([Key: CurrentValueSubject<Self.Output, Never>], CurrentValueSubject<Self.Output, Never>?), next) in
            var (accumPubs, _) = accum
            if let existingSubject = accumPubs[key(next)] {
                existingSubject.send(next)
                
                return (accumPubs, nil)
            } else {
                let newSubject = CurrentValueSubject<Self.Output, Never>(next)
                accumPubs[key(next)] = newSubject
                
                return (accumPubs, newSubject)
            }
        }.compactMap { $0.1 }
    }
}

Как только вы это получите, я считаю, что что-то вроде этого должно решить вашу проблему (это основано на вашем исходном примере):

struct Element {
    let channel: String
    let updateNum: Int
}
var subscriptions: Set<AnyCancellable> = Set()
withExtendedLifetime(subscriptions) {
    let pub1 = Just(Element(channel: "chan1", updateNum: 1)).delay(for: .seconds(0.1), scheduler: RunLoop.main)
    let pub2 = Just(Element(channel: "chan2", updateNum: 1)).delay(for: .seconds(0.2), scheduler: RunLoop.main)
    let pub3 = Just(Element(channel: "chan1", updateNum: 2)).delay(for: .seconds(0.2), scheduler: RunLoop.main)
    let pub4 = Just(Element(channel: "chan1", updateNum: 3)).delay(for: .seconds(0.3), scheduler: RunLoop.main)
    let pub5 = Just(Element(channel: "chan2", updateNum: 2)).delay(for: .seconds(0.4), scheduler: RunLoop.main)
    let pub6 = Just(Element(channel: "chan2", updateNum: 3)).delay(for: .seconds(0.5), scheduler: RunLoop.main)
    let pub7 = Just(Element(channel: "chan1", updateNum: 4)).delay(for: .seconds(0.6), scheduler: RunLoop.main)
    let pub = Publishers.MergeMany(pub1, pub2, pub3, pub4, pub5, pub6, pub7)
    pub
        .grouped { $0.channel }
        .flatMap {
            $0.debounce(for: .seconds(0.3), scheduler: RunLoop.main)
            // Or:
            // $0.throttle(for: .seconds(0.3), scheduler: RunLoop.main, latest: true)
        }
        .sink { result in print(result) }
        .store(in: &subscriptions)
}
RunLoop.main.run()

И вот что-то, основанное на вашей реальной проблеме с WebSockets:

let subject = PassthroughSubject<WidgetUpdate, Never>()
cancellable = subject
    .grouped { $0.widget }
    .flatMap {
        $0.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
        // Or:
        // $0.throttle(for: .seconds(0.5), scheduler: RunLoop.main, latest: true)
    }
    .sink { update in
        self?.updateWidget(update: update)
    }

// And to publish inside the publisher from the socket handler like this
subject.send(update)

Кроме того, мне кажется, что на самом деле вы хотите дросселировать , а не уменьшать дребезг, хотя в вашем случае большинство пользователей, вероятно, не заметят разницы между ними. Пока ваша группа — ваши издатели, вы можете выбрать либо регулирование, либо откат.