一、整体流程
FileChannel主要是由WAL预写日志和内存队列FlumeEventQueue组成。
二、Transaction
public interface Transaction {// 描述transaction状态enum TransactionState { Started, Committed, RolledBack, Closed }void begin();void commit();void rollback();void close();
}
三、Put()
Put事务发生在source往channel写入数据
// source收集到events后,交由channel处理
getChannelProcessor().processEventBatch(events);
public void processEventBatch(List<Event> events) {Preconditions.checkNotNull(events, "Event list must not be null");events = interceptorChain.intercept(events);//必需通道Map<Channel, List<Event>> reqChannelQueue =new LinkedHashMap<Channel, List<Event>>();//可选通道,可选通道是指在定义 SinkGroup 时可以选择添加的通道,用于将数据从 SinkGroup 中的一个 Sink 传输到另一个 Sink//没有配置可选通道可忽略。Map<Channel, List<Event>> optChannelQueue =new LinkedHashMap<Channel, List<Event>>();for (Event event : events) {List<Channel> reqChannels = selector.getRequiredChannels(event);for (Channel ch : reqChannels) {List<Event> eventQueue = reqChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();reqChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}List<Channel> optChannels = selector.getOptionalChannels(event);for (Channel ch : optChannels) {List<Event> eventQueue = optChannelQueue.get(ch);if (eventQueue == null) {eventQueue = new ArrayList<Event>();optChannelQueue.put(ch, eventQueue);}eventQueue.add(event);}}// Process required channelsfor (Channel reqChannel : reqChannelQueue.keySet()) {Transaction tx = reqChannel.getTransaction();Preconditions.checkNotNull(tx, "Transaction object must not be null");try {tx.begin();List<Event> batch = reqChannelQueue.get(reqChannel);for (Event event : batch) {//进入必选通道,调用put()reqChannel.put(event);}tx.commit();} catch (Throwable t) {tx.rollback();if (t instanceof Error) {LOG.error("Error while writing to required channel: " + reqChannel, t);throw (Error) t;} else if (t instanceof ChannelException) {throw (ChannelException) t;} else {throw new ChannelException("Unable to put batch on required " +"channel: " + reqChannel, t);}} finally {if (tx != null) {tx.close();}}}// Process optional channelsfor (Channel optChannel : optChannelQueue.keySet()) {Transaction tx = optChannel.getTransaction();Preconditions.checkNotNull(tx, "Transaction object must not be null");try {tx.begin();List<Event> batch = optChannelQueue.get(optChannel);for (Event event : batch) {optChannel.put(event);}tx.commit();} catch (Throwable t) {tx.rollback();LOG.error("Unable to put batch on optional channel: " + optChannel, t);if (t instanceof Error) {throw (Error) t;}} finally {if (tx != null) {tx.close();}}}}
//org.apache.flume.channel.BasicChannelSemantics.javapublic void put(Event event) throws ChannelException {BasicTransactionSemantics transaction = currentTransaction.get();Preconditions.checkState(transaction != null,"No transaction exists for this thread");//调用transaction对象put()transaction.put(event);}
//org.apache.flume.channel.BasicTransactionSemantics.javaprotected void put(Event event) {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"put() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"put() called when transaction is %s!", state);Preconditions.checkArgument(event != null,"put() called with null event!");try {doPut(event);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new ChannelException(e.toString(), e);}}
// org.apache.flume.channel.file.FileBackedTransaction.java
protected void doPut(Event event) throws InterruptedException {channelCounter.incrementEventPutAttemptCount();if (putList.remainingCapacity() == 0) {throw new ChannelException("Put queue for FileBackedTransaction " +"of capacity " + putList.size() + " full, consider " +"committing more frequently, increasing capacity or " +"increasing thread count. " + channelNameDescriptor);}// this does not need to be in the critical section as it does not// modify the structure of the log or queue.if (!queueRemaining.tryAcquire(keepAlive, TimeUnit.SECONDS)) {throw new ChannelFullException("The channel has reached it's capacity. "+ "This might be the result of a sink on the channel having too "+ "low of batch size, a downstream system running slower than "+ "normal, or that the channel capacity is just too low. "+ channelNameDescriptor);}boolean success = false;//doTake,也会获取该锁,所以doTake和doPut只能操作一个,无法同时操作log.lockShared();try {// transactionID在前面创建transaction对象的时候定义的// 将put事件写入WAL日志,并将数据持久化到磁盘文件FlumeEventPointer ptr = log.put(transactionID, event);// 将event pointer放到内存队列putListPreconditions.checkState(putList.offer(ptr), "putList offer failed "+ channelNameDescriptor);// 将event也放到inflightPuts中,这是临时数据,因为还没有提交// 如果还没有提交就直接放到FlumeEventQueue,那么将提前暴露给sink。queue.addWithoutCommit(ptr, transactionID);success = true;} catch (IOException e) {channelCounter.incrementEventPutErrorCount();throw new ChannelException("Put failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();if (!success) {// release slot obtained in the case// the put fails for any reasonqueueRemaining.release();}}}
// org.apache.flume.channel.file.FlumeEventQueue.javasynchronized void addWithoutCommit(FlumeEventPointer e, long transactionID) {inflightPuts.addEvent(transactionID, e.toLong());}public void addEvent(Long transactionID, Long pointer) {// event放到inflightEventsinflightEvents.put(transactionID, pointer);inflightFileIDs.put(transactionID,FlumeEventPointer.fromLong(pointer).getFileID());syncRequired = true;}
InflightPuts 是实际正在传输中的事件集合,Flume 使用 InflightPuts 来跟踪正在传输的事件,只有事务提交了,InflightPuts 里才会清空这次事务的所有pointer数据。InflightPuts 和Flume的checkpoint机制密切相关。
先来看下Flume如何将event持久化到磁盘,将写到"log-"前缀的文件中。log.put(transactionID, event):
// org.apache.flume.channel.file.Log.java
FlumeEventPointer put(long transactionID, Event event)throws IOException {Preconditions.checkState(open, "Log is closed");FlumeEvent flumeEvent = new FlumeEvent(event.getHeaders(), event.getBody());//封装Put操作,WAL日志会记录四种操作,分别是Put,Take,Commit和RollbackPut put = new Put(transactionID, WriteOrderOracle.next(), flumeEvent);ByteBuffer buffer = TransactionEventRecord.toByteBuffer(put);//选择数据目录的数据文件,比如log-1int logFileIndex = nextLogWriter(transactionID);long usableSpace = logFiles.get(logFileIndex).getUsableSpace();long requiredSpace = minimumRequiredSpace + buffer.limit();if (usableSpace <= requiredSpace) {throw new IOException("Usable space exhausted, only " + usableSpace +" bytes remaining, required " + requiredSpace + " bytes");}boolean error = true;try {try {// Put事件写入WAL日志文件,Event也就持久化到文件了// logFileIndex就是数据文件ID,比如log-1文件FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);error = false;return ptr;} catch (LogFileRetryableIOException e) {if (!open) {throw e;}roll(logFileIndex, buffer);FlumeEventPointer ptr = logFiles.get(logFileIndex).put(buffer);error = false;return ptr;}} finally {if (error && open) {roll(logFileIndex);}}}
再来看下commit,tx.commit():
// org.apache.flume.channel.BasicTransactionSemantics.javapublic void commit() {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"commit() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"commit() called when transaction is %s!", state);try {doCommit();} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new ChannelException(e.toString(), e);}// 修改transaction状态state = State.COMPLETED;}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected void doCommit() throws InterruptedException {int puts = putList.size();int takes = takeList.size();if (puts > 0) {Preconditions.checkState(takes == 0, "nonzero puts and takes "+ channelNameDescriptor);log.lockShared();try {// commit操作写入WAL日志log.commitPut(transactionID);channelCounter.addToEventPutSuccessCount(puts);synchronized (queue) {while (!putList.isEmpty()) {// 将putList的event pointer放到transaction的queue中// channel的容量就是channel transaction的queue的容量if (!queue.addTail(putList.removeFirst())) {StringBuilder msg = new StringBuilder();msg.append("Queue add failed, this shouldn't be able to ");msg.append("happen. A portion of the transaction has been ");msg.append("added to the queue but the remaining portion ");msg.append("cannot be added. Those messages will be consumed ");msg.append("despite this transaction failing. Please report.");msg.append(channelNameDescriptor);LOG.error(msg.toString());Preconditions.checkState(false, msg.toString());}}//清除事务ID//清空inflightPutsqueue.completeTransaction(transactionID);}} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();}} else if (takes > 0) {//省略代码//......}//清空putList.clear();takeList.clear();channelCounter.setChannelSize(queue.getSize());}
// org.apache.flume.channel.file.FlumeEventQueuesynchronized void completeTransaction(long transactionID) {//清空inflightPutsif (!inflightPuts.completeTransaction(transactionID)) {inflightTakes.completeTransaction(transactionID);}}
// org.apache.flume.channel.file.FlumeEventQueue.InflightEventWrapper// inflightPuts和inflightTakes都是InflightEventWrapper的实例对象public boolean completeTransaction(Long transactionID) {if (!inflightEvents.containsKey(transactionID)) {return false;}//清除inflightPuts的事务ID//清空inflightPuts的inflightEventsinflightEvents.removeAll(transactionID);inflightFileIDs.removeAll(transactionID);syncRequired = true;return true;}
内存队列putList中的event pointer放入到transaction的queue中,并清空了inflightPuts,说明当前put事务已经提交成功了。
四、Take()
Take事务发生在sink从channel拿取数据。
public Status process() throws EventDeliveryException {Status result = Status.READY;Channel channel = getChannel();Transaction transaction = channel.getTransaction();Event event = null;try {transaction.begin();//从channel拿取数据event = channel.take();......transaction.commit();} catch (Exception ex) {transaction.rollback();throw new EventDeliveryException("Failed to process transaction" , ex);} finally {transaction.close();}return result;}
// org.apache.flume.channel.BasicTransactionSemantics.javaprotected Event take() {Preconditions.checkState(Thread.currentThread().getId() == initialThreadId,"take() called from different thread than getTransaction()!");Preconditions.checkState(state.equals(State.OPEN),"take() called when transaction is %s!", state);try {return doTake();} catch (InterruptedException e) {Thread.currentThread().interrupt();return null;}}
// org.apache.flume.channel.file.FileChannel.FileBackedTransaction
protected Event doTake() throws InterruptedException {channelCounter.incrementEventTakeAttemptCount();if (takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for FileBackedTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count. "+ channelNameDescriptor);}log.lockShared();/** 1. Take an event which is in the queue.* 2. If getting that event does not throw NoopRecordException,* then return it.* 3. Else try to retrieve the next event from the queue* 4. Repeat 2 and 3 until queue is empty or an event is returned.*/try {while (true) {// 从FlumeEventQueue中取出event pointer// 并将event pointer放到inflightTakesFlumeEventPointer ptr = queue.removeHead(transactionID);if (ptr == null) {return null;} else {try {// first add to takeList so that if write to disk// fails rollback actually does it's work//首先将pointer放入takeList中Preconditions.checkState(takeList.offer(ptr),"takeList offer failed "+ channelNameDescriptor);// take操作写入WAL日志log.take(transactionID, ptr); // write take to disk//根据pointer从持久化文件中获取eventEvent event = log.get(ptr);return event;} catch (IOException e) {channelCounter.incrementEventTakeErrorCount();throw new ChannelException("Take failed due to IO error "+ channelNameDescriptor, e);} catch (NoopRecordException e) {LOG.warn("Corrupt record replaced by File Channel Integrity " +"tool found. Will retrieve next event", e);takeList.remove(ptr);} catch (CorruptEventException ex) {channelCounter.incrementEventTakeErrorCount();if (fsyncPerTransaction) {throw new ChannelException(ex);}LOG.warn("Corrupt record found. Event will be " +"skipped, and next event will be read.", ex);takeList.remove(ptr);}}}} finally {log.unlockShared();}}
看一下queue.removeHead(transactionID):
synchronized FlumeEventPointer removeHead(long transactionID) {if (backingStore.getSize() == 0) {return null;}long value = remove(0, transactionID);Preconditions.checkState(value != EMPTY, "Empty value "+ channelNameDescriptor);FlumeEventPointer ptr = FlumeEventPointer.fromLong(value);backingStore.decrementFileID(ptr.getFileID());return ptr;}
protected synchronized long remove(int index, long transactionID) {if (index < 0 || index > backingStore.getSize() - 1) {throw new IndexOutOfBoundsException("index = " + index+ ", queueSize " + backingStore.getSize() + " " + channelNameDescriptor);}copyCount++;long start = System.currentTimeMillis();long value = get(index);if (queueSet != null) {queueSet.remove(value);}//if txn id = 0, we are recovering from a crash.if (transactionID != 0) {//将queue的event pointer加入到inflightTakes中inflightTakes.addEvent(transactionID, value);}if (index > backingStore.getSize() / 2) {// Move tail part to leftfor (int i = index; i < backingStore.getSize() - 1; i++) {long rightValue = get(i + 1);set(i, rightValue);}set(backingStore.getSize() - 1, EMPTY);} else {// Move head part to rightfor (int i = index - 1; i >= 0; i--) {long leftValue = get(i);set(i + 1, leftValue);}set(0, EMPTY);backingStore.setHead(backingStore.getHead() + 1);if (backingStore.getHead() == backingStore.getCapacity()) {backingStore.setHead(0);}}backingStore.setSize(backingStore.getSize() - 1);copyTime += System.currentTimeMillis() - start;return value;}
拿到event后,再来看commit事务:
protected void doCommit() throws InterruptedException {int puts = putList.size();int takes = takeList.size();if (puts > 0) {//省略代码//......} else if (takes > 0) {log.lockShared();try {log.commitTake(transactionID);//清除事务ID//清空inflightTakesqueue.completeTransaction(transactionID);channelCounter.addToEventTakeSuccessCount(takes);} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();}queueRemaining.release(takes);}putList.clear();takeList.clear();channelCounter.setChannelSize(queue.getSize());}
// org.apache.flume.channel.file.FlumeEventQueuesynchronized void completeTransaction(long transactionID) {if (!inflightPuts.completeTransaction(transactionID)) {//清空inflightTakesinflightTakes.completeTransaction(transactionID);}}
如果take事务提交,则从transaction的queue中取出了event pointer,并清空inflightTakes和takeList。
transaction.rollback():
protected void doRollback() throws InterruptedException {int puts = putList.size();int takes = takeList.size();log.lockShared();try {if (takes > 0) {Preconditions.checkState(puts == 0, "nonzero puts and takes "+ channelNameDescriptor);synchronized (queue) {while (!takeList.isEmpty()) {// 把takeList中的数据放回到transaction的queue中Preconditions.checkState(queue.addHead(takeList.removeLast()),"Queue add failed, this shouldn't be able to happen "+ channelNameDescriptor);}}}putList.clear();takeList.clear();//清除事务ID//清空inflightTakesqueue.completeTransaction(transactionID);channelCounter.setChannelSize(queue.getSize());//rollback操作写入WAL日志log.rollback(transactionID);} catch (IOException e) {throw new ChannelException("Commit failed due to IO error "+ channelNameDescriptor, e);} finally {log.unlockShared();// since rollback is being called, puts will never make it on// to the queue and we need to be sure to release the resourcesqueueRemaining.release(puts);}}
take事务回滚时,takeList中的数据重新放回到transaction的queue中,并清空这次事务的inflightTakes