java Flink(三十二)Flink的异步IO访问外部数据 谁践踏了优雅 2022-08-30 05:25 147阅读 0赞 ### ** Flink的异步官方的介绍:** ### 当我们使用 Flink 在与外部系统进行交互时(例如:使用存储在数据库中的数据来丰富流事件),这时便需要注意 `Flink系统`与`外部系统`的通信延迟了。 我们使用 MapFunction() 的方式与外部数据库交互,使用的 同步交互 的方式。`即:将请求发送到数据库,并MapFunction等待直到收到响应。在许多情况下,这种等待占据了功能的绝大部分时间。` 与数据库的异步交互,意味着单个并行函数实例可以同时处理许多请求并同时接收响应。这样,等待时间便可以与发送其他请求和接收响应重叠。至少,等待时间将被分摊到多个请求上。在大多数情况下,会使系统`有更高的吞吐量。` ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70][] ### ** 我们这里用异步的http请求(异步的前提是外部数据支持异步操作)来记录这个内容** ### ![20210719153101928.png][] DataLocation POJO类: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class HttpAsyncMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建pro配置 Properties pro = new Properties(); //添加集群配置信息 pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx:9092,xx:9092,xx:9092"); pro.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("locations", new SimpleStringSchema(), pro); DataStreamSource<String> kafkaStream = env.addSource(kafkaConsumer); //开启异步处理数据 异步任务超过10个就会阻塞,0代表不设置超时时间 DataStream<DataLocation> resultStream = AsyncDataStream.unorderedWait(kafkaStream, new AsyncDatabaseRequest(), 0, TimeUnit.MICROSECONDS, 10); resultStream.print("result:"); env.execute(); } //异步请求第三方API public static class AsyncDatabaseRequest extends RichAsyncFunction<String, DataLocation> { private transient CloseableHttpAsyncClient httpAsyncClient = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //初始化异步HttpClient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) //设置socket超时时间 .setConnectTimeout(3000) //设置连接超时时间 .build(); HttpAsyncClients.custom() .setMaxConnTotal(20) //设置最大连接数 .setDefaultRequestConfig(requestConfig).build(); } public void asyncInvoke(String input, final ResultFuture<DataLocation> resultFuture) throws Exception { String[] fields = input.split(","); String id = fields[0]; String name = fields[1]; String date = fields[2]; String lng = fields[3]; String lat = fields[4]; //API 地址 String url = ""; //执行查询 final HttpGet request1 = new HttpGet(url); final Future<HttpResponse> future = httpAsyncClient.execute(request1, null); CompletableFuture.supplyAsync(new Supplier<String>() { public String get() { try { HttpResponse response = future.get(); String province = null; if (response.getStatusLine().getStatusCode() == 200) { //获取请求的Json字符串 String result = EntityUtils.toString(response.getEntity()); //将Json字符串转成Json对象 JSONObject jsonObject = JSON.parseObject(result); //获取位置信息 JSONObject regeocode = jsonObject.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省份信息 String province1 = address.getString("province"); } } return province; } catch (Exception e) { return null; } } }).thenAccept((String dbResult) -> { resultFuture.complete(Collections.singleton(DataLocation.of(id, name, date, dbResult))); }); } } } 异步请求主程序: import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Collections; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class HttpAsyncMain { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //创建pro配置 Properties pro = new Properties(); //添加集群配置信息 pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xx:9092,xx:9092,xx:9092"); pro.put(ConsumerConfig.GROUP_ID_CONFIG, "group"); pro.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("locations", new SimpleStringSchema(), pro); DataStreamSource<String> kafkaStream = env.addSource(kafkaConsumer); //开启异步处理数据 异步任务超过10个就会阻塞,0代表不设置超时时间 DataStream<DataLocation> resultStream = AsyncDataStream.unorderedWait(kafkaStream, new AsyncDatabaseRequest(), 0, TimeUnit.MICROSECONDS, 10); resultStream.print("result:"); env.execute(); } //异步请求第三方API public static class AsyncDatabaseRequest extends RichAsyncFunction<String, DataLocation> { private transient CloseableHttpAsyncClient httpAsyncClient = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //初始化异步HttpClient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) //设置socket超时时间 .setConnectTimeout(3000) //设置连接超时时间 .build(); HttpAsyncClients.custom() .setMaxConnTotal(20) //设置最大连接数 .setDefaultRequestConfig(requestConfig).build(); } public void asyncInvoke(String input, final ResultFuture<DataLocation> resultFuture) throws Exception { String[] fields = input.split(","); String id = fields[0]; String name = fields[1]; String date = fields[2]; String lng = fields[3]; String lat = fields[4]; //API 地址 String url = ""; //执行查询 final HttpGet request1 = new HttpGet(url); final Future<HttpResponse> future = httpAsyncClient.execute(request1, null); CompletableFuture.supplyAsync(new Supplier<String>() { public String get() { try { HttpResponse response = future.get(); String province = null; if (response.getStatusLine().getStatusCode() == 200) { //获取请求的Json字符串 String result = EntityUtils.toString(response.getEntity()); //将Json字符串转成Json对象 JSONObject jsonObject = JSON.parseObject(result); //获取位置信息 JSONObject regeocode = jsonObject.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省份信息 String province1 = address.getString("province"); } } return province; } catch (Exception e) { return null; } } }).thenAccept((String dbResult) -> { resultFuture.complete(Collections.singleton(DataLocation.of(id, name, date, dbResult))); }); } } } [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQwNzcxNTY3_size_16_color_FFFFFF_t_70]: /images/20220829/23199c669bf94e81bf26a809c94bbe34.png [20210719153101928.png]: /images/20220829/a8fbd99d071a48009f267763374b166f.png
还没有评论,来说两句吧...