Description
Search before asking
- I had searched in the issues and found no similar issues.
Version
doris-2.1.8
flink-1.17.2
mongoDB-4.4.18
What's Wrong?
I am using Flink to real-time sync data from MongoDB to Doris. For a sync latency test, I created a new Doris table with the following DDL:
CREATE TABLE test_sync_perf_test (
id VARCHAR(36) NOT NULL,
source_timestamp DATETIME(3),
flink_write_time DATETIME(3),
doris_current_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
device_id INT,
sensor_code VARCHAR(24),
temperature DECIMAL(5,2),
humidity DECIMAL(5,2),
pressure DECIMAL(7,2),
status TINYINT,
voltage FLOAT,
`current` FLOAT,
gps_lng DECIMAL(10,6),
gps_lat DECIMAL(10,6),
firmware_ver VARCHAR(16),
error_code SMALLINT,
data_quality TINYINT,
checksum BIGINT,
rand_val INT,
reserved1 VARCHAR(32),
reserved2 BIGINT
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES (
"replication_num" = "1",
"storage_format" = "V2",
"enable_unique_key_merge_on_write" = "true"
);
Among these, three fields are primary, while the rest are for testing purposes:
- source_timestamp: MongoDB data write time
- flink_write_time: Time after processing by flink-doris-connector
- doris_current_time: Doris data ingestion time, which is supposed to be automatically captured upon data entry.
I used the following Java code to generate and automatically insert data into MongoDB:
public class MongoDataGenerator {
// 参数默认值
private static final int DEFAULT_TOTAL = 1_000_000;
private static final int DEFAULT_BATCH_SIZE = 500;
private static final int DEFAULT_RATE = 5000; // 条/秒
private static final int DEFAULT_THREADS = 1;
private static int incr = 0;
public static void main(String[] args) {
// 解析命令行参数
Map<String, String> params = parseArguments(args);
if (params.containsKey("-h")) {
printHelp();
return;
}
// 获取参数值
int total = getIntParam(params, "--total", DEFAULT_TOTAL);
int batchSize = getIntParam(params, "--batch-size", DEFAULT_BATCH_SIZE);
int ratePerSecond = getIntParam(params, "--rate", DEFAULT_RATE);
int threads = getIntParam(params, "--threads", DEFAULT_THREADS);
validateParameters(total, batchSize, ratePerSecond, threads);
System.out.printf("启动数据生成器 [总量=%d] [批次大小=%d] [限速=%d条/秒] [线程数=%d]%n",
total, batchSize, ratePerSecond, threads);
// 计算实际需要的批次数量
int totalBatches = (int) Math.ceil((double) total / batchSize);
long delayMicroseconds = (long) (1_000_000.0 / (ratePerSecond / (double) batchSize));
ExecutorService executor = Executors.newFixedThreadPool(threads);
MongoClient client = createMongoClient();
MongoCollection<Document> collection = client.getDatabase("xlsms")
.getCollection("test_sync_perf_test");
AtomicLong counter = new AtomicLong(0);
long startTime = System.currentTimeMillis();
Random random = new Random();
// 创建定时线程池用于速率控制
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
// 进度监控任务
scheduler.scheduleAtFixedRate(() -> {
long count = counter.get();
if (count >= total) return;
long elapsed = System.currentTimeMillis() - startTime;
System.out.printf("进度: %d/%d (%.1f%%) 当前速率: %.2f 条/秒%n",
count, total,
(count * 100.0 / total),
(count * 1000.0 / (elapsed == 0 ? 1 : elapsed)));
}, 1, 1, TimeUnit.SECONDS);
// 数据生成任务
for (int i = 0; i < totalBatches; i++) {
final int currentBatch = i;
scheduler.schedule(() -> {
int actualBatchSize = Math.min(batchSize, total - (currentBatch * batchSize));
if (actualBatchSize <= 0) return;
List<Document> batch = generateBatch(actualBatchSize, random);
executor.submit(() -> {
collection.insertMany(batch);
long count = counter.addAndGet(actualBatchSize);
});
}, i * delayMicroseconds, TimeUnit.MICROSECONDS);
}
// 关闭资源
scheduler.schedule(() -> {
executor.shutdown();
try {
if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
System.err.println("任务未能在指定时间内完成");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
client.close();
System.out.println("数据生成完成");
scheduler.shutdown();
}, totalBatches * delayMicroseconds, TimeUnit.MICROSECONDS);
}
private static Map<String, String> parseArguments(String[] args) {
Map<String, String> params = new HashMap<>();
for (int i = 0; i < args.length; i++) {
if (args[i].startsWith("--")) {
if (i + 1 < args.length && !args[i + 1].startsWith("-")) {
params.put(args[i], args[i + 1]);
i++;
} else {
params.put(args[i], "");
}
} else if (args[i].equals("-h")) {
params.put("-h", "");
}
}
return params;
}
private static int getIntParam(Map<String, String> params, String key, int defaultValue) {
return params.containsKey(key) ? Integer.parseInt(params.get(key)) : defaultValue;
}
private static void validateParameters(int total, int batchSize, int rate, int threads) {
if (total <= 0) throw new IllegalArgumentException("总数据量必须大于0");
if (batchSize <= 0) throw new IllegalArgumentException("批次大小必须大于0");
if (rate <= 0) throw new IllegalArgumentException("速率限制必须大于0");
if (threads <= 0) throw new IllegalArgumentException("线程数必须大于0");
if (batchSize > rate) System.err.println("警告:批次大小超过速率限制,实际速率可能无法达到限制值");
}
private static MongoClient createMongoClient() {
return MongoClients.create("mongodb://xinling:2&[email protected]:27017," +
"10.250.250.143:27017,10.250.250.218:27017/?replicaSet=rep1");
// return MongoClients.create("mongodb://stuser:[email protected]:27017," +
// "192.168.10.103:27017,192.168.10.104:27017/?replicaSet=rs0");
}
private static List<Document> generateBatch(int batchSize, Random random) {
ZonedDateTime beijingTime = ZonedDateTime.now(ZoneId.of("Asia/Shanghai"));
List<Document> batch = new ArrayList<>(batchSize);
for (int j = 0; j < batchSize; j++) {
Document doc = new Document()
.append("_id", new ObjectId())
.append("id", incr)
.append("device_id", ThreadLocalRandom.current().nextInt(1, 1000))
.append("sensor_code", "SENSOR-" + String.format("%04d", j))
.append("temperature", BigDecimal.valueOf(20 + ThreadLocalRandom.current().nextDouble(15)))
.append("source_timestamp", Date.from(beijingTime.toInstant()))
.append("humidity", 30 + random.nextDouble() * 30)
.append("pressure", 1000 + random.nextDouble() * 50)
.append("status", random.nextInt(2))
.append("voltage", 3 + random.nextDouble() * 2)
.append("current", random.nextDouble())
.append("gps_lng", 116 + random.nextDouble() * 2)
.append("gps_lat", 39 + random.nextDouble() * 2)
.append("firmware_ver", "v" + (random.nextInt(5) + 1) + "." + (random.nextInt(10)) + "." + (random.nextInt(10)))
.append("error_code", random.nextInt(5))
.append("data_quality", random.nextInt(3))
.append("checksum", random.nextLong())
.append("rand_val", random.nextInt(100))
.append("reserved1", "backup-" + random.nextInt(100))
.append("reserved2", random.nextLong());
// System.out.println(incr + ":" + doc);
incr += 1;
batch.add(doc);
}
return batch;
}
private static void printHelp() {
System.out.println("MongoDB 压力测试数据生成器");
System.out.println("参数说明:");
System.out.println(" --total <number> 总数据量(默认:" + DEFAULT_TOTAL + ")");
System.out.println(" --batch-size <number> 每批次写入量(默认:" + DEFAULT_BATCH_SIZE + ")");
System.out.println(" --rate <number> 最大写入速率(条/秒,默认:" + DEFAULT_RATE + ")");
System.out.println(" --threads <number> 写入线程数(默认:" + DEFAULT_THREADS + ")");
System.out.println(" -h 显示帮助信息");
}
}
However, after all my data was synced to Doris, I noticed that for many records, doris_current_time was earlier than source_timestamp, with the discrepancy approaching Flink’s checkpoint_interval in seconds.
I can assure that the time difference among my MongoDB, Flink, and Doris servers does not exceed 1 second. Upon reviewing MongoDB’s oplog, it appears the issue does not stem from MongoDB, and flink_write_time values seem normal, indicating typical latency. Therefore, I suspect the issue lies within Doris.
Could this be a bug or a specific mechanism of Doris? Please advise.
What You Expected?
I hope to make this time more reasonable, at least later than my source_timestamp, or I want to know why this happens, at least to rule out that it’s a problem with Doris.
How to Reproduce?
Here is the Flink sync command I used:
./bin/flink run \
-Dexecution.checkpointing.interval=5s \
-Dparallelism.default=2 \
-Dpipeline.operator-chaining=false \
-c org.apache.doris.flink.tools.cdc.CdcTools \
./lib/flink-doris-connector-1.17-25.1.0-SNAPSHOT.jar \
mongodb-sync-database \
--job-name mongo2Doris \
--database xlsms \
--table-prefix db_ \
--schema-change-mode sql_parser \
--use-new-schema-change true\
--mongodb-conf hosts=192.168.10.102:27017,192.168.10.103:27017,192.168.10.104:27017 \
--mongodb-conf username=stuser \
--mongodb-conf password=stpw \
--mongodb-conf database=xlsms \
--mongodb-conf scan.startup.mode=initial \
--mongodb-conf schema.sample-percent=0.2 \
--including-tables ".*" \
--sink-conf fenodes=192.168.10.102:8030 \
--sink-conf username=root \
--sink-conf password=123456 \
--sink-conf jdbc-url=jdbc:mysql://192.168.10.102:9030 \
--sink-conf sink.label-prefix=sms \
--sink-conf sink.enable-2pc=false \
--single-sink true \
--table-conf replication_num=1
Anything Else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct