Поиск эффективных решений для обработки асинхронных файловых операций в Node.js

В настоящее время я разрабатываю приложение Node.js, которое требует управления большим объемом асинхронных файловых операций, таких как чтение, запись и удаление. По мере масштабирования этих операций я сталкиваюсь с узкими местами в производительности и случайными ошибками, такими как EMFILE (слишком много открытых файлов).

Текущий подход: Я использую асинхронные методы из модуля fs, такие как fs.readFile и fs.writeFile. Я создал простой механизм очереди, используя async.queue для управления параллелизмом файловых операций.

Несмотря на очередь, время ответа колеблется, и я считаю, что общую пропускную способность можно улучшить. Иногда я сталкиваюсь с ошибками EMFILE во время пиковой нагрузки сервера.

Экспериментировал с Graceful-fs как альтернативой стандартному модулю fs, но к существенному улучшению это не привело.

const async = require('async');

const fileQueue = async.queue((task, callback) => {
    fs.readFile(task.filePath, (err, data) => {
        if (err) return callback(err);
        // business logic for filtration in file you can just add map loop or filter loop here
        callback();
    });
}, 5);

fileQueue.drain = function() {
    console.info('All file operations have been processed.');
};

fileQueue.push({filePath: '/social-directory/file.txt'});

🤔 А знаете ли вы, что...
Node.js поддерживает работу с протоколами HTTP, HTTPS, TCP и UDP.


1
55
1

Ответ:

Решено

Чтобы эффективно обрабатывать большие объемы асинхронных файловых операций и избегать ошибок EMFILE, рассмотрите следующий оптимизированный подход с использованием p-очереди для лучшего управления параллелизмом и использования потоков Node.js для больших файловых операций. Эта стратегия поможет эффективно управлять ресурсами и повысить общую производительность.

Увеличьте лимит дескриптора файла

ulimit -n 4096

Вот оптимизированная реализация с использованием p-очереди и потоков:

1.Установить зависимости

npm install p-queue

2. Реализация кода

import fs from 'fs';
import { createReadStream } from 'fs';
import PQueue from 'p-queue';
import readline from 'readline';

const queue = new PQueue({ concurrency: 5 });

queue.on('idle', () => {
    console.info('All file operations have been processed.');
});

const processFile = async (filePath) => {
    try {
        const data = await fs.promises.readFile(filePath, 'utf8');
        // Business logic for processing the file data
        console.info(`Processing ${filePath}`);
    } catch (error) {
        if (error.code === 'EMFILE') {
            console.error(`EMFILE error encountered. Retrying ${filePath}...`);
            await new Promise((resolve) => setTimeout(resolve, 1000));
            return processFile(filePath);
        }
        console.error(`Error processing ${filePath}:`, error);
    }
};

const processLargeFile = (filePath) => {
    return new Promise((resolve, reject) => {
        const readStream = createReadStream(filePath, { encoding: 'utf8' });
        const rl = readline.createInterface({ input: readStream });

        rl.on('line', (line) => {
            // Process each line
            console.info(`Processing line: ${line}`);
        });

        rl.on('close', () => {
            console.info(`Finished processing ${filePath}`);
            resolve();
        });

        rl.on('error', (err) => {
            console.error(`Error reading ${filePath}:`, err);
            reject(err);
        });
    });
};

const filePaths = ['/social-directory/file1.txt', '/social-directory/file2.txt'];

filePaths.forEach((filePath) => {
    queue.add(() => processFile(filePath));
    // Use processLargeFile for large files
    // queue.add(() => processLargeFile(filePath));
});