flume 自定义source,sink,channel,拦截器

£神魔★判官ぃ 2022-06-04 00:08 409阅读 0赞

按照以往的惯例,还是需求驱动学习,有位网友在我的flume学习五中留言提了一个问题如下:

我想实现一个功能,就在读一个文件的时候,将文件的名字和文件生成的日期作为event的header传到hdfs上时,不同的event存到不同的目录下,如一个文件是a.log.2014-07-25在hdfs上是存到/a/2014-07-25目录下,a.log.2014-07-26存到/a/2014-07-26目录下,就是每个文件对应自己的目录,这个要怎么实现。

带着这个问题,我又重新翻看了官方的文档,发现一个spooling directory source跟这个需求稍微有点吻合:它监视指定的文件夹下面有没有写入新的文件,有的话,就会把该文件内容传递给sink,然后将该文件后缀标示为.complete,表示已处理。提供了参数可以将文件名和文件全路径名添加到event的header中去。

现有的功能不能满足我们的需求,但是至少提供了一个方向:它能将文件名放入header!

当时就在祈祷源代码不要太复杂,这样我们在这个地方稍微修改修改,把文件名拆分一下,然后再放入header,这样就完成了我们想要的功能了。

于是就打开了源代码,果然不复杂,代码结构非常清晰,按照我的思路,稍微改了一下,就实现了这个功能,主要修改了与spooling directory source代码相关的三个类,分别是:ReliableSpoolingFileEventExtReader,SpoolDirectorySourceConfigurationExtConstants,SpoolDirectoryExtSource(在原类名的基础上增加了:Ext)代码如下:

首先,要根据flume ng提供的接口来实现自定义source,需要我们依赖flume ng的配置,我们引入两个配置flume-ng-core和flume-ng-configuration,具体的maven配置如下:

  1. <dependency>
  2. <groupId>org.apache.flume</groupId>
  3. <artifactId>flume-ng-core</artifactId>
  4. <version>1.6.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flume</groupId>
  8. <artifactId>flume-ng-configuration</artifactId>
  9. <version>1.6.0</version>
  10. </dependency>

[java] view plain copy

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * “License”); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing,
  13. * software distributed under the License is distributed on an
  14. * “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. * KIND, either express or implied. See the License for the
  16. * specific language governing permissions and limitations
  17. * under the License.
  18. */
  19. package com.besttone.flume;
  20. import java.io.File;
  21. import java.io.FileFilter;
  22. import java.io.FileNotFoundException;
  23. import java.io.IOException;
  24. import java.nio.charset.Charset;
  25. import java.util.Arrays;
  26. import java.util.Collections;
  27. import java.util.Comparator;
  28. import java.util.List;
  29. import java.util.regex.Matcher;
  30. import java.util.regex.Pattern;
  31. import org.apache.flume.Context;
  32. import org.apache.flume.Event;
  33. import org.apache.flume.FlumeException;
  34. import org.apache.flume.annotations.InterfaceAudience;
  35. import org.apache.flume.annotations.InterfaceStability;
  36. import org.apache.flume.client.avro.ReliableEventReader;
  37. import org.apache.flume.serialization.DecodeErrorPolicy;
  38. import org.apache.flume.serialization.DurablePositionTracker;
  39. import org.apache.flume.serialization.EventDeserializer;
  40. import org.apache.flume.serialization.EventDeserializerFactory;
  41. import org.apache.flume.serialization.PositionTracker;
  42. import org.apache.flume.serialization.ResettableFileInputStream;
  43. import org.apache.flume.serialization.ResettableInputStream;
  44. import org.apache.flume.tools.PlatformDetect;
  45. import org.joda.time.DateTime;
  46. import org.joda.time.format.DateTimeFormat;
  47. import org.joda.time.format.DateTimeFormatter;
  48. import org.slf4j.Logger;
  49. import org.slf4j.LoggerFactory;
  50. import com.google.common.base.Charsets;
  51. import com.google.common.base.Optional;
  52. import com.google.common.base.Preconditions;
  53. import com.google.common.io.Files;
  54. /**
  55. *

  56. * A {@link ReliableEventReader} which reads log data from files stored in a
  57. * spooling directory and renames each file once all of its data has been read
  58. * (through {@link EventDeserializer#readEvent()} calls). The user must
  59. * {@link #commit()} each read, to indicate that the lines have been fully
  60. * processed.
  61. *

  62. * Read calls will return no data if there are no files left to read. This
  63. * class, in general, is not thread safe.
  64. *
  65. *

  66. * This reader assumes that files with unique file names are left in the
  67. * spooling directory and not modified once they are placed there. Any user
  68. * behavior which violates these assumptions, when detected, will result in a
  69. * FlumeException being thrown.
  70. *
  71. *

  72. * This class makes the following guarantees, if above assumptions are met:
  73. *
    • *
    • Once a log file has been renamed with the {@link #completedSuffix}, all
    • * of its records have been read through the
    • * {@link EventDeserializer#readEvent()} function and {@link #commit()}ed at
    • * least once.
    • *
    • All files in the spooling directory will eventually be opened and
    • * delivered to a {@link #readEvents(int)} caller.
    • *
  74. */
  75. @InterfaceAudience.Private
  76. @InterfaceStability.Evolving
  77. public class ReliableSpoolingFileEventExtReader implements ReliableEventReader {
  78. private static final Logger logger = LoggerFactory
  79. .getLogger(ReliableSpoolingFileEventExtReader.class);
  80. static final String metaFileName = “.flumespool-main.meta”;
  81. private final File spoolDirectory;
  82. private final String completedSuffix;
  83. private final String deserializerType;
  84. private final Context deserializerContext;
  85. private final Pattern ignorePattern;
  86. private final File metaFile;
  87. private final boolean annotateFileName;
  88. private final boolean annotateBaseName;
  89. private final String fileNameHeader;
  90. private final String baseNameHeader;
  91. // 添加参数开始
  92. private final boolean annotateFileNameExtractor;
  93. private final String fileNameExtractorHeader;
  94. private final Pattern fileNameExtractorPattern;
  95. private final boolean convertToTimestamp;
  96. private final String dateTimeFormat;
  97. private final boolean splitFileName;
  98. private final String splitBy;
  99. private final String splitBaseNameHeader;
  100. // 添加参数结束
  101. private final String deletePolicy;
  102. private final Charset inputCharset;
  103. private final DecodeErrorPolicy decodeErrorPolicy;
  104. private Optional currentFile = Optional.absent();
  105. /** Always contains the last file from which lines have been read. **/
  106. private Optional lastFileRead = Optional.absent();
  107. private boolean committed = true;
  108. /**
  109. * Create a ReliableSpoolingFileEventReader to watch the given directory.
  110. */
  111. private ReliableSpoolingFileEventExtReader(File spoolDirectory,
  112. String completedSuffix, String ignorePattern,
  113. String trackerDirPath, boolean annotateFileName,
  114. String fileNameHeader, boolean annotateBaseName,
  115. String baseNameHeader, String deserializerType,
  116. Context deserializerContext, String deletePolicy,
  117. String inputCharset, DecodeErrorPolicy decodeErrorPolicy,
  118. boolean annotateFileNameExtractor, String fileNameExtractorHeader,
  119. String fileNameExtractorPattern, boolean convertToTimestamp,
  120. String dateTimeFormat, boolean splitFileName, String splitBy,
  121. String splitBaseNameHeader) throws IOException {
  122. // Sanity checks
  123. Preconditions.checkNotNull(spoolDirectory);
  124. Preconditions.checkNotNull(completedSuffix);
  125. Preconditions.checkNotNull(ignorePattern);
  126. Preconditions.checkNotNull(trackerDirPath);
  127. Preconditions.checkNotNull(deserializerType);
  128. Preconditions.checkNotNull(deserializerContext);
  129. Preconditions.checkNotNull(deletePolicy);
  130. Preconditions.checkNotNull(inputCharset);
  131. // validate delete policy
  132. if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())
  133. && !deletePolicy
  134. .equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
  135. throw new IllegalArgumentException(“Delete policies other than “
    • “NEVER and IMMEDIATE are not yet supported”);
  136. }
  137. if (logger.isDebugEnabled()) {
  138. logger.debug(“Initializing {} with directory={}, metaDir={}, “
    • “deserializer={}“, new Object[] {
  139. ReliableSpoolingFileEventExtReader.class.getSimpleName(),
  140. spoolDirectory, trackerDirPath, deserializerType });
  141. }
  142. // Verify directory exists and is readable/writable
  143. Preconditions
  144. .checkState(
  145. spoolDirectory.exists(),
  146. “Directory does not exist: “
    • spoolDirectory.getAbsolutePath());
  147. Preconditions.checkState(spoolDirectory.isDirectory(),
  148. “Path is not a directory: “ + spoolDirectory.getAbsolutePath());
  149. // Do a canary test to make sure we have access to spooling directory
  150. try {
  151. File canary = File.createTempFile(“flume-spooldir-perm-check-“,
  152. “.canary”, spoolDirectory);
  153. Files.write(“testing flume file permissions\n”, canary,
  154. Charsets.UTF_8);
  155. List lines = Files.readLines(canary, Charsets.UTF_8);
  156. Preconditions.checkState(!lines.isEmpty(), “Empty canary file %s”,
  157. canary);
  158. if (!canary.delete()) {
  159. throw new IOException(“Unable to delete canary file “ + canary);
  160. }
  161. logger.debug(“Successfully created and deleted canary file: {}“,
  162. canary);
  163. } catch (IOException e) {
  164. throw new FlumeException(“Unable to read and modify files”
    • “ in the spooling directory: “ + spoolDirectory, e);
  165. }
  166. this.spoolDirectory = spoolDirectory;
  167. this.completedSuffix = completedSuffix;
  168. this.deserializerType = deserializerType;
  169. this.deserializerContext = deserializerContext;
  170. this.annotateFileName = annotateFileName;
  171. this.fileNameHeader = fileNameHeader;
  172. this.annotateBaseName = annotateBaseName;
  173. this.baseNameHeader = baseNameHeader;
  174. this.ignorePattern = Pattern.compile(ignorePattern);
  175. this.deletePolicy = deletePolicy;
  176. this.inputCharset = Charset.forName(inputCharset);
  177. this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
  178. // 增加代码开始
  179. this.annotateFileNameExtractor = annotateFileNameExtractor;
  180. this.fileNameExtractorHeader = fileNameExtractorHeader;
  181. this.fileNameExtractorPattern = Pattern
  182. .compile(fileNameExtractorPattern);
  183. this.convertToTimestamp = convertToTimestamp;
  184. this.dateTimeFormat = dateTimeFormat;
  185. this.splitFileName = splitFileName;
  186. this.splitBy = splitBy;
  187. this.splitBaseNameHeader = splitBaseNameHeader;
  188. // 增加代码结束
  189. File trackerDirectory = new File(trackerDirPath);
  190. // if relative path, treat as relative to spool directory
  191. if (!trackerDirectory.isAbsolute()) {
  192. trackerDirectory = new File(spoolDirectory, trackerDirPath);
  193. }
  194. // ensure that meta directory exists
  195. if (!trackerDirectory.exists()) {
  196. if (!trackerDirectory.mkdir()) {
  197. throw new IOException(
  198. “Unable to mkdir nonexistent meta directory “
    • trackerDirectory);
  199. }
  200. }
  201. // ensure that the meta directory is a directory
  202. if (!trackerDirectory.isDirectory()) {
  203. throw new IOException(“Specified meta directory is not a directory”
    • trackerDirectory);
  204. }
  205. this.metaFile = new File(trackerDirectory, metaFileName);
  206. }
  207. /**
  208. * Return the filename which generated the data from the last successful
  209. * {@link #readEvents(int)} call. Returns null if called before any file
  210. * contents are read.
  211. */
  212. public String getLastFileRead() {
  213. if (!lastFileRead.isPresent()) {
  214. return null;
  215. }
  216. return lastFileRead.get().getFile().getAbsolutePath();
  217. }
  218. // public interface
  219. public Event readEvent() throws IOException {
  220. List events = readEvents(1);
  221. if (!events.isEmpty()) {
  222. return events.get(0);
  223. } else {
  224. return null;
  225. }
  226. }
  227. public List readEvents(int numEvents) throws IOException {
  228. if (!committed) {
  229. if (!currentFile.isPresent()) {
  230. throw new IllegalStateException(“File should not roll when “
    • “commit is outstanding.”);
  231. }
  232. logger.info(“Last read was never committed - resetting mark position.”);
  233. currentFile.get().getDeserializer().reset();
  234. } else {
  235. // Check if new files have arrived since last call
  236. if (!currentFile.isPresent()) {
  237. currentFile = getNextFile();
  238. }
  239. // Return empty list if no new files
  240. if (!currentFile.isPresent()) {
  241. return Collections.emptyList();
  242. }
  243. }
  244. EventDeserializer des = currentFile.get().getDeserializer();
  245. List events = des.readEvents(numEvents);
  246. /*
  247. * It’s possible that the last read took us just up to a file boundary.
  248. * If so, try to roll to the next file, if there is one.
  249. */
  250. if (events.isEmpty()) {
  251. retireCurrentFile();
  252. currentFile = getNextFile();
  253. if (!currentFile.isPresent()) {
  254. return Collections.emptyList();
  255. }
  256. events = currentFile.get().getDeserializer().readEvents(numEvents);
  257. }
  258. if (annotateFileName) {
  259. String filename = currentFile.get().getFile().getAbsolutePath();
  260. for (Event event : events) {
  261. event.getHeaders().put(fileNameHeader, filename);
  262. }
  263. }
  264. if (annotateBaseName) {
  265. String basename = currentFile.get().getFile().getName();
  266. for (Event event : events) {
  267. event.getHeaders().put(baseNameHeader, basename);
  268. }
  269. }
  270. // 增加代码开始
  271. // 按正则抽取文件名的内容
  272. if (annotateFileNameExtractor) {
  273. Matcher matcher = fileNameExtractorPattern.matcher(currentFile
  274. .get().getFile().getName());
  275. if (matcher.find()) {
  276. String value = matcher.group();
  277. if (convertToTimestamp) {
  278. DateTimeFormatter formatter = DateTimeFormat
  279. .forPattern(dateTimeFormat);
  280. DateTime dateTime = formatter.parseDateTime(value);
  281. value = Long.toString(dateTime.getMillis());
  282. }
  283. for (Event event : events) {
  284. event.getHeaders().put(fileNameExtractorHeader, value);
  285. }
  286. }
  287. }
  288. // 按分隔符拆分文件名
  289. if (splitFileName) {
  290. String[] splits = currentFile.get().getFile().getName()
  291. .split(splitBy);
  292. for (Event event : events) {
  293. for (int i = 0; i < splits.length; i++) {
  294. event.getHeaders().put(splitBaseNameHeader + i, splits[i]);
  295. }
  296. }
  297. }
  298. // 增加代码结束
  299. committed = false;
  300. lastFileRead = currentFile;
  301. return events;
  302. }
  303. @Override
  304. public void close() throws IOException {
  305. if (currentFile.isPresent()) {
  306. currentFile.get().getDeserializer().close();
  307. currentFile = Optional.absent();
  308. }
  309. }
  310. /** Commit the last lines which were read. */
  311. @Override
  312. public void commit() throws IOException {
  313. if (!committed && currentFile.isPresent()) {
  314. currentFile.get().getDeserializer().mark();
  315. committed = true;
  316. }
  317. }
  318. /**
  319. * Closes currentFile and attempt to rename it.
  320. *
  321. * If these operations fail in a way that may cause duplicate log entries,
  322. * an error is logged but no exceptions are thrown. If these operations fail
  323. * in a way that indicates potential misuse of the spooling directory, a
  324. * FlumeException will be thrown.
  325. *
  326. * @throws FlumeException
  327. * if files do not conform to spooling assumptions
  328. */
  329. private void retireCurrentFile() throws IOException {
  330. Preconditions.checkState(currentFile.isPresent());
  331. File fileToRoll = new File(currentFile.get().getFile()
  332. .getAbsolutePath());
  333. currentFile.get().getDeserializer().close();
  334. // Verify that spooling assumptions hold
  335. if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
  336. String message = “File has been modified since being read: “
    • fileToRoll;
  337. throw new IllegalStateException(message);
  338. }
  339. if (fileToRoll.length() != currentFile.get().getLength()) {
  340. String message = “File has changed size since being read: “
    • fileToRoll;
  341. throw new IllegalStateException(message);
  342. }
  343. if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
  344. rollCurrentFile(fileToRoll);
  345. } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
  346. deleteCurrentFile(fileToRoll);
  347. } else {
  348. // TODO: implement delay in the future
  349. throw new IllegalArgumentException(“Unsupported delete policy: “
    • deletePolicy);
  350. }
  351. }
  352. /**
  353. * Rename the given spooled file
  354. *
  355. * @param fileToRoll
  356. * @throws IOException
  357. */
  358. private void rollCurrentFile(File fileToRoll) throws IOException {
  359. File dest = new File(fileToRoll.getPath() + completedSuffix);
  360. logger.info(“Preparing to move file {} to {}“, fileToRoll, dest);
  361. // Before renaming, check whether destination file name exists
  362. if (dest.exists() && PlatformDetect.isWindows()) {
  363. /*
  364. * If we are here, it means the completed file already exists. In
  365. * almost every case this means the user is violating an assumption
  366. * of Flume (that log files are placed in the spooling directory
  367. * with unique names). However, there is a corner case on Windows
  368. * systems where the file was already rolled but the rename was not
  369. * atomic. If that seems likely, we let it pass with only a warning.
  370. */
  371. if (Files.equal(currentFile.get().getFile(), dest)) {
  372. logger.warn(“Completed file “ + dest
    • “ already exists, but files match, so continuing.”);
  373. boolean deleted = fileToRoll.delete();
  374. if (!deleted) {
  375. logger.error(“Unable to delete file “
    • fileToRoll.getAbsolutePath()
    • “. It will likely be ingested another time.”);
  376. }
  377. } else {
  378. String message = “File name has been re-used with different”
    • “ files. Spooling assumptions violated for “ + dest;
  379. throw new IllegalStateException(message);
  380. }
  381. // Dest file exists and not on windows
  382. } else if (dest.exists()) {
  383. String message = “File name has been re-used with different”
    • “ files. Spooling assumptions violated for “ + dest;
  384. throw new IllegalStateException(message);
  385. // Destination file does not already exist. We are good to go!
  386. } else {
  387. boolean renamed = fileToRoll.renameTo(dest);
  388. if (renamed) {
  389. logger.debug(“Successfully rolled file {} to {}“, fileToRoll,
  390. dest);
  391. // now we no longer need the meta file
  392. deleteMetaFile();
  393. } else {
  394. /*
  395. * If we are here then the file cannot be renamed for a reason
  396. * other than that the destination file exists (actually, that
  397. * remains possible w/ small probability due to TOC-TOU
  398. * conditions).
  399. */
  400. String message = “Unable to move “
    • fileToRoll
    • “ to “
    • dest
    • “. This will likely cause duplicate events. Please verify that “
    • “flume has sufficient permissions to perform these operations.”;
  401. throw new FlumeException(message);
  402. }
  403. }
  404. }
  405. /**
  406. * Delete the given spooled file
  407. *
  408. * @param fileToDelete
  409. * @throws IOException
  410. */
  411. private void deleteCurrentFile(File fileToDelete) throws IOException {
  412. logger.info(“Preparing to delete file {}“, fileToDelete);
  413. if (!fileToDelete.exists()) {
  414. logger.warn(“Unable to delete nonexistent file: {}“, fileToDelete);
  415. return;
  416. }
  417. if (!fileToDelete.delete()) {
  418. throw new IOException(“Unable to delete spool file: “
    • fileToDelete);
  419. }
  420. // now we no longer need the meta file
  421. deleteMetaFile();
  422. }
  423. /**
  424. * Find and open the oldest file in the chosen directory. If two or more
  425. * files are equally old, the file name with lower lexicographical value is
  426. * returned. If the directory is empty, this will return an absent option.
  427. */
  428. private Optional getNextFile() {
  429. /* Filter to exclude finished or hidden files */
  430. FileFilter filter = new FileFilter() {
  431. public boolean accept(File candidate) {
  432. String fileName = candidate.getName();
  433. if ((candidate.isDirectory())
  434. || (fileName.endsWith(completedSuffix))
  435. || (fileName.startsWith(“.”))
  436. || ignorePattern.matcher(fileName).matches()) {
  437. return false;
  438. }
  439. return true;
  440. }
  441. };
  442. List candidateFiles = Arrays.asList(spoolDirectory
  443. .listFiles(filter));
  444. if (candidateFiles.isEmpty()) {
  445. return Optional.absent();
  446. } else {
  447. Collections.sort(candidateFiles, new Comparator() {
  448. public int compare(File a, File b) {
  449. int timeComparison = new Long(a.lastModified())
  450. .compareTo(new Long(b.lastModified()));
  451. if (timeComparison != 0) {
  452. return timeComparison;
  453. } else {
  454. return a.getName().compareTo(b.getName());
  455. }
  456. }
  457. });
  458. File nextFile = candidateFiles.get(0);
  459. try {
  460. // roll the meta file, if needed
  461. String nextPath = nextFile.getPath();
  462. PositionTracker tracker = DurablePositionTracker.getInstance(
  463. metaFile, nextPath);
  464. if (!tracker.getTarget().equals(nextPath)) {
  465. tracker.close();
  466. deleteMetaFile();
  467. tracker = DurablePositionTracker.getInstance(metaFile,
  468. nextPath);
  469. }
  470. // sanity check
  471. Preconditions
  472. .checkState(
  473. tracker.getTarget().equals(nextPath),
  474. “Tracker target %s does not equal expected filename %s”,
  475. tracker.getTarget(), nextPath);
  476. ResettableInputStream in = new ResettableFileInputStream(
  477. nextFile, tracker,
  478. ResettableFileInputStream.DEFAULT_BUF_SIZE,
  479. inputCharset, decodeErrorPolicy);
  480. EventDeserializer deserializer = EventDeserializerFactory
  481. .getInstance(deserializerType, deserializerContext, in);
  482. return Optional.of(new FileInfo(nextFile, deserializer));
  483. } catch (FileNotFoundException e) {
  484. // File could have been deleted in the interim
  485. logger.warn(“Could not find file: “ + nextFile, e);
  486. return Optional.absent();
  487. } catch (IOException e) {
  488. logger.error(“Exception opening file: “ + nextFile, e);
  489. return Optional.absent();
  490. }
  491. }
  492. }
  493. private void deleteMetaFile() throws IOException {
  494. if (metaFile.exists() && !metaFile.delete()) {
  495. throw new IOException(“Unable to delete old meta file “ + metaFile);
  496. }
  497. }
  498. /** An immutable class with information about a file being processed. */
  499. private static class FileInfo {
  500. private final File file;
  501. private final long length;
  502. private final long lastModified;
  503. private final EventDeserializer deserializer;
  504. public FileInfo(File file, EventDeserializer deserializer) {
  505. this.file = file;
  506. this.length = file.length();
  507. this.lastModified = file.lastModified();
  508. this.deserializer = deserializer;
  509. }
  510. public long getLength() {
  511. return length;
  512. }
  513. public long getLastModified() {
  514. return lastModified;
  515. }
  516. public EventDeserializer getDeserializer() {
  517. return deserializer;
  518. }
  519. public File getFile() {
  520. return file;
  521. }
  522. }
  523. @InterfaceAudience.Private
  524. @InterfaceStability.Unstable
  525. static enum DeletePolicy {
  526. NEVER, IMMEDIATE, DELAY
  527. }
  528. /**
  529. * Special builder class for ReliableSpoolingFileEventReader
  530. */
  531. public static class Builder {
  532. private File spoolDirectory;
  533. private String completedSuffix = SpoolDirectorySourceConfigurationExtConstants.SPOOLED_FILE_SUFFIX;
  534. private String ignorePattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_IGNORE_PAT;
  535. private String trackerDirPath = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_TRACKER_DIR;
  536. private Boolean annotateFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILE_HEADER;
  537. private String fileNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_HEADER_KEY;
  538. private Boolean annotateBaseName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER;
  539. private String baseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER_KEY;
  540. private String deserializerType = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DESERIALIZER;
  541. private Context deserializerContext = new Context();
  542. private String deletePolicy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DELETE_POLICY;
  543. private String inputCharset = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_INPUT_CHARSET;
  544. private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy
  545. .valueOf(SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DECODE_ERROR_POLICY
  546. .toUpperCase());
  547. // 增加代码开始
  548. private Boolean annotateFileNameExtractor = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR;
  549. private String fileNameExtractorHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY;
  550. private String fileNameExtractorPattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_PATTERN;
  551. private Boolean convertToTimestamp = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP;
  552. private String dateTimeFormat = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT;
  553. private Boolean splitFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_FILENAME;
  554. private String splitBy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLITY_BY;
  555. private String splitBaseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_BASENAME_HEADER;
  556. public Builder annotateFileNameExtractor(
  557. Boolean annotateFileNameExtractor) {
  558. this.annotateFileNameExtractor = annotateFileNameExtractor;
  559. return this;
  560. }
  561. public Builder fileNameExtractorHeader(String fileNameExtractorHeader) {
  562. this.fileNameExtractorHeader = fileNameExtractorHeader;
  563. return this;
  564. }
  565. public Builder fileNameExtractorPattern(String fileNameExtractorPattern) {
  566. this.fileNameExtractorPattern = fileNameExtractorPattern;
  567. return this;
  568. }
  569. public Builder convertToTimestamp(Boolean convertToTimestamp) {
  570. this.convertToTimestamp = convertToTimestamp;
  571. return this;
  572. }
  573. public Builder dateTimeFormat(String dateTimeFormat) {
  574. this.dateTimeFormat = dateTimeFormat;
  575. return this;
  576. }
  577. public Builder splitFileName(Boolean splitFileName) {
  578. this.splitFileName = splitFileName;
  579. return this;
  580. }
  581. public Builder splitBy(String splitBy) {
  582. this.splitBy = splitBy;
  583. return this;
  584. }
  585. public Builder splitBaseNameHeader(String splitBaseNameHeader) {
  586. this.splitBaseNameHeader = splitBaseNameHeader;
  587. return this;
  588. }
  589. // 增加代码结束
  590. public Builder spoolDirectory(File directory) {
  591. this.spoolDirectory = directory;
  592. return this;
  593. }
  594. public Builder completedSuffix(String completedSuffix) {
  595. this.completedSuffix = completedSuffix;
  596. return this;
  597. }
  598. public Builder ignorePattern(String ignorePattern) {
  599. this.ignorePattern = ignorePattern;
  600. return this;
  601. }
  602. public Builder trackerDirPath(String trackerDirPath) {
  603. this.trackerDirPath = trackerDirPath;
  604. return this;
  605. }
  606. public Builder annotateFileName(Boolean annotateFileName) {
  607. this.annotateFileName = annotateFileName;
  608. return this;
  609. }
  610. public Builder fileNameHeader(String fileNameHeader) {
  611. this.fileNameHeader = fileNameHeader;
  612. return this;
  613. }
  614. public Builder annotateBaseName(Boolean annotateBaseName) {
  615. this.annotateBaseName = annotateBaseName;
  616. return this;
  617. }
  618. public Builder baseNameHeader(String baseNameHeader) {
  619. this.baseNameHeader = baseNameHeader;
  620. return this;
  621. }
  622. public Builder deserializerType(String deserializerType) {
  623. this.deserializerType = deserializerType;
  624. return this;
  625. }
  626. public Builder deserializerContext(Context deserializerContext) {
  627. this.deserializerContext = deserializerContext;
  628. return this;
  629. }
  630. public Builder deletePolicy(String deletePolicy) {
  631. this.deletePolicy = deletePolicy;
  632. return this;
  633. }
  634. public Builder inputCharset(String inputCharset) {
  635. this.inputCharset = inputCharset;
  636. return this;
  637. }
  638. public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
  639. this.decodeErrorPolicy = decodeErrorPolicy;
  640. return this;
  641. }
  642. public ReliableSpoolingFileEventExtReader build() throws IOException {
  643. return new ReliableSpoolingFileEventExtReader(spoolDirectory,
  644. completedSuffix, ignorePattern, trackerDirPath,
  645. annotateFileName, fileNameHeader, annotateBaseName,
  646. baseNameHeader, deserializerType, deserializerContext,
  647. deletePolicy, inputCharset, decodeErrorPolicy,
  648. annotateFileNameExtractor, fileNameExtractorHeader,
  649. fileNameExtractorPattern, convertToTimestamp,
  650. dateTimeFormat, splitFileName, splitBy, splitBaseNameHeader);
  651. }
  652. }
  653. }

[java] view plain copy

  1. /*
  2. * Licensed to the Apache Software Foundation (ASF) under one or more
  3. * contributor license agreements. See the NOTICE file distributed with this
  4. * work for additional information regarding copyright ownership. The ASF
  5. * licenses this file to you under the Apache License, Version 2.0 (the
  6. * “License”); you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
  13. * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  14. * License for the specific language governing permissions and limitations under
  15. * the License.
  16. */
  17. package com.besttone.flume;
  18. import org.apache.flume.serialization.DecodeErrorPolicy;
  19. public class SpoolDirectorySourceConfigurationExtConstants {
  20. /** Directory where files are deposited. */
  21. public static final String SPOOL_DIRECTORY = “spoolDir”;
  22. /** Suffix appended to files when they are finished being sent. */
  23. public static final String SPOOLED_FILE_SUFFIX = “fileSuffix”;
  24. public static final String DEFAULT_SPOOLED_FILE_SUFFIX = “.COMPLETED”;
  25. /** Header in which to put absolute path filename. */
  26. public static final String FILENAME_HEADER_KEY = “fileHeaderKey”;
  27. public static final String DEFAULT_FILENAME_HEADER_KEY = “file”;
  28. /** Whether to include absolute path filename in a header. */
  29. public static final String FILENAME_HEADER = “fileHeader”;
  30. public static final boolean DEFAULT_FILE_HEADER = false;
  31. /** Header in which to put the basename of file. */
  32. public static final String BASENAME_HEADER_KEY = “basenameHeaderKey”;
  33. public static final String DEFAULT_BASENAME_HEADER_KEY = “basename”;
  34. /** Whether to include the basename of a file in a header. */
  35. public static final String BASENAME_HEADER = “basenameHeader”;
  36. public static final boolean DEFAULT_BASENAME_HEADER = false;
  37. /** What size to batch with before sending to ChannelProcessor. */
  38. public static final String BATCH_SIZE = “batchSize”;
  39. public static final int DEFAULT_BATCH_SIZE = 100;
  40. /** Maximum number of lines to buffer between commits. */
  41. @Deprecated
  42. public static final String BUFFER_MAX_LINES = “bufferMaxLines”;
  43. @Deprecated
  44. public static final int DEFAULT_BUFFER_MAX_LINES = 100;
  45. /** Maximum length of line (in characters) in buffer between commits. */
  46. @Deprecated
  47. public static final String BUFFER_MAX_LINE_LENGTH = “bufferMaxLineLength”;
  48. @Deprecated
  49. public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
  50. /** Pattern of files to ignore */
  51. public static final String IGNORE_PAT = “ignorePattern”;
  52. public static final String DEFAULT_IGNORE_PAT = “^$”; // no effect
  53. /** Directory to store metadata about files being processed */
  54. public static final String TRACKER_DIR = “trackerDir”;
  55. public static final String DEFAULT_TRACKER_DIR = “.flumespool”;
  56. /** Deserializer to use to parse the file data into Flume Events */
  57. public static final String DESERIALIZER = “deserializer”;
  58. public static final String DEFAULT_DESERIALIZER = “LINE”;
  59. public static final String DELETE_POLICY = “deletePolicy”;
  60. public static final String DEFAULT_DELETE_POLICY = “never”;
  61. /** Character set used when reading the input. */
  62. public static final String INPUT_CHARSET = “inputCharset”;
  63. public static final String DEFAULT_INPUT_CHARSET = “UTF-8”;
  64. /** What to do when there is a character set decoding error. */
  65. public static final String DECODE_ERROR_POLICY = “decodeErrorPolicy”;
  66. public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL
  67. .name();
  68. public static final String MAX_BACKOFF = “maxBackoff”;
  69. public static final Integer DEFAULT_MAX_BACKOFF = 4000;
  70. // 增加代码开始
  71. public static final String FILENAME_EXTRACTOR = “fileExtractor”;
  72. public static final boolean DEFAULT_FILENAME_EXTRACTOR = false;
  73. public static final String FILENAME_EXTRACTOR_HEADER_KEY = “fileExtractorHeaderKey”;
  74. public static final String DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY = “fileExtractorHeader”;
  75. public static final String FILENAME_EXTRACTOR_PATTERN = “fileExtractorPattern”;
  76. public static final String DEFAULT_FILENAME_EXTRACTOR_PATTERN = “(.)*“;
  77. public static final String FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = “convertToTimestamp”;
  78. public static final boolean DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = false;
  79. public static final String FILENAME_EXTRACTOR_DATETIME_FORMAT = “dateTimeFormat”;
  80. public static final String DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT = “yyyy-MM-dd”;
  81. public static final String SPLIT_FILENAME = “splitFileName”;
  82. public static final boolean DEFAULT_SPLIT_FILENAME = false;
  83. public static final String SPLITY_BY = “splitBy”;
  84. public static final String DEFAULT_SPLITY_BY = “\\.”;
  85. public static final String SPLIT_BASENAME_HEADER = “splitBaseNameHeader”;
  86. public static final String DEFAULT_SPLIT_BASENAME_HEADER = “fileNameSplit”;
  87. // 增加代码结束
  88. }

[java] view plain copy

  1. package com.besttone.flume;
  2. import static com.besttone.flume.SpoolDirectorySourceConfigurationExtConstants.*;
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.util.List;
  6. import java.util.concurrent.Executors;
  7. import java.util.concurrent.ScheduledExecutorService;
  8. import java.util.concurrent.TimeUnit;
  9. import org.apache.flume.ChannelException;
  10. import org.apache.flume.Context;
  11. import org.apache.flume.Event;
  12. import org.apache.flume.EventDrivenSource;
  13. import org.apache.flume.FlumeException;
  14. import org.apache.flume.conf.Configurable;
  15. import org.apache.flume.instrumentation.SourceCounter;
  16. import org.apache.flume.serialization.DecodeErrorPolicy;
  17. import org.apache.flume.serialization.LineDeserializer;
  18. import org.apache.flume.source.AbstractSource;
  19. import org.slf4j.Logger;
  20. import org.slf4j.LoggerFactory;
  21. import com.google.common.annotations.VisibleForTesting;
  22. import com.google.common.base.Preconditions;
  23. import com.google.common.base.Throwables;
  24. public class SpoolDirectoryExtSource extends AbstractSource implements
  25. Configurable, EventDrivenSource {
  26. private static final Logger logger = LoggerFactory
  27. .getLogger(SpoolDirectoryExtSource.class);
  28. // Delay used when polling for new files
  29. private static final int POLL_DELAY_MS = 500;
  30. /* Config options */
  31. private String completedSuffix;
  32. private String spoolDirectory;
  33. private boolean fileHeader;
  34. private String fileHeaderKey;
  35. private boolean basenameHeader;
  36. private String basenameHeaderKey;
  37. private int batchSize;
  38. private String ignorePattern;
  39. private String trackerDirPath;
  40. private String deserializerType;
  41. private Context deserializerContext;
  42. private String deletePolicy;
  43. private String inputCharset;
  44. private DecodeErrorPolicy decodeErrorPolicy;
  45. private volatile boolean hasFatalError = false;
  46. private SourceCounter sourceCounter;
  47. ReliableSpoolingFileEventExtReader reader;
  48. private ScheduledExecutorService executor;
  49. private boolean backoff = true;
  50. private boolean hitChannelException = false;
  51. private int maxBackoff;
  52. // 增加代码开始
  53. private Boolean annotateFileNameExtractor;
  54. private String fileNameExtractorHeader;
  55. private String fileNameExtractorPattern;
  56. private Boolean convertToTimestamp;
  57. private String dateTimeFormat;
  58. private boolean splitFileName;
  59. private String splitBy;
  60. private String splitBaseNameHeader;
  61. // 增加代码结束
  62. @Override
  63. public synchronized void start() {
  64. logger.info(“SpoolDirectorySource source starting with directory: {}“,
  65. spoolDirectory);
  66. executor = Executors.newSingleThreadScheduledExecutor();
  67. File directory = new File(spoolDirectory);
  68. try {
  69. reader = new ReliableSpoolingFileEventExtReader.Builder()
  70. .spoolDirectory(directory).completedSuffix(completedSuffix)
  71. .ignorePattern(ignorePattern)
  72. .trackerDirPath(trackerDirPath)
  73. .annotateFileName(fileHeader).fileNameHeader(fileHeaderKey)
  74. .annotateBaseName(basenameHeader)
  75. .baseNameHeader(basenameHeaderKey)
  76. .deserializerType(deserializerType)
  77. .deserializerContext(deserializerContext)
  78. .deletePolicy(deletePolicy).inputCharset(inputCharset)
  79. .decodeErrorPolicy(decodeErrorPolicy)
  80. .annotateFileNameExtractor(annotateFileNameExtractor)
  81. .fileNameExtractorHeader(fileNameExtractorHeader)
  82. .fileNameExtractorPattern(fileNameExtractorPattern)
  83. .convertToTimestamp(convertToTimestamp)
  84. .dateTimeFormat(dateTimeFormat)
  85. .splitFileName(splitFileName).splitBy(splitBy)
  86. .splitBaseNameHeader(splitBaseNameHeader).build();
  87. } catch (IOException ioe) {
  88. throw new FlumeException(
  89. “Error instantiating spooling event parser”, ioe);
  90. }
  91. Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
  92. executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS,
  93. TimeUnit.MILLISECONDS);
  94. super.start();
  95. logger.debug(“SpoolDirectorySource source started”);
  96. sourceCounter.start();
  97. }
  98. @Override
  99. public synchronized void stop() {
  100. executor.shutdown();
  101. try {
  102. executor.awaitTermination(10L, TimeUnit.SECONDS);
  103. } catch (InterruptedException ex) {
  104. logger.info(“Interrupted while awaiting termination”, ex);
  105. }
  106. executor.shutdownNow();
  107. super.stop();
  108. sourceCounter.stop();
  109. logger.info(“SpoolDir source {} stopped. Metrics: {}“, getName(),
  110. sourceCounter);
  111. }
  112. @Override
  113. public String toString() {
  114. return “Spool Directory source “ + getName() + “: { spoolDir: “
    • spoolDirectory + “ }“;
  115. }
  116. @Override
  117. public synchronized void configure(Context context) {
  118. spoolDirectory = context.getString(SPOOL_DIRECTORY);
  119. Preconditions.checkState(spoolDirectory != null,
  120. “Configuration must specify a spooling directory”);
  121. completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
  122. DEFAULT_SPOOLED_FILE_SUFFIX);
  123. deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
  124. fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER);
  125. fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
  126. DEFAULT_FILENAME_HEADER_KEY);
  127. basenameHeader = context.getBoolean(BASENAME_HEADER,
  128. DEFAULT_BASENAME_HEADER);
  129. basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
  130. DEFAULT_BASENAME_HEADER_KEY);
  131. batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
  132. inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
  133. decodeErrorPolicy = DecodeErrorPolicy
  134. .valueOf(context.getString(DECODE_ERROR_POLICY,
  135. DEFAULT_DECODE_ERROR_POLICY).toUpperCase());
  136. ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
  137. trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
  138. deserializerType = context
  139. .getString(DESERIALIZER, DEFAULT_DESERIALIZER);
  140. deserializerContext = new Context(context.getSubProperties(DESERIALIZER
    • “.”));
  141. // “Hack” to support backwards compatibility with previous generation of
  142. // spooling directory source, which did not support deserializers
  143. Integer bufferMaxLineLength = context
  144. .getInteger(BUFFER_MAX_LINE_LENGTH);
  145. if (bufferMaxLineLength != null && deserializerType != null
  146. && deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
  147. deserializerContext.put(LineDeserializer.MAXLINE_KEY,
  148. bufferMaxLineLength.toString());
  149. }
  150. maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
  151. if (sourceCounter == null) {
  152. sourceCounter = new SourceCounter(getName());
  153. }
  154. //增加代码开始
  155. annotateFileNameExtractor=context.getBoolean(FILENAME_EXTRACTOR, DEFAULT_FILENAME_EXTRACTOR);
  156. fileNameExtractorHeader=context.getString(FILENAME_EXTRACTOR_HEADER_KEY, DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY);
  157. fileNameExtractorPattern=context.getString(FILENAME_EXTRACTOR_PATTERN,DEFAULT_FILENAME_EXTRACTOR_PATTERN);
  158. convertToTimestamp=context.getBoolean(FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP, DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP);
  159. dateTimeFormat=context.getString(FILENAME_EXTRACTOR_DATETIME_FORMAT, DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT);
  160. splitFileName=context.getBoolean(SPLIT_FILENAME, DEFAULT_SPLIT_FILENAME);
  161. splitBy=context.getString(SPLITY_BY, DEFAULT_SPLITY_BY);
  162. splitBaseNameHeader=context.getString(SPLIT_BASENAME_HEADER, DEFAULT_SPLIT_BASENAME_HEADER);
  163. //增加代码结束
  164. }
  165. @VisibleForTesting
  166. protected boolean hasFatalError() {
  167. return hasFatalError;
  168. }
  169. /**
  170. * The class always backs off, this exists only so that we can test without
  171. * taking a really long time.
  172. *
  173. * @param backoff
  174. * - whether the source should backoff if the channel is full
  175. */
  176. @VisibleForTesting
  177. protected void setBackOff(boolean backoff) {
  178. this.backoff = backoff;
  179. }
  180. @VisibleForTesting
  181. protected boolean hitChannelException() {
  182. return hitChannelException;
  183. }
  184. @VisibleForTesting
  185. protected SourceCounter getSourceCounter() {
  186. return sourceCounter;
  187. }
  188. private class SpoolDirectoryRunnable implements Runnable {
  189. private ReliableSpoolingFileEventExtReader reader;
  190. private SourceCounter sourceCounter;
  191. public SpoolDirectoryRunnable(
  192. ReliableSpoolingFileEventExtReader reader,
  193. SourceCounter sourceCounter) {
  194. this.reader = reader;
  195. this.sourceCounter = sourceCounter;
  196. }
  197. @Override
  198. public void run() {
  199. int backoffInterval = 250;
  200. try {
  201. while (!Thread.interrupted()) {
  202. List events = reader.readEvents(batchSize);
  203. if (events.isEmpty()) {
  204. break;
  205. }
  206. sourceCounter.addToEventReceivedCount(events.size());
  207. sourceCounter.incrementAppendBatchReceivedCount();
  208. try {
  209. getChannelProcessor().processEventBatch(events);
  210. reader.commit();
  211. } catch (ChannelException ex) {
  212. logger.warn(“The channel is full, and cannot write data now. The “
    • “source will try again after “
    • String.valueOf(backoffInterval)
    • “ milliseconds”);
  213. hitChannelException = true;
  214. if (backoff) {
  215. TimeUnit.MILLISECONDS.sleep(backoffInterval);
  216. backoffInterval = backoffInterval << 1;
  217. backoffInterval = backoffInterval >= maxBackoff ? maxBackoff
  218. : backoffInterval;
  219. }
  220. continue;
  221. }
  222. backoffInterval = 250;
  223. sourceCounter.addToEventAcceptedCount(events.size());
  224. sourceCounter.incrementAppendBatchAcceptedCount();
  225. }
  226. logger.info(“Spooling Directory Source runner has shutdown.”);
  227. } catch (Throwable t) {
  228. logger.error(
  229. “FATAL: “
    • SpoolDirectoryExtSource.this.toString()
    • “: “
    • “Uncaught exception in SpoolDirectorySource thread. “
    • “Restart or reconfigure Flume to continue processing.”,
  230. t);
  231. hasFatalError = true;
  232. Throwables.propagate(t);
  233. }
  234. }
  235. }
  236. }

主要提供了如下几个设置参数:

tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource #写类的全路径名
tier1.sources.source1.spoolDir=/opt/logs #监控的目录
tier1.sources.source1.splitFileName=true #是否分隔文件名,并把分割后的内容添加到header中,默认false
tier1.sources.source1.splitBy=\\. #以什么符号分隔,默认是”.”分隔
tier1.sources.source1.splitBaseNameHeader=fileNameSplit #分割后写入header的key的前缀,比如a.log.2014-07-31,按“.”分隔,

则header中有fileNameSplit0=a,fileNameSplit1=log,fileNameSplit2=2014-07-31

(其中还有扩展一个通过正则表达式抽取文件名的功能也在里面,我们这里不用到,就不介绍了)

扩展了这个source之后,前面的那个需求就很容易实现了,只需要:

tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}

a.log.2014-07-31这个文件的内容就会保存到hdfs://master68:8020/flume/events/a/2014-07-31目录下面去了。

接下来我们说说如何部署这个我们扩展的自定义的spooling directory source(基于CM的设置)。

首先,我们把上面三个类打成JAR包:SpoolDirectoryExtSource.jar
CM的flume插件目录为:/var/lib/flume-ng/plugins.d

然后再你需要使用这个source的agent上的/var/lib/flume-ng/plugins.d目录下面创建SpoolDirectoryExtSource目录以及子目录lib,libext,native,lib是放插件JAR的目录,libext是放插件的依赖JAR的目录,native放使用到的原生库(如果有用到的话)。

我们这里没有使用到其他的依赖,于是就把SpoolDirectoryExtSource.jar放到lib目录下就好了,最终的目录结构:

  1. plugins.d/
  2. plugins.d/SpoolDirectoryExtSource/
  3. plugins.d/SpoolDirectoryExtSource/lib/SpoolDirectoryExtSource.jar
  4. plugins.d/SpoolDirectoryExtSource/libext/
  5. plugins.d/SpoolDirectoryExtSource/native/

重新启动flume agent,flume就会自动装载我们的插件,这样在flume.conf中就可以使用全路径类名配置type属性了。

最终flume.conf配置如下:

[plain] view plain copy

  1. tier1.sources=source1
  2. tier1.channels=channel1
  3. tier1.sinks=sink1
  4. tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource
  5. tier1.sources.source1.spoolDir=/opt/logs
  6. tier1.sources.source1.splitFileName=true
  7. tier1.sources.source1.splitBy=\\.
  8. tier1.sources.source1.splitBaseNameHeader=fileNameSplit
  9. tier1.sources.source1.channels=channel1
  10. tier1.sinks.sink1.type=hdfs
  11. tier1.sinks.sink1.channel=channel1
  12. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}
  13. tier1.sinks.sink1.hdfs.round=true
  14. tier1.sinks.sink1.hdfs.roundValue=10
  15. tier1.sinks.sink1.hdfs.roundUnit=minute
  16. tier1.sinks.sink1.hdfs.fileType=DataStream
  17. tier1.sinks.sink1.hdfs.writeFormat=Text
  18. tier1.sinks.sink1.hdfs.rollInterval=0
  19. tier1.sinks.sink1.hdfs.rollSize=10240
  20. tier1.sinks.sink1.hdfs.rollCount=0
  21. tier1.sinks.sink1.hdfs.idleTimeout=60
  22. tier1.channels.channel1.type=memory
  23. tier1.channels.channel1.capacity=10000
  24. tier1.channels.channel1.transactionCapacity=1000
  25. tier1.channels.channel1.keep-alive=30

附上一张用logger作为sink的查看日志文件的截图:Center

问题导读:
1.如何实现flume端自定一个sink,来按照我们的规则来保存日志?
2.想从flume的配置文件中获取rootPath的值,该如何配置?

4.gif

最近需要利用flume来做收集远端日志,所以学习一些flume最基本的用法。这里仅作记录。

远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。

自定义Sink代码:

  1. public class LocalFileLogSink extends AbstractSink implements Configurable {

  2. private static final Logger logger = LoggerFactory

  3. . getLogger(LocalFileLogSink .class );

  4. private static final String PROP_KEY_ROOTPATH = “rootPath”;

  5. private String rootPath;

  6. @Override

  7. public void configure(Context context) {

  8. String rootPath = context.getString(PROP_KEY_ROOTPATH );

  9. setRootPath(rootPath);

  10. }

  11. @Override

  12. public Status process() throws EventDeliveryException {

  13. logger .debug(“Do process” );

  14. }

复制代码

实现Configurable接口,即可在初始化时,通过configure方法从context中获取配置的参数的值。这里,我们是想从flume的配置文件中获取rootPath的值,也就是日志保存的根路径。在flume-conf.properties中配置如下:

  1. agent.sinks = loggerSink

  2. agent.sinks.loggerSink.rootPath = ./logs

复制代码

loggerSink是自定义sink的名称,我们取值时的key,只需要loggerSink后面的部分即可,即这里的rootPath。

实际业务逻辑的执行,是通过继承复写AbstractSink中的process方法实现。从基类的getChannel方法中获取信道,从中取出Event处理即可。

  1. Channel ch = getChannel();

  2. Transaction txn = ch.getTransaction();

  3. txn.begin();

  4. try {

  5. logger .debug(“Get event.” );

  6. Event event = ch.take();

  7. txn.commit();

  8. status = Status. READY ;

  9. return status;

  10. }finally {

  11. Log. info( “trx close.”);

  12. txn.close();

  13. }

以下是我的自定义kafka sink插件的pom文件,编译成jar包丢到flume的lib下即可使用

[html] view plain copy

  1. <?**xml version=”1.0” encoding=”UTF-8”?>**
  2. <**project** xmlns=”http://maven.apache.org/POM/4.0.0“
  3. xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance“
  4. xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"**>**
  5. <**modelVersion**>4.0.0</**modelVersion**>
  6. <**groupId**>flume-sinks</**groupId**>
  7. <**artifactId**>cmcc-kafka-sink</**artifactId**>
  8. <**name**>Flume Kafka Sink</**name**>
  9. <**version**>1.0.0</**version**>
  10. <**build**>
  11. <**plugins**>
  12. <**plugin**>
  13. <**groupId**>org.apache.maven.plugins</**groupId**>
  14. <**artifactId**>maven-jar-plugin</**artifactId**>
  15. </**plugin**>
  16. </**plugins**>
  17. </**build**>
  18. <**dependencies**>
  19. <**dependency**>
  20. <**groupId**>org.apache.flume</**groupId**>
  21. <**artifactId**>flume-ng-sdk</**artifactId**>
  22. <**version**>1.5.2</**version**>
  23. </**dependency**>
  24. <**dependency**>
  25. <**groupId**>org.apache.flume</**groupId**>
  26. <**artifactId**>flume-ng-core</**artifactId**>
  27. <**version**>1.5.2</**version**>
  28. </**dependency**>
  29. <**dependency**>
  30. <**groupId**>org.apache.flume</**groupId**>
  31. <**artifactId**>flume-ng-configuration</**artifactId**>
  32. <**version**>1.5.2</**version**>
  33. </**dependency**>
  34. <**dependency**>
  35. <**groupId**>org.slf4j</**groupId**>
  36. <**artifactId**>slf4j-api</**artifactId**>
  37. <**version**>1.6.1</**version**>
  38. </**dependency**>
  39. <**dependency**>
  40. <**groupId**>junit</**groupId**>
  41. <**artifactId**>junit</**artifactId**>
  42. <**version**>4.10</**version**>
  43. <**scope**>test</**scope**>
  44. </**dependency**>
  45. <**dependency**>
  46. <**groupId**>org.apache.kafka</**groupId**>
  47. <**artifactId**>kafka_2.10</**artifactId**>
  48. <**version**>0.8.1.1</**version**>
  49. </**dependency**>
  50. </**dependencies**>
  51. </**project**>

这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误 https://issues.apache.org/jira/browse/FLUME-1372

定义了几个变量

[java] view plain copy

  1. public static final String BATCH_SIZE = “batchSize”;
  2. public static final int DEFAULT_BATCH_SIZE = 100;
  3. public static final String PARTITION_KEY_NAME = “cmcc.partition.key”;
  4. public static final String ENCODING_KEY_NAME = “cmcc.encoding”;
  5. public static final String DEFAULT_ENCODING = “UTF-8”;
  6. public static final String CUSTOME_TOPIC_KEY_NAME = “cmcc.topic.name”;
  7. public static final String DEFAULT_TOPIC_NAME=”CMCC”;

自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:

[java] view plain copy

  1. package org.apache.flume.cmcc.kafka;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import java.util.Properties;
  5. import kafka.javaapi.producer.Producer;
  6. import kafka.producer.KeyedMessage;
  7. import kafka.producer.ProducerConfig;
  8. import org.apache.commons.lang.StringUtils;
  9. import org.apache.flume.Channel;
  10. import org.apache.flume.Context;
  11. import org.apache.flume.Event;
  12. import org.apache.flume.Transaction;
  13. import org.apache.flume.conf.Configurable;
  14. import org.apache.flume.instrumentation.SinkCounter;
  15. import org.apache.flume.sink.AbstractSink;
  16. import org.slf4j.Logger;
  17. import org.slf4j.LoggerFactory;
  18. import com.google.common.base.Throwables;
  19. import com.google.common.collect.ImmutableMap;
  20. public class CmccKafkaSink extends AbstractSink implements Configurable {
  21. private static final Logger log = LoggerFactory
  22. .getLogger(CmccKafkaSink.class);
  23. private Properties parameters;
  24. private Producer producer;
  25. // private Context context;
  26. private int batchSize;// 一次事务的event数量,整体提交
  27. private List> messageList;
  28. private SinkCounter sinkCounter;
  29. @Override
  30. public Status process() {
  31. // TODO Auto-generated method stub
  32. Status result = Status.READY;
  33. Channel channel = getChannel();
  34. Transaction transaction = null;
  35. Event event = null;
  36. try {
  37. long processedEvent = 0;
  38. transaction = channel.getTransaction();
  39. transaction.begin();// 事务开始
  40. messageList.clear();
  41. for (; processedEvent < batchSize; processedEvent++) {
  42. event = channel.take();// 从channel取出一个事件
  43. if (event == null) {
  44. result = Status.BACKOFF;
  45. break;
  46. }
  47. sinkCounter.incrementEventDrainAttemptCount();
  48. // Map headers = event.getHeaders();
  49. String partitionKey = parameters
  50. .getProperty(Constants.PARTITION_KEY_NAME);
  51. String topic = StringUtils.defaultIfEmpty(parameters
  52. .getProperty(Constants.CUSTOME_TOPIC_KEY_NAME),
  53. Constants.DEFAULT_TOPIC_NAME);
  54. String encoding = StringUtils.defaultIfEmpty(
  55. parameters.getProperty(Constants.ENCODING_KEY_NAME),
  56. Constants.DEFAULT_ENCODING);
  57. byte[] eventBody = event.getBody();
  58. String eventData = new String(eventBody, encoding);
  59. KeyedMessage data = null;
  60. if (StringUtils.isEmpty(partitionKey)) {
  61. data = new KeyedMessage(topic, eventData);
  62. } else {
  63. data = new KeyedMessage(topic,
  64. partitionKey, eventData);
  65. }
  66. messageList.add(data);
  67. log.debug(“Add data [“ + eventData
    • “] into messageList,position:” + processedEvent);
  68. }
  69. if (processedEvent == 0) {
  70. sinkCounter.incrementBatchEmptyCount();
  71. result = Status.BACKOFF;
  72. } else {
  73. if (processedEvent < batchSize) {
  74. sinkCounter.incrementBatchUnderflowCount();
  75. } else {
  76. sinkCounter.incrementBatchCompleteCount();
  77. }
  78. sinkCounter.addToEventDrainAttemptCount(processedEvent);
  79. producer.send(messageList);
  80. log.debug(“Send MessageList to Kafka: [ message List size = “
    • messageList.size() + “,processedEvent number = “
    • processedEvent + “] “);
  81. }
  82. transaction.commit();// batchSize个事件处理完成,一次事务提交
  83. sinkCounter.addToEventDrainSuccessCount(processedEvent);
  84. result = Status.READY;
  85. } catch (Exception e) {
  86. String errorMsg = “Failed to publish events !”;
  87. log.error(errorMsg, e);
  88. e.printStackTrace();
  89. result = Status.BACKOFF;
  90. if (transaction != null) {
  91. try {
  92. transaction.rollback();
  93. log.debug(“transaction rollback success !”);
  94. } catch (Exception ex) {
  95. log.error(errorMsg, ex);
  96. throw Throwables.propagate(ex);
  97. }
  98. }
  99. // throw new EventDeliveryException(errorMsg, e);
  100. } finally {
  101. if (transaction != null) {
  102. transaction.close();
  103. }
  104. }
  105. return result;
  106. }
  107. @Override
  108. public synchronized void start() {
  109. // TODO Auto-generated method stub
  110. log.info(“Starting {}…”, this);
  111. sinkCounter.start();
  112. super.start();
  113. ProducerConfig config = new ProducerConfig(this.parameters);
  114. this.producer = new Producer(config);
  115. sinkCounter.incrementConnectionCreatedCount();
  116. }
  117. @Override
  118. public synchronized void stop() {
  119. // TODO Auto-generated method stub
  120. log.debug(“Cmcc Kafka sink {} stopping…”, getName());
  121. sinkCounter.stop();
  122. producer.close();
  123. sinkCounter.incrementConnectionClosedCount();
  124. }
  125. @Override
  126. public void configure(Context context) {
  127. // TODO Auto-generated method stub
  128. ImmutableMap props = context.getParameters();
  129. batchSize = context.getInteger(Constants.BATCH_SIZE,
  130. Constants.DEFAULT_BATCH_SIZE);
  131. messageList = new ArrayList>(batchSize);
  132. parameters = new Properties();
  133. for (String key : props.keySet()) {
  134. String value = props.get(key);
  135. this.parameters.put(key, value);
  136. }
  137. if (sinkCounter == null) {
  138. sinkCounter = new SinkCounter(getName());
  139. }
  140. }
  141. }

以上sink同时支持了flume的内部监控

这里为了提高性能,添加了batchSize的概念,也就减少了事务提交的次数

当然当通道中已经没有event了,这时候就将之前处理的event都提交了

下面看配置

[plain] view plain copy

  1. a1.sinks.k1.type=org.apache.flume.cmcc.kafka.CmccKafkaSink
  2. a1.sinks.k1.metadata.broker.list=192.168.11.174:9092
  3. a1.sinks.k1.partition.key=0
  4. a1.sinks.k1.partitioner.class=org.apache.flume.cmcc.kafka.CmccPartition
  5. a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  6. a1.sinks.k1.request.required.acks=0
  7. a1.sinks.k1.max.message.size=1000000
  8. a1.sinks.k1.cmcc.encoding=UTF-8
  9. a1.sinks.k1.cmcc.topic.name=CMCC
  10. a1.sinks.k1.producer.type=sync
  11. a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
  12. a1.sinks.k1.batchSize=100

这里我们看到,有些属性,我们在Constants中并没有定义,这是如何读取的呢,我们来看下kafka的源码就知道了:

[java] view plain copy

  1. private ProducerConfig(VerifiableProperties props)
  2. {
  3. this.props = props;
  4. super();
  5. kafka.producer.async.AsyncProducerConfig.class.$init$(this);
  6. SyncProducerConfigShared.class.$init$(this);
  7. brokerList = props.getString(“metadata.broker.list”);
  8. partitionerClass = props.getString(“partitioner.class”, “kafka.producer.DefaultPartitioner”);
  9. producerType = props.getString(“producer.type”, “sync”);
  10. String prop;
  11. compressionCodec = liftedTree1$1(prop = props.getString(“compression.codec”, NoCompressionCodec$.MODULE$.name()));
  12. Object _tmp = null;
  13. compressedTopics = Utils$.MODULE$.parseCsvList(props.getString(“compressed.topics”, null));
  14. messageSendMaxRetries = props.getInt(“message.send.max.retries”, 3);
  15. retryBackoffMs = props.getInt(“retry.backoff.ms”, 100);
  16. topicMetadataRefreshIntervalMs = props.getInt(“topic.metadata.refresh.interval.ms”, 600000);
  17. ProducerConfig$.MODULE$.validate(this);
  18. }

kafka的源码在实例化ProducerConfig的时候会读取配置文件中的kafka配置信息的。

四、自定义Channel开发

官网说待定。

下面是美团网的自定义Channel 开发,下面是链接

http://tech.meituan.com/mt-log-system-optimization.html

……

Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。

其具体的逻辑如下:

[java] view plain copy

  1. /***
  2. * putToMemChannel indicate put event to memChannel or fileChannel
  3. * takeFromMemChannel indicate take event from memChannel or fileChannel
  4. * */
  5. private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
  6. private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
  7. void doPut(Event event) {
  8. if (switchon && putToMemChannel.get()) {
  9. //往memChannel中写数据
  10. memTransaction.put(event);
  11. if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
  12. putToMemChannel.set(false);
  13. }
  14. } else {
  15. //往fileChannel中写数据
  16. fileTransaction.put(event);
  17. }
  18. }
  19. Event doTake() {
  20. Event event = null;
  21. if ( takeFromMemChannel.get() ) {
  22. //从memChannel中取数据
  23. event = memTransaction.take();
  24. if (event == null) {
  25. takeFromMemChannel.set(false);
  26. }
  27. } else {
  28. //从fileChannel中取数据
  29. event = fileTransaction.take();
  30. if (event == null) {
  31. takeFromMemChannel.set(true);
  32. putToMemChannel.set(true);
  33. }
  34. }
  35. return event;
  36. }

这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用

还是针对学习八中的那个需求,我们现在换一种实现方式,采用拦截器来实现。

先回想一下,spooldir source可以将文件名作为header中的key:basename写入到event的header当中去。试想一下,如果有一个拦截器可以拦截这个event,然后抽取header中这个key的值,将其拆分成3段,每一段都放入到header中,这样就可以实现那个需求了。

遗憾的是,flume没有提供可以拦截header的拦截器。不过有一个抽取body内容的拦截器:RegexExtractorInterceptor,看起来也很强大,以下是一个官方文档的示例:

If the Flume event body contained 1:2:3.4foobar5 and the following configuration was used

a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3

大概意思就是,通过这样的配置,event body中如果有1:2:3.4foobar5 这样的内容,这会通过正则的规则抽取具体部分的内容,然后设置到header当中去。

于是决定打这个拦截器的主义,觉得只要把代码稍微改改,从拦截body改为拦截header中的具体key,就OK了。翻开源码,哎呀,很工整,改起来没难度,以下是我新增的一个拦截器:RegexExtractorExtInterceptor:

[java] view plain copy

  1. package com.besttone.flume;
  2. import java.util.List;
  3. import java.util.Map;
  4. import java.util.regex.Matcher;
  5. import java.util.regex.Pattern;
  6. import org.apache.commons.lang.StringUtils;
  7. import org.apache.flume.Context;
  8. import org.apache.flume.Event;
  9. import org.apache.flume.interceptor.Interceptor;
  10. import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
  11. import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import com.google.common.base.Charsets;
  15. import com.google.common.base.Preconditions;
  16. import com.google.common.base.Throwables;
  17. import com.google.common.collect.Lists;
  18. /**
  19. * Interceptor that extracts matches using a specified regular expression and
  20. * appends the matches to the event headers using the specified serializers

  21. * Note that all regular expression matching occurs through Java’s built in
  22. * java.util.regex package

    . Properties:
  23. *

  24. * regex: The regex to use
  25. *

  26. * serializers: Specifies the group the serializer will be applied to, and the
  27. * name of the header that will be added. If no serializer is specified for a
  28. * group the default {@link RegexExtractorInterceptorPassThroughSerializer} will
  29. * be used
  30. *

  31. * Sample config:
  32. *

  33. * agent.sources.r1.channels = c1
  34. *

  35. * agent.sources.r1.type = SEQ
  36. *

  37. * agent.sources.r1.interceptors = i1
  38. *

  39. * agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR
  40. *

  41. * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)
  42. *

  43. * agent.sources.r1.interceptors.i1.serializers = s1 s2
  44. * agent.sources.r1.interceptors.i1.serializers.s1.type =
  45. * com.blah.SomeSerializer agent.sources.r1.interceptors.i1.serializers.s1.name
  46. * = warning agent.sources.r1.interceptors.i1.serializers.s2.type =
  47. * org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
  48. * agent.sources.r1.interceptors.i1.serializers.s2.name = error
  49. * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd
  50. *
  51. *

  52. *
  53. *
  54. * Example 1:
  55. *

  56. * EventBody: 1:2:3.4foobar5

    Configuration:
  57. * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
  58. *

  59. * agent.sources.r1.interceptors.i1.serializers = s1 s2 s3
  60. * agent.sources.r1.interceptors.i1.serializers.s1.name = one
  61. * agent.sources.r1.interceptors.i1.serializers.s2.name = two
  62. * agent.sources.r1.interceptors.i1.serializers.s3.name = three
  63. *

  64. * results in an event with the the following
  65. *
  66. * body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3
  67. *
  68. * Example 2:
  69. *
  70. * EventBody: 1:2:3.4foobar5
  71. *
  72. * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
  73. *

  74. * agent.sources.r1.interceptors.i1.serializers = s1 s2
  75. * agent.sources.r1.interceptors.i1.serializers.s1.name = one
  76. * agent.sources.r1.interceptors.i1.serializers.s2.name = two
  77. *

  78. *
  79. * results in an event with the the following
  80. *
  81. * body: 1:2:3.4foobar5 headers: one=>1, two=>2
  82. *
  83. */
  84. public class RegexExtractorExtInterceptor implements Interceptor {
  85. static final String REGEX = “regex”;
  86. static final String SERIALIZERS = “serializers”;
  87. // 增加代码开始
  88. static final String EXTRACTOR_HEADER = “extractorHeader”;
  89. static final boolean DEFAULT_EXTRACTOR_HEADER = false;
  90. static final String EXTRACTOR_HEADER_KEY = “extractorHeaderKey”;
  91. // 增加代码结束
  92. private static final Logger logger = LoggerFactory
  93. .getLogger(RegexExtractorExtInterceptor.class);
  94. private final Pattern regex;
  95. private final List serializers;
  96. // 增加代码开始
  97. private final boolean extractorHeader;
  98. private final String extractorHeaderKey;
  99. // 增加代码结束
  100. private RegexExtractorExtInterceptor(Pattern regex,
  101. List serializers, boolean extractorHeader,
  102. String extractorHeaderKey) {
  103. this.regex = regex;
  104. this.serializers = serializers;
  105. this.extractorHeader = extractorHeader;
  106. this.extractorHeaderKey = extractorHeaderKey;
  107. }
  108. @Override
  109. public void initialize() {
  110. // NO-OP…
  111. }
  112. @Override
  113. public void close() {
  114. // NO-OP…
  115. }
  116. @Override
  117. public Event intercept(Event event) {
  118. String tmpStr;
  119. if(extractorHeader)
  120. {
  121. tmpStr = event.getHeaders().get(extractorHeaderKey);
  122. }
  123. else
  124. {
  125. tmpStr=new String(event.getBody(),
  126. Charsets.UTF_8);
  127. }
  128. Matcher matcher = regex.matcher(tmpStr);
  129. Map headers = event.getHeaders();
  130. if (matcher.find()) {
  131. for (int group = 0, count = matcher.groupCount(); group < count; group++) {
  132. int groupIndex = group + 1;
  133. if (groupIndex > serializers.size()) {
  134. if (logger.isDebugEnabled()) {
  135. logger.debug(
  136. “Skipping group {} to {} due to missing serializer”,
  137. group, count);
  138. }
  139. break;
  140. }
  141. NameAndSerializer serializer = serializers.get(group);
  142. if (logger.isDebugEnabled()) {
  143. logger.debug(“Serializing {} using {}“,
  144. serializer.headerName, serializer.serializer);
  145. }
  146. headers.put(serializer.headerName, serializer.serializer
  147. .serialize(matcher.group(groupIndex)));
  148. }
  149. }
  150. return event;
  151. }
  152. @Override
  153. public List intercept(List events) {
  154. List intercepted = Lists.newArrayListWithCapacity(events.size());
  155. for (Event event : events) {
  156. Event interceptedEvent = intercept(event);
  157. if (interceptedEvent != null) {
  158. intercepted.add(interceptedEvent);
  159. }
  160. }
  161. return intercepted;
  162. }
  163. public static class Builder implements Interceptor.Builder {
  164. private Pattern regex;
  165. private List serializerList;
  166. // 增加代码开始
  167. private boolean extractorHeader;
  168. private String extractorHeaderKey;
  169. // 增加代码结束
  170. private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
  171. @Override
  172. public void configure(Context context) {
  173. String regexString = context.getString(REGEX);
  174. Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
  175. “Must supply a valid regex string”);
  176. regex = Pattern.compile(regexString);
  177. regex.pattern();
  178. regex.matcher(“”).groupCount();
  179. configureSerializers(context);
  180. // 增加代码开始
  181. extractorHeader = context.getBoolean(EXTRACTOR_HEADER,
  182. DEFAULT_EXTRACTOR_HEADER);
  183. if (extractorHeader) {
  184. extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);
  185. Preconditions.checkArgument(
  186. !StringUtils.isEmpty(extractorHeaderKey),
  187. “必须指定要抽取内容的header key”);
  188. }
  189. // 增加代码结束
  190. }
  191. private void configureSerializers(Context context) {
  192. String serializerListStr = context.getString(SERIALIZERS);
  193. Preconditions.checkArgument(
  194. !StringUtils.isEmpty(serializerListStr),
  195. “Must supply at least one name and serializer”);
  196. String[] serializerNames = serializerListStr.split(“\\s+”);
  197. Context serializerContexts = new Context(
  198. context.getSubProperties(SERIALIZERS + “.”));
  199. serializerList = Lists
  200. .newArrayListWithCapacity(serializerNames.length);
  201. for (String serializerName : serializerNames) {
  202. Context serializerContext = new Context(
  203. serializerContexts.getSubProperties(serializerName
    • “.”));
  204. String type = serializerContext.getString(“type”, “DEFAULT”);
  205. String name = serializerContext.getString(“name”);
  206. Preconditions.checkArgument(!StringUtils.isEmpty(name),
  207. “Supplied name cannot be empty.”);
  208. if (“DEFAULT”.equals(type)) {
  209. serializerList.add(new NameAndSerializer(name,
  210. defaultSerializer));
  211. } else {
  212. serializerList.add(new NameAndSerializer(name,
  213. getCustomSerializer(type, serializerContext)));
  214. }
  215. }
  216. }
  217. private RegexExtractorInterceptorSerializer getCustomSerializer(
  218. String clazzName, Context context) {
  219. try {
  220. RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class
  221. .forName(clazzName).newInstance();
  222. serializer.configure(context);
  223. return serializer;
  224. } catch (Exception e) {
  225. logger.error(“Could not instantiate event serializer.”, e);
  226. Throwables.propagate(e);
  227. }
  228. return defaultSerializer;
  229. }
  230. @Override
  231. public Interceptor build() {
  232. Preconditions.checkArgument(regex != null,
  233. “Regex pattern was misconfigured”);
  234. Preconditions.checkArgument(serializerList.size() > 0,
  235. “Must supply a valid group match id list”);
  236. return new RegexExtractorExtInterceptor(regex, serializerList,
  237. extractorHeader, extractorHeaderKey);
  238. }
  239. }
  240. static class NameAndSerializer {
  241. private final String headerName;
  242. private final RegexExtractorInterceptorSerializer serializer;
  243. public NameAndSerializer(String headerName,
  244. RegexExtractorInterceptorSerializer serializer) {
  245. this.headerName = headerName;
  246. this.serializer = serializer;
  247. }
  248. }
  249. }

简单说明一下改动的内容:

增加了两个配置参数:

extractorHeader 是否抽取的是header部分,默认为false,即和原始的拦截器功能一致,抽取的是event body的内容

extractorHeaderKey 抽取的header的指定的key的内容,当extractorHeader为true时,必须指定该参数。

按照第八讲的方法,我们将该类打成jar包,作为flume的插件放到了/var/lib/flume-ng/plugins.d/RegexExtractorExtInterceptor/lib目录下,重新启动flume,将该拦截器加载到classpath中。

最终的flume.conf如下:

[plain] view plain copy

  1. tier1.sources=source1
  2. tier1.channels=channel1
  3. tier1.sinks=sink1
  4. tier1.sources.source1.type=spooldir
  5. tier1.sources.source1.spoolDir=/opt/logs
  6. tier1.sources.source1.fileHeader=true
  7. tier1.sources.source1.basenameHeader=true
  8. tier1.sources.source1.interceptors=i1
  9. tier1.sources.source1.interceptors.i1.type=com.besttone.flume.RegexExtractorExtInterceptor$Builder
  10. tier1.sources.source1.interceptors.i1.regex=(.*)\\.(.*)\\.(.*)
  11. tier1.sources.source1.interceptors.i1.extractorHeader=true
  12. tier1.sources.source1.interceptors.i1.extractorHeaderKey=basename
  13. tier1.sources.source1.interceptors.i1.serializers=s1 s2 s3
  14. tier1.sources.source1.interceptors.i1.serializers.s1.name=one
  15. tier1.sources.source1.interceptors.i1.serializers.s2.name=two
  16. tier1.sources.source1.interceptors.i1.serializers.s3.name=three
  17. tier1.sources.source1.channels=channel1
  18. tier1.sinks.sink1.type=hdfs
  19. tier1.sinks.sink1.channel=channel1
  20. tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}
  21. tier1.sinks.sink1.hdfs.round=true
  22. tier1.sinks.sink1.hdfs.roundValue=10
  23. tier1.sinks.sink1.hdfs.roundUnit=minute
  24. tier1.sinks.sink1.hdfs.fileType=DataStream
  25. tier1.sinks.sink1.hdfs.writeFormat=Text
  26. tier1.sinks.sink1.hdfs.rollInterval=0
  27. tier1.sinks.sink1.hdfs.rollSize=10240
  28. tier1.sinks.sink1.hdfs.rollCount=0
  29. tier1.sinks.sink1.hdfs.idleTimeout=60
  30. tier1.channels.channel1.type=memory
  31. tier1.channels.channel1.capacity=10000
  32. tier1.channels.channel1.transactionCapacity=1000
  33. tier1.channels.channel1.keep-alive=30

我把source type改回了内置的spooldir,而不是上一讲自定义的source,然后添加了一个拦截器i1,type是自定义的拦截器:com.besttone.flume.RegexExtractorExtInterceptor$Builder,正则表达式按“.”分隔抽取三部分,分别放到header中的key:one,two,three当中去,即a.log.2014-07-31,通过拦截器后,在header当中就会增加三个key: one=a,two=log,three=2014-07-31。这时候我们在tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}。

就实现了和前面第八讲一模一样的需求。

也可以看到,自定义拦截器的改动成本非常小,比自定义source小多了,我们这就增加了一个类,就实现了该功能。

发表评论

表情:
评论列表 (有 0 条评论,409人围观)

还没有评论,来说两句吧...

相关阅读