Outsider's Dev Story

Stay Hungry. Stay Foolish. Don't Be Satisfied.
RetroTech 팟캐스트 44BITS 팟캐스트

Node.js MongoDB 드라이버에서 MapReduce 사용하기

개인 프로젝트에서는 MongoDB를 주로 사용하곤 하는데 MongoDB 라이브러리로 이것저것 쓰다가 mongodb-native가 MongoDB의 공식 Node.js 드라이버가 된 뒤로는 mongodb-native를 주로 사용하고 있다. 난 하둡쪽은 별로 다뤄본 적이 없어서 MapReduce에 대해서는 잘 모르지만 최근에 MapReduce를 처음으로 좀 써보게 되었다. MapReduce를 많이는 모르기 때문에 MapReduce에 대한 설명이라기 보다는 mongodb-native에서의 mapReduce 사용방법에 대한 글이다.

> db.example.find()
{ "name" : "outsider", "classOf" : "A", "eng" : 60, "kor" : 70, "math" : 50, "_id" : ObjectId("519877aebb61820000000001") }
{ "name" : "nephilim", "classOf" : "B", "eng" : 95, "kor" : 90, "math" : 100, "_id" : ObjectId("519877aebb61820000000002") }
{ "name" : "arawn", "classOf" : "A", "eng" : 80, "kor" : 80, "math" : 67, "_id" : ObjectId("519877aebb61820000000003") }
{ "name" : "zziuni", "classOf" : "B", "eng" : 70, "kor" : 65, "math" : 73, "_id" : ObjectId("519877aebb61820000000004") }
{ "name" : "fupfin", "classOf" : "A", "eng" : 65, "kor" : 50, "math" : 85, "_id" : ObjectId("519877aebb61820000000005") }
{ "name" : "tenshi", "classOf" : "B", "eng" : 85, "kor" : 70, "math" : 74, "_id" : ObjectId("519877aebb61820000000006") }
{ "name" : "anarcher", "classOf" : "B", "eng" : 70, "kor" : 78, "math" : 87, "_id" : ObjectId("519877aebb61820000000007") }
{ "name" : "nanha", "classOf" : "A", "eng" : 87, "kor" : 84, "math" : 91, "_id" : ObjectId("519877aebb61820000000008") }
{ "name" : "fallroot", "classOf" : "B", "eng" : 93, "kor" : 84, "math" : 84, "_id" : ObjectId("519877aebb61820000000009") }
{ "name" : "dani", "classOf" : "A", "eng" : 88, "kor" : 79, "math" : 77, "_id" : ObjectId("519877aebb6182000000000a") }

예를 들어 위와 같은 데이터가 디비에 들어있다고 하자. 각 학생의 리스트고 classOf가 학생들이 속한 반이고 각 과목에 대한 점수가 들어있다. 각 반별로 과목에 대한 평균을 구한다고 할때 물론 그냥 쿼리로도 충분히 수행이 가능하지만 MapReduce를 사용해보자. 이 MapReduce는 다음과 같이 작성할 수 있다.

var MongoClient = require('mongodb').MongoClient;

MongoClient.connect('mongodb://localhost:27017/mrtest', function(err, db) {
  var example = db.collection('example');

  var map = function() {
    emit(this.classOf, this);
  };

  var reduce = function(classOf, students) {
    var engTotal = korTotal = mathTotal = 0;

    students.forEach(function(student) {
      engTotal += student.eng
      korTotal += student.kor
      mathTotal += student.math
    });

    return {
      classOf: classOf,
      engAvg: engTotal / students.length,
      korAvg: korTotal / students.length,
      mathAvg: mathTotal / students.length,
    }
  };

  example.mapReduce(
      map,
      reduce,
      { out: 'mrtemp'},
      function(err, coll) {
        coll.find().toArray(function(err, arr) {
          console.log(arr);
        });
      }
  );
});

코드가 약간 길지만(?) 그리 복잡하지는 않다. 앞부분은 디비에 연결하는 부분이고 먼저 사용한 map 함수와 reduce함수를 정의해야 한다. MapReduce 개념에 대해서 여기서 다 설명하기는 좀 무리가 있는데 간단히 말하면 map함수를 이용해서 데이터를 특정 키값의 데이터로 모아줄 수 있고 이를 reduce에서 다시 재가공해서 최종 데이터를 만들게 된다. 차례차례 보자.

var map = function() {
  emit(this.classOf, this);
};

수행한 쿼리의 모든 도큐먼트에 대해서 map함수가 실행되는데 map 함수내에서 this가 해당 도큐먼트를 가리킨다. 이 함수내에서 어떤 처리가 필요하다면 할 수 있고 최종적으로 reduce로 전달하기 위해서 emit()을 실행해야 하는데 파라미터로 key와 value를 전달해야 한다. 여기서는 반별로 평균을 구할 것이므로 키에 classOf의 값을 전달하고 값에는 해당 도큐먼트를 통째로 전달했다.

var reduce = function(classOf, students) {
  var engTotal = korTotal = mathTotal = 0;

  students.forEach(function(student) {
    engTotal += student.eng
    korTotal += student.kor
    mathTotal += student.math
  });

  return {
    classOf: classOf,
    engAvg: engTotal / students.length,
    korAvg: korTotal / students.length,
    mathAvg: mathTotal / students.length,
  }
};

이번엔 Reduce를 위함 함수이다. map에서 전달한 것과 같이 reduce함수는 키와 배열의 두 파라미터를 받는다. 키 값은 map이 전달한 키값이고 배열에는 같은 키를 가진 값(map이 전달한)의 배열이 전달된다. 즉, 여기서는 classOf의 값이 A와 B 두가지 뿐이므로 reduce는 2번 호출되고 A 키값에 A반 학생들객체의 배열, B 키값에는 B반 학생객체의 배열이 전달되게 된다. 이 Reduce 함수내에서 필요한 작업(여기서는 평균값 계산)을 한 뒤에 최종적으로 MongoDB에 넣을 JSON 객체(MongoDB 이므로)를 만들어서 반환하면 된다.

example.mapReduce(
    map,
    reduce,
    { out: 'mrtemp'},
    function(err, coll) {
      coll.find().toArray(function(err, arr) {
        console.log(arr);
      });
    }
);

실제 MapReduce를 수행하는 부분이다. 컬렉션에 mapRecude()함수가 존재하고 처음 두 파라미터로 Map, Reduce함수를 전달해주면 된다. 세번째 파라미터는 옵션값인데 out은 MapReduce를 수행한 결과가 들어갈 컬렉션을 지정한다. 여기서는 reduce가 2개의 문서(A, B)를 생성하므로 mrtemp라는 컬렉션이 두 문서가 들어가게 된다. 여기서는 쿼리에 어떤 조건을 주지 않았지만 쿼리조건이 필요하다면 query 옵션을 사용할 수 있다. 마지막으로 콜백으로 실행결과와 MapReduce  결과가 들어간 컬렉션을 반환하므로 결과를 조회하려면 해당 컬렉션에서 다시 조회를 해서 데이터를 가져와야 한다.


그 외 몇가지 사항들...
  • map, reduce로 전달한 함수는 MongoDB내에서 수행된다. 그러므로 일반적인 자바스크립트처럼 프로그램내의 어떤 변수를 클로저형태로 map, reduce함수로 전달할 수 없다. map, reduce함수는 각각 개별적으로 동작할 수 있어야 한다.
  • map, reduce 과정을 디버깅하려면 MongoDB의 로그를 봐야한다. 당연히 console.log같은건 안 먹히고 map, reduce 함수내에서 출력해 보고 싶다면 print() 함수를 사용하면 MongoDB의 로그에 출력되기 때문에 전달되는 값을 확인하고 싶다면 print()로 디버깅할 수 있다.
  • MongoDB의 내부를 다 파악해 보지는 않았지만 로그상으로 보면 MapReduce를 수행할 때마다 기존의 맵리듀스용 컬렉션(out으로 지정한)을 drop한 뒤에 다시 생성한다.
  • MapReduce과정은 순차적으로 처리가 된다. 동시에 여러 MapReduce를 요청한다고 하더라도 한 MapReduce를 모두 처리한뒤에 나머지를 처리하게 된다. 이는 중간에 서로 간섭이 일어나지 않도록 하기 위함으로 보이므로 동시에 여러 요청이 들어가게 된다면 지연시간이 오래걸리므로 유의해서 사용해야 한다.
  • MapReduce를 잘 몰라서 잘 아는 분께 물어본 봐로는 이러한 동시성 문제나 성능향상을 위한 MapReduce를 Incremental MapReduce라고 부른다고 한다. MongoDB의 Incremental MapReduce는 별로 좋지 않아보이고 데이터의 타임스탬프를 사용해서 직접 처리를 해주어야 하는 것으로 보인다.(사용해 보진 않았다.) 다른 디비에서는 MapReduce 명령차원에서 Incremental 사용여부를 지정할 수 있기도 한데 MongoDB는 그렇지 않아서 이부분은 신경써서 처리해야할듯 싶다.

2013/05/20 01:49 2013/05/20 01:49