flume 自定义source,sink,channel,拦截器
按照以往的惯例,还是需求驱动学习,有位网友在我的flume学习五中留言提了一个问题如下:
我想实现一个功能,就在读一个文件的时候,将文件的名字和文件生成的日期作为event的header传到hdfs上时,不同的event存到不同的目录下,如一个文件是a.log.2014-07-25在hdfs上是存到/a/2014-07-25目录下,a.log.2014-07-26存到/a/2014-07-26目录下,就是每个文件对应自己的目录,这个要怎么实现。
带着这个问题,我又重新翻看了官方的文档,发现一个spooling directory source跟这个需求稍微有点吻合:它监视指定的文件夹下面有没有写入新的文件,有的话,就会把该文件内容传递给sink,然后将该文件后缀标示为.complete,表示已处理。提供了参数可以将文件名和文件全路径名添加到event的header中去。
现有的功能不能满足我们的需求,但是至少提供了一个方向:它能将文件名放入header!
当时就在祈祷源代码不要太复杂,这样我们在这个地方稍微修改修改,把文件名拆分一下,然后再放入header,这样就完成了我们想要的功能了。
于是就打开了源代码,果然不复杂,代码结构非常清晰,按照我的思路,稍微改了一下,就实现了这个功能,主要修改了与spooling directory source代码相关的三个类,分别是:ReliableSpoolingFileEventExtReader,SpoolDirectorySourceConfigurationExtConstants,SpoolDirectoryExtSource(在原类名的基础上增加了:Ext)代码如下:
首先,要根据flume ng提供的接口来实现自定义source,需要我们依赖flume ng的配置,我们引入两个配置flume-ng-core和flume-ng-configuration,具体的maven配置如下:
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-configuration</artifactId>
<version>1.6.0</version>
</dependency>
[java] view plain copy
- /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * “License”); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
- package com.besttone.flume;
- import java.io.File;
- import java.io.FileFilter;
- import java.io.FileNotFoundException;
- import java.io.IOException;
- import java.nio.charset.Charset;
- import java.util.Arrays;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.List;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.FlumeException;
- import org.apache.flume.annotations.InterfaceAudience;
- import org.apache.flume.annotations.InterfaceStability;
- import org.apache.flume.client.avro.ReliableEventReader;
- import org.apache.flume.serialization.DecodeErrorPolicy;
- import org.apache.flume.serialization.DurablePositionTracker;
- import org.apache.flume.serialization.EventDeserializer;
- import org.apache.flume.serialization.EventDeserializerFactory;
- import org.apache.flume.serialization.PositionTracker;
- import org.apache.flume.serialization.ResettableFileInputStream;
- import org.apache.flume.serialization.ResettableInputStream;
- import org.apache.flume.tools.PlatformDetect;
- import org.joda.time.DateTime;
- import org.joda.time.format.DateTimeFormat;
- import org.joda.time.format.DateTimeFormatter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.base.Charsets;
- import com.google.common.base.Optional;
- import com.google.common.base.Preconditions;
- import com.google.common.io.Files;
- /**
- *
- * A {@link ReliableEventReader} which reads log data from files stored in a
- * spooling directory and renames each file once all of its data has been read
- * (through {@link EventDeserializer#readEvent()} calls). The user must
- * {@link #commit()} each read, to indicate that the lines have been fully
- * processed.
- *
- * Read calls will return no data if there are no files left to read. This
- * class, in general, is not thread safe.
- *
- *
- * This reader assumes that files with unique file names are left in the
- * spooling directory and not modified once they are placed there. Any user
- * behavior which violates these assumptions, when detected, will result in a
- * FlumeException being thrown.
- *
- *
- * This class makes the following guarantees, if above assumptions are met:
- *
- *
- Once a log file has been renamed with the {@link #completedSuffix}, all
- * of its records have been read through the
- * {@link EventDeserializer#readEvent()} function and {@link #commit()}ed at
- * least once.
- *
- All files in the spooling directory will eventually be opened and
- * delivered to a {@link #readEvents(int)} caller.
- *
- */
- @InterfaceAudience.Private
- @InterfaceStability.Evolving
- public class ReliableSpoolingFileEventExtReader implements ReliableEventReader {
- private static final Logger logger = LoggerFactory
- .getLogger(ReliableSpoolingFileEventExtReader.class);
- static final String metaFileName = “.flumespool-main.meta”;
- private final File spoolDirectory;
- private final String completedSuffix;
- private final String deserializerType;
- private final Context deserializerContext;
- private final Pattern ignorePattern;
- private final File metaFile;
- private final boolean annotateFileName;
- private final boolean annotateBaseName;
- private final String fileNameHeader;
- private final String baseNameHeader;
- // 添加参数开始
- private final boolean annotateFileNameExtractor;
- private final String fileNameExtractorHeader;
- private final Pattern fileNameExtractorPattern;
- private final boolean convertToTimestamp;
- private final String dateTimeFormat;
- private final boolean splitFileName;
- private final String splitBy;
- private final String splitBaseNameHeader;
- // 添加参数结束
- private final String deletePolicy;
- private final Charset inputCharset;
- private final DecodeErrorPolicy decodeErrorPolicy;
- private Optional
currentFile = Optional.absent(); - /** Always contains the last file from which lines have been read. **/
- private Optional
lastFileRead = Optional.absent(); - private boolean committed = true;
- /**
- * Create a ReliableSpoolingFileEventReader to watch the given directory.
- */
- private ReliableSpoolingFileEventExtReader(File spoolDirectory,
- String completedSuffix, String ignorePattern,
- String trackerDirPath, boolean annotateFileName,
- String fileNameHeader, boolean annotateBaseName,
- String baseNameHeader, String deserializerType,
- Context deserializerContext, String deletePolicy,
- String inputCharset, DecodeErrorPolicy decodeErrorPolicy,
- boolean annotateFileNameExtractor, String fileNameExtractorHeader,
- String fileNameExtractorPattern, boolean convertToTimestamp,
- String dateTimeFormat, boolean splitFileName, String splitBy,
- String splitBaseNameHeader) throws IOException {
- // Sanity checks
- Preconditions.checkNotNull(spoolDirectory);
- Preconditions.checkNotNull(completedSuffix);
- Preconditions.checkNotNull(ignorePattern);
- Preconditions.checkNotNull(trackerDirPath);
- Preconditions.checkNotNull(deserializerType);
- Preconditions.checkNotNull(deserializerContext);
- Preconditions.checkNotNull(deletePolicy);
- Preconditions.checkNotNull(inputCharset);
- // validate delete policy
- if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())
- && !deletePolicy
- .equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
- throw new IllegalArgumentException(“Delete policies other than “
- “NEVER and IMMEDIATE are not yet supported”);
- }
- if (logger.isDebugEnabled()) {
- logger.debug(“Initializing {} with directory={}, metaDir={}, “
- “deserializer={}“, new Object[] {
- ReliableSpoolingFileEventExtReader.class.getSimpleName(),
- spoolDirectory, trackerDirPath, deserializerType });
- }
- // Verify directory exists and is readable/writable
- Preconditions
- .checkState(
- spoolDirectory.exists(),
- “Directory does not exist: “
- spoolDirectory.getAbsolutePath());
- Preconditions.checkState(spoolDirectory.isDirectory(),
- “Path is not a directory: “ + spoolDirectory.getAbsolutePath());
- // Do a canary test to make sure we have access to spooling directory
- try {
- File canary = File.createTempFile(“flume-spooldir-perm-check-“,
- “.canary”, spoolDirectory);
- Files.write(“testing flume file permissions\n”, canary,
- Charsets.UTF_8);
- List
lines = Files.readLines(canary, Charsets.UTF_8); - Preconditions.checkState(!lines.isEmpty(), “Empty canary file %s”,
- canary);
- if (!canary.delete()) {
- throw new IOException(“Unable to delete canary file “ + canary);
- }
- logger.debug(“Successfully created and deleted canary file: {}“,
- canary);
- } catch (IOException e) {
- throw new FlumeException(“Unable to read and modify files”
- “ in the spooling directory: “ + spoolDirectory, e);
- }
- this.spoolDirectory = spoolDirectory;
- this.completedSuffix = completedSuffix;
- this.deserializerType = deserializerType;
- this.deserializerContext = deserializerContext;
- this.annotateFileName = annotateFileName;
- this.fileNameHeader = fileNameHeader;
- this.annotateBaseName = annotateBaseName;
- this.baseNameHeader = baseNameHeader;
- this.ignorePattern = Pattern.compile(ignorePattern);
- this.deletePolicy = deletePolicy;
- this.inputCharset = Charset.forName(inputCharset);
- this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy);
- // 增加代码开始
- this.annotateFileNameExtractor = annotateFileNameExtractor;
- this.fileNameExtractorHeader = fileNameExtractorHeader;
- this.fileNameExtractorPattern = Pattern
- .compile(fileNameExtractorPattern);
- this.convertToTimestamp = convertToTimestamp;
- this.dateTimeFormat = dateTimeFormat;
- this.splitFileName = splitFileName;
- this.splitBy = splitBy;
- this.splitBaseNameHeader = splitBaseNameHeader;
- // 增加代码结束
- File trackerDirectory = new File(trackerDirPath);
- // if relative path, treat as relative to spool directory
- if (!trackerDirectory.isAbsolute()) {
- trackerDirectory = new File(spoolDirectory, trackerDirPath);
- }
- // ensure that meta directory exists
- if (!trackerDirectory.exists()) {
- if (!trackerDirectory.mkdir()) {
- throw new IOException(
- “Unable to mkdir nonexistent meta directory “
- trackerDirectory);
- }
- }
- // ensure that the meta directory is a directory
- if (!trackerDirectory.isDirectory()) {
- throw new IOException(“Specified meta directory is not a directory”
- trackerDirectory);
- }
- this.metaFile = new File(trackerDirectory, metaFileName);
- }
- /**
- * Return the filename which generated the data from the last successful
- * {@link #readEvents(int)} call. Returns null if called before any file
- * contents are read.
- */
- public String getLastFileRead() {
- if (!lastFileRead.isPresent()) {
- return null;
- }
- return lastFileRead.get().getFile().getAbsolutePath();
- }
- // public interface
- public Event readEvent() throws IOException {
- List
events = readEvents(1); - if (!events.isEmpty()) {
- return events.get(0);
- } else {
- return null;
- }
- }
- public List
readEvents(int numEvents) throws IOException { - if (!committed) {
- if (!currentFile.isPresent()) {
- throw new IllegalStateException(“File should not roll when “
- “commit is outstanding.”);
- }
- logger.info(“Last read was never committed - resetting mark position.”);
- currentFile.get().getDeserializer().reset();
- } else {
- // Check if new files have arrived since last call
- if (!currentFile.isPresent()) {
- currentFile = getNextFile();
- }
- // Return empty list if no new files
- if (!currentFile.isPresent()) {
- return Collections.emptyList();
- }
- }
- EventDeserializer des = currentFile.get().getDeserializer();
- List
events = des.readEvents(numEvents); - /*
- * It’s possible that the last read took us just up to a file boundary.
- * If so, try to roll to the next file, if there is one.
- */
- if (events.isEmpty()) {
- retireCurrentFile();
- currentFile = getNextFile();
- if (!currentFile.isPresent()) {
- return Collections.emptyList();
- }
- events = currentFile.get().getDeserializer().readEvents(numEvents);
- }
- if (annotateFileName) {
- String filename = currentFile.get().getFile().getAbsolutePath();
- for (Event event : events) {
- event.getHeaders().put(fileNameHeader, filename);
- }
- }
- if (annotateBaseName) {
- String basename = currentFile.get().getFile().getName();
- for (Event event : events) {
- event.getHeaders().put(baseNameHeader, basename);
- }
- }
- // 增加代码开始
- // 按正则抽取文件名的内容
- if (annotateFileNameExtractor) {
- Matcher matcher = fileNameExtractorPattern.matcher(currentFile
- .get().getFile().getName());
- if (matcher.find()) {
- String value = matcher.group();
- if (convertToTimestamp) {
- DateTimeFormatter formatter = DateTimeFormat
- .forPattern(dateTimeFormat);
- DateTime dateTime = formatter.parseDateTime(value);
- value = Long.toString(dateTime.getMillis());
- }
- for (Event event : events) {
- event.getHeaders().put(fileNameExtractorHeader, value);
- }
- }
- }
- // 按分隔符拆分文件名
- if (splitFileName) {
- String[] splits = currentFile.get().getFile().getName()
- .split(splitBy);
- for (Event event : events) {
- for (int i = 0; i < splits.length; i++) {
- event.getHeaders().put(splitBaseNameHeader + i, splits[i]);
- }
- }
- }
- // 增加代码结束
- committed = false;
- lastFileRead = currentFile;
- return events;
- }
- @Override
- public void close() throws IOException {
- if (currentFile.isPresent()) {
- currentFile.get().getDeserializer().close();
- currentFile = Optional.absent();
- }
- }
- /** Commit the last lines which were read. */
- @Override
- public void commit() throws IOException {
- if (!committed && currentFile.isPresent()) {
- currentFile.get().getDeserializer().mark();
- committed = true;
- }
- }
- /**
- * Closes currentFile and attempt to rename it.
- *
- * If these operations fail in a way that may cause duplicate log entries,
- * an error is logged but no exceptions are thrown. If these operations fail
- * in a way that indicates potential misuse of the spooling directory, a
- * FlumeException will be thrown.
- *
- * @throws FlumeException
- * if files do not conform to spooling assumptions
- */
- private void retireCurrentFile() throws IOException {
- Preconditions.checkState(currentFile.isPresent());
- File fileToRoll = new File(currentFile.get().getFile()
- .getAbsolutePath());
- currentFile.get().getDeserializer().close();
- // Verify that spooling assumptions hold
- if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
- String message = “File has been modified since being read: “
- fileToRoll;
- throw new IllegalStateException(message);
- }
- if (fileToRoll.length() != currentFile.get().getLength()) {
- String message = “File has changed size since being read: “
- fileToRoll;
- throw new IllegalStateException(message);
- }
- if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
- rollCurrentFile(fileToRoll);
- } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
- deleteCurrentFile(fileToRoll);
- } else {
- // TODO: implement delay in the future
- throw new IllegalArgumentException(“Unsupported delete policy: “
- deletePolicy);
- }
- }
- /**
- * Rename the given spooled file
- *
- * @param fileToRoll
- * @throws IOException
- */
- private void rollCurrentFile(File fileToRoll) throws IOException {
- File dest = new File(fileToRoll.getPath() + completedSuffix);
- logger.info(“Preparing to move file {} to {}“, fileToRoll, dest);
- // Before renaming, check whether destination file name exists
- if (dest.exists() && PlatformDetect.isWindows()) {
- /*
- * If we are here, it means the completed file already exists. In
- * almost every case this means the user is violating an assumption
- * of Flume (that log files are placed in the spooling directory
- * with unique names). However, there is a corner case on Windows
- * systems where the file was already rolled but the rename was not
- * atomic. If that seems likely, we let it pass with only a warning.
- */
- if (Files.equal(currentFile.get().getFile(), dest)) {
- logger.warn(“Completed file “ + dest
- “ already exists, but files match, so continuing.”);
- boolean deleted = fileToRoll.delete();
- if (!deleted) {
- logger.error(“Unable to delete file “
- fileToRoll.getAbsolutePath()
- “. It will likely be ingested another time.”);
- }
- } else {
- String message = “File name has been re-used with different”
- “ files. Spooling assumptions violated for “ + dest;
- throw new IllegalStateException(message);
- }
- // Dest file exists and not on windows
- } else if (dest.exists()) {
- String message = “File name has been re-used with different”
- “ files. Spooling assumptions violated for “ + dest;
- throw new IllegalStateException(message);
- // Destination file does not already exist. We are good to go!
- } else {
- boolean renamed = fileToRoll.renameTo(dest);
- if (renamed) {
- logger.debug(“Successfully rolled file {} to {}“, fileToRoll,
- dest);
- // now we no longer need the meta file
- deleteMetaFile();
- } else {
- /*
- * If we are here then the file cannot be renamed for a reason
- * other than that the destination file exists (actually, that
- * remains possible w/ small probability due to TOC-TOU
- * conditions).
- */
- String message = “Unable to move “
- fileToRoll
- “ to “
- dest
- “. This will likely cause duplicate events. Please verify that “
- “flume has sufficient permissions to perform these operations.”;
- throw new FlumeException(message);
- }
- }
- }
- /**
- * Delete the given spooled file
- *
- * @param fileToDelete
- * @throws IOException
- */
- private void deleteCurrentFile(File fileToDelete) throws IOException {
- logger.info(“Preparing to delete file {}“, fileToDelete);
- if (!fileToDelete.exists()) {
- logger.warn(“Unable to delete nonexistent file: {}“, fileToDelete);
- return;
- }
- if (!fileToDelete.delete()) {
- throw new IOException(“Unable to delete spool file: “
- fileToDelete);
- }
- // now we no longer need the meta file
- deleteMetaFile();
- }
- /**
- * Find and open the oldest file in the chosen directory. If two or more
- * files are equally old, the file name with lower lexicographical value is
- * returned. If the directory is empty, this will return an absent option.
- */
- private Optional
getNextFile() { - /* Filter to exclude finished or hidden files */
- FileFilter filter = new FileFilter() {
- public boolean accept(File candidate) {
- String fileName = candidate.getName();
- if ((candidate.isDirectory())
- || (fileName.endsWith(completedSuffix))
- || (fileName.startsWith(“.”))
- || ignorePattern.matcher(fileName).matches()) {
- return false;
- }
- return true;
- }
- };
- List
candidateFiles = Arrays.asList(spoolDirectory - .listFiles(filter));
- if (candidateFiles.isEmpty()) {
- return Optional.absent();
- } else {
- Collections.sort(candidateFiles, new Comparator
() { - public int compare(File a, File b) {
- int timeComparison = new Long(a.lastModified())
- .compareTo(new Long(b.lastModified()));
- if (timeComparison != 0) {
- return timeComparison;
- } else {
- return a.getName().compareTo(b.getName());
- }
- }
- });
- File nextFile = candidateFiles.get(0);
- try {
- // roll the meta file, if needed
- String nextPath = nextFile.getPath();
- PositionTracker tracker = DurablePositionTracker.getInstance(
- metaFile, nextPath);
- if (!tracker.getTarget().equals(nextPath)) {
- tracker.close();
- deleteMetaFile();
- tracker = DurablePositionTracker.getInstance(metaFile,
- nextPath);
- }
- // sanity check
- Preconditions
- .checkState(
- tracker.getTarget().equals(nextPath),
- “Tracker target %s does not equal expected filename %s”,
- tracker.getTarget(), nextPath);
- ResettableInputStream in = new ResettableFileInputStream(
- nextFile, tracker,
- ResettableFileInputStream.DEFAULT_BUF_SIZE,
- inputCharset, decodeErrorPolicy);
- EventDeserializer deserializer = EventDeserializerFactory
- .getInstance(deserializerType, deserializerContext, in);
- return Optional.of(new FileInfo(nextFile, deserializer));
- } catch (FileNotFoundException e) {
- // File could have been deleted in the interim
- logger.warn(“Could not find file: “ + nextFile, e);
- return Optional.absent();
- } catch (IOException e) {
- logger.error(“Exception opening file: “ + nextFile, e);
- return Optional.absent();
- }
- }
- }
- private void deleteMetaFile() throws IOException {
- if (metaFile.exists() && !metaFile.delete()) {
- throw new IOException(“Unable to delete old meta file “ + metaFile);
- }
- }
- /** An immutable class with information about a file being processed. */
- private static class FileInfo {
- private final File file;
- private final long length;
- private final long lastModified;
- private final EventDeserializer deserializer;
- public FileInfo(File file, EventDeserializer deserializer) {
- this.file = file;
- this.length = file.length();
- this.lastModified = file.lastModified();
- this.deserializer = deserializer;
- }
- public long getLength() {
- return length;
- }
- public long getLastModified() {
- return lastModified;
- }
- public EventDeserializer getDeserializer() {
- return deserializer;
- }
- public File getFile() {
- return file;
- }
- }
- @InterfaceAudience.Private
- @InterfaceStability.Unstable
- static enum DeletePolicy {
- NEVER, IMMEDIATE, DELAY
- }
- /**
- * Special builder class for ReliableSpoolingFileEventReader
- */
- public static class Builder {
- private File spoolDirectory;
- private String completedSuffix = SpoolDirectorySourceConfigurationExtConstants.SPOOLED_FILE_SUFFIX;
- private String ignorePattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_IGNORE_PAT;
- private String trackerDirPath = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_TRACKER_DIR;
- private Boolean annotateFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILE_HEADER;
- private String fileNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_HEADER_KEY;
- private Boolean annotateBaseName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER;
- private String baseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_BASENAME_HEADER_KEY;
- private String deserializerType = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DESERIALIZER;
- private Context deserializerContext = new Context();
- private String deletePolicy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DELETE_POLICY;
- private String inputCharset = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_INPUT_CHARSET;
- private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy
- .valueOf(SpoolDirectorySourceConfigurationExtConstants.DEFAULT_DECODE_ERROR_POLICY
- .toUpperCase());
- // 增加代码开始
- private Boolean annotateFileNameExtractor = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR;
- private String fileNameExtractorHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY;
- private String fileNameExtractorPattern = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_PATTERN;
- private Boolean convertToTimestamp = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP;
- private String dateTimeFormat = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT;
- private Boolean splitFileName = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_FILENAME;
- private String splitBy = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLITY_BY;
- private String splitBaseNameHeader = SpoolDirectorySourceConfigurationExtConstants.DEFAULT_SPLIT_BASENAME_HEADER;
- public Builder annotateFileNameExtractor(
- Boolean annotateFileNameExtractor) {
- this.annotateFileNameExtractor = annotateFileNameExtractor;
- return this;
- }
- public Builder fileNameExtractorHeader(String fileNameExtractorHeader) {
- this.fileNameExtractorHeader = fileNameExtractorHeader;
- return this;
- }
- public Builder fileNameExtractorPattern(String fileNameExtractorPattern) {
- this.fileNameExtractorPattern = fileNameExtractorPattern;
- return this;
- }
- public Builder convertToTimestamp(Boolean convertToTimestamp) {
- this.convertToTimestamp = convertToTimestamp;
- return this;
- }
- public Builder dateTimeFormat(String dateTimeFormat) {
- this.dateTimeFormat = dateTimeFormat;
- return this;
- }
- public Builder splitFileName(Boolean splitFileName) {
- this.splitFileName = splitFileName;
- return this;
- }
- public Builder splitBy(String splitBy) {
- this.splitBy = splitBy;
- return this;
- }
- public Builder splitBaseNameHeader(String splitBaseNameHeader) {
- this.splitBaseNameHeader = splitBaseNameHeader;
- return this;
- }
- // 增加代码结束
- public Builder spoolDirectory(File directory) {
- this.spoolDirectory = directory;
- return this;
- }
- public Builder completedSuffix(String completedSuffix) {
- this.completedSuffix = completedSuffix;
- return this;
- }
- public Builder ignorePattern(String ignorePattern) {
- this.ignorePattern = ignorePattern;
- return this;
- }
- public Builder trackerDirPath(String trackerDirPath) {
- this.trackerDirPath = trackerDirPath;
- return this;
- }
- public Builder annotateFileName(Boolean annotateFileName) {
- this.annotateFileName = annotateFileName;
- return this;
- }
- public Builder fileNameHeader(String fileNameHeader) {
- this.fileNameHeader = fileNameHeader;
- return this;
- }
- public Builder annotateBaseName(Boolean annotateBaseName) {
- this.annotateBaseName = annotateBaseName;
- return this;
- }
- public Builder baseNameHeader(String baseNameHeader) {
- this.baseNameHeader = baseNameHeader;
- return this;
- }
- public Builder deserializerType(String deserializerType) {
- this.deserializerType = deserializerType;
- return this;
- }
- public Builder deserializerContext(Context deserializerContext) {
- this.deserializerContext = deserializerContext;
- return this;
- }
- public Builder deletePolicy(String deletePolicy) {
- this.deletePolicy = deletePolicy;
- return this;
- }
- public Builder inputCharset(String inputCharset) {
- this.inputCharset = inputCharset;
- return this;
- }
- public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) {
- this.decodeErrorPolicy = decodeErrorPolicy;
- return this;
- }
- public ReliableSpoolingFileEventExtReader build() throws IOException {
- return new ReliableSpoolingFileEventExtReader(spoolDirectory,
- completedSuffix, ignorePattern, trackerDirPath,
- annotateFileName, fileNameHeader, annotateBaseName,
- baseNameHeader, deserializerType, deserializerContext,
- deletePolicy, inputCharset, decodeErrorPolicy,
- annotateFileNameExtractor, fileNameExtractorHeader,
- fileNameExtractorPattern, convertToTimestamp,
- dateTimeFormat, splitFileName, splitBy, splitBaseNameHeader);
- }
- }
- }
[java] view plain copy
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * “License”); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an “AS IS” BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
- package com.besttone.flume;
- import org.apache.flume.serialization.DecodeErrorPolicy;
- public class SpoolDirectorySourceConfigurationExtConstants {
- /** Directory where files are deposited. */
- public static final String SPOOL_DIRECTORY = “spoolDir”;
- /** Suffix appended to files when they are finished being sent. */
- public static final String SPOOLED_FILE_SUFFIX = “fileSuffix”;
- public static final String DEFAULT_SPOOLED_FILE_SUFFIX = “.COMPLETED”;
- /** Header in which to put absolute path filename. */
- public static final String FILENAME_HEADER_KEY = “fileHeaderKey”;
- public static final String DEFAULT_FILENAME_HEADER_KEY = “file”;
- /** Whether to include absolute path filename in a header. */
- public static final String FILENAME_HEADER = “fileHeader”;
- public static final boolean DEFAULT_FILE_HEADER = false;
- /** Header in which to put the basename of file. */
- public static final String BASENAME_HEADER_KEY = “basenameHeaderKey”;
- public static final String DEFAULT_BASENAME_HEADER_KEY = “basename”;
- /** Whether to include the basename of a file in a header. */
- public static final String BASENAME_HEADER = “basenameHeader”;
- public static final boolean DEFAULT_BASENAME_HEADER = false;
- /** What size to batch with before sending to ChannelProcessor. */
- public static final String BATCH_SIZE = “batchSize”;
- public static final int DEFAULT_BATCH_SIZE = 100;
- /** Maximum number of lines to buffer between commits. */
- @Deprecated
- public static final String BUFFER_MAX_LINES = “bufferMaxLines”;
- @Deprecated
- public static final int DEFAULT_BUFFER_MAX_LINES = 100;
- /** Maximum length of line (in characters) in buffer between commits. */
- @Deprecated
- public static final String BUFFER_MAX_LINE_LENGTH = “bufferMaxLineLength”;
- @Deprecated
- public static final int DEFAULT_BUFFER_MAX_LINE_LENGTH = 5000;
- /** Pattern of files to ignore */
- public static final String IGNORE_PAT = “ignorePattern”;
- public static final String DEFAULT_IGNORE_PAT = “^$”; // no effect
- /** Directory to store metadata about files being processed */
- public static final String TRACKER_DIR = “trackerDir”;
- public static final String DEFAULT_TRACKER_DIR = “.flumespool”;
- /** Deserializer to use to parse the file data into Flume Events */
- public static final String DESERIALIZER = “deserializer”;
- public static final String DEFAULT_DESERIALIZER = “LINE”;
- public static final String DELETE_POLICY = “deletePolicy”;
- public static final String DEFAULT_DELETE_POLICY = “never”;
- /** Character set used when reading the input. */
- public static final String INPUT_CHARSET = “inputCharset”;
- public static final String DEFAULT_INPUT_CHARSET = “UTF-8”;
- /** What to do when there is a character set decoding error. */
- public static final String DECODE_ERROR_POLICY = “decodeErrorPolicy”;
- public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL
- .name();
- public static final String MAX_BACKOFF = “maxBackoff”;
- public static final Integer DEFAULT_MAX_BACKOFF = 4000;
- // 增加代码开始
- public static final String FILENAME_EXTRACTOR = “fileExtractor”;
- public static final boolean DEFAULT_FILENAME_EXTRACTOR = false;
- public static final String FILENAME_EXTRACTOR_HEADER_KEY = “fileExtractorHeaderKey”;
- public static final String DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY = “fileExtractorHeader”;
- public static final String FILENAME_EXTRACTOR_PATTERN = “fileExtractorPattern”;
- public static final String DEFAULT_FILENAME_EXTRACTOR_PATTERN = “(.)*“;
- public static final String FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = “convertToTimestamp”;
- public static final boolean DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP = false;
- public static final String FILENAME_EXTRACTOR_DATETIME_FORMAT = “dateTimeFormat”;
- public static final String DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT = “yyyy-MM-dd”;
- public static final String SPLIT_FILENAME = “splitFileName”;
- public static final boolean DEFAULT_SPLIT_FILENAME = false;
- public static final String SPLITY_BY = “splitBy”;
- public static final String DEFAULT_SPLITY_BY = “\\.”;
- public static final String SPLIT_BASENAME_HEADER = “splitBaseNameHeader”;
- public static final String DEFAULT_SPLIT_BASENAME_HEADER = “fileNameSplit”;
- // 增加代码结束
- }
[java] view plain copy
- package com.besttone.flume;
- import static com.besttone.flume.SpoolDirectorySourceConfigurationExtConstants.*;
- import java.io.File;
- import java.io.IOException;
- import java.util.List;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ScheduledExecutorService;
- import java.util.concurrent.TimeUnit;
- import org.apache.flume.ChannelException;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.EventDrivenSource;
- import org.apache.flume.FlumeException;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.instrumentation.SourceCounter;
- import org.apache.flume.serialization.DecodeErrorPolicy;
- import org.apache.flume.serialization.LineDeserializer;
- import org.apache.flume.source.AbstractSource;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.base.Preconditions;
- import com.google.common.base.Throwables;
- public class SpoolDirectoryExtSource extends AbstractSource implements
- Configurable, EventDrivenSource {
- private static final Logger logger = LoggerFactory
- .getLogger(SpoolDirectoryExtSource.class);
- // Delay used when polling for new files
- private static final int POLL_DELAY_MS = 500;
- /* Config options */
- private String completedSuffix;
- private String spoolDirectory;
- private boolean fileHeader;
- private String fileHeaderKey;
- private boolean basenameHeader;
- private String basenameHeaderKey;
- private int batchSize;
- private String ignorePattern;
- private String trackerDirPath;
- private String deserializerType;
- private Context deserializerContext;
- private String deletePolicy;
- private String inputCharset;
- private DecodeErrorPolicy decodeErrorPolicy;
- private volatile boolean hasFatalError = false;
- private SourceCounter sourceCounter;
- ReliableSpoolingFileEventExtReader reader;
- private ScheduledExecutorService executor;
- private boolean backoff = true;
- private boolean hitChannelException = false;
- private int maxBackoff;
- // 增加代码开始
- private Boolean annotateFileNameExtractor;
- private String fileNameExtractorHeader;
- private String fileNameExtractorPattern;
- private Boolean convertToTimestamp;
- private String dateTimeFormat;
- private boolean splitFileName;
- private String splitBy;
- private String splitBaseNameHeader;
- // 增加代码结束
- @Override
- public synchronized void start() {
- logger.info(“SpoolDirectorySource source starting with directory: {}“,
- spoolDirectory);
- executor = Executors.newSingleThreadScheduledExecutor();
- File directory = new File(spoolDirectory);
- try {
- reader = new ReliableSpoolingFileEventExtReader.Builder()
- .spoolDirectory(directory).completedSuffix(completedSuffix)
- .ignorePattern(ignorePattern)
- .trackerDirPath(trackerDirPath)
- .annotateFileName(fileHeader).fileNameHeader(fileHeaderKey)
- .annotateBaseName(basenameHeader)
- .baseNameHeader(basenameHeaderKey)
- .deserializerType(deserializerType)
- .deserializerContext(deserializerContext)
- .deletePolicy(deletePolicy).inputCharset(inputCharset)
- .decodeErrorPolicy(decodeErrorPolicy)
- .annotateFileNameExtractor(annotateFileNameExtractor)
- .fileNameExtractorHeader(fileNameExtractorHeader)
- .fileNameExtractorPattern(fileNameExtractorPattern)
- .convertToTimestamp(convertToTimestamp)
- .dateTimeFormat(dateTimeFormat)
- .splitFileName(splitFileName).splitBy(splitBy)
- .splitBaseNameHeader(splitBaseNameHeader).build();
- } catch (IOException ioe) {
- throw new FlumeException(
- “Error instantiating spooling event parser”, ioe);
- }
- Runnable runner = new SpoolDirectoryRunnable(reader, sourceCounter);
- executor.scheduleWithFixedDelay(runner, 0, POLL_DELAY_MS,
- TimeUnit.MILLISECONDS);
- super.start();
- logger.debug(“SpoolDirectorySource source started”);
- sourceCounter.start();
- }
- @Override
- public synchronized void stop() {
- executor.shutdown();
- try {
- executor.awaitTermination(10L, TimeUnit.SECONDS);
- } catch (InterruptedException ex) {
- logger.info(“Interrupted while awaiting termination”, ex);
- }
- executor.shutdownNow();
- super.stop();
- sourceCounter.stop();
- logger.info(“SpoolDir source {} stopped. Metrics: {}“, getName(),
- sourceCounter);
- }
- @Override
- public String toString() {
- return “Spool Directory source “ + getName() + “: { spoolDir: “
- spoolDirectory + “ }“;
- }
- @Override
- public synchronized void configure(Context context) {
- spoolDirectory = context.getString(SPOOL_DIRECTORY);
- Preconditions.checkState(spoolDirectory != null,
- “Configuration must specify a spooling directory”);
- completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
- DEFAULT_SPOOLED_FILE_SUFFIX);
- deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
- fileHeader = context.getBoolean(FILENAME_HEADER, DEFAULT_FILE_HEADER);
- fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
- DEFAULT_FILENAME_HEADER_KEY);
- basenameHeader = context.getBoolean(BASENAME_HEADER,
- DEFAULT_BASENAME_HEADER);
- basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
- DEFAULT_BASENAME_HEADER_KEY);
- batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);
- inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
- decodeErrorPolicy = DecodeErrorPolicy
- .valueOf(context.getString(DECODE_ERROR_POLICY,
- DEFAULT_DECODE_ERROR_POLICY).toUpperCase());
- ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
- trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR);
- deserializerType = context
- .getString(DESERIALIZER, DEFAULT_DESERIALIZER);
- deserializerContext = new Context(context.getSubProperties(DESERIALIZER
- “.”));
- // “Hack” to support backwards compatibility with previous generation of
- // spooling directory source, which did not support deserializers
- Integer bufferMaxLineLength = context
- .getInteger(BUFFER_MAX_LINE_LENGTH);
- if (bufferMaxLineLength != null && deserializerType != null
- && deserializerType.equalsIgnoreCase(DEFAULT_DESERIALIZER)) {
- deserializerContext.put(LineDeserializer.MAXLINE_KEY,
- bufferMaxLineLength.toString());
- }
- maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF);
- if (sourceCounter == null) {
- sourceCounter = new SourceCounter(getName());
- }
- //增加代码开始
- annotateFileNameExtractor=context.getBoolean(FILENAME_EXTRACTOR, DEFAULT_FILENAME_EXTRACTOR);
- fileNameExtractorHeader=context.getString(FILENAME_EXTRACTOR_HEADER_KEY, DEFAULT_FILENAME_EXTRACTOR_HEADER_KEY);
- fileNameExtractorPattern=context.getString(FILENAME_EXTRACTOR_PATTERN,DEFAULT_FILENAME_EXTRACTOR_PATTERN);
- convertToTimestamp=context.getBoolean(FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP, DEFAULT_FILENAME_EXTRACTOR_CONVERT_TO_TIMESTAMP);
- dateTimeFormat=context.getString(FILENAME_EXTRACTOR_DATETIME_FORMAT, DEFAULT_FILENAME_EXTRACTOR_DATETIME_FORMAT);
- splitFileName=context.getBoolean(SPLIT_FILENAME, DEFAULT_SPLIT_FILENAME);
- splitBy=context.getString(SPLITY_BY, DEFAULT_SPLITY_BY);
- splitBaseNameHeader=context.getString(SPLIT_BASENAME_HEADER, DEFAULT_SPLIT_BASENAME_HEADER);
- //增加代码结束
- }
- @VisibleForTesting
- protected boolean hasFatalError() {
- return hasFatalError;
- }
- /**
- * The class always backs off, this exists only so that we can test without
- * taking a really long time.
- *
- * @param backoff
- * - whether the source should backoff if the channel is full
- */
- @VisibleForTesting
- protected void setBackOff(boolean backoff) {
- this.backoff = backoff;
- }
- @VisibleForTesting
- protected boolean hitChannelException() {
- return hitChannelException;
- }
- @VisibleForTesting
- protected SourceCounter getSourceCounter() {
- return sourceCounter;
- }
- private class SpoolDirectoryRunnable implements Runnable {
- private ReliableSpoolingFileEventExtReader reader;
- private SourceCounter sourceCounter;
- public SpoolDirectoryRunnable(
- ReliableSpoolingFileEventExtReader reader,
- SourceCounter sourceCounter) {
- this.reader = reader;
- this.sourceCounter = sourceCounter;
- }
- @Override
- public void run() {
- int backoffInterval = 250;
- try {
- while (!Thread.interrupted()) {
- List
events = reader.readEvents(batchSize); - if (events.isEmpty()) {
- break;
- }
- sourceCounter.addToEventReceivedCount(events.size());
- sourceCounter.incrementAppendBatchReceivedCount();
- try {
- getChannelProcessor().processEventBatch(events);
- reader.commit();
- } catch (ChannelException ex) {
- logger.warn(“The channel is full, and cannot write data now. The “
- “source will try again after “
- String.valueOf(backoffInterval)
- “ milliseconds”);
- hitChannelException = true;
- if (backoff) {
- TimeUnit.MILLISECONDS.sleep(backoffInterval);
- backoffInterval = backoffInterval << 1;
- backoffInterval = backoffInterval >= maxBackoff ? maxBackoff
- : backoffInterval;
- }
- continue;
- }
- backoffInterval = 250;
- sourceCounter.addToEventAcceptedCount(events.size());
- sourceCounter.incrementAppendBatchAcceptedCount();
- }
- logger.info(“Spooling Directory Source runner has shutdown.”);
- } catch (Throwable t) {
- logger.error(
- “FATAL: “
- SpoolDirectoryExtSource.this.toString()
- “: “
- “Uncaught exception in SpoolDirectorySource thread. “
- “Restart or reconfigure Flume to continue processing.”,
- t);
- hasFatalError = true;
- Throwables.propagate(t);
- }
- }
- }
- }
主要提供了如下几个设置参数:
tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource #写类的全路径名
tier1.sources.source1.spoolDir=/opt/logs #监控的目录
tier1.sources.source1.splitFileName=true #是否分隔文件名,并把分割后的内容添加到header中,默认false
tier1.sources.source1.splitBy=\\. #以什么符号分隔,默认是”.”分隔
tier1.sources.source1.splitBaseNameHeader=fileNameSplit #分割后写入header的key的前缀,比如a.log.2014-07-31,按“.”分隔,
则header中有fileNameSplit0=a,fileNameSplit1=log,fileNameSplit2=2014-07-31
(其中还有扩展一个通过正则表达式抽取文件名的功能也在里面,我们这里不用到,就不介绍了)
扩展了这个source之后,前面的那个需求就很容易实现了,只需要:
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}
a.log.2014-07-31这个文件的内容就会保存到hdfs://master68:8020/flume/events/a/2014-07-31目录下面去了。
接下来我们说说如何部署这个我们扩展的自定义的spooling directory source(基于CM的设置)。
首先,我们把上面三个类打成JAR包:SpoolDirectoryExtSource.jar
CM的flume插件目录为:/var/lib/flume-ng/plugins.d
然后再你需要使用这个source的agent上的/var/lib/flume-ng/plugins.d目录下面创建SpoolDirectoryExtSource目录以及子目录lib,libext,native,lib是放插件JAR的目录,libext是放插件的依赖JAR的目录,native放使用到的原生库(如果有用到的话)。
我们这里没有使用到其他的依赖,于是就把SpoolDirectoryExtSource.jar放到lib目录下就好了,最终的目录结构:
plugins.d/
plugins.d/SpoolDirectoryExtSource/
plugins.d/SpoolDirectoryExtSource/lib/SpoolDirectoryExtSource.jar
plugins.d/SpoolDirectoryExtSource/libext/
plugins.d/SpoolDirectoryExtSource/native/
重新启动flume agent,flume就会自动装载我们的插件,这样在flume.conf中就可以使用全路径类名配置type属性了。
最终flume.conf配置如下:
[plain] view plain copy
- tier1.sources=source1
- tier1.channels=channel1
- tier1.sinks=sink1
- tier1.sources.source1.type=com.besttone.flume.SpoolDirectoryExtSource
- tier1.sources.source1.spoolDir=/opt/logs
- tier1.sources.source1.splitFileName=true
- tier1.sources.source1.splitBy=\\.
- tier1.sources.source1.splitBaseNameHeader=fileNameSplit
- tier1.sources.source1.channels=channel1
- tier1.sinks.sink1.type=hdfs
- tier1.sinks.sink1.channel=channel1
- tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{fileNameSplit0}/%{fileNameSplit2}
- tier1.sinks.sink1.hdfs.round=true
- tier1.sinks.sink1.hdfs.roundValue=10
- tier1.sinks.sink1.hdfs.roundUnit=minute
- tier1.sinks.sink1.hdfs.fileType=DataStream
- tier1.sinks.sink1.hdfs.writeFormat=Text
- tier1.sinks.sink1.hdfs.rollInterval=0
- tier1.sinks.sink1.hdfs.rollSize=10240
- tier1.sinks.sink1.hdfs.rollCount=0
- tier1.sinks.sink1.hdfs.idleTimeout=60
- tier1.channels.channel1.type=memory
- tier1.channels.channel1.capacity=10000
- tier1.channels.channel1.transactionCapacity=1000
- tier1.channels.channel1.keep-alive=30
附上一张用logger作为sink的查看日志文件的截图:
问题导读:
1.如何实现flume端自定一个sink,来按照我们的规则来保存日志?
2.想从flume的配置文件中获取rootPath的值,该如何配置?
最近需要利用flume来做收集远端日志,所以学习一些flume最基本的用法。这里仅作记录。
远端日志收集的整体思路是远端自定义实现log4j的appender把消息发送到flume端,flume端自定义实现一个sink来按照我们的规则保存日志。
自定义Sink代码:
public class LocalFileLogSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory
. getLogger(LocalFileLogSink .class );
private static final String PROP_KEY_ROOTPATH = “rootPath”;
private String rootPath;
@Override
public void configure(Context context) {
String rootPath = context.getString(PROP_KEY_ROOTPATH );
setRootPath(rootPath);
}
@Override
public Status process() throws EventDeliveryException {
logger .debug(“Do process” );
}
}
复制代码
实现Configurable接口,即可在初始化时,通过configure方法从context中获取配置的参数的值。这里,我们是想从flume的配置文件中获取rootPath的值,也就是日志保存的根路径。在flume-conf.properties中配置如下:
agent.sinks = loggerSink
agent.sinks.loggerSink.rootPath = ./logs
复制代码
loggerSink是自定义sink的名称,我们取值时的key,只需要loggerSink后面的部分即可,即这里的rootPath。
实际业务逻辑的执行,是通过继承复写AbstractSink中的process方法实现。从基类的getChannel方法中获取信道,从中取出Event处理即可。
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
logger .debug(“Get event.” );
Event event = ch.take();
txn.commit();
status = Status. READY ;
return status;
}finally {
Log. info( “trx close.”);
txn.close();
}
以下是我的自定义kafka sink插件的pom文件,编译成jar包丢到flume的lib下即可使用
[html] view plain copy
- <?**xml version=”1.0” encoding=”UTF-8”?>**
- <**project** xmlns=”http://maven.apache.org/POM/4.0.0“
- xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance“
- xsi:schemaLocation=”http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"**>**
- <**modelVersion**>4.0.0</**modelVersion**>
- <**groupId**>flume-sinks</**groupId**>
- <**artifactId**>cmcc-kafka-sink</**artifactId**>
- <**name**>Flume Kafka Sink</**name**>
- <**version**>1.0.0</**version**>
- <**build**>
- <**plugins**>
- <**plugin**>
- <**groupId**>org.apache.maven.plugins</**groupId**>
- <**artifactId**>maven-jar-plugin</**artifactId**>
- </**plugin**>
- </**plugins**>
- </**build**>
- <**dependencies**>
- <**dependency**>
- <**groupId**>org.apache.flume</**groupId**>
- <**artifactId**>flume-ng-sdk</**artifactId**>
- <**version**>1.5.2</**version**>
- </**dependency**>
- <**dependency**>
- <**groupId**>org.apache.flume</**groupId**>
- <**artifactId**>flume-ng-core</**artifactId**>
- <**version**>1.5.2</**version**>
- </**dependency**>
- <**dependency**>
- <**groupId**>org.apache.flume</**groupId**>
- <**artifactId**>flume-ng-configuration</**artifactId**>
- <**version**>1.5.2</**version**>
- </**dependency**>
- <**dependency**>
- <**groupId**>org.slf4j</**groupId**>
- <**artifactId**>slf4j-api</**artifactId**>
- <**version**>1.6.1</**version**>
- </**dependency**>
- <**dependency**>
- <**groupId**>junit</**groupId**>
- <**artifactId**>junit</**artifactId**>
- <**version**>4.10</**version**>
- <**scope**>test</**scope**>
- </**dependency**>
- <**dependency**>
- <**groupId**>org.apache.kafka</**groupId**>
- <**artifactId**>kafka_2.10</**artifactId**>
- <**version**>0.8.1.1</**version**>
- </**dependency**>
- </**dependencies**>
- </**project**>
这里取出了parent,也取出了rat plugin,这样就避免了编译时出现的常见错误 https://issues.apache.org/jira/browse/FLUME-1372
定义了几个变量
[java] view plain copy
- public static final String BATCH_SIZE = “batchSize”;
- public static final int DEFAULT_BATCH_SIZE = 100;
- public static final String PARTITION_KEY_NAME = “cmcc.partition.key”;
- public static final String ENCODING_KEY_NAME = “cmcc.encoding”;
- public static final String DEFAULT_ENCODING = “UTF-8”;
- public static final String CUSTOME_TOPIC_KEY_NAME = “cmcc.topic.name”;
- public static final String DEFAULT_TOPIC_NAME=”CMCC”;
自定义sink实现需要继承AbstractSink和实现接口Configurable,并重写部分方法,如下:
[java] view plain copy
- package org.apache.flume.cmcc.kafka;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Properties;
- import kafka.javaapi.producer.Producer;
- import kafka.producer.KeyedMessage;
- import kafka.producer.ProducerConfig;
- import org.apache.commons.lang.StringUtils;
- import org.apache.flume.Channel;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.Transaction;
- import org.apache.flume.conf.Configurable;
- import org.apache.flume.instrumentation.SinkCounter;
- import org.apache.flume.sink.AbstractSink;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.base.Throwables;
- import com.google.common.collect.ImmutableMap;
- public class CmccKafkaSink extends AbstractSink implements Configurable {
- private static final Logger log = LoggerFactory
- .getLogger(CmccKafkaSink.class);
- private Properties parameters;
- private Producer
producer; - // private Context context;
- private int batchSize;// 一次事务的event数量,整体提交
- private List
> messageList; - private SinkCounter sinkCounter;
- @Override
- public Status process() {
- // TODO Auto-generated method stub
- Status result = Status.READY;
- Channel channel = getChannel();
- Transaction transaction = null;
- Event event = null;
- try {
- long processedEvent = 0;
- transaction = channel.getTransaction();
- transaction.begin();// 事务开始
- messageList.clear();
- for (; processedEvent < batchSize; processedEvent++) {
- event = channel.take();// 从channel取出一个事件
- if (event == null) {
- result = Status.BACKOFF;
- break;
- }
- sinkCounter.incrementEventDrainAttemptCount();
- // Map
headers = event.getHeaders(); - String partitionKey = parameters
- .getProperty(Constants.PARTITION_KEY_NAME);
- String topic = StringUtils.defaultIfEmpty(parameters
- .getProperty(Constants.CUSTOME_TOPIC_KEY_NAME),
- Constants.DEFAULT_TOPIC_NAME);
- String encoding = StringUtils.defaultIfEmpty(
- parameters.getProperty(Constants.ENCODING_KEY_NAME),
- Constants.DEFAULT_ENCODING);
- byte[] eventBody = event.getBody();
- String eventData = new String(eventBody, encoding);
- KeyedMessage
data = null; - if (StringUtils.isEmpty(partitionKey)) {
- data = new KeyedMessage
(topic, eventData); - } else {
- data = new KeyedMessage
(topic, - partitionKey, eventData);
- }
- messageList.add(data);
- log.debug(“Add data [“ + eventData
- “] into messageList,position:” + processedEvent);
- }
- if (processedEvent == 0) {
- sinkCounter.incrementBatchEmptyCount();
- result = Status.BACKOFF;
- } else {
- if (processedEvent < batchSize) {
- sinkCounter.incrementBatchUnderflowCount();
- } else {
- sinkCounter.incrementBatchCompleteCount();
- }
- sinkCounter.addToEventDrainAttemptCount(processedEvent);
- producer.send(messageList);
- log.debug(“Send MessageList to Kafka: [ message List size = “
- messageList.size() + “,processedEvent number = “
- processedEvent + “] “);
- }
- transaction.commit();// batchSize个事件处理完成,一次事务提交
- sinkCounter.addToEventDrainSuccessCount(processedEvent);
- result = Status.READY;
- } catch (Exception e) {
- String errorMsg = “Failed to publish events !”;
- log.error(errorMsg, e);
- e.printStackTrace();
- result = Status.BACKOFF;
- if (transaction != null) {
- try {
- transaction.rollback();
- log.debug(“transaction rollback success !”);
- } catch (Exception ex) {
- log.error(errorMsg, ex);
- throw Throwables.propagate(ex);
- }
- }
- // throw new EventDeliveryException(errorMsg, e);
- } finally {
- if (transaction != null) {
- transaction.close();
- }
- }
- return result;
- }
- @Override
- public synchronized void start() {
- // TODO Auto-generated method stub
- log.info(“Starting {}…”, this);
- sinkCounter.start();
- super.start();
- ProducerConfig config = new ProducerConfig(this.parameters);
- this.producer = new Producer
(config); - sinkCounter.incrementConnectionCreatedCount();
- }
- @Override
- public synchronized void stop() {
- // TODO Auto-generated method stub
- log.debug(“Cmcc Kafka sink {} stopping…”, getName());
- sinkCounter.stop();
- producer.close();
- sinkCounter.incrementConnectionClosedCount();
- }
- @Override
- public void configure(Context context) {
- // TODO Auto-generated method stub
- ImmutableMap
props = context.getParameters(); - batchSize = context.getInteger(Constants.BATCH_SIZE,
- Constants.DEFAULT_BATCH_SIZE);
- messageList = new ArrayList
>(batchSize); - parameters = new Properties();
- for (String key : props.keySet()) {
- String value = props.get(key);
- this.parameters.put(key, value);
- }
- if (sinkCounter == null) {
- sinkCounter = new SinkCounter(getName());
- }
- }
- }
以上sink同时支持了flume的内部监控
这里为了提高性能,添加了batchSize的概念,也就减少了事务提交的次数
当然当通道中已经没有event了,这时候就将之前处理的event都提交了
下面看配置
[plain] view plain copy
- a1.sinks.k1.type=org.apache.flume.cmcc.kafka.CmccKafkaSink
- a1.sinks.k1.metadata.broker.list=192.168.11.174:9092
- a1.sinks.k1.partition.key=0
- a1.sinks.k1.partitioner.class=org.apache.flume.cmcc.kafka.CmccPartition
- a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
- a1.sinks.k1.request.required.acks=0
- a1.sinks.k1.max.message.size=1000000
- a1.sinks.k1.cmcc.encoding=UTF-8
- a1.sinks.k1.cmcc.topic.name=CMCC
- a1.sinks.k1.producer.type=sync
- a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
- a1.sinks.k1.batchSize=100
这里我们看到,有些属性,我们在Constants中并没有定义,这是如何读取的呢,我们来看下kafka的源码就知道了:
[java] view plain copy
- private ProducerConfig(VerifiableProperties props)
- {
- this.props = props;
- super();
- kafka.producer.async.AsyncProducerConfig.class.$init$(this);
- SyncProducerConfigShared.class.$init$(this);
- brokerList = props.getString(“metadata.broker.list”);
- partitionerClass = props.getString(“partitioner.class”, “kafka.producer.DefaultPartitioner”);
- producerType = props.getString(“producer.type”, “sync”);
- String prop;
- compressionCodec = liftedTree1$1(prop = props.getString(“compression.codec”, NoCompressionCodec$.MODULE$.name()));
- Object _tmp = null;
- compressedTopics = Utils$.MODULE$.parseCsvList(props.getString(“compressed.topics”, null));
- messageSendMaxRetries = props.getInt(“message.send.max.retries”, 3);
- retryBackoffMs = props.getInt(“retry.backoff.ms”, 100);
- topicMetadataRefreshIntervalMs = props.getInt(“topic.metadata.refresh.interval.ms”, 600000);
- ProducerConfig$.MODULE$.validate(this);
- }
kafka的源码在实例化ProducerConfig的时候会读取配置文件中的kafka配置信息的。
四、自定义Channel开发
官网说待定。
下面是美团网的自定义Channel 开发,下面是链接
http://tech.meituan.com/mt-log-system-optimization.html
……
Flume本身提供了MemoryChannel和FileChannel。MemoryChannel处理速度快,但缓存大小有限,且没有持久化;FileChannel则刚好相反。我们希望利用两者的优势,在Sink处理速度够快,Channel没有缓存过多日志的时候,就使用MemoryChannel,当Sink处理速度跟不上,又需要Channel能够缓存下应用端发送过来的日志时,就使用FileChannel,由此我们开发了DualChannel,能够智能的在两个Channel之间切换。
其具体的逻辑如下:
[java] view plain copy
- /***
- * putToMemChannel indicate put event to memChannel or fileChannel
- * takeFromMemChannel indicate take event from memChannel or fileChannel
- * */
- private AtomicBoolean putToMemChannel = new AtomicBoolean(true);
- private AtomicBoolean takeFromMemChannel = new AtomicBoolean(true);
- void doPut(Event event) {
- if (switchon && putToMemChannel.get()) {
- //往memChannel中写数据
- memTransaction.put(event);
- if ( memChannel.isFull() || fileChannel.getQueueSize() > 100) {
- putToMemChannel.set(false);
- }
- } else {
- //往fileChannel中写数据
- fileTransaction.put(event);
- }
- }
- Event doTake() {
- Event event = null;
- if ( takeFromMemChannel.get() ) {
- //从memChannel中取数据
- event = memTransaction.take();
- if (event == null) {
- takeFromMemChannel.set(false);
- }
- } else {
- //从fileChannel中取数据
- event = fileTransaction.take();
- if (event == null) {
- takeFromMemChannel.set(true);
- putToMemChannel.set(true);
- }
- }
- return event;
- }
这里要说明下,官网是建议使用file channel,虽然它的效率比较低,但是它能保证数据完整性,而memory channel效率高,但是只能对数据丢失和重复不太敏感的业务使用
还是针对学习八中的那个需求,我们现在换一种实现方式,采用拦截器来实现。
先回想一下,spooldir source可以将文件名作为header中的key:basename写入到event的header当中去。试想一下,如果有一个拦截器可以拦截这个event,然后抽取header中这个key的值,将其拆分成3段,每一段都放入到header中,这样就可以实现那个需求了。
遗憾的是,flume没有提供可以拦截header的拦截器。不过有一个抽取body内容的拦截器:RegexExtractorInterceptor,看起来也很强大,以下是一个官方文档的示例:
If the Flume event body contained 13.4foobar5 and the following configuration was used
a1.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
a1.sources.r1.interceptors.i1.serializers = s1 s2 s3
a1.sources.r1.interceptors.i1.serializers.s1.name = one
a1.sources.r1.interceptors.i1.serializers.s2.name = two
a1.sources.r1.interceptors.i1.serializers.s3.name = three
The extracted event will contain the same body but the following headers will have been added one=>1, two=>2, three=>3
大概意思就是,通过这样的配置,event body中如果有13.4foobar5 这样的内容,这会通过正则的规则抽取具体部分的内容,然后设置到header当中去。
于是决定打这个拦截器的主义,觉得只要把代码稍微改改,从拦截body改为拦截header中的具体key,就OK了。翻开源码,哎呀,很工整,改起来没难度,以下是我新增的一个拦截器:RegexExtractorExtInterceptor:
[java] view plain copy
- package com.besttone.flume;
- import java.util.List;
- import java.util.Map;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
- import org.apache.commons.lang.StringUtils;
- import org.apache.flume.Context;
- import org.apache.flume.Event;
- import org.apache.flume.interceptor.Interceptor;
- import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;
- import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.google.common.base.Charsets;
- import com.google.common.base.Preconditions;
- import com.google.common.base.Throwables;
- import com.google.common.collect.Lists;
- /**
- * Interceptor that extracts matches using a specified regular expression and
- * appends the matches to the event headers using the specified serializers
- * Note that all regular expression matching occurs through Java’s built in
- * java.util.regex package. Properties:
- *
- * regex: The regex to use
- *
- * serializers: Specifies the group the serializer will be applied to, and the
- * name of the header that will be added. If no serializer is specified for a
- * group the default {@link RegexExtractorInterceptorPassThroughSerializer} will
- * be used
- *
- * Sample config:
- *
- * agent.sources.r1.channels = c1
- *
- * agent.sources.r1.type = SEQ
- *
- * agent.sources.r1.interceptors = i1
- *
- * agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR
- *
- * agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)
- *
- * agent.sources.r1.interceptors.i1.serializers = s1 s2
- * agent.sources.r1.interceptors.i1.serializers.s1.type =
- * com.blah.SomeSerializer agent.sources.r1.interceptors.i1.serializers.s1.name
- * = warning agent.sources.r1.interceptors.i1.serializers.s2.type =
- * org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer
- * agent.sources.r1.interceptors.i1.serializers.s2.name = error
- * agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd
- *
- *
- *
- *
- * Example 1:
- *
- * EventBody: 1
3.4foobar5 Configuration:
- * agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
- *
- * agent.sources.r1.interceptors.i1.serializers = s1 s2 s3
- * agent.sources.r1.interceptors.i1.serializers.s1.name = one
- * agent.sources.r1.interceptors.i1.serializers.s2.name = two
- * agent.sources.r1.interceptors.i1.serializers.s3.name = three
- *
- * results in an event with the the following
- *
- * body: 1
3.4foobar5 headers: one=>1, two=>2, three=3
- *
- * Example 2:
- *
- * EventBody: 1
3.4foobar5
- *
- * Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)
- *
- * agent.sources.r1.interceptors.i1.serializers = s1 s2
- * agent.sources.r1.interceptors.i1.serializers.s1.name = one
- * agent.sources.r1.interceptors.i1.serializers.s2.name = two
- *
- *
- * results in an event with the the following
- *
- * body: 1
3.4foobar5 headers: one=>1, two=>2
- *
- */
- public class RegexExtractorExtInterceptor implements Interceptor {
- static final String REGEX = “regex”;
- static final String SERIALIZERS = “serializers”;
- // 增加代码开始
- static final String EXTRACTOR_HEADER = “extractorHeader”;
- static final boolean DEFAULT_EXTRACTOR_HEADER = false;
- static final String EXTRACTOR_HEADER_KEY = “extractorHeaderKey”;
- // 增加代码结束
- private static final Logger logger = LoggerFactory
- .getLogger(RegexExtractorExtInterceptor.class);
- private final Pattern regex;
- private final List
serializers; - // 增加代码开始
- private final boolean extractorHeader;
- private final String extractorHeaderKey;
- // 增加代码结束
- private RegexExtractorExtInterceptor(Pattern regex,
- List
serializers, boolean extractorHeader, - String extractorHeaderKey) {
- this.regex = regex;
- this.serializers = serializers;
- this.extractorHeader = extractorHeader;
- this.extractorHeaderKey = extractorHeaderKey;
- }
- @Override
- public void initialize() {
- // NO-OP…
- }
- @Override
- public void close() {
- // NO-OP…
- }
- @Override
- public Event intercept(Event event) {
- String tmpStr;
- if(extractorHeader)
- {
- tmpStr = event.getHeaders().get(extractorHeaderKey);
- }
- else
- {
- tmpStr=new String(event.getBody(),
- Charsets.UTF_8);
- }
- Matcher matcher = regex.matcher(tmpStr);
- Map
headers = event.getHeaders(); - if (matcher.find()) {
- for (int group = 0, count = matcher.groupCount(); group < count; group++) {
- int groupIndex = group + 1;
- if (groupIndex > serializers.size()) {
- if (logger.isDebugEnabled()) {
- logger.debug(
- “Skipping group {} to {} due to missing serializer”,
- group, count);
- }
- break;
- }
- NameAndSerializer serializer = serializers.get(group);
- if (logger.isDebugEnabled()) {
- logger.debug(“Serializing {} using {}“,
- serializer.headerName, serializer.serializer);
- }
- headers.put(serializer.headerName, serializer.serializer
- .serialize(matcher.group(groupIndex)));
- }
- }
- return event;
- }
- @Override
- public List
intercept(List events) { - List
intercepted = Lists.newArrayListWithCapacity(events.size()); - for (Event event : events) {
- Event interceptedEvent = intercept(event);
- if (interceptedEvent != null) {
- intercepted.add(interceptedEvent);
- }
- }
- return intercepted;
- }
- public static class Builder implements Interceptor.Builder {
- private Pattern regex;
- private List
serializerList; - // 增加代码开始
- private boolean extractorHeader;
- private String extractorHeaderKey;
- // 增加代码结束
- private final RegexExtractorInterceptorSerializer defaultSerializer = new RegexExtractorInterceptorPassThroughSerializer();
- @Override
- public void configure(Context context) {
- String regexString = context.getString(REGEX);
- Preconditions.checkArgument(!StringUtils.isEmpty(regexString),
- “Must supply a valid regex string”);
- regex = Pattern.compile(regexString);
- regex.pattern();
- regex.matcher(“”).groupCount();
- configureSerializers(context);
- // 增加代码开始
- extractorHeader = context.getBoolean(EXTRACTOR_HEADER,
- DEFAULT_EXTRACTOR_HEADER);
- if (extractorHeader) {
- extractorHeaderKey = context.getString(EXTRACTOR_HEADER_KEY);
- Preconditions.checkArgument(
- !StringUtils.isEmpty(extractorHeaderKey),
- “必须指定要抽取内容的header key”);
- }
- // 增加代码结束
- }
- private void configureSerializers(Context context) {
- String serializerListStr = context.getString(SERIALIZERS);
- Preconditions.checkArgument(
- !StringUtils.isEmpty(serializerListStr),
- “Must supply at least one name and serializer”);
- String[] serializerNames = serializerListStr.split(“\\s+”);
- Context serializerContexts = new Context(
- context.getSubProperties(SERIALIZERS + “.”));
- serializerList = Lists
- .newArrayListWithCapacity(serializerNames.length);
- for (String serializerName : serializerNames) {
- Context serializerContext = new Context(
- serializerContexts.getSubProperties(serializerName
- “.”));
- String type = serializerContext.getString(“type”, “DEFAULT”);
- String name = serializerContext.getString(“name”);
- Preconditions.checkArgument(!StringUtils.isEmpty(name),
- “Supplied name cannot be empty.”);
- if (“DEFAULT”.equals(type)) {
- serializerList.add(new NameAndSerializer(name,
- defaultSerializer));
- } else {
- serializerList.add(new NameAndSerializer(name,
- getCustomSerializer(type, serializerContext)));
- }
- }
- }
- private RegexExtractorInterceptorSerializer getCustomSerializer(
- String clazzName, Context context) {
- try {
- RegexExtractorInterceptorSerializer serializer = (RegexExtractorInterceptorSerializer) Class
- .forName(clazzName).newInstance();
- serializer.configure(context);
- return serializer;
- } catch (Exception e) {
- logger.error(“Could not instantiate event serializer.”, e);
- Throwables.propagate(e);
- }
- return defaultSerializer;
- }
- @Override
- public Interceptor build() {
- Preconditions.checkArgument(regex != null,
- “Regex pattern was misconfigured”);
- Preconditions.checkArgument(serializerList.size() > 0,
- “Must supply a valid group match id list”);
- return new RegexExtractorExtInterceptor(regex, serializerList,
- extractorHeader, extractorHeaderKey);
- }
- }
- static class NameAndSerializer {
- private final String headerName;
- private final RegexExtractorInterceptorSerializer serializer;
- public NameAndSerializer(String headerName,
- RegexExtractorInterceptorSerializer serializer) {
- this.headerName = headerName;
- this.serializer = serializer;
- }
- }
- }
简单说明一下改动的内容:
增加了两个配置参数:
extractorHeader 是否抽取的是header部分,默认为false,即和原始的拦截器功能一致,抽取的是event body的内容
extractorHeaderKey 抽取的header的指定的key的内容,当extractorHeader为true时,必须指定该参数。
按照第八讲的方法,我们将该类打成jar包,作为flume的插件放到了/var/lib/flume-ng/plugins.d/RegexExtractorExtInterceptor/lib目录下,重新启动flume,将该拦截器加载到classpath中。
最终的flume.conf如下:
[plain] view plain copy
- tier1.sources=source1
- tier1.channels=channel1
- tier1.sinks=sink1
- tier1.sources.source1.type=spooldir
- tier1.sources.source1.spoolDir=/opt/logs
- tier1.sources.source1.fileHeader=true
- tier1.sources.source1.basenameHeader=true
- tier1.sources.source1.interceptors=i1
- tier1.sources.source1.interceptors.i1.type=com.besttone.flume.RegexExtractorExtInterceptor$Builder
- tier1.sources.source1.interceptors.i1.regex=(.*)\\.(.*)\\.(.*)
- tier1.sources.source1.interceptors.i1.extractorHeader=true
- tier1.sources.source1.interceptors.i1.extractorHeaderKey=basename
- tier1.sources.source1.interceptors.i1.serializers=s1 s2 s3
- tier1.sources.source1.interceptors.i1.serializers.s1.name=one
- tier1.sources.source1.interceptors.i1.serializers.s2.name=two
- tier1.sources.source1.interceptors.i1.serializers.s3.name=three
- tier1.sources.source1.channels=channel1
- tier1.sinks.sink1.type=hdfs
- tier1.sinks.sink1.channel=channel1
- tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}
- tier1.sinks.sink1.hdfs.round=true
- tier1.sinks.sink1.hdfs.roundValue=10
- tier1.sinks.sink1.hdfs.roundUnit=minute
- tier1.sinks.sink1.hdfs.fileType=DataStream
- tier1.sinks.sink1.hdfs.writeFormat=Text
- tier1.sinks.sink1.hdfs.rollInterval=0
- tier1.sinks.sink1.hdfs.rollSize=10240
- tier1.sinks.sink1.hdfs.rollCount=0
- tier1.sinks.sink1.hdfs.idleTimeout=60
- tier1.channels.channel1.type=memory
- tier1.channels.channel1.capacity=10000
- tier1.channels.channel1.transactionCapacity=1000
- tier1.channels.channel1.keep-alive=30
我把source type改回了内置的spooldir,而不是上一讲自定义的source,然后添加了一个拦截器i1,type是自定义的拦截器:com.besttone.flume.RegexExtractorExtInterceptor$Builder,正则表达式按“.”分隔抽取三部分,分别放到header中的key:one,two,three当中去,即a.log.2014-07-31,通过拦截器后,在header当中就会增加三个key: one=a,two=log,three=2014-07-31。这时候我们在tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%{one}/%{three}。
就实现了和前面第八讲一模一样的需求。
也可以看到,自定义拦截器的改动成本非常小,比自定义source小多了,我们这就增加了一个类,就实现了该功能。
还没有评论,来说两句吧...