一小时完成基于阿里云流计算的实时计算系统搭建

本文涉及的产品
RDS MySQL DuckDB 分析主实例,集群系列 4核8GB
简介: 目前,实时计算越来越被广泛应用,比如 实时ETL、实时报表、实时大屏展示等一些监控预警和在线系统的场景。企业对计算速度和消息更新速度要求越来越高。开源框架中,Storm,Sparks,Flink等在企业生产中大量投入使用,但是开发相对复杂,需要对接各种框架api、sdk等,另外人力成本相对较高。

目前,实时计算越来越被广泛应用,比如 实时ETL、实时报表、实时大屏展示等一些监控预警和在线系统的场景。企业对计算速度和消息更新速度要求越来越高。开源框架中,Storm,Sparks,Flink等在企业生产中大量投入使用,但是开发相对复杂,需要对接各种框架api、sdk等,另外人力成本相对较高。那么有没有一种高效的实时计算平台,只要会写SQL并且可视化的操作就可以快速完成实时计算的业务开发呢

阿里云目前推出产品 阿里云流计算(公测中,预计18年3月份商业化)(StreamCompute)传送门:https://help.aliyun.com/document_detail/62437.html?spm=a2c4g.11186623.6.544.Ed7XzG

阿里云流计算全链路示意图
image.png
可以简单快速的实现仅用SQL就完成流计算的业务链路,下面我们就使用流计算给大家示范。

架构:

以阿里云流计算为核心,从数据流向上我们可以分为数据从哪里,到哪里去。
本次实验架构为:
Logstash+DataHub+阿里云流计算+RDS-mysql
其中,
Logstash :开源框架,用于采集数据
DataHub:阿里云自主研发大型缓存队列(可以理解为类似 开源Kafka )
https://help.aliyun.com/document_detail/47439.html?spm=a2c4g.11186623.6.539.hri7Gy
RDS-mysql:阿里云关系型数据库 mysql版

搭建流程:

第一步:创建DataHub 项目和Topic

具体参考
https://help.aliyun.com/document_detail/47448.html?spm=a2c4g.11186623.6.546.eJLHOm
如图是本次实验创建的:
image.png

第二步:搭建Logstash 参考

https://help.aliyun.com/document_detail/47451.html?spm=5176.product27797.6.588.iFTE4i
配置文件如下

input {
    file {
        path => "/Users/yang/test/stream.csv"
        start_position => "beginning"
    }
}
filter{
    csv {
        columns => ['name', 'age']
    }
}
output {
    datahub {
        access_id => "LTAIu****Ouj87b"
        access_key => "MfY8ONjK6******7OEdyXw4T"
        endpoint => "https://dh-cn-hangzhou.aliyuncs.com/"
        project_name => "M_shangdantest"
        topic_name => "to_stream_topic"
        #shard_id => "0"
        #shard_keys => ["thread_id"]
        dirty_data_continue => true
        dirty_data_file => "/Users/yang/test/dirty.data"
        dirty_data_file_max_size => 1000
    }
}
第三步:创建流计算任务

1,可视化注册刚才创建的DataHub数据源,接入数据。如图
image.png
2,可视化注册数据输出的数据库RDS-mysql
image.png
3,编写从数据来源(DataHub)取数据经过计算写入数据目的源(RDS-mysql)的业务SQL。
如图
image.png
代码附上,具体业务逻辑可以自己通过SQL实现:

CREATE TABLE to_stream_topic (
    `name`             VARCHAR,
    age                BIGINT
) WITH (
    type = 'datahub',
    endPoint = 'http://dh-cn-hangzhou.aliyun-inc.com/',
    roleArn='acs:ram::xxxxx:role/aliyunstreamdefaultrole',
    project = 'shangdantest',
    topic = 'to_stream_topic'
);
INSERT INTO resoult_stream
SELECT name,age from to_stream_topic;
CREATE TABLE resoult_stream (
    `name`             VARCHAR,
    age                BIGINT
) WITH (
    type= 'rds',
    url = 'jdbc:mysql://rm-xxxxxxx.mysql.rds.aliyuncs.com:3306/lptest',
    userName = 'xxxxx',
    password = 'xxxx',
    tableName = 'resoult_stream'
);

更多操作其他数据源参考:
https://help.aliyun.com/document_detail/62509.html?spm=a2c4g.11186623.6.633.DPannE

以上操作完成后可以
1,启动logstash
2,测试流计算,调试非常方便,对线上业务没有影响,自动从DataHub中抽取数据进行测试
image.png
3,启动流计算作业,如图
image.png
4,观察数据是否成功写入RDS-Mysql

到此 流计算一个实时链路搭建完毕,有没有发现很酷炫,只需要写写SQL,加一些数据源的配置即可。整个过程一小时就可以完成,相对开源省去了繁琐的各种环境搭建,代码编写,监控等等。

  
  
  有对大数据技术感兴趣的,可以加笔者的微信 wx4085116.目前笔者已经从阿里离职,博客不代表阿里立场。笔者开了一个大数据培训班。有兴趣的加我。
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
2月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
485 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
2月前
|
存储 运维 分布式计算
零售数据湖的进化之路:滔搏从Lambda架构到阿里云Flink+Paimon统一架构的实战实践
在数字化浪潮席卷全球的今天,传统零售企业面临着前所未有的技术挑战和转型压力。本文整理自 Flink Forward Asia 2025 城市巡回上海站,滔搏技术负责人分享了滔搏从传统 Lambda 架构向阿里云实时计算 Flink 版+Paimon 统一架构转型的完整实战历程。这不仅是一次技术架构的重大升级,更是中国零售企业拥抱实时数据湖仓一体化的典型案例。
230 0
|
6月前
|
Java 调度 流计算
基于Java 17 + Spring Boot 3.2 + Flink 1.18的智慧实验室管理系统核心代码
这是一套基于Java 17、Spring Boot 3.2和Flink 1.18开发的智慧实验室管理系统核心代码。系统涵盖多协议设备接入(支持OPC UA、MQTT等12种工业协议)、实时异常检测(Flink流处理引擎实现设备状态监控)、强化学习调度(Q-Learning算法优化资源分配)、三维可视化(JavaFX与WebGL渲染实验室空间)、微服务架构(Spring Cloud构建分布式体系)及数据湖建设(Spark构建实验室数据仓库)。实际应用中,该系统显著提升了设备调度效率(响应时间从46分钟降至9秒)、设备利用率(从41%提升至89%),并大幅减少实验准备时间和维护成本。
375 0
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1536 7
阿里云实时计算Flink在多行业的应用和实践
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
1118 2
|
9月前
|
消息中间件 关系型数据库 Kafka
阿里云基于 Flink CDC 的现代数据栈云上实践
阿里云基于 Flink CDC 的现代数据栈云上实践
186 1
|
12月前
|
存储 物联网 大数据
探索阿里云 Flink 物化表:原理、优势与应用场景全解析
阿里云Flink的物化表是流批一体化平台中的关键特性,支持低延迟实时更新、灵活查询性能、无缝流批处理和高容错性。它广泛应用于电商、物联网和金融等领域,助力企业高效处理实时数据,提升业务决策能力。实践案例表明,物化表显著提高了交易欺诈损失率的控制和信贷审批效率,推动企业在数字化转型中取得竞争优势。
478 16
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
18788 54
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
370 1
|
消息中间件 存储 算法
联通实时计算平台问题之亿级标签关联实现且不依赖外部系统要如何操作
联通实时计算平台问题之亿级标签关联实现且不依赖外部系统要如何操作
130 1

热门文章

最新文章