flume自定义sink àì夳堔傛蜴生んèń 2022-07-14 02:13 179阅读 0赞 java文件: package com.dle; import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MySink.class); private static final String PROP_KEY_ROOTPATH = "fileName"; private String fileName; @Override public void configure(Context context) { // TODO Auto-generated method stub fileName = context.getString(PROP_KEY_ROOTPATH); } @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event =null; txn.begin(); while(true){ event = ch.take(); if (event!=null) { break; } } try { logger.debug("Get event."); String body = new String(event.getBody()); String res = body + ":" + System.currentTimeMillis() + "\r\n"; File file = new File(fileName); FileOutputStream fos = null; try { fos = new FileOutputStream(file, true); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.write(res.getBytes()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } } 配置信息: a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.channels = c1 a1.sources.r1.command = tail -F /tmp/test/a.txt # Describe the sink a1.sinks.k1.type = com.dle.MySink a1.sinks.k1.fileName=/tmp/test/target.txt # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
还没有评论,来说两句吧...