DataX二次开发

代码

1
2
3
4
5
cd /Users/kevin/Workspace

git clone git@git.nuozhilin.site:yuanlin/datax.git

git clone git@git.nuozhilin.site:yuanlin/datax-source.git

配置

配置基于IntelliJ IDEA

  • IDE Open => datax-source

  • Edit Configurations => Application “datax”

  • Main class => com.alibaba.datax.core.Engine

  • VM options => -Ddatax.home=/Users/kevin/Workspace/datax

  • Program arguments => -mode standalone -jobid -1 -job /Users/kevin/Workspace/datax/job/local2ots.json

  • Use classpath of module => datax-core

  • JRE => 1.8

启动调试Debug “datax”

框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
(*) Main Thread
全局 Engine main()
入口 Engine engine()
启动 JobContainer start()
切分 JobContainer split() => channel个数 = TaskGroup个数
调度 AbstractScheduler schedule()
执行 ProcessInnerScheduler startAllTaskGroup()
(*) TaskGroup Thread => 线程个数 = TaskGroup个数
执行 TaskGroupContainer start()
读取 ReaderRunner readerRunner
(*) Reader Thread
通道 channel => TaskGroup线程内存对空间
com.alibaba.datax.core.transport.channel.memory.MemoryChannel
流控 ArrayBlockingQueue
写入 WriterRunner WriterRunner
(*) Writer Thread
报告 TaskGroupContainer reportTaskGroupCommunication()

高性能的两个方面

  • 任务基于并行多子任务 (split channel TaskGroup)

  • 读写基于异步独立线程 (ReaderThread WriterThread)

插件

  • Plugin MongoReader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class MongoDBReader extends Reader {
public static class Job extends Reader.Job {
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.mongoClient = MongoUtil.initMongoClient(originalConfig);
}

@Override
public List<Configuration> split(int adviceNumber) {
return CollectionSplitUtil.doSplit(originalConfig, adviceNumber, mongoClient);
}
}
public static class Task extends Reader.Task {
private MongoClient mongoClient;

@Override
public void init() {
mongoClient = MongoUtil.initMongoClient(readerSliceConfig);
}

@Override
public void startRead(RecordSender recordSender) {
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection col = db.getCollection(this.collection);
dbCursor = col.find(filter).iterator();
while (dbCursor.hasNext()) {
Document item = dbCursor.next();
}

recordSender.sendToWriter(record);
}
}
}
  • Plugin MongoDBWriter
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class MongoDBWriter extends Writer {
public static class Job extends Writer.Job {
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> configList = new ArrayList<Configuration>();
for(int i = 0; i < mandatoryNumber; i++) {
configList.add(this.originalConfig.clone());
}
return configList;
}

@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
}
}

public static class Task extends Writer.Task {
@Override
public void init() {
this.writerSliceConfig = this.getPluginJobConf();
this.mongoClient = MongoUtil.initMongoClient(this.writerSliceConfig);
this.batchSize = BATCH_SIZE;
}

@Override
public void startWrite(RecordReceiver lineReceiver) {
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection<BasicDBObject> col = db.getCollection(this.collection, BasicDBObject.class);
List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
Record record = null;
while((record = lineReceiver.getFromReader()) != null) {
writerBuffer.add(record);
if(writerBuffer.size() >= this.batchSize) {
doBatchInsert(col,writerBuffer, mongodbColumnMeta);
writerBuffer.clear();
}
}
if(!writerBuffer.isEmpty()) {
doBatchInsert(col,writerBuffer, mongodbColumnMeta);
writerBuffer.clear();
}
}
}
}

参考