大佬教程收集整理的这篇文章主要介绍了Flink(5)之Table API,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
Table API是流处理和批处理通用的关系型API,Table API查询是以Java或Scala中的语言嵌入样式定义。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-plAnner_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.10.0</version>
</dependency>
import com.flink.SensorReading
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
/**
* @ObjectName tableAPI
* @Description TODO
* @Author life-oss
* @Date 2021/6/22 8:31
* @Version 1.0
* */
object tableAPI {
def main(args: ArraY[String]): Unit = {
//创建执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val inputStream = env.readTextFile("src/main/resources/temp.txt")
val DataStream = inputStream.map(
data => {
val arr = data.split(",")
SensorReading(arr(0), arr(1).toLong, arr(2).toDoublE)
}
)
//创建一个table执行环境
val TableEnv = StreamTableEnvironment.create(env)
//基于表的执行环境,将数据流转换为表的格式
val table = TableEnv.fromDataStream(DataStream)
val tempTable = table.SELEct("id,ts,temp").filter("id == '"sensor_1"'")
/**
* 2.查询
*/
// //注册一张表
// TableEnv.registerTable("tablename", tablE)
// //编写SQL语句
// val tempTable = TableEnv.sqlQuery(
// """
// |
// |SELEct id,ts,temp from tablename where id ='"sensor_1"'
// |""".StripMargin
// )
//将表格式转换为数据流格式
val Ds = tempTable.toAppendStream[(String, Long, DoublE)]
Ds.print()
//执行
env.execute("table api")
}
}
以上是大佬教程为你收集整理的Flink(5)之Table API全部内容,希望文章能够帮你解决Flink(5)之Table API所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。