Flume各种拦截器(二)——正则和自定义拦截器

2022年3月2日23:13:38 发表评论 785 views

查询替换拦截器

 

search.conf

#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Shalltest
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = search_replace

#遇到数字改成shalltest,A123会替换为Ashalltest
a1.sources.r1.interceptors.i1.searchPattern = [0-9]+
a1.sources.r1.interceptors.i1.replaceString = shalltest
a1.sources.r1.interceptors.i1.charset = UTF-8

#3 sink
a1.sinks.k1.type = logger

#4 Chanel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#5 bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf/ -f jobconf/search.conf -n a1 -Dflume.root.logger=INFO,console

正则过滤拦截器

filter.conf

#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Shalltest
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_filter
a1.sources.r1.interceptors.i1.regex = ^A.*
#如果excludeEvents设为false,表示过滤掉不是以A开头的events。如果excludeEvents设为true,则表示过滤掉以A开头的events。
a1.sources.r1.interceptors.i1.excludeEvents = true

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

bin/flume-ng agent -c conf/ -f jobconf/filter.conf -n a1 -Dflume.root.logger=INFO,console

正则抽取拦截器

extractor.conf


#1 agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#2 source
a1.sources.r1.type = exec
a1.sources.r1.channels = c1
a1.sources.r1.command = tail -F /opt/Shalltest
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = hostname is (.*?) ip is (.*)
a1.sources.r1.interceptors.i1.serializers = s1 s2
a1.sources.r1.interceptors.i1.serializers.s1.name = hostname
a1.sources.r1.interceptors.i1.serializers.s2.name = ip

a1.sinks.k1.type = logger

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -c conf/ -f jobconf/extractor.conf -n a1 -Dflume.root.logger=INFO,console

自定义拦截器

maven配置文件

<dependencies>
        <!-- flume核心依赖 -->
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>
    </dependencies>
    <build>
    <plugins>
    <!-- 打包插件 -->
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.4</version>
        <configuration>
            <archive>
                <manifest>
                    <addClasspath>true</addClasspath>
                    <classpathPrefix>lib/</classpathPrefix>
                    <mainClass></mainClass>
                </manifest>
            </archive>
        </configuration>
    </plugin>
    <!-- 编译插件 -->
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>1.8</source>
            <target>1.8</target>
            <encoding>utf-8</encoding>
        </configuration>
    </plugin>
    </plugins>
    </build>

Java代码:

package ToUpCase;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class MyInterceptor implements Interceptor {
    @Override
    public void initialize() {

    }

    @Override
    public Event intercept(Event event) {
        byte[] body = event.getBody();
        event.setBody(new String(body).toUpperCase().getBytes());
        return event;
    }

    @Override
    public List<Event> intercept(List<Event> list) {
        List<Event> events = new ArrayList<>();
        for (Event event : list) {
            events.add(intercept(event));
        }
        return events;
    }



    @Override
    public void close() {

    }

    public static class Builder implements Interceptor.Builder{

        @Override
        public Interceptor build() {
            return new MyInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }

}

ToUpCase.conf


#1.agent 
a1.sources = r1
a1.sinks =k1
a1.channels = c1


# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/Shalltest
a1.sources.r1.interceptors = i1
#全类名$Builder
a1.sources.r1.interceptors.i1.type = ToUpCase.MyInterceptor$Builder

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /ToUpCase1
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

 

启动,即可看到HDFS里面有了文件夹,并已经转化成大写

bin/flume-ng agent -c conf/ -n a1 -f jar/ToUpCase.conf -C jar/vip.isok-1.0-SNAPSHOT.jar -Dflume.root.logger=DEBUG,console

 

 

 

 

 

 

 

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: