Flink 消费 Kafka 数据批量写入 MySQL 多个表 你的名字 2021-09-24 02:38 784阅读 0赞 > 业务场景: > sdk 数据统一入Kafka 的一个 topic(topic\_sdk\_log),其中包含多种事件类型数据,如:登录,注册,激活等,需要将 Kafka 中数据根据事件类型分别写入 MySQL 多个表。这里使用 Flink 每5秒写入 MySQL 不同表。 数据示例: {"key":"login","data":{"_game_version":"","_package_id":"12280200","_core_account":"ysdk_oGFjT0XEjdefIOgd7uApwWUX2ccY","_time":"2020-03-23 15:35:11","_time_server":"2020-03-23 15:35:59","_idfa":"","_imei":"","_mac":null,"_imsi":null,"_device_code":"46e0fd235df0b7c167c0cbe82be02a3c"}} {"key":"activate","data":{"_game_version":"","_package_id":850030,"_time":"2020-03-23 15:25:56","_time_server":"2020-03-23 15:27:42","_idfa":"","_imei":"866092032174377","_mac":"02:00:00:00:00:00","_imsi":"460001960611910","_device_code":"fc9b4de27db42c94433a02c59cd5e2ff"}} ### **KafkaToMySQL 主类** ### import com.alibaba.fastjson.JSON import com.sm.conf.ConfigurationManager import com.sm.constants.Constants import com.sm.sink.MySQLSink import com.sm.entry.SdkData import com.sm.util.KafkaUtils import com.sm.window.SdkWindow import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.TimeCharacteristic /** * Flink 读取 Kafka,每分钟聚合一次数据,批量写入 MySQL * * create by LiuJinHe 2020/3/25 */ object KafkaToMySQL { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 失败重启 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(10, 3L)) // env.setParallelism(1) // checkpoint 周期 env.enableCheckpointing(10000) val config = env.getCheckpointConfig // exactly_one 模式 config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // checkpoint 之间最小间隔 config.setMinPauseBetweenCheckpoints(1000) // checkpoint 超时时间,超时将被丢弃 config.setCheckpointTimeout(10000) // 同一时间只允许进行一次 checkpoint config.setMaxConcurrentCheckpoints(1) val kafkaProp = ConfigurationManager.load(Constants.KAFKA_PROPERTIES) val kafkaConsumer = KafkaUtils.getConsumer(kafkaProp).setStartFromGroupOffsets() val dataStream = env.addSource(kafkaConsumer) dataStream.map(line => { val data = JSON.parseObject(line, classOf[SdkData]) data }) .filter(line => line.key == "activate" || line.key == "register" || line.key == "start_app" || line.key == "login" ) .keyBy(data => data.key) .timeWindowAll(Time.seconds(3)) .apply(new SdkWindow) .addSink(new MySink) env.execute("stream job") } } **ConfigurationManager 配置管理** import java.util.Properties /** * 配置文件的管理器,用于加载主要的配置信息,获取对应的配置项 * * create by LiuJinHe 2020/3/25 */ object ConfigurationManager { private val properties = new Properties def load(name: String): Properties = { if (name != null && name.nonEmpty) { properties.load(ConfigurationManager.getClass.getClassLoader.getResourceAsStream(name)) } properties } } **PropManager 配置加载** 配置文件 application.properties 放在 resources 下 import com.typesafe.config.{Config, ConfigFactory} /** * create by LiuJinHe 2020/3/25 */ object PropManager { final val config: Config = ConfigFactory.load def getProp(key: String): String = config.getString(key).trim } 如: application.properties driver = com.mysql.jdbc.Driver url = jdbc:mysql://localhost:3306/test?useSSL=false username = root password = 863863 database = test activate = tb_sdk_active_log register = tb_sdk_user_reg start_app = tb_sdk_start_log login = tb_sdk_user_login **KafkaUtils 自定Source** import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer /** * 自定义Source, Kafka Consumer * * create by LiuJinHe 2020/1/6 */ object KafkaUtils { def getConsumer(prop:Properties): FlinkKafkaConsumer[String] = { prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") prop.setProperty("auto.offset.reset", "latest") val topic = prop.getProperty("regional.topic") new FlinkKafkaConsumer[String](topic, new SimpleStringSchema(), prop) } } ** SdkData 输入类** /** * 输入类 * @param key 事件类型 * @param data 事件内容 */ case class SdkData(key: String, data: String) **SdkData window function** import com.sm.entry.SdkData import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.util.Collector /** * 自定义 window function * * create by LiuJinHe 2020/3/25 */ class SdkWindow extends AllWindowFunction[SdkData, Iterable[SdkData], TimeWindow] { override def apply(window: TimeWindow, input: Iterable[SdkData], out: Collector[Iterable[SdkData]]): Unit = { if (input.nonEmpty) { println("5 秒内数据条数:" + input.size) out.collect(input) } } } **MySQLSink 自定义 Sink** import java.sql.{Connection, PreparedStatement} import com.sm.entry.SdkData import com.sm.parse.ParseUtils import com.sm.util.DBUtils import org.apache.commons.dbcp2.BasicDataSource import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction} /** * 自定义 MySQL Sink,批量写入数据到 MySQL * * create by LiuJinHe 2020/3/25 */ class MySQLSink extends RichSinkFunction[Iterable[SdkData]] { var conn: Connection = _ var ps: PreparedStatement = _ var dataSource: BasicDataSource = _ /** * 建立连接,每分钟调用一次 */ override def open(parameters: Configuration): Unit = { super.open(parameters) dataSource = new BasicDataSource conn = DBUtils.getConnection(dataSource) conn.setAutoCommit(false) println("open: 已连接mysql") } /** * 每次写入调用一次 */ override def invoke(value: Iterable[SdkData], context: SinkFunction.Context[_]): Unit = { val dataList = value.toList ps = conn.prepareStatement("") dataList.foreach(sdkData => { val category = sdkData.key val sql = ParseUtils.getSqlStr(category) sql.append(setPreparedStatement(sdkData.data, category)) ps.addBatch(sql.toString) }) val count = ps.executeBatch() conn.commit() println("成功写入数据条数: " + count.length) } /** * 关闭连接和释放资源 */ override def close(): Unit = { super.close() if (conn != null) { conn.close() } if (ps != null) { ps.close() } } /** * preparedStatement 赋值 */ def setPreparedStatement(data: String, category: String): StringBuffer = { val valueStr = new StringBuffer() val json = JSON.parseObject(data) val obj = ParseUtils.matchCategory(json: JSONObject, category: String) val fields: Array[Field] = obj.getClass.getDeclaredFields for (i <- fields.indices) { val field = fields(i) val colType = field.getType field.setAccessible(true) val data = field.get(obj) if(colType.getName.contains("String")){ valueStr.append(s"'$data'").append(",") }else{ valueStr.append(data).append(",") } } valueStr.replace(valueStr.length - 1, valueStr.length(), ");") } } **DBUtils 数据库连接池** 这里配置了两种( Druid 和 dbcp2 ) import java.sql.Connection import com.alibaba.druid.pool.DruidDataSource import com.sm.conf.PropManager import org.apache.commons.dbcp2.BasicDataSource /** * * 数据库连接池设置,获取线程 * * create by LiuJinHe 2020/3/25 */ object DBUtils { /** * Druid */ def getConnection(dataSource: DruidDataSource): Connection = { dataSource.setDriverClassName(PropManager.getProp("driver")) dataSource.setUrl(PropManager.getProp("url")) dataSource.setUsername(PropManager.getProp("username")) dataSource.setPassword(PropManager.getProp("password")) //设置初始化连接数,最大连接数,最小闲置数 dataSource.setInitialSize(10) dataSource.setMaxActive(50) dataSource.setMinIdle(5) //返回连接 val conn = dataSource.getConnection println("创建 Druid 连接池:" + conn) conn } /** * dbcp2 */ def getConnection(dataSource: BasicDataSource): Connection ={ dataSource.setDriverClassName(PropManager.getProp("driver")) dataSource.setUrl(PropManager.getProp("url")) dataSource.setUsername(PropManager.getProp("username")) dataSource.setPassword(PropManager.getProp("password")) dataSource.setInitialSize(10) dataSource.setMaxTotal(50) dataSource.setMaxIdle(5) val conn = dataSource.getConnection println("创建 dbcp2 连接池:" + conn) conn } } **ParseUtils 解析事件** import com.alibaba.fastjson.JSONObject import com.sm.conf.PropManager import com.sm.entry._ /** * 解析事件 * * create by LiuJinHe 2020/3/30 */ object ParseUtils { def getSqlStr(category: String): StringBuffer = { // 构建 insert into table(field1,field2) values( val sqlStr = new StringBuffer() val database = PropManager.getProp("database") val table = PropManager.getProp(category) sqlStr.append(s"insert ignore into $database.$table(") val sdk = category match { case "activate" => classOf[Active].getDeclaredFields // 设备激活 case "register" => classOf[Reg].getDeclaredFields // 用户注册 case "login" => classOf[UserLogin].getDeclaredFields // 用户登录 case "start_app" => classOf[Start].getDeclaredFields // 设备启动 } for (i <- 0 until sdk.length) { val field = sdk.toList(i).getName sqlStr.append(s"$field,") } sqlStr.replace(sqlStr.length - 1, sqlStr.length(), ") values(") } /** * 匹配事件 */ def matchCategory(json: JSONObject, category: String): AnyRef = category match { case "activate" => ParseCategory.activeParse(json, category) case "register" => ParseCategory.registerParse(json, category) case "login" => ParseCategory.loginParse(json, category) case "start_app" => ParseCategory.startParse(json, category) } } **ParseCategory 事件解析类** import com.alibaba.fastjson.JSONObject import com.mysql.jdbc.StringUtils import com.sm.entry.{Active, Crash, Payment, Reg, RoleLogin, Start, UserLogin} import com.sm.util.{DateUtils, MD5Utils} /** * 解析事件,并进行清洗 * * create by LiuJinHe 2020/3/30 */ object ParseCategory { /** * 公告字段 */ private var package_id: String = _ // 游戏包id private var core_account: String = _ // 用户id(核心账号) private var time: String = _ // 行为发生时间 private var time_server: String = _ // 上报到服务器时间 private var device_code: String = _ // 设备码 /** * 登录事件 */ def loginParse(json: JSONObject, category: String): AnyRef = { package_id = json.getString("_package_id").trim core_account = if(json.containsKey("_core_account")) json.getString("_core_account").trim else "" time = json.getString("_time").trim time_server = json.getString("_time_server").trim device_code = if(json.containsKey("_device_code")) json.getString("_device_code").trim else "" UserLogin( if (!StringUtils.isNullOrEmpty(package_id) && StringUtils.isStrictlyNumeric(package_id)) package_id.toInt else 0, if (!StringUtils.isNullOrEmpty(core_account)) core_account else "", if (StringUtils.isStrictlyNumeric(time)) DateUtils.timestampToTime(time.toLong) else if (time != null) time else "", if (!StringUtils.isNullOrEmpty(device_code)) device_code.toLowerCase else MD5Utils.MD5Encode(json.getString("_mac") + json.getString("_imei") + json.getString("_imsi")).toLowerCase, ) } } **UserLogin 事件** /** * SDK 用户登录日志 * * create by LiuJinHe 2020/3/25 */ case class UserLogin( package_id: Int, // 游戏包id core_account: String, // 用户id(核心账号) time: String, // 行为发生时间 time_server: String, // 上报到服务器时间 device_code: String, // 设备码 ) **KafkaProducer 生产数据** import java.util.Properties import org.apache.kafka.clients.producer._ /** * Kafka Producer 将本地数据发送到 Kafka * * create by LiuJinHe 2020/3/25 */ object KafkaProducer { def main(args: Array[String]): Unit = { loadToKafka("topic_sdk_log") } def loadToKafka(topic: String): Unit = { val prop = new Properties() // prop.put("bootstrap.servers", "cdh-master:9094,cdh-slave01:9094,cdh-slave02:9094") prop.put("bootstrap.servers", "hadoop-master:9092") prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer") val producer = new KafkaProducer[String, String](prop) val bufferSource = io.Source.fromFile("E:\\workspace\\real-time-kafka-to-mysql\\src\\main\\resources\\avtive.txt") for (line <- bufferSource.getLines) { val record = new ProducerRecord[String, String](topic, line) // Thread.sleep(1000) producer.send(record) } producer.close() } } **结果:** ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpbmdlaW8_size_16_color_FFFFFF_t_70][] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2xpbmdlaW8_size_16_color_FFFFFF_t_70]: /images/20210923/9475e7d1d895448386149530f3314192.png
还没有评论,来说两句吧...