В настоящее время я разрабатываю приложение Node.js, которое требует управления большим объемом асинхронных файловых операций, таких как чтение, запись и удаление. По мере масштабирования этих операций я сталкиваюсь с узкими местами в производительности и случайными ошибками, такими как EMFILE (слишком много открытых файлов).
Текущий подход: Я использую асинхронные методы из модуля fs, такие как fs.readFile и fs.writeFile. Я создал простой механизм очереди, используя async.queue для управления параллелизмом файловых операций.
Несмотря на очередь, время ответа колеблется, и я считаю, что общую пропускную способность можно улучшить. Иногда я сталкиваюсь с ошибками EMFILE во время пиковой нагрузки сервера.
Экспериментировал с Graceful-fs как альтернативой стандартному модулю fs, но к существенному улучшению это не привело.
const async = require('async');
const fileQueue = async.queue((task, callback) => {
fs.readFile(task.filePath, (err, data) => {
if (err) return callback(err);
// business logic for filtration in file you can just add map loop or filter loop here
callback();
});
}, 5);
fileQueue.drain = function() {
console.info('All file operations have been processed.');
};
fileQueue.push({filePath: '/social-directory/file.txt'});
🤔 А знаете ли вы, что...
Node.js поддерживает работу с протоколами HTTP, HTTPS, TCP и UDP.
Чтобы эффективно обрабатывать большие объемы асинхронных файловых операций и избегать ошибок EMFILE, рассмотрите следующий оптимизированный подход с использованием p-очереди для лучшего управления параллелизмом и использования потоков Node.js для больших файловых операций. Эта стратегия поможет эффективно управлять ресурсами и повысить общую производительность.
Увеличьте лимит дескриптора файла
ulimit -n 4096
Вот оптимизированная реализация с использованием p-очереди и потоков:
1.Установить зависимости
npm install p-queue
2. Реализация кода
import fs from 'fs';
import { createReadStream } from 'fs';
import PQueue from 'p-queue';
import readline from 'readline';
const queue = new PQueue({ concurrency: 5 });
queue.on('idle', () => {
console.info('All file operations have been processed.');
});
const processFile = async (filePath) => {
try {
const data = await fs.promises.readFile(filePath, 'utf8');
// Business logic for processing the file data
console.info(`Processing ${filePath}`);
} catch (error) {
if (error.code === 'EMFILE') {
console.error(`EMFILE error encountered. Retrying ${filePath}...`);
await new Promise((resolve) => setTimeout(resolve, 1000));
return processFile(filePath);
}
console.error(`Error processing ${filePath}:`, error);
}
};
const processLargeFile = (filePath) => {
return new Promise((resolve, reject) => {
const readStream = createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({ input: readStream });
rl.on('line', (line) => {
// Process each line
console.info(`Processing line: ${line}`);
});
rl.on('close', () => {
console.info(`Finished processing ${filePath}`);
resolve();
});
rl.on('error', (err) => {
console.error(`Error reading ${filePath}:`, err);
reject(err);
});
});
};
const filePaths = ['/social-directory/file1.txt', '/social-directory/file2.txt'];
filePaths.forEach((filePath) => {
queue.add(() => processFile(filePath));
// Use processLargeFile for large files
// queue.add(() => processLargeFile(filePath));
});