flume之自定义sink组件

阳光穿透心脏的1/2处 2021-06-24 15:57 683阅读 0赞

flume内部提供了很多种sink,如:logger、file_roll、avro、hdfs、kafak、es等,方便直接将event数据对接到本地磁盘、或者其他第三方存储中。有的时候,我们需要自定义source,来完成特殊需求。本文介绍如何开发自定义sink,来实现将event数据存储到Mysql。

1、pom.xml

  1. <?xml version="1.0"?>
  2. <project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>com.abc</groupId>
  7. <artifactId>ttbrain-log</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. </parent>
  10. <groupId>com.abc</groupId>
  11. <artifactId>ttbrain-log-flume</artifactId>
  12. <version>0.0.1-SNAPSHOT</version>
  13. <name>ttbrain-log-flume</name>
  14. <properties>
  15. <version.flume>1.7.0</version.flume>
  16. </properties>
  17. <dependencies>
  18. <dependency>
  19. <groupId>junit</groupId>
  20. <artifactId>junit</artifactId>
  21. <scope>test</scope>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.slf4j</groupId>
  25. <artifactId>slf4j-log4j12</artifactId>
  26. </dependency>
  27. <!-- flume -->
  28. <dependency>
  29. <groupId>org.apache.flume</groupId>
  30. <artifactId>flume-ng-core</artifactId>
  31. <version>${version.flume}</version>
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.flume</groupId>
  35. <artifactId>flume-ng-configuration</artifactId>
  36. <version>${version.flume}</version>
  37. </dependency>
  38. <!-- mysql -->
  39. <dependency>
  40. <groupId>c3p0</groupId>
  41. <artifactId>c3p0</artifactId>
  42. </dependency>
  43. <dependency>
  44. <groupId>com.alibaba</groupId>
  45. <artifactId>druid</artifactId>
  46. </dependency>
  47. <dependency>
  48. <groupId>mysql</groupId>
  49. <artifactId>mysql-connector-java</artifactId>
  50. </dependency>
  51. </dependencies>
  52. <profiles>
  53. <profile>
  54. <id>dev</id>
  55. <properties>
  56. <profile.env.name>dev</profile.env.name>
  57. </properties>
  58. <activation>
  59. <activeByDefault>true</activeByDefault>
  60. </activation>
  61. </profile>
  62. <profile>
  63. <id>test</id>
  64. <properties>
  65. <profile.env.name>test</profile.env.name>
  66. </properties>
  67. </profile>
  68. <profile>
  69. <id>product</id>
  70. <properties>
  71. <profile.env.name>product</profile.env.name>
  72. </properties>
  73. </profile>
  74. </profiles>
  75. <build>
  76. <finalName>ttbrain-log-flume-MysqlSink</finalName>
  77. <filters>
  78. <filter>${basedir}/filters/filter-${profile.env.name}.properties</filter><!--这里指定filter属性文件的位置-->
  79. </filters>
  80. <resources>
  81. <resource>
  82. <directory>src/main/resources</directory>
  83. <filtering>true</filtering><!--这里开启变量替换-->
  84. <includes>
  85. <include>**/*.xml</include>
  86. <include>conf/*.properties</include>
  87. <include>**/*.properties</include>
  88. <include>**/*.json</include>
  89. </includes>
  90. </resource>
  91. </resources>
  92. <plugins>
  93. <!-- <plugin>
  94. <groupId>org.apache.maven.plugins</groupId>
  95. <artifactId>maven-jar-plugin</artifactId>
  96. <version>2.4</version>
  97. <configuration>
  98. <archive>
  99. <manifest>
  100. <addClasspath>true</addClasspath>
  101. <classpathPrefix>lib/</classpathPrefix>
  102. <mainClass>com.abc.ttbrain.log.flume.interceptor.MyInterceptor</mainClass>
  103. </manifest>
  104. <manifestEntries>
  105. <Class-Path>conf/</Class-Path>
  106. </manifestEntries>
  107. </archive>
  108. <includes>
  109. <include>**/*.class</include>
  110. </includes>
  111. </configuration>
  112. </plugin> -->
  113. <plugin>
  114. <groupId>org.apache.maven.plugins</groupId>
  115. <artifactId>maven-assembly-plugin</artifactId>
  116. <version>2.4</version>
  117. <configuration>
  118. <descriptorRefs>
  119. <descriptorRef>jar-with-dependencies</descriptorRef>
  120. </descriptorRefs>
  121. <archive>
  122. <manifest>
  123. <mainClass>com.abc.ttbrain.log.flume.sink.MysqlSink</mainClass>
  124. </manifest>
  125. </archive>
  126. </configuration>
  127. <executions>
  128. <execution>
  129. <id>make-assembly</id>
  130. <phase>package</phase>
  131. <goals>
  132. <goal>single</goal>
  133. </goals>
  134. </execution>
  135. </executions>
  136. </plugin>
  137. </plugins>
  138. </build>
  139. </project>

2、开发自定义sink,继承AbstractSink

  1. package com.abc.ttbrain.log.flume.sink;
  2. import java.sql.Connection;
  3. import java.sql.PreparedStatement;
  4. import java.sql.SQLException;
  5. import java.util.HashSet;
  6. import java.util.List;
  7. import java.util.Set;
  8. import org.apache.commons.lang.StringUtils;
  9. import org.apache.flume.Channel;
  10. import org.apache.flume.Context;
  11. import org.apache.flume.Event;
  12. import org.apache.flume.EventDeliveryException;
  13. import org.apache.flume.Transaction;
  14. import org.apache.flume.conf.Configurable;
  15. import org.apache.flume.sink.AbstractSink;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import com.google.common.collect.Lists;
  19. import com.abc.ttbrain.log.flume.sink.db.DataSourceUtils;
  20. /**
  21. * MysqlSink
  22. * @author kevinliu
  23. *
  24. */
  25. public class MysqlSink extends AbstractSink implements Configurable {
  26. private static final Logger logger = LoggerFactory.getLogger(MysqlSink.class);
  27. private String tableName = "ttengine_history";
  28. private int batchSize = 100;
  29. private Set<String> deviceKeySet = new HashSet<>();
  30. private String sql = "";
  31. public MysqlSink() {
  32. logger.info("MysqlSink start...");
  33. }
  34. @Override
  35. public void start() {
  36. super.start();
  37. deviceKeySet.add("0B466578-F542-4928-A66B-B12FCFA6DA23");
  38. deviceKeySet.add("CB13721AE4668DEA151F3F72877A2256");
  39. deviceKeySet.add("033C179D-8987-4D08-AF4F-94070F8632E0");
  40. deviceKeySet.add("0CD570CE-4B62-45AA-B86A-7828CFE4F195");
  41. deviceKeySet.add("D9157D45-C17E-4CF9-9C28-EE573F8CAD19");
  42. deviceKeySet.add("3ACE3422774FEE190304604277E389F9");
  43. deviceKeySet.add("2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35");
  44. deviceKeySet.add("A1AD2FAF-86DD-45DE-B85D-7FA23CCDE4C4");
  45. logger.info("deviceKey:"+deviceKeySet.toString());
  46. StringBuilder sb = new StringBuilder();
  47. sb.append("insert into ").append(tableName)
  48. .append(" (uid,ppuid,ch_id,f_num,cost,usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,"
  49. + "rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15"
  50. + ",p_16,p_17,p_18,p_19,p_20,p_21,p_22,p_23,p_24,p_25,p_26,p_27,p_28,p_29,p_30,p_31,p_32"
  51. + ",p_33,p_34,p_35,p_36,p_37,p_38,p_39,p_40,p_41,p_42,p_43,p_44,p_45,p_46,p_47,p_48,p_49,p_50"
  52. + ") values")
  53. .append(" (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,"
  54. + "?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?"
  55. + ",?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");
  56. sql = sb.toString();
  57. }
  58. @Override
  59. public void stop() {
  60. super.stop();
  61. //DataSourceUtils.closeDs();
  62. }
  63. @Override
  64. public Status process() throws EventDeliveryException {
  65. Status result = Status.READY;
  66. Transaction transaction = null;
  67. Event event = null;
  68. String content = "";
  69. List<String> actions = Lists.newArrayList();
  70. PreparedStatement preparedStatement = null;
  71. Connection conn = null;
  72. try {
  73. //db
  74. conn = DataSourceUtils.getConnection();
  75. preparedStatement = conn.prepareStatement(sql);
  76. //flume
  77. Channel channel = getChannel();
  78. transaction = channel.getTransaction();
  79. transaction.begin();
  80. for (int i = 0; i < batchSize; i++) {
  81. event = channel.take();
  82. if (event == null) {
  83. result = Status.BACKOFF;
  84. break;
  85. } else {
  86. content = new String(event.getBody(),"UTF-8");
  87. String[] split = content.split("\t");
  88. if (split.length < 37) {
  89. continue;
  90. }
  91. String uid = split[0];
  92. boolean contains = deviceKeySet.contains(uid);
  93. if (!contains) {
  94. continue;
  95. }
  96. //logger.info(content);
  97. actions.add(content);
  98. }
  99. }
  100. if (actions.size() > 0) {
  101. conn.setAutoCommit(false);
  102. preparedStatement.clearBatch();
  103. //logger.info(actions.get(0));
  104. for (String line : actions) {
  105. String[] split = line.split("\t");
  106. //uid,ppuid,ch_id,f_num,cost,
  107. preparedStatement.setString(1, split[0]);//uid
  108. preparedStatement.setString(2, split[1]);//ppuid
  109. preparedStatement.setString(3, split[2]);//ch_id
  110. preparedStatement.setInt(4, Integer.parseInt(split[3]));//f_num
  111. preparedStatement.setInt(5, Integer.parseInt(split[4]));//cost
  112. //usg,prior,req_id,vers,rg,rh,pg,ph,sg,sh,time_stamp,host,
  113. preparedStatement.setInt(6, Integer.parseInt(split[5]));//usg
  114. preparedStatement.setString(7, split[6]);//prior
  115. preparedStatement.setString(8, split[7]);//req_id
  116. preparedStatement.setString(9, split[8]);//vers
  117. preparedStatement.setString(10, split[9]);//rg
  118. preparedStatement.setInt(11, Integer.parseInt(split[10]));//rh
  119. preparedStatement.setString(12, split[11]);//pg
  120. preparedStatement.setInt(13, Integer.parseInt(split[12]));//ph
  121. preparedStatement.setString(14, split[13]);//sg
  122. preparedStatement.setInt(15, Integer.parseInt(split[14]));//sh
  123. String time_stamp = split[15];
  124. if ("null".equals(time_stamp) || StringUtils.isBlank(time_stamp)) {
  125. time_stamp = "0";
  126. }
  127. preparedStatement.setLong(16, Long.parseLong(time_stamp));//time_stamp
  128. preparedStatement.setString(17, split[16]);//host
  129. //rec_feed_ids,txt,vedio,gallery,p_1,p_2,p_3,p_4,p_5,p_6,p_7,p_8,p_9,p_10,p_11,p_12,p_13,p_14,p_15
  130. preparedStatement.setString(18, split[17]);//rec_feed_ids
  131. preparedStatement.setString(19, split[18]);//txt
  132. preparedStatement.setString(20, split[19]);//vedio
  133. preparedStatement.setString(21, split[20]);//gallery
  134. preparedStatement.setString(22, split[21]);//p_1
  135. preparedStatement.setString(23, split[22]);//p_1
  136. preparedStatement.setString(24, split[23]);//p_1
  137. preparedStatement.setString(25, split[24]);//p_1
  138. preparedStatement.setString(26, split[25]);//p_1
  139. preparedStatement.setString(27, split[26]);//p_1
  140. preparedStatement.setString(28, split[27]);//p_1
  141. preparedStatement.setString(29, split[28]);//p_1
  142. preparedStatement.setString(30, split[29]);//p_1
  143. preparedStatement.setString(31, split[30]);//p_1
  144. preparedStatement.setString(32, split[31]);//p_1
  145. preparedStatement.setString(33, split[32]);//p_1
  146. preparedStatement.setString(34, split[33]);//p_1
  147. preparedStatement.setString(35, split[34]);//p_1
  148. preparedStatement.setString(36, split[35]);//p_1
  149. preparedStatement.setString(37, split[36]);
  150. preparedStatement.setString(38, split[37]);
  151. preparedStatement.setString(39, split[38]);
  152. preparedStatement.setString(40, split[39]);
  153. preparedStatement.setString(41, split[40]);
  154. preparedStatement.setString(42, split[41]);
  155. preparedStatement.setString(43, split[42]);
  156. preparedStatement.setString(44, split[43]);
  157. preparedStatement.setString(45, split[44]);
  158. preparedStatement.setString(46, split[45]);
  159. preparedStatement.setString(47, split[46]);
  160. preparedStatement.setString(48, split[47]);
  161. preparedStatement.setString(49, split[48]);
  162. preparedStatement.setString(50, split[49]);
  163. preparedStatement.setString(51, split[50]);
  164. preparedStatement.setString(52, split[51]);
  165. preparedStatement.setString(53, split[52]);
  166. preparedStatement.setString(54, split[53]);
  167. preparedStatement.setString(55, split[54]);
  168. preparedStatement.setString(56, split[55]);
  169. preparedStatement.setString(57, split[56]);
  170. preparedStatement.setString(58, split[57]);
  171. preparedStatement.setString(59, split[58]);
  172. preparedStatement.setString(60, split[59]);
  173. preparedStatement.setString(61, split[60]);
  174. preparedStatement.setString(62, split[61]);
  175. preparedStatement.setString(63, split[62]);
  176. preparedStatement.setString(64, split[63]);
  177. preparedStatement.setString(65, split[64]);
  178. preparedStatement.setString(66, split[65]);
  179. preparedStatement.setString(67, split[66]);
  180. preparedStatement.setString(68, split[67]);
  181. preparedStatement.setString(69, split[68]);
  182. preparedStatement.setString(70, split[69]);
  183. preparedStatement.setString(71, split[70]);
  184. preparedStatement.addBatch();
  185. logger.info("device:{}",split[0]);
  186. }
  187. preparedStatement.executeBatch();
  188. conn.commit();
  189. }
  190. transaction.commit();
  191. } catch (Throwable e) {
  192. try {
  193. if (transaction != null) {
  194. transaction.rollback();
  195. }
  196. } catch (Exception e2) {
  197. logger.error("flume transaction rollback error.", e2);
  198. }
  199. logger.error("Failed to commit flume transaction," +"Transaction rolled back.", e);
  200. //Throwables.propagate(e);
  201. } finally {
  202. if (transaction != null) {
  203. transaction.close();
  204. }
  205. if (preparedStatement != null) {
  206. try {
  207. preparedStatement.close();
  208. } catch (SQLException e) {
  209. logger.error("statement close error.", e);
  210. }
  211. }
  212. if (conn != null) {
  213. try {
  214. conn.close();
  215. } catch (SQLException e) {
  216. logger.error("connection close error.", e);
  217. }
  218. }
  219. }
  220. return result;
  221. }
  222. @Override
  223. public void configure(Context context) {
  224. String deviceKeys = context.getString("deviceKeys");
  225. if (StringUtils.isNotBlank(deviceKeys)) {
  226. String[] split = deviceKeys.split(",");
  227. for (String deviceKey : split) {
  228. deviceKeySet.add(deviceKey);
  229. }
  230. logger.info("sink configure deivceKeys:"+deviceKeys);
  231. } else {
  232. logger.info("sink configure deivceKeys is empty...");
  233. }
  234. }
  235. }

说明:

1)configure方法中,可以从flume的配置文件中读取对应的配置信息。当配置文件修改后,flume框架会自动重新加载,这是就会调用configure方法

2)start和stop方法当flume启动和关闭时进行执行;

3、打包:

使用 maven package 将带把打包成ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar

4、部署:

1)配置flume配置文件:

  1. agent1.sources = ngrinder
  2. agent1.channels = mc2
  3. agent1.sinks = mysql
  4. #source
  5. agent1.sources.ngrinder.type = exec
  6. agent1.sources.ngrinder.command = tail -F /data/logs/ttbrain/ttbrain-recommend-api.log
  7. agent1.sources.ngrinder.channels = mc2
  8. #filter
  9. agent1.sources.ngrinder.interceptors=filt1 filt2 filt3 filt4
  10. agent1.sources.ngrinder.interceptors.filt1.type=regex_filter
  11. agent1.sources.ngrinder.interceptors.filt1.regex=.*recId.*
  12. agent1.sources.ngrinder.interceptors.filt2.type=host
  13. agent1.sources.ngrinder.interceptors.filt2.hostHeader=hostname
  14. agent1.sources.ngrinder.interceptors.filt2.useIP=true
  15. agent1.sources.ngrinder.interceptors.filt3.type=timestamp
  16. agent1.sources.ngrinder.interceptors.filt4.type=com.abc.ttbrain.log.flume.interceptor.MyInterceptor$Builder
  17. #channel2
  18. agent1.channels.mc2.type = file
  19. agent1.channels.mc2.checkpointDir = /data/flume/ckdir/mc2_ck
  20. agent1.channels.mc2.dataDirs = /data/flume/datadir/mc2_data
  21. #sink2
  22. agent1.sinks.mysql.type = com.abc.ttbrain.log.flume.sink.MysqlSink
  23. agent1.sinks.mysql.deviceKeys = 2DCFE0C8-2DD6-4FB7-A2E6-1A210F7C7C07,3F4DA241-B827-4FF8-BB3F-624CFDEDA58D,89C574A2-1E44-468F-9AB4-96737D2FF7F2,f614ca8a42bd121a8bb971d89a078a08267b4df2,A78DB69352E74B744EDD15DE2B91BE40,2C4B1E38-2629-4AF9-BA0E-2DA17C5F3A35,B9D0A82EE8A4BB9B20CEEBB0977A9CFC,3BD40EB0-8171-4481-8B6E-002DB8C6924D,81F37B3C-5E29-40BC-A030-424E137268C2,E59909BC-D62E-4393-9D06-41BF03F81DA9
  24. agent1.sinks.mysql.channel = mc2

说明:

A、agent1.sinks.mysql.type 指定自定义sink类全路径;

B、agent1.sinks.mysql.deviceKeys 是自定义配置信息,可以在自定义sink中的configure方法中获取该信息。

2)将ttbrain-log-flume-MysqlSink-jar-with-dependencies.jar放到flume_home 的lib目录下;

3)启动flume:

  1. nohup flume-ng agent -c /usr/local/apache-flume-1.7.0-bin/conf -f /usr/local/apache-flume-1.7.0-bin/conf/engine-api-log.conf -n agent1 >/dev/null 2>&1 &

发表评论

表情:
评论列表 (有 0 条评论,683人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flume-定义Sink

    自定义Sink Sink不断地轮询Channel中的事件切批量地移除他们,并将这些事件批量写入到存储或索引系统、或被发送到另一个Flume Agent; Sink是完全