今夏最火综艺《青春有你2》里面刘雨昕从默默无闻一直冲到第二名,让众人感叹,只有在唱、跳、舞蹈、Rap等多方面能力俱佳,才是真偶像担当。在看了5月份初发布的CarbonData 2.0 RC2之后,笔者不禁感觉CarbonData在新版本中所体现的全方面能力,和刘雨昕一样具有”C位出道”的巨大潜力。CarbonData作为目前为数不多由中国公司贡献的Apache顶级项目,从16年正式”出道”以来,一直努力践行着实践和探索的开源精神。笔者在看到CarbonData的成长、ASF中华人力量的崛起之后,也相信国内的开源力量可以继续”不忘初心、砥砺前行”,借用青春有你2里面的一句歌词:“OpenSource Made in China,出发,多远都可以到达!” 随着数据量的增大,如果基于传统的数据存储解决方案,即使用不同的数据库产品来分别处理不同业务类型,势必会带来成本和运维的巨大压力。以互联网行业用户行为数据为例,随着5G+AI的到来,数据量正呈现快速增长趋势,目前对于一个日活1000万的APP应用来说,平均每天约产生500亿条用户行为数据,一年的数据存储量约10PB。PB级别数据可以满足不同类型业务的需求,如明细查询类业务:用户行为查询、订单查询、交易查询;聚合分析类业务:A/B Test、聚合分析;ELT类业务:热销榜、PV/UV等。下表中,我们分别给出了明细查询、聚合分析、ETL业务的典型查询、可选数据存储方案和方案痛点。 具体来说,(1) 成本高:以HBase为例,单台RegionServer可维护不超过10TB的数据,面对10PB的数据存储时,需要100台计算节点部署RegionServer,每台计算节点500元/月(4U16G),计算成本共计50万/月,每PB存储的云硬盘成本为70万/月,总成本=120万/月;(2) 计算慢:Spark on Parquet可以将数据存储在对象存储中,成本大大降低,每PB存储的对象存储成本为8万/月,上层的100台计算节点假设每天开机8小时,计算成本15万/月,总成本约23万/月,成本可以降低5倍。但是由于无索引,只能通过暴力扫描的方式进行查询和计算,在暴力计算时系统往往受限于对象存储带宽,假设对象存储带宽为20GB/s,查询需要14个小时。 由上可见,Nosql数据库虽然具有较好的数据索引机制,但是“太贵”,传统的Hadoop生态数据仓库将数据放在对象存储上,但是“太慢”,这两者各自的局限性,使得我们进行EB级别数据仓库选型时,面临着这一个鱼与熊掌不可兼得的选择题。 可以像NoSQL数据库一样,构建高效索引,又可以和Spark/Hive一样,享受高度可扩展性的数据并行处理,并能利用近乎无限的低成本的对象存储资源,满足这种“又方便又快又便宜”的任性就是CarbonData的使命。 接下来的内容,我们通过演示介绍CarbonData 2.0 RC2中的数据湖构建方法。。 过去的几个月,Apache CarbonData 2.0一直在努力实现一种低成本全场景数据湖,力求做到”一份数据到处使用”的愿景。下图给出了一个基于CarbonData数据湖系统架构图,数据首先由Kafka完成数据收集,并由Flink完成数据清洗、预处理 。其次,Flink将数据直接写入对象存储。最后,CarbonData对对象存储的数据构建索引,并供上层多种计算引擎计算和查询,用户可使用Spark或者Presto对CarbonData数据进行明细查询和聚合分析类业务,使用Spark或者Hive对CarbonData数据进行聚合分析和ETL类业务。 从数据存储的角度看,数据主要包含三要素:元数据、数据、索引,这三要素同样也是构建数据湖的关键。下面,我们主要演示基于CarbonData的数据湖如何解决这几个问题: 如何构建元数据? 如何写入数据? 如何建索引? 下面,我们首先演示如何一键式搭建Kafka+Flink+CarbonData的开发环境,其次介绍CarbonData数据湖中构建元数据、写入数据、建索引的方法。 接下来,我们演示如果使用Flink消费Kafka中的数据,并将数据写入CarbonData数据湖。 Spark中创建CarbonData表可以快速构建元数据,元数据存储路径为${table location}/Metadata/schema。 这里,TBLPROPERTIES(‘sort_scope’=‘GLOBAL_SORT’,‘sort_columns’=‘id, age’)代表着数据将按照(id, age)有序存储,当在处理id详单查询或者id、age联合查询时,可以通过二分查找的方式进行数据的快速定位和筛选,相比无排序时,数据查询的时间复杂度从O(N)下降为O(logN)。 下面给出了基于Flink消费Kafka的数据,并将CarbonData数据落盘的代码示例。下载代码 当Flink成功消费Kafka数据之后,在{$tablePath}/stage_data/,可以看到成功落盘的CarbonData数据。 在Flink目录中中放置了示例JAR文件,可以直接启动作业尝试Flink入库CarbonData表数据,作业启动示例如下所示: 就此,我们完成了Kafka到Flink到CarbonData的端到端数据湖的简单构建。 本文主要介绍了如何快速构建Kafka到Flink到CarbonData的端到端数据湖DataPipe。后续,我们将继续介绍:(1)如何在数据湖中使用Spark、Hive、Presto访问同一份数据;(2)如何将Mysql等关系型数据库数据同步到CarbonData数据湖中;(3)如何在TensorFlow等AI计算引擎中使用CarbonData。敬请关注。 欢迎大家添加微信ID:xu601450868,加入CarbonData技术交流群。
从CarbonData社区发布的版本信息看,CarbonData 2.0提供了一种新的融合数据存储方案,以一份数据同时支持多种应用场景:明细查询、交互分析、数据实时同步和更新、ETL、AI、时序聚合、空间检索等,实现EB级别数据规模,查询性能秒级响应,并通过计算存储分离优化,大大降低了数据湖成本。本文笔者将着重介绍如何快速构建一个明细查询、交互分析、ETL的数据湖,后续将会大家带来CarbonData在数据实时同步和更新、AI、时序时空聚合等场景中的应用,敬请关注。前言
类型
典型查询
可选数据库
痛点
明细查询
select * from fact where userid IS ‘1323’
HBase、ES等
成本高、运维难
聚合分析
select * from fact GROUPBY abtestid
ClickHouse、Oracle等
不支持横向扩展
ETL
select * from facttable JOIN dimensiontable
Spark on Parquet、Hive on ORC等
无索引,速度慢
一、CarbonData数据湖架构
二、一键式搭建Kafka+Flink+CarbonData演示环境
curl -k -O https://carbondata-publish.obs.myhuaweicloud.com/quick_start_kafka_flink_carbondata.sh
source quick_start_kafka_flink_carbondata.sh
创建Topic kafka_2.12-2.5.0/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test 写入数据 kafka_2.12-2.5.0/bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test >c001,23,china,2016-02-23 09:01:30,china sz,computer design >c003,23,japan,2016-02-23 08:01:30,japan to,sport speech >c002,23,india,2016-02-23 07:01:30,india mm,draw write
三、如何构建元数据?
CREATE TABLE person(id STRING, age INT, country STRING, timestamp timestamp, address STRING, skill STRING) STORED AS carbondata TBLPROPERTIES('sort_scope'='GLOBAL_SORT','sort_columns'='id, age');
四、如何写入数据?
object FlinkToCarbonDemo { def main(args: Array[String]): Unit = { // 1. kafka地址和Topic名称 val parameter = ParameterTool.fromArgs(args) val kafkaTopic = parameter.get("TOPIC") val kafkaBootstrapServices = parameter.get("KAFKASERVICES") val kafkaProperties = new Properties(); kafkaProperties.put("bootstrap.servers", kafkaBootstrapServices) kafkaProperties.put("auto.commit.interval.ms", "3000") // 2. 设置表明database名称、表名、表存储位置、临时文件本地存储目录 val tableName = parameter.get("TABLENAME") val tablePath = parameter.get("TABLEPATH") val tempPath = parameter.get("TEMPPATH") // writerProperties代表写操作鉴权信息,当写数据到S3时,需要在writerProperties中配置桶名、AKSK val writerProperties = newWriterProperties(tempPath) // carbonProperties代表CarbonDataSDK属性信息,支持配置sdk内存、数据自定义格式等 val carbonProperties = newCarbonProperties() // Flink定时将数据上传到CarbonData表空间目录是,时间周期阈值为Flink Checkpoint时间 val environment = StreamExecutionEnvironment.getExecutionEnvironment environment.enableCheckpointing(180000) // 3. 配置source stream. 这里主要是基于自定义DeserializeSchema完成Kafka数据解析。 // Flink中集成的CarbonDataSDK默认输入数据为字符数组,因此需要将Record解析为Array[AnyRef]或者Array[String] val stream: DataStream[Array[AnyRef]] = environment.addSource(new FlinkKafkaConsumer011[Array[AnyRef]]( kafkaTopic, new DeserializeSchema(), kafkaProperties )) // 4. 配置stream sink,这里首先构建carbondatasdk,可选类型为Local和S3. val factory = CarbonWriterFactory.builder("Local").build( "default", tableName, tablePath, new Properties, writerProperties, carbonProperties) val streamSink = StreamingFileSink.forBulkFormat( new Path(ProxyFileSystem.DEFAULT_URI), factory ).build() stream.addSink(streamSink) // Execute the environment environment.execute() streamSink.close() } private def newWriterProperties(dataTempPath: String) = { val properties = new Properties properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath) properties } private def newCarbonProperties() = { val properties = new Properties properties } } // 基于自定义DeserializeSchema完成Kafka数据解析示例。例如Kafka中Record为"c001,23,china,2016-02-23 09:01:30,china sz,computer design"时,解析形式为['c001','23','china']的字符数组 class DeserializeSchema extends KafkaDeserializationSchema[Array[AnyRef]] { override def isEndOfStream(t: Array[AnyRef]): Boolean = { false } override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): Array[AnyRef] = { new String(consumerRecord.value()).split(",").map(_.trim).asInstanceOf[Array[AnyRef]] } override def getProducedType: TypeInformation[Array[AnyRef]] = TypeInformation.of(new TypeHint[Array[AnyRef]] {}) }
bin/flink run -d -p 1 -c org.apache.carbon.flink.FlinkToCarbonDemo examples/flinktocarbondemo-1.0.jar --TOPIC test --KAFKASERVICES localhost:9092 --TABLENAME person --TABLEPATH "/user/hive/warehouse/person" --TEMPPATH "/tmp"
五、如何构建索引?
INSERT INTO person STAGE;
SELECT * FROM person;
结语
本网页所有视频内容由 imoviebox边看边下-网页视频下载, iurlBox网页地址收藏管理器 下载并得到。
ImovieBox网页视频下载器 下载地址: ImovieBox网页视频下载器-最新版本下载
本文章由: imapbox邮箱云存储,邮箱网盘,ImageBox 图片批量下载器,网页图片批量下载专家,网页图片批量下载器,获取到文章图片,imoviebox网页视频批量下载器,下载视频内容,为您提供.
阅读和此文章类似的: 全球云计算