在Node.js中parsing巨大的日志文件 – 逐行读取

我需要在Javascript / Node.js(我正在使用多维数据集)parsing大型(5-10 Gb)日志文件。

logline看起来像这样:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS". 

我们需要读取每一行,做一些parsing(例如,删除7SUCCESS ),然后使用他们的JS客户端将这些数据抽取到Cube( https://github.com/square/cube )中。

首先,Node中的规范方法是逐行读入文件?

这似乎是相当普遍的问题在线:

  • http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js
  • 在node.js中一次读取一行文件?

很多答案似乎指向了一堆第三方模块:

  • https://github.com/nickewing/line-reader
  • https://github.com/jahewson/node-byline
  • https://github.com/pkrumins/node-lazy
  • https://github.com/Gagle/Node-BufferedReader

但是,这似乎是一个相当基本的任务 – 当然,stdlib中有一个简单的方法来逐行读取文本文件?

其次,我需要处理每一行(例如将时间戳转换为Date对象,并提取有用的字段)。

什么是最好的方式来做到这一点,最大限度地提高吞吐量? 是否有某种方法不会在每行中读取或者将其发送到Cube?

第三 – 我猜测使用string拆分,包含(IndexOf!= -1?)的JS等价物将比正则expression式快很多? 有没有人在Node.js中parsing大量的文本数据有很多经验?

干杯,维克多

我search了一个解决scheme来parsing非常大的文件(gbs)逐行使用stream。 所有的第三方库和示例都不适合我的需要,因为他们不是逐行处理文件(如1,2,3,4 …)或者将整个文件读到内存

以下解决scheme可以使用stream&pipe逐行parsing非常大的文件。 为了testing,我使用了一个2.1 GB的文件与17.000.000logging。 公羊使用量不超过60 mb。

 var fs = require('fs') , util = require('util') , stream = require('stream') , es = require('event-stream'); var lineNr = 0; var s = fs.createReadStream('very-large-file.csv') .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); lineNr += 1; // process line here and call s.resume() when rdy // function below was for logging memory usage logMemoryUsage(lineNr); // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(err){ console.log('Error while reading file.', err); }) .on('end', function(){ console.log('Read entire file.') }) ); 

在这里输入图像说明

请让我知道它是怎么回事!

您可以使用内置的readline包,请参阅文档。 我使用stream创build一个新的输出stream。

 var fs = require('fs'), readline = require('readline'), stream = require('stream'); var instream = fs.createReadStream('/path/to/file'); var outstream = new stream; outstream.readable = true; outstream.writable = true; var rl = readline.createInterface({ input: instream, output: outstream, terminal: false }); rl.on('line', function(line) { console.log(line); //Do your stuff ... //Then write to outstream rl.write(cubestuff); }); 

大文件需要一些时间来处理。 请告诉它是否有效。

我真的很喜欢@gerard答案,这实际上应该是这里的正确答案。 我做了一些改进:

  • 代码是在一个类(模块化)
  • 包括parsing
  • 如果有asynchronous作业被链接到读取CSV(如插入到数据库)或HTTP请求
  • 读取用户可以声明的块/大小。 我也照顾在stream中的编码,以防万一你有不同的编码文件。

代码如下:

 'use strict' const fs = require('fs'), util = require('util'), stream = require('stream'), es = require('event-stream'), parse = require("csv-parse"), iconv = require('iconv-lite'); class CSVReader { constructor(filename, batchSize, columns) { this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8')) this.batchSize = batchSize || 1000 this.lineNumber = 0 this.data = [] this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true} } read(callback) { this.reader .pipe(es.split()) .pipe(es.mapSync(line => { ++this.lineNumber parse(line, this.parseOptions, (err, d) => { this.data.push(d[0]) }) if (this.lineNumber % this.batchSize === 0) { callback(this.data) } }) .on('error', function(){ console.log('Error while reading file.') }) .on('end', function(){ console.log('Read entirefile.') })) } continue () { this.data = [] this.reader.resume() } } module.exports = CSVReader 

所以基本上,你将如何使用它:

 let reader = CSVReader('path_to_file.csv') reader.read(() => reader.continue()) 

我用一个35GB的CSV文件testing了这个文件,它为我工作,这就是为什么我select在@gerard的答案build立它,欢迎反馈。

我使用https://www.npmjs.com/package/line-by-line从文本文件中读取了100多万行。; 在这种情况下,RAM的占用容量大约是50-60兆字节。

  const LineByLineReader = require('line-by-line'), lr = new LineByLineReader('big_file.txt'); lr.on('error', function (err) { // 'err' contains error object }); lr.on('line', function (line) { // pause emitting of lines... lr.pause(); // ...do your asynchronous line processing.. setTimeout(function () { // ...and continue emitting lines. lr.resume(); }, 100); }); lr.on('end', function () { // All lines are read, file is closed now. }); 

我还有同样的问题。 比较好几个似乎有这个function的模块后,我决定自己做,这比我想象的要简单。

要点: https : //gist.github.com/deemstone/8279565

 var fetchBlock = lineByline(filepath, onEnd); fetchBlock(function(lines, start){ ... }); //lines{array} start{int} lines[0] No. 

它覆盖了在一个闭包中打开的文件,返回的fetchBlock()将从文件中获取一个块,结束拆分为数组(将处理来自上次获取的段)。

我已经设置块大小为1024每个读取操作。 这可能有错误,但代码逻辑是显而易见的,请亲自尝试。

节点逐行使用stream,所以我宁愿那一个为您的巨大的文件。

对于你的date转换,我会使用moment.js 。

为了最大限度地提高吞吐量,您可以考虑使用软件集群。 有一些很好的模块能很好地包装节点本地集群模块。 我喜欢isaacs的cluster master 。 例如你可以创build一个x工作集群,它们都可以计算一个文件。

基准分割与正则expression式使用benchmark.js 。 我还没有testing过,直到现在。 benchmark.js可用作节点模块

我做了一个节点模块读取大文件asynchronous文本或JSON。 testing大文件。

 var fs = require('fs') , util = require('util') , stream = require('stream') , es = require('event-stream'); module.exports = FileReader; function FileReader(){ } FileReader.prototype.read = function(pathToFile, callback){ var returnTxt = ''; var s = fs.createReadStream(pathToFile) .pipe(es.split()) .pipe(es.mapSync(function(line){ // pause the readstream s.pause(); //console.log('reading line: '+line); returnTxt += line; // resume the readstream, possibly from a callback s.resume(); }) .on('error', function(){ console.log('Error while reading file.'); }) .on('end', function(){ console.log('Read entire file.'); callback(returnTxt); }) ); }; FileReader.prototype.readJSON = function(pathToFile, callback){ try{ this.read(pathToFile, function(txt){callback(JSON.parse(txt));}); } catch(err){ throw new Error('json file is not valid! '+err.stack); } }; 

只需将文件保存为file-reader.js,然后像这样使用它:

 var FileReader = require('./file-reader'); var fileReader = new FileReader(); fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/}); 

除了逐行读取大文件外,还可以按块读取它。 更多的参考这篇文章

 var offset = 0; var chunkSize = 2048; var chunkBuffer = new Buffer(chunkSize); var fp = fs.openSync('filepath', 'r'); var bytesRead = 0; while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) { offset += bytesRead; var str = chunkBuffer.slice(0, bytesRead).toString(); var arr = str.split('\n'); if(bytesRead = chunkSize) { // the last item of the arr may be not a full line, leave it to the next chunk offset -= arr.pop().length; } lines.push(arr); } console.log(lines); 

基于这个问题的答案我实现了一个类,你可以用它来逐行同步读取一个文件fs.readSync() 。 你可以通过使用Q promise来实现这个“暂停”和“恢复”( jQuery似乎需要一个DOM,所以不能用nodejs运行):

 var fs = require('fs'); var Q = require('q'); var lr = new LineReader(filenameToLoad); lr.open(); var promise; workOnLine = function () { var line = lr.readNextLine(); promise = complexLineTransformation(line).then( function() {console.log('ok');workOnLine();}, function() {console.log('error');} ); } workOnLine(); complexLineTransformation = function (line) { var deferred = Q.defer(); // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error)); return deferred.promise; } function LineReader (filename) { this.moreLinesAvailable = true; this.fd = undefined; this.bufferSize = 1024*1024; this.buffer = new Buffer(this.bufferSize); this.leftOver = ''; this.read = undefined; this.idxStart = undefined; this.idx = undefined; this.lineNumber = 0; this._bundleOfLines = []; this.open = function() { this.fd = fs.openSync(filename, 'r'); }; this.readNextLine = function () { if (this._bundleOfLines.length === 0) { this._readNextBundleOfLines(); } this.lineNumber++; var lineToReturn = this._bundleOfLines[0]; this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany) return lineToReturn; }; this.getLineNumber = function() { return this.lineNumber; }; this._readNextBundleOfLines = function() { var line = ""; while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver this.idxStart = 0 while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver line = this.leftOver.substring(this.idxStart, this.idx); this._bundleOfLines.push(line); this.idxStart = this.idx + 1; } this.leftOver = this.leftOver.substring(this.idxStart); if (line !== "") { break; } } }; }