Skip to content

[Bug] Exception in Obtaining Current Time in Doris During Flink Synchronization #49968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 of 3 tasks
kwonder0926 opened this issue Apr 10, 2025 · 0 comments
Open
2 of 3 tasks

Comments

@kwonder0926
Copy link

Search before asking

  • I had searched in the issues and found no similar issues.

Version

doris-2.1.8
flink-1.17.2
mongoDB-4.4.18

What's Wrong?

I am using Flink to real-time sync data from MongoDB to Doris. For a sync latency test, I created a new Doris table with the following DDL:

CREATE TABLE test_sync_perf_test (
    id VARCHAR(36) NOT NULL,
    source_timestamp DATETIME(3),
    flink_write_time DATETIME(3),
    doris_current_time DATETIME(3) DEFAULT CURRENT_TIMESTAMP(3),
    device_id INT,
    sensor_code VARCHAR(24),
    temperature DECIMAL(5,2),
    humidity DECIMAL(5,2),
    pressure DECIMAL(7,2),
    status TINYINT,
    voltage FLOAT,
    `current` FLOAT,
    gps_lng DECIMAL(10,6),
    gps_lat DECIMAL(10,6),
    firmware_ver VARCHAR(16),
    error_code SMALLINT,
    data_quality TINYINT,
    checksum BIGINT,
    rand_val INT,
    reserved1 VARCHAR(32),
    reserved2 BIGINT
)
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES (
    "replication_num" = "1",
    "storage_format" = "V2",
    "enable_unique_key_merge_on_write" = "true"
);

Among these, three fields are primary, while the rest are for testing purposes:

  • source_timestamp: MongoDB data write time
  • flink_write_time: Time after processing by flink-doris-connector
  • doris_current_time: Doris data ingestion time, which is supposed to be automatically captured upon data entry.
    I used the following Java code to generate and automatically insert data into MongoDB:
public class MongoDataGenerator {

    // 参数默认值
    private static final int DEFAULT_TOTAL = 1_000_000;
    private static final int DEFAULT_BATCH_SIZE = 500;
    private static final int DEFAULT_RATE = 5000; // 条/秒
    private static final int DEFAULT_THREADS = 1;

    private static int incr = 0;

    public static void main(String[] args) {
        // 解析命令行参数
        Map<String, String> params = parseArguments(args);
        if (params.containsKey("-h")) {
            printHelp();
            return;
        }

        // 获取参数值
        int total = getIntParam(params, "--total", DEFAULT_TOTAL);
        int batchSize = getIntParam(params, "--batch-size", DEFAULT_BATCH_SIZE);
        int ratePerSecond = getIntParam(params, "--rate", DEFAULT_RATE);
        int threads = getIntParam(params, "--threads", DEFAULT_THREADS);

        validateParameters(total, batchSize, ratePerSecond, threads);

        System.out.printf("启动数据生成器 [总量=%d] [批次大小=%d] [限速=%d条/秒] [线程数=%d]%n",
                total, batchSize, ratePerSecond, threads);

        // 计算实际需要的批次数量
        int totalBatches = (int) Math.ceil((double) total / batchSize);
        long delayMicroseconds = (long) (1_000_000.0 / (ratePerSecond / (double) batchSize));

        ExecutorService executor = Executors.newFixedThreadPool(threads);
        MongoClient client = createMongoClient();
        MongoCollection<Document> collection = client.getDatabase("xlsms")
                .getCollection("test_sync_perf_test");

        AtomicLong counter = new AtomicLong(0);
        long startTime = System.currentTimeMillis();
        Random random = new Random();

        // 创建定时线程池用于速率控制
        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

        // 进度监控任务
        scheduler.scheduleAtFixedRate(() -> {
            long count = counter.get();
            if (count >= total) return;

            long elapsed = System.currentTimeMillis() - startTime;
            System.out.printf("进度: %d/%d (%.1f%%) 当前速率: %.2f 条/秒%n",
                    count, total,
                    (count * 100.0 / total),
                    (count * 1000.0 / (elapsed == 0 ? 1 : elapsed)));
        }, 1, 1, TimeUnit.SECONDS);

        // 数据生成任务
        for (int i = 0; i < totalBatches; i++) {
            final int currentBatch = i;
            scheduler.schedule(() -> {
                int actualBatchSize = Math.min(batchSize, total - (currentBatch * batchSize));
                if (actualBatchSize <= 0) return;

                List<Document> batch = generateBatch(actualBatchSize, random);
                executor.submit(() -> {
                    collection.insertMany(batch);
                    long count = counter.addAndGet(actualBatchSize);
                });
            }, i * delayMicroseconds, TimeUnit.MICROSECONDS);
        }

        // 关闭资源
        scheduler.schedule(() -> {
            executor.shutdown();
            try {
                if (!executor.awaitTermination(1, TimeUnit.HOURS)) {
                    System.err.println("任务未能在指定时间内完成");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            client.close();
            System.out.println("数据生成完成");
            scheduler.shutdown();
        }, totalBatches * delayMicroseconds, TimeUnit.MICROSECONDS);
    }

    private static Map<String, String> parseArguments(String[] args) {
        Map<String, String> params = new HashMap<>();
        for (int i = 0; i < args.length; i++) {
            if (args[i].startsWith("--")) {
                if (i + 1 < args.length && !args[i + 1].startsWith("-")) {
                    params.put(args[i], args[i + 1]);
                    i++;
                } else {
                    params.put(args[i], "");
                }
            } else if (args[i].equals("-h")) {
                params.put("-h", "");
            }
        }
        return params;
    }

    private static int getIntParam(Map<String, String> params, String key, int defaultValue) {
        return params.containsKey(key) ? Integer.parseInt(params.get(key)) : defaultValue;
    }

    private static void validateParameters(int total, int batchSize, int rate, int threads) {
        if (total <= 0) throw new IllegalArgumentException("总数据量必须大于0");
        if (batchSize <= 0) throw new IllegalArgumentException("批次大小必须大于0");
        if (rate <= 0) throw new IllegalArgumentException("速率限制必须大于0");
        if (threads <= 0) throw new IllegalArgumentException("线程数必须大于0");
        if (batchSize > rate) System.err.println("警告:批次大小超过速率限制,实际速率可能无法达到限制值");
    }

    private static MongoClient createMongoClient() {
        return MongoClients.create("mongodb://xinling:2&[email protected]:27017," +
                "10.250.250.143:27017,10.250.250.218:27017/?replicaSet=rep1");
//        return MongoClients.create("mongodb://stuser:[email protected]:27017," +
//                "192.168.10.103:27017,192.168.10.104:27017/?replicaSet=rs0");

    }

    private static List<Document> generateBatch(int batchSize, Random random) {
        ZonedDateTime beijingTime = ZonedDateTime.now(ZoneId.of("Asia/Shanghai"));
        List<Document> batch = new ArrayList<>(batchSize);
        for (int j = 0; j < batchSize; j++) {
            Document doc = new Document()
                    .append("_id", new ObjectId())
                    .append("id", incr)
                    .append("device_id", ThreadLocalRandom.current().nextInt(1, 1000))
                    .append("sensor_code", "SENSOR-" + String.format("%04d", j))
                    .append("temperature", BigDecimal.valueOf(20 + ThreadLocalRandom.current().nextDouble(15)))
                    .append("source_timestamp", Date.from(beijingTime.toInstant()))
                    .append("humidity", 30 + random.nextDouble() * 30)
                    .append("pressure", 1000 + random.nextDouble() * 50)
                    .append("status", random.nextInt(2))
                    .append("voltage", 3 + random.nextDouble() * 2)
                    .append("current", random.nextDouble())
                    .append("gps_lng", 116 + random.nextDouble() * 2)
                    .append("gps_lat", 39 + random.nextDouble() * 2)
                    .append("firmware_ver", "v" + (random.nextInt(5) + 1) + "." + (random.nextInt(10)) + "." + (random.nextInt(10)))
                    .append("error_code", random.nextInt(5))
                    .append("data_quality", random.nextInt(3))
                    .append("checksum", random.nextLong())
                    .append("rand_val", random.nextInt(100))
                    .append("reserved1", "backup-" + random.nextInt(100))
                    .append("reserved2", random.nextLong());
//            System.out.println(incr + ":" + doc);
            incr += 1;
            batch.add(doc);
        }
        return batch;
    }

    private static void printHelp() {
        System.out.println("MongoDB 压力测试数据生成器");
        System.out.println("参数说明:");
        System.out.println("  --total <number>      总数据量(默认:" + DEFAULT_TOTAL + ")");
        System.out.println("  --batch-size <number> 每批次写入量(默认:" + DEFAULT_BATCH_SIZE + ")");
        System.out.println("  --rate <number>       最大写入速率(条/秒,默认:" + DEFAULT_RATE + ")");
        System.out.println("  --threads <number>    写入线程数(默认:" + DEFAULT_THREADS + ")");
        System.out.println("  -h                    显示帮助信息");
    }
}

However, after all my data was synced to Doris, I noticed that for many records, doris_current_time was earlier than source_timestamp, with the discrepancy approaching Flink’s checkpoint_interval in seconds.

I can assure that the time difference among my MongoDB, Flink, and Doris servers does not exceed 1 second. Upon reviewing MongoDB’s oplog, it appears the issue does not stem from MongoDB, and flink_write_time values seem normal, indicating typical latency. Therefore, I suspect the issue lies within Doris.

Could this be a bug or a specific mechanism of Doris? Please advise.

Image

What You Expected?

I hope to make this time more reasonable, at least later than my source_timestamp, or I want to know why this happens, at least to rule out that it’s a problem with Doris.

How to Reproduce?

Here is the Flink sync command I used:

./bin/flink run \
    -Dexecution.checkpointing.interval=5s \
    -Dparallelism.default=2 \
    -Dpipeline.operator-chaining=false \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    ./lib/flink-doris-connector-1.17-25.1.0-SNAPSHOT.jar \
    mongodb-sync-database \
    --job-name mongo2Doris \
    --database xlsms \
    --table-prefix db_ \
    --schema-change-mode sql_parser \
    --use-new-schema-change true\
    --mongodb-conf hosts=192.168.10.102:27017,192.168.10.103:27017,192.168.10.104:27017 \
    --mongodb-conf username=stuser \
    --mongodb-conf password=stpw \
    --mongodb-conf database=xlsms \
    --mongodb-conf scan.startup.mode=initial \
    --mongodb-conf schema.sample-percent=0.2 \
    --including-tables ".*" \
    --sink-conf fenodes=192.168.10.102:8030 \
    --sink-conf username=root \
    --sink-conf password=123456 \
    --sink-conf jdbc-url=jdbc:mysql://192.168.10.102:9030 \
    --sink-conf sink.label-prefix=sms \
    --sink-conf sink.enable-2pc=false \
    --single-sink true \
    --table-conf replication_num=1

Anything Else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant