在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

如何快速掌握Fink SQL

111次阅读
没有评论

共计 4216 个字符,预计需要花费 11 分钟才能阅读完成。

这篇文章主要讲解了“如何快速掌握 Fink SQL”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着丸趣 TV 小编的思路慢慢深入,一起来研究和学习“如何快速掌握 Fink SQL”吧!

1、导入所需要的的依赖包

dependency   groupId org.apache.flink /groupId   artifactId flink-table-planner_2.12 /artifactId   version 1.10.1 /version   /dependency   dependency   groupId org.apache.flink /groupId   artifactId flink-table-api-scala-bridge_2.12 /artifactId   version 1.10.1 /version   /dependency   dependency   groupId org.apache.flink /groupId   artifactId flink-csv /artifactId   version 1.10.1 /version   /dependency

flink-table-planner:planner 计划器,是 table API 最主要的部分,提供了运行时环境和生成程序执行计划的  planner; flink-table-api-scala-bridge:bridge 桥接器,主要负责 table API 和  DataStream/DataSet API 的连接支持,按照语言分 java 和 scala。

这里的两个依赖,是 IDE 环境下运行需要添加的; 如果是生产环境,lib 目录下默认已经有了 planner,就只需要有 bridge 就可以了。

当然,如果想使用用户自定义函数,或是跟 kafka 做连接,需要有一个 SQL client,这个包含在 flink-table-common   里。

2、两种 planner(old blink)的区别

鸿蒙官方战略合作共建——HarmonyOS 技术社区

批流统一:Blink 将批处理作业,视为流式处理的特殊情况。所以,blink 不支持表和 DataSet 之间的转换,批处理作业将不转换为 DataSet   应用程序,而是跟流处理一样,转换为 DataStream 程序来处理。

因 为 批 流 统 一,Blink planner 也 不 支 持 BatchTableSource,而 使 用 有 界 的

Blink planner 只支持全新的目录,不支持已弃用的 ExternalCatalog。

旧 planner 和 Blink planner 的 FilterableTableSource 实现不兼容。旧的 planner   会把 PlannerExpressions 下推到 filterableTableSource 中,而 blink planner 则会把 Expressions   下推。

基于字符串的键值配置选项仅适用于 Blink planner。

PlannerConfig 在两个 planner 中的实现不同。

Blink planner 会将多个 sink 优化在一个 DAG 中(仅在 TableEnvironment 上受支持,而在  StreamTableEnvironment 上不受支持)。而旧 planner 的优化总是将每一个 sink 放在一个新的 DAG 中,其中所有 DAG   彼此独立。

旧的 planner 不支持目录统计,而 Blink planner 支持。

3、表 (Table) 的概念

TableEnvironment 可以注册目录 Catalog,并可以基于 Catalog 注册表。它会维护一个 Catalog-Table 表之间的  map。表 (Table) 是由一个标识符来指定的,由 3 部分组成:Catalog   名、数据库 (database) 名和对象名(表名)。如果没有指定目录或数据库,就使用当前的默认值。

4、连接到文件系统(Csv 格式)

连接外部系统在 Catalog 中注册表,直接调用 tableEnv.connect()就可以,里面参数要传入一个  ConnectorDescriptor,也就是 connector 描述器。对于文件系统的 connector 而言,flink 内部已经提供了,就叫做  FileSystem()。

5、测试案例 (新)

需求:将一个 txt 文本文件作为输入流读取数据过滤 id 不等于 sensor_1 的数据实现思路:  首先我们先构建一个 table 的 env 环境通过 connect 提供的方法来读取数据然后设置表结构将数据注册为一张表就可进行我们的数据过滤了(使用 sql 或者流处理方式进行解析)

准备数据

sensor_1,1547718199,35.8 sensor_6,1547718201,15.4 sensor_7,1547718202,6.7 sensor_10,1547718205,38.1 sensor_1,1547718206,32 sensor_1,1547718208,36.2 sensor_1,1547718210,29.7 sensor_1,1547718213,30.9

代码实现

import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.{DataTypes} import org.apache.flink.table.api.scala._ import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema} /** * @Package * @author  大数据老哥  * @date 2020/12/12 21:22 * @version V1.0 *  第一个 Flinksql 测试案例  */ object FlinkSqlTable { def main(args: Array[String]): Unit = { //  构建运行流处理的运行环境  val env = StreamExecutionEnvironment.getExecutionEnvironment //  构建 table 环境  val tableEnv = StreamTableEnvironment.create(env) // 通过  connect  读取数据  tableEnv.connect(new FileSystem().path(D:\\d12\\Flink\\FlinkSql\\src\\main\\resources\\sensor.txt)) .withFormat(new Csv()) // 设置类型  .withSchema(new Schema() //  给数据添加元数信息  .field(id , DataTypes.STRING()) .field(time , DataTypes.BIGINT()) .field(temperature , DataTypes.DOUBLE()) ).createTemporaryTable(inputTable) //  创建一个临时表  val resTable = tableEnv.from(inputTable) .select(*).filter(id ===  sensor_1) //  使用 sql 的方式查询数据  var resSql = tableEnv.sqlQuery(select * from inputTable where id= sensor_1) //  将数据转为流进行输出  resTable.toAppendStream[(String, Long, Double)].print(resTable) resSql.toAppendStream[(String, Long, Double)].print(resSql) env.execute(FlinkSqlWrodCount) } }

6、TableEnvironment 的作用

注册 catalog

在内部 catalog 中注册表

执行 SQL 查询

注册用户自定义函数

注册用户自定义函数

保存对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用

在创建 TableEnv 的时候,可以多传入一个 EnvironmentSettings 或者 TableConfig 参数,可以用来配置  TableEnvironment 的一些特性。

7、老版本创建流处理批处理

7.1 老版本流处理

val settings = EnvironmentSettings.newInstance() .useOldPlanner() //  使用老版本  planner .inStreamingMode() //  流处理模式  .build() val tableEnv = StreamTableEnvironment.create(env, settings)

7.2 老版本批处理

val batchEnv = ExecutionEnvironment.getExecutionEnvironment val batchTableEnv = BatchTableEnvironment.create(batchEnv)

7.3 blink 版本的流处理环境

val bsSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

7.4 blink 版本的批处理环境

val bbSettings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode().build() val bbTableEnv = TableEnvironment.create(bbSettings)

感谢各位的阅读,以上就是“如何快速掌握 Fink SQL”的内容了,经过本文的学习后,相信大家对如何快速掌握 Fink SQL 这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是丸趣 TV,丸趣 TV 小编将为大家推送更多相关知识点的文章,欢迎关注!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-07-27发表,共计4216字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 最新精品国偷自产在线美女足 | 亚洲精品第一第二区 | 男女后式激烈动态图片 | 日韩在线欧美在线 | 亚洲熟女综合一区二区三区 | 国产亚洲欧美日韩在线观看不卡 | 性情中人中文网 | 欧美日韩免费一区二区三区 | 宝贝腿开大点我添添公视频免费 | 国产美女视频 | 91香蕉视频官网 | 激情做人爱免费视频 | 国产精品亚洲精品一区二区三区 | 成人午夜在线 | 亚洲va在线va天堂va偷拍 | 午夜爱爱影院 | 在线看国产视频 | 欧美成本人视频免费播放 | 国产成人va亚洲电影 | 亚洲国产一区二区精品无码 | 少数民族美乳国产在线 | 国产亚洲福利一区二区免费看 | 国产图片一区 | 国产中文字幕在线 | 久久久久亚洲精品中文字幕 | 四虎永久在线精品视频播放 | 千人斩欧美图区 | 亚洲成av人片一区二区密柚 | www一级毛片| 久久国产色av | 男人阁久久 | 国产午夜精品一区二区三区软件 | 男女肉粗暴进来动态图 | 18禁无遮挡羞羞污污污污网站 | 亚洲天堂久久精品 | 亚洲成人免费在线视频 | 97精品国产一区二区三区 | 成人无码www免费视频 | 黄色的视频在线免费观看 | 内射无码专区久久亚洲 | 亚洲色偷拍另类无码专区 |