1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public class MongoDBWriter extends Writer { public static class Job extends Writer.Job { @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> configList = new ArrayList<Configuration>(); for(int i = 0; i < mandatoryNumber; i++) { configList.add(this.originalConfig.clone()); } return configList; }
@Override public void init() { this.originalConfig = super.getPluginJobConf(); } }
public static class Task extends Writer.Task { @Override public void init() { this.writerSliceConfig = this.getPluginJobConf(); this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig); this.batchSize = BATCH_SIZE; }
@Override public void startWrite(RecordReceiver lineReceiver) { MongoDatabase db = mongoClient.getDatabase(database); MongoCollection<BasicDBObject> col = db.getCollection(this.collection, BasicDBObject.class); List<Record> writerBuffer = new ArrayList<Record>(this.batchSize); Record record = null; while((record = lineReceiver.getFromReader()) != null) { writerBuffer.add(record); if(writerBuffer.size() >= this.batchSize) { doBatchInsert(col,writerBuffer, mongodbColumnMeta); writerBuffer.clear(); } } if(!writerBuffer.isEmpty()) { doBatchInsert(col,writerBuffer, mongodbColumnMeta); writerBuffer.clear(); } } } }
|