Как улучшить производительность pandas-numpy при декодировании больших последовательностей байтов

Мне нужно декодировать и поместить в кадры данных pandas или, по крайней мере, массивы numpy, десятки двоичных файлов, каждый из которых содержит сотни «сигналов», состоящих из ~ 10 ^ 5 образцов по 17 байт каждый (time = uint64, value = float64, flags = uint8 ). Поле времени особенно сложно, поскольку оно представлено как количество единиц по 100 нс от 0001/01/01 00:00:00, поэтому мне нужно преобразовать его в time_t64, прежде чем помещать его в массив np.

В приведенном ниже коде я придумал «current_implementation», но на моей машине на 100 «сигналов» уходит 6,7 секунды, а производительность неудовлетворительная. Я пытаюсь ускорить код с помощью некоторых альтернативных реализаций, но в лучшем случае это 5,5 секунды, но это все равно слишком много.

Есть предложения?

import pandas as pd
import numpy as np
import timeit
from struct import unpack, iter_unpack

# DELTA_EPOCH is the delay in 100 ns units from 0001/01/01 to 1970/01/01.
DELTA_EPOCH = 621355968000000000

# MAGIC_NUMBER is the factor to convert from 100 ns units to seconds.
MAGIC_NUMBER = 1/10000000


def current_implementation():

    # Decode values in a single pass
    fmt = "<" + "QdB" * nsamples
    nums = unpack(fmt, signal)

    # I put the numbers in the arrays so then I can use the np's vectorized operations
    ts = np.array(nums[::3], dtype=np.float64)
    vl = np.array(nums[1::3], dtype=np.float64)
    fl = np.array(nums[2::3], dtype=np.uint8)

    # Transform timestamp from FILEDATE to UNIX64 and finally to datetime64[ns] format
    ts = (ts - DELTA_EPOCH) * MAGIC_NUMBER
    idx = pd.to_datetime(ts, unit='s')
    return idx, vl, fl


def alternative_implementation1():

    # Using iter_unpack I get a list of tuples suitable to be put into a structured array
    fmt = "<QdB"
    tmp = list(iter_unpack(fmt, signal))
    tmp = np.array(tmp, dtype=[('idx', np.uint64), ('vl', np.float64), ('fl', np.uint8)])
    tmp['idx'] = np.array((tmp['idx'] - DELTA_EPOCH) * 100, dtype = 'datetime64[ns]')
    return tmp


def alternative_implementation2():
    fmt = "<" + "QdB" * nsamples
    nums = unpack(fmt, signal)
    tmp = np.array(nums[::3], dtype=np.uint64) - DELTA_EPOCH
    ts = np.array(tmp * 100, dtype='datetime64[ns]')   # 100 to convert from 100ns units to ns
    vl = np.array(nums[1::3], dtype=np.float64)
    fl = np.array(nums[2::3], dtype=np.uint8)
    idx = pd.DatetimeIndex(ts)
    return idx, vl, fl


if __name__ == "__main__":

    # Prepare test signal
    nsamples = 300000
    signal = b'\xcd\xf1\xb9!\x18\xbb\xda\x08\x00\x00\x00\x80\x01\xc84@\x03' * nsamples

    print("\nCurrent implementation ", end = "")
    print(timeit.timeit('current_implementation()', number=100, globals=globals()))

    print("\nalternative_implementation1 ", end = "")
    print(timeit.timeit('alternative_implementation1()', number=100, globals=globals()))

    print("\nalternative_implementation2 ", end = "")
    print(timeit.timeit('alternative_implementation2()', number=100, globals=globals()))

🤔 А знаете ли вы, что...
Python используется в разработке игр с помощью библиотеки Pygame.


1
51
1

Ответ:

Решено

Не нужно делать ничего необычного. Используйте np.frombuffer в alternative_implementation1.

def alternative_implementation1b():
    tmp = np.frombuffer(signal, dtype=[("idx", np.uint64), ("vl", np.float64), ("fl", np.uint8)])
    ts = np.array((tmp["idx"] - DELTA_EPOCH) * 100, dtype = "datetime64[ns]")
    vl = tmp["vl"]
    fl = tmp["fl"]
    return ts, vl, fl

Возможно, необходимо отметить, что vl и fl не являются смежными в памяти и в этой реализации доступны только для чтения. Если это проблема, вы можете просто скопировать их. Это все равно будет достаточно быстро.

def alternative_implementation1b_with_copy():
    tmp = np.frombuffer(signal, dtype=[("idx", np.uint64), ("vl", np.float64), ("fl", np.uint8)])
    ts = np.array((tmp["idx"] - DELTA_EPOCH) * 100, dtype = "datetime64[ns]")
    vl = np.ascontiguousarray(tmp["vl"])
    fl = np.ascontiguousarray(tmp["fl"])
    return ts, vl, fl

Тест:

if __name__ == "__main__":
    # Prepare test signal
    nsamples = 300000
    signal = b"\xcd\xf1\xb9!\x18\xbb\xda\x08\x00\x00\x00\x80\x01\xc84@\x03" * nsamples

    candidates = [
        current_implementation,
        alternative_implementation1,
        alternative_implementation2,
        current_implementation_2,
        alternative_implementation1b,
        alternative_implementation1b_with_copy,
    ]
    name_len = max(len(f.__name__) for f in candidates)

    for f in candidates:
        t = timeit.repeat(f, number=100, repeat=3)
        print(f"{f.__name__:{name_len}}: {min(t)}")

Результат:

current_implementation                : 9.686750392982503
alternative_implementation1           : 15.24386641397723
alternative_implementation2           : 8.135940829990432
current_implementation_2              : 0.8789778979844414
alternative_implementation1b          : 0.05908472600276582
alternative_implementation1b_with_copy: 0.0940033549850341