Данные не упорядочены должным образом в очереди spsc без блокировки

Я пытаюсь написать ограниченную очередь без блокировки с одним производителем и одним потребителем, однако порядок элементов, вставленных в очередь, не совпадает с порядком элементов, удаленных из очереди. Тестовый код в main.cpp добавляет 100 000 000 последовательных целых чисел в очередь в одном потоке и извлекает значения в другой, проверяя, что значения являются последовательными.

В настоящее время при запуске программы она печатает «О нет! Последнее значение было: 219718, но это значение — 220231» (числовые значения различаются в зависимости от запуска программы). Диапазон чисел, по-видимому, зависит от yield_frequency в writer_thread, более низкие значения yield_frequency приводят к тому, что тест (часто, но не всегда) завершается неудачно при числе, превышающем 100.

(Код является незавершенным, поэтому, возможно, какой-то код не соответствует лучшим практикам C++)

spsc_queue.hpp

#pragma once

#include <memory>
#include <utility>
#include <bit>
#include <atomic>
#include <optional>

template <typename T> class ChannelReader;
template <typename T> class ChannelWriter;

// instances of this class cannot be created directly, make_queue returns a pair of ChannelReader and ChannelWriter instances that wrap the queue to prevent reads (or writes) from multiple threads
template <typename T>
class SpscQueue {
    friend ChannelReader<T>;
    friend ChannelWriter<T>;
    public:
    static std::pair<ChannelReader<T>, ChannelWriter<T>> make_queue(size_t size) {
        if (!std::__has_single_bit(size)) throw "";
        std::shared_ptr<SpscQueue<T>> queue_ptr(new SpscQueue<T>(size));
        return std::make_pair(ChannelReader<T>(queue_ptr), ChannelWriter<T>(queue_ptr));
    }
    ~SpscQueue() {
        delete[] data;
    }
    private:
    SpscQueue (size_t _size): data(new T[_size]()), size(_size) {}
    alignas(64) T* data;
    const size_t size;
    alignas(64) std::atomic<size_t> read_idx{0};
    alignas(64) std::atomic<size_t> write_idx{0};
};

// wrapper class that allows ONE thread to read from a SPSC queue
template <typename T>
class ChannelReader {
    friend SpscQueue<T>;
    public:
    // attempt to read from the queue, returning the value if successful, and std::nullopt otherwise
    std::optional<T> try_get_next() {
        size_t read_idx = queue->read_idx;
        size_t write_idx = cached_write_idx;
        if (write_idx <= read_idx) {
            write_idx = queue->write_idx;
            cached_write_idx = write_idx;
        }
        if (write_idx <= read_idx) {
            return std::nullopt;
        } else {
            queue->read_idx++;
            return std::move(queue->data[read_idx % queue->size]);
        }
    }
    ChannelReader(const ChannelReader&) = delete;
    ChannelReader& operator=(const ChannelReader&) = delete;
    ChannelReader(ChannelReader&&) = default;
    ChannelReader& operator=(ChannelReader&&) = default;
    private:
    ChannelReader(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_write_idx{0};
};

// wrapper class that allows ONE thread to write to a SPSC queue
template <typename T>
class ChannelWriter {
    friend SpscQueue<T>;
    public:
    // attempt to write to the queue, returns true if the write was successful, and false otherwise
    bool try_write_next(const T& obj) {
        size_t read_idx = cached_read_idx;
        size_t write_idx = queue->write_idx;
        if (write_idx >= read_idx + queue->size) {
            read_idx = queue->read_idx;
            cached_read_idx = read_idx;          
        }
        if (write_idx >= read_idx + queue->size) {
            return false;
        } else {
            queue->data[write_idx % queue->size] = obj;
            ++queue->write_idx;
            return true;
        }
    }
    ChannelWriter(const ChannelWriter&) = delete;
    ChannelWriter& operator=(const ChannelWriter&) = delete;
    ChannelWriter(ChannelWriter&&) = default;
    ChannelWriter& operator=(ChannelWriter&&) = default;
    private:
    ChannelWriter(std::shared_ptr<SpscQueue<T>> ptr): queue(ptr) {}
    std::shared_ptr<SpscQueue<T>> queue;
    alignas(64) size_t cached_read_idx{0};
};

main.cpp

#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>

#include "../include/spsc_queue.hpp"

namespace chrono = std::chrono;

int main() {
  std::atomic<bool> latch{false};
  auto [reader, writer] = SpscQueue<int>::make_queue(512);
  std::thread reader_thread([_reader = std::move(reader), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting reader...\n";
    int last_val = -1;
    while (last_val != 100'000'000 - 1) {
      if (auto data = _reader.try_get_next()) {
        if (*data != last_val + 1) {
          std::cerr << "Oh no! last value was: " << last_val
                    << " but this value is " << *data << '\n';
          std::exit(1);
        }
        last_val = *data;
        // std::cerr << last_val << " Read\n";
      }
    }
  });
  std::thread writer_thread([_writer = std::move(writer), &latch]() mutable {
    latch.wait(false);
    std::cerr << "Starting writer...\n";
    for (int i = 0; i < 100'000'000; ++i) {
      for (int j = 0; !_writer.try_write_next(i); ++j) {
        constexpr int yield_frequency = 1 << 0;
        if (j % yield_frequency)
          std::this_thread::yield();
      }
      // std::cerr << "writer wrote value" << i << '\n';
      // if (i == 10'000'000) std::exit(1);
    }
  });
  std::cout << "Start" << std::endl;
  {
    auto start = chrono::steady_clock::now();
    latch = true;
    latch.notify_all();
    reader_thread.join();
    writer_thread.join();
    auto finish = chrono::steady_clock::now();
    auto elapsed_seconds =
        chrono::duration_cast<chrono::duration<double>>(finish - start).count();
    std::cout << elapsed_seconds << std::endl;
  }
  std::cout << "End" << std::endl;
}

🤔 А знаете ли вы, что...
C++ - это многопарадигменный язык программирования, который расширяет язык C.


2
54
1

Ответ:

Решено

220231 - 219718 = 513 и размер кольцевого буфера равен 512, поэтому записывающее устройство и средство чтения состязаются по этому последнему значению.... вы увеличиваете индекс чтения перед чтением значения, записывающее устройство может перезаписать значение, которое собирается прочитать считывающее устройство.

queue->read_idx++; // tell writer data is already read
// writer modifies data here
return std::move(queue->data[read_idx % queue->size]); // read new data

решение состоит в том, чтобы прочитать значение перед увеличением, так же, как и при записи: сначала запишите значение, а затем увеличьте индекс записи.

auto result = std::move(queue->data[read_idx % queue->size]);
++queue->read_idx;
return result;