Flume Source组件分析

Flume的介绍以及它的架构之前已经分析过。本文分析flume的Source组件。

Flume内置了很多Source,比如Avro Source,Spooling Directory Source,NetCat Source,Kafka Source等。

以源码的角度分析Source组件。

Source接口

Source接口的定义:

public interface Source extends LifecycleAware, NamedComponent {

  public void setChannelProcessor(ChannelProcessor channelProcessor);

  public ChannelProcessor getChannelProcessor();

}

Source接口只有2个方法,分别是setChannelProcessor和getChannelProcessor。ChannelProcessor是一个Channel处理器,会暴露一些操作用来将事件存储到channel中。

Source接口继承了LifecycleAware和NamedComponent接口。

LifecycleAware接口表示实现了该接口的类是具有状态的,有生命周期的特性。

public interface LifecycleAware {

  public void start(); // 启动一个服务或组件

  public void stop(); // 关闭一个服务或组件

  public LifecycleState getLifecycleState(); // 得到当前组件或服务的状态

}

NamedComponent接口是一个可以让组件拥有名字的接口,这样组件可以在配置中被引用。

public interface NamedComponent {

  public void setName(String name);

  public String getName();

}

Source接口的实现类AbstractSource,基本上所有的Source都会继承这个AbstractSource:

AbstractSource属性:

private ChannelProcessor channelProcessor;
private String name;
private LifecycleState lifecycleState;

这3个属性是都是都会在Source接口的方法中被使用到。

@Override
  public synchronized void start() {
    Preconditions.checkState(channelProcessor != null,
        "No channel processor configured");

    lifecycleState = LifecycleState.START;
  }

  @Override
  public synchronized void stop() {
    lifecycleState = LifecycleState.STOP;
  }

  @Override
  public synchronized void setChannelProcessor(ChannelProcessor cp) {
    channelProcessor = cp;
  }

  @Override
  public synchronized ChannelProcessor getChannelProcessor() {
    return channelProcessor;
  }

  @Override
  public synchronized LifecycleState getLifecycleState() {
    return lifecycleState;
  }

  @Override
  public synchronized void setName(String name) {
    this.name = name;
  }

  @Override
  public synchronized String getName() {
    return name;
  }

Flume内置的Source例子

NetCat Source

首先看最简单的NetCat Source。

NetCat Source配置如下:

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

NetCat Source对应的Source类是NetcatSource,定义如下:

public class NetcatSource extends AbstractSource implements Configurable,
        EventDrivenSource

EventDrivenSource接口表示这个Source不需要外部驱动来收集event,而是自身会实现。这个接口只是一个标识,没有任何方法。

Configurable接口提供了一个public void configure(Context context);方法,会使用以下Context进行一些数据的配置。

NetcatSource覆盖了AbstractSource的start和stop方法,也实现了Configurable中的configure方法。

实现configure方法:

读取配置文件中的信息host和port信息(还有其他一些属性的配置,比如max-line-length和ack-every-event)。

覆盖start方法:

内部会使用ServerSocketChannel监听对应地址和端口发来的数据。

覆盖stop方法:

关闭ServerSocketChannel。

start方法内部会用线程池起一个新的进程来监听数据,其中有个processEvents方法,processEvents方法内部会读取Socket发来的数据,有段代码:

Event event = EventBuilder.withBody(body);
// process event
ChannelException ex = null;
try {
  source.getChannelProcessor().processEvent(event);
} catch (ChannelException chEx) {
  ex = chEx;
}

这段代码就会处理socket读取的数据并构造成一个Event,然后放到ChannelProcessor里,这个ChannelProcessor是在AbstractSource中定义的。

Kafka Source

Kafka Source也是Flume内置的一个Source之一,配置如下:

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.zookeeperConnect = localhost:2181
tier1.sources.source1.topic = test1
tier1.sources.source1.groupId = flume
tier1.sources.source1.kafka.consumer.timeout.ms = 100

对应的Source类是KafkaSource:

public class KafkaSource extends AbstractSource
    implements Configurable, PollableSource

PollableSource接口表示Source需要自己去查询数据(poll)。

public interface PollableSource extends Source {

  // process方法的返回值是个Status,有2个状态,分别是READY和BACKOFF
  public Status process() throws EventDeliveryException;

  public static enum Status {
    READY, BACKOFF
  }
}

KafkaSource的configure方法同样是读取配置文件信息。KafkaSource有几个特殊的配置,zookeeperConnect,groupId,topic都是kafka本身需要的配置。batchSize和batchDurationMillis是批处理的配置。batchSize表示批次数,每batchSize个event需要写入到Channel中,默认值是1000。batchDurationMillis是处理Kafka对i读取数据的时间,默认是1000毫秒,比如批次个数没有到1000,但是batchDurationMillis到了的话还是会丢到channel里。

start方法构造Kafka的ConsumerConnector,start方法还会调用父类AbstractSource的start方法,也就是初始化lifecycleState属性,说明这个Source是有生命周期的。

stop方法关闭Kafka的ConsumerConnector,同理stop也会调用父类的stop方法。

process方法是PollableSource接口的方法,KafkaSource需要我们自己去查询数据。

process部分源码如下:

long batchStartTime = System.currentTimeMillis();
long batchEndTime = System.currentTimeMillis() + timeUpperLimit;
try {
  boolean iterStatus = false;
  // 批次个数和处理时间全部符合才会进行处理
  while (eventList.size() < batchUpperLimit &&
          System.currentTimeMillis() < batchEndTime) {
    iterStatus = hasNext();
    if (iterStatus) {
      // kafka队列存在数据的话,提取数据,构造Event,放到eventList里,eventList是KafkaSource里的一个属性
      MessageAndMetadata<byte[], byte[]> messageAndMetadata = it.next();
      kafkaMessage = messageAndMetadata.message();
      kafkaKey = messageAndMetadata.key();

      headers = new HashMap<String, String>();
      headers.put(KafkaSourceConstants.TIMESTAMP,
              String.valueOf(System.currentTimeMillis()));
      headers.put(KafkaSourceConstants.TOPIC, topic);
      headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));

      event = EventBuilder.withBody(kafkaMessage, headers);
      eventList.add(event);
    }

  }
  if (eventList.size() > 0) {
    // eventList列表里有数据的话,将数据丢到channel里,清空eventList。代码执行到这里说明要么读取了1000条数据,要么处理时间到了
    getChannelProcessor().processEventBatch(eventList);
    eventList.clear();

    if (!kafkaAutoCommitEnabled) {
      consumer.commitOffsets();
    }
  }
  // kafka队列没有数据的话返回BACKOFF状态
  if (!iterStatus) {
    return Status.BACKOFF;
  }
  // kafka队列还有数据的话返回READY状态
  return Status.READY;
} catch (Exception e) {
  log.error("KafkaSource EXCEPTION, {}", e);
  return Status.BACKOFF;
}

编写自定义的Source

Flume尽管已经提供了不少Source,但始终无法满足所有的需求。 比如数据库中的数据想使用Flume写入了其他渠道。

编写SQLSource只需要继承AbstractSource,且实现Configurable和PollableSource接口,SQLSource跟KafkaSource一样都需要自身去查询数据,所以都得实现PollableSource接口。

SQLSource在github上已经有人实现过了: https://github.com/keedio/flume-ng-sql-source

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
本文作者:Format
原文链接: http://fangjian0423.github.io/2015/06/21/flume-source/
版权归作者所有,转载请注明出处