programing

nodejs로 MongoDB 쿼리 결과를 스트리밍하는 방법은 무엇입니까?

i4 2023. 7. 20. 21:42
반응형

nodejs로 MongoDB 쿼리 결과를 스트리밍하는 방법은 무엇입니까?

MongoDB 쿼리 결과를 nodejs 클라이언트로 스트리밍하는 방법에 대한 예를 찾고 있습니다.지금까지 찾은 모든 솔루션은 쿼리 결과를 한 번에 읽은 후 서버로 다시 전송하는 것 같습니다.

대신, 저는 (분명히) 쿼리 메서드에 콜백을 제공하고 MongoDB가 결과 집합의 다음 청크를 사용할 수 있을 때 콜백을 호출하도록 하고 싶습니다.

몽구스를 보고 있는데 다른 드라이버를 사용해야 할까요?

node-mongodb-driver (모든 mongoDB 클라이언트가 nodejs에서 사용하는 기본 계층) 다른 사람들이 언급한 커서 API를 제외하고는 멋진 스트림 API(#458)를 가집니다.안타깝게도 다른 곳에서는 문서화되지 않았습니다.

업데이트: 문서가 있습니다.

다음과 같이 사용할 수 있습니다.

var stream = collection.find().stream()
stream.on('error', function (err) {
  console.error(err)
})
stream.on('data', function (doc) {
  console.log(doc)
})

실제로 ReadableStream 인터페이스를 구현하므로 모든 이점(일시 중지/재개 등)이 있습니다.

Mongoose에서의 스트리밍은 다음 질문을 올린 지 3개월 후에 등장한 버전 2.4.0에서 사용할 수 있게 되었습니다.

Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);

자세한 예제는 설명서 페이지에서 확인할 수 있습니다.

mongoose실제로는 "드라이버"가 아니라 MongoDB node-mongodb-native드라이버() 주변의 ORM 래퍼입니다.

있는 을 보세요..find그리고..each다음은 예제의 몇 가지 코드입니다.

// Find all records. find() returns a cursor
collection.find(function(err, cursor) {
  sys.puts("Printing docs from Cursor Each")
  cursor.each(function(err, doc) {
    if(doc != null) sys.puts("Doc from Each " + sys.inspect(doc));
  })                    
});

결과를 스트리밍하기 위해, 기본적으로 당신은 그것을 대체합니다.sys.puts사용자의 "스트림" 기능으로.결과를 어떻게 스트리밍할 계획인지 확실하지 않습니다.는 이 할 수 합니다.response.write() + response.flush()하지만 당신은 또한 체크아웃을 원할 수도 있습니다.socket.io.

여기 제가 찾은 해결책이 있습니다(만약 그것이 잘못된 방법이라면 아무나 고쳐주세요): (또한 잘못된 코딩을 용서하십시오 - 제가 지금 이것을 예쁘게 하기에는 너무 늦었습니다).

var sys = require('sys')
var http = require("http");

var Db = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Db,
  Connection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Connection,
  Collection = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Collection,
  Server = require('/usr/local/src/npm/node_modules/mongodb/lib/mongodb').Server;

var db = new Db('test', new Server('localhost',Connection.DEFAULT_PORT , {}));

var products;

db.open(function (error, client) {
  if (error) throw error;
  products = new Collection(client, 'products');
});

function ProductReader(collection) {
        this.collection = collection;
}

ProductReader.prototype = new process.EventEmitter();

ProductReader.prototype.do = function() {
        var self = this;

        this.collection.find(function(err, cursor) {
                if (err) {
                        self.emit('e1');
                        return;

                }
                sys.puts("Printing docs from Cursor Each");

                self.emit('start');
                cursor.each(function(err, doc) {
                        if (!err) {
                                self.emit('e2');
                                self.emit('end');
                                return;
                        }

                        if(doc != null) {
                                sys.puts("doc:" + doc.name);
                                self.emit('doc',doc);
                        } else {
                                self.emit('end');
                        }
                })
        });
};
http.createServer(function(req,res){
        pr = new ProductReader(products);
        pr.on('e1',function(){
                sys.puts("E1");
                res.writeHead(400,{"Content-Type": "text/plain"});
                res.write("e1 occurred\n");
                res.end();
        });
        pr.on('e2',function(){
                sys.puts("E2");
                res.write("ERROR\n");
        });

        pr.on('start',function(){
                sys.puts("START");
                res.writeHead(200,{"Content-Type": "text/plain"});
                res.write("<products>\n");
        });

        pr.on('doc',function(doc){
                sys.puts("A DOCUMENT" + doc.name);
                res.write("<product><name>" + doc.name + "</name></product>\n");
        });

        pr.on('end',function(){
                sys.puts("END");
                res.write("</products>");
                res.end();
        });

        pr.do();

  }).listen(8000);

저는 몽고드브 스트림을 직접 연구해 왔지만, 저는 당신이 찾는 전체 답을 가지고 있지는 않지만, 일부는 가지고 있습니다.socket.io 스트림을 설정할 수 있습니다.

이것은 자바스크립트 socket.io 와 NPM에서 사용할 수 있는 socket.io -mongodb 또한 데이터베이스에 사용할 수 있습니다. 왜냐하면 문제가 있는 40년 된 데이터베이스를 사용하는 것은 부정확하기 때문입니다. 또한 40년 된 데이터베이스를 현대화하는 시간은 SQL이고 SQL은 제가 알기로는 스트림을 하지 않습니다.

따라서 서버에서 클라이언트로 이동하는 데이터에 대해서만 질문하셨지만, 검색 시 데이터를 어디에서도 찾을 수 없고 스트림을 통해 전송 및 수신 요소를 모두 한 곳에 설정하여 모든 사용자가 빠르게 파악할 수 있도록 하고 싶었기 때문에 클라이언트에서 서버로 이동하는 것도 제 대답으로 알고 싶습니다.

스트리밍을 통해 데이터를 서버로 전송하는 클라이언트 측

stream = ss.createStream();
blobstream=ss.createBlobReadStream(data);
blobstream.pipe(stream);
ss(socket).emit('data.stream',stream,{},function(err,successful_db_insert_id){
 //if you get back the id it went into the db and everything worked
});

서버는 클라이언트 측에서 스트림을 수신한 후 완료되면 응답합니다.

ss(socket).on('data.stream.out',function(stream,o,c){
 buffer=[];
 stream.on('data',function(chunk){buffer.push(chunk);});
 stream.on('end',function(){
  buffer=Buffer.concat(buffer);
  db.insert(buffer,function(err,res){
   res=insertedId[0];
   c(null,res);
  });
 });
});

//데이터를 가져와서 클라이언트로 스트리밍하는 작업의 나머지 절반입니다.

클라이언트 측 서버에서 스트림 데이터 요청 및 수신

stream=ss.createStream();
binarystring='';
stream.on('data',function(chunk){ 
 for(var I=0;i<chunk.length;i++){
  binarystring+=String.fromCharCode(chunk[i]); 
 }
});
stream.on('end',function(){ data=window.btoa(binarystring); c(null,data); });
ss(socket).emit('data.stream.get,stream,o,c);

서버 측 스트리밍 데이터 요청에 회신

ss(socket).on('data.stream.get',function(stream,o,c){
 stream.on('end',function(){
  c(null,true);
 });
 db.find().stream().pipe(stream);
});

가장 마지막에 있는 것은 제가 아직 시도하지 않았기 때문에 제 엉덩이에서 그것을 꺼내는 유일한 것입니다. 하지만 그것은 효과가 있을 것입니다.저는 실제로 비슷한 작업을 하지만 하드 드라이브에 파일을 쓴 다음 fs.createReadStream을 사용하여 클라이언트에 스트리밍합니다.그래서 100% 확실하지는 않지만 제가 읽은 바로는 테스트를 해본 후에 다시 연락드리겠습니다.

추신. 누구든지 저의 구어체 말투에 대해 저를 괴롭히고 싶은 사람, 저는 캐나다 사람이고, 저는 "에"라고 말하는 것을 좋아합니다. 당신의 포옹으로 저에게 와서 형제들을 때립니다. :D

언급URL : https://stackoverflow.com/questions/7372626/how-to-stream-mongodb-query-results-with-nodejs

반응형