java Flink(四十)FlinkSql 简单Demo以及具体使用 ゞ 浴缸里的玫瑰 2022-09-02 00:50 339阅读 0赞 之前FlinkSql用的比较少,今天开始简单介绍一下。 首先导入依赖: <!-- 引入flink table相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.10.1</version> </dependency> 然后我们的数据源是读取文件: ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70][] POJO类: package beans; //传感器温度读数的数据类型 public class SensorReading { //属性 id,时间戳,温度值 private String id; private Long timestamp; private Double temperature; public SensorReading() { } public SensorReading(String id, Long timestamp, Double temperature) { this.id = id; this.timestamp = timestamp; this.temperature = temperature; } public String getId() { return id; } public void setId(String id) { this.id = id; } public Long getTimestamp() { return timestamp; } public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } public Double getTemperature() { return temperature; } public void setTemperature(Double temperature) { this.temperature = temperature; } @Override public String toString() { return "SensorReading{" + "id='" + id + '\'' + ", timestamp=" + timestamp + ", temperature=" + temperature + '}'; } } 具体使用DEMO: package table; import beans.SensorReading; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class Table1 { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<String> inputStream = env.readTextFile("D:\\idle\\FlinkTest\\src\\main\\resources\\sensor.txt"); DataStream<SensorReading> dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); //创建表的执行环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //创建表 Table dataTable = tableEnv.fromDataStream(dataStream); //调用table API进行转换操作 Table resultTable = dataTable.select("id, temperature") .where("id = 'sensor_1'"); //执行sql //dataTable注册进环境 tableEnv.createTemporaryView("sensor",dataTable); String sql = "select id, temperature from sensor where id='sensor_1'"; Table resultSqlTable = tableEnv.sqlQuery(sql); //打印结果 tableEnv.toAppendStream(resultTable, Row.class).print("result"); tableEnv.toAppendStream(resultSqlTable, Row.class).print("sql"); env.execute(); } } 查看结果 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 1][] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70]: /images/20220829/9f055b8b200548fc9c2da285bbb9de0b.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70 1]: /images/20220829/30606cf5c0a94accb942622a9d951ae2.png
还没有评论,来说两句吧...