Flume开发中常用组件source,channel,sink等配置 绝地灬酷狼 2022-06-02 11:39 139阅读 0赞 实际开发中Flume中常用的source源,sink,channel,intercepertor等介绍,而实际上关于agent代理中设置选项有很多,具体要到官网去查看。 <table> <tbody> <tr> <td> <p><br> </p> </td> <td> <p>组件</p> </td> <td> <p>使用介绍</p> </td> </tr> <tr> <td> <p> </p> <p> </p> <p> </p> <p> </p> <p> </p> <p>source</p> <p>数据源</p> </td> <td> <p><strong><span style="color:#FF0000">Avro</span></strong></p> </td> <td> <p>监听由Avro sink 或Flume SDK 通过Avro RPC发送的事件所抵达的端口</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">Exec</span></strong></p> </td> <td> <p>运行一个Unix命令(例如 tail -F /path/to/file),并且把从标准输出上读取的行转化为事件。但是要注意,此source不一定能保证把事件传送到channel,更好的选择可以参考spooling directory source 或者Flume SDK.</p> </td> </tr> <tr> <td> <p>HTTP</p> </td> <td> <p>监听一个端口,并且使用可插拔句柄,比如JSON处理程序或者二进制数据处理程序,把HTTP请求转换成事件</p> </td> </tr> <tr> <td> <p>JMS</p> </td> <td> <p>读取来自JMS Queue或者Topic的消息并将其转换成事件</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">Spooling directory</span></strong></p> <p><strong><span style="color:#FF0000">/spooldir</span></strong></p> </td> <td> <p>按行读取保存在缓冲目录中的文件,并将其转换为事件。</p> </td> </tr> <tr> <td> <p><span style="color:#FF0000">Netcat</span></p> </td> <td> <p>监听一个端口,并把每行文本转换为一个事件</p> </td> </tr> <tr> <td> <p>Syslog</p> </td> <td> <p>从日志中读取行,并将其转换为事件</p> </td> </tr> <tr> <td> <p>Thrift</p> </td> <td> <p>监听由Thrift sink或Flume SDK通过Thrift RPC发送的事件所抵达的窗口</p> </td> </tr> <tr> <td> <p>Squence genetartor</p> </td> <td> <p>依据增量计数器来生成事件,主要用来测试用</p> </td> </tr> <tr> <td> <p><span style="color:#FF0000">kafka</span></p> </td> <td> <p>监听kafka的Topic,当成事件</p> </td> </tr> <tr> <td> <p><br> </p> </td> <td> <p>组件</p> </td> <td> <p>使用介绍</p> </td> </tr> <tr> <td> <p> </p> <p> </p> <p> </p> <p> </p> <p> </p> <p>sink</p> <p>目地</p> </td> <td> <p><strong><span style="color:#FF0000">HDFS</span></strong></p> </td> <td> <p>以文本,序列化文件,Avro或定制格式写入到HDFS中。</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">HBASE</span></strong></p> </td> <td> <p>使用某种序列化工具将数据写入到HBASE中</p> </td> </tr> <tr> <td> <p>Logger</p> </td> <td> <p>使用SLF4J记录INFO级别的事件,主要用来测试</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">kafka</span></strong></p> </td> <td> <p>将事件写入到kafka消息缓存对列</p> </td> </tr> <tr> <td> <p>Elasticcsearch</p> </td> <td> <p>使用Logstash格式将事件写入到Elasticsearch集群中</p> </td> </tr> <tr> <td> <p>NULL</p> </td> <td> <p>丢弃所有事件</p> </td> </tr> <tr> <td> <p>Avro</p> </td> <td> <p>通过Avro RPC发送事件到一个Avro source群中</p> </td> </tr> <tr> <td> <p>File roll</p> </td> <td> <p>将事件写入到本地文件系统</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">Hive</span></strong></p> </td> <td> <p>将事件按固定的格式导入到hive表中或者对应的分区里。</p> </td> </tr> <tr> <td> <p>IRC</p> </td> <td> <p>将事件发送给IRC通道</p> </td> </tr> <tr> <td> <p><br> </p> </td> <td> <p>组件</p> </td> <td> <p>使用介绍</p> </td> </tr> <tr> <td> <p><br> </p> <p>channel</p> <p>通道</p> </td> <td> <p><strong><span style="color:#FF0000">file</span></strong></p> </td> <td> <p><span style="color:#FF0000">将事件存储在一个本地文件系统上的事务日志中。这种类型的channel</span>具有持久性:只要事件被写入channel,即使使用了代理,代理重新启动,事件也不会丢失。agent1.channnels.channel1.type=file </p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">memory</span></strong></p> </td> <td> <p><span style="color:#ff0000">将事件缓存到内存中,因此不具有持久性存储能力。所以采用memory channel时,如果代理重新启动,事件就会丢失。在有些时候这种情况允许。和file channel相比,memory channel 速度快,吞吐量较高</span></p> <p><span style="color:#FF0000">agent1.channnels.channel1.type=memory</span></p> </td> </tr> <tr> <td> <p> jdbc</p> </td> <td> <p>将事件存储到数据库中。</p> </td> </tr> <tr> <td> <p><br> </p> </td> <td> <p>组件</p> </td> <td> <p>使用介绍</p> </td> </tr> <tr> <td> <p> </p> <p> </p> <p> </p> <p> </p> <p>拦截器</p> <p>interce-ptor</p> <p> </p> </td> <td> <p><strong><span style="color:#FF0000">Timestamp</span></strong></p> </td> <td> <p>给特定source传递过来的事件加一个时间戳header.其中包含的是代理处理事件的时间,以ms为单位。具体参考博客Flume拦截器那篇</p> <p> agent1.sources.souce1.interceptors = interceptor1</p> <p> agent1.sources.source1.interceptors.interceptor1.type = timestamp。</p> </td> </tr> <tr> <td> <p><strong><span style="color:#FF0000">UUID</span></strong></p> </td> <td> <p><span style="color:#2F5597">在所有的事件上设置一个ID header</span>,它是一个全局的唯一标识符,对将来删除重复数据有用。</p> <p><span style="color:#2F5597">agent1.sources.source1.interceptors.interceptor1.type</span></p> <p><span style="color:#2F5597">=</span></p> <p>org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder</p> </td> </tr> <tr> <td> <p> static</p> </td> <td> <p>将所有的事件上设置一个固定的header及其值。具体参考官网</p> </td> </tr> <tr> <td> <p> Host</p> </td> <td> <p>将所有事件上设置一个包含代理主机名或IP地址的主机header.</p> </td> </tr> </tbody> </table>
还没有评论,来说两句吧...