1. 背景

我们的大数据平台XSailboat由众多的前端web服务和中台服务构成(模块多,总数目前有16个左右)。为了方便查看这些服务的日志信息,我们实现了对这些服务的日志采集、集中存储和查看。
应用日志查看

2. 旧的日志采集方案

日志采集方案(旧)
LogBack提供了ch.qos.logback.classic.net.SyslogAppender,但是这个Appender在SysLog服务不可用时,会抛出异常,影响程序运行。笔者希望及时SysLog服务不可用时,只是这期间的日志不能正常采集到HBase中集中存储,而不应该干扰到应用的正常运行。所以我对它进行了修改。
类:com.cimstech.ms.common.log.SyslogAppender_TCP

package com.cimstech.ms.common.log;

import java.net.SocketException;
import java.net.UnknownHostException;

import com.cimstech.xfront.common.AppContext;

import ch.qos.logback.classic.net.SyslogAppender;
import ch.qos.logback.core.net.SyslogOutputStream;

public class SyslogAppender_TCP extends SyslogAppender
{
	public SyslogAppender_TCP()
	{
		super() ;
		setCharset(AppContext.sUTF8);
	}
	
	@Override
	public SyslogOutputStream createOutputStream() throws SocketException, UnknownHostException
	{
		return new SyslogOutputStream_TCP(getSyslogHost(), getPort());
	}
}

类:com.cimstech.ms.common.log.SyslogOutputStream_TCP

package com.cimstech.ms.common.log;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cimstech.xfront.common.CommonExecutor;
import com.cimstech.xfront.common.JCommon;
import com.cimstech.xfront.common.Wrapper;
import com.cimstech.xfront.common.excep.ExceptionAssist;
import com.cimstech.xfront.common.serial.StreamAssist;
import com.cimstech.xfront.common.time.XTime;
import com.cimstech.xfront.common.time.XTimeUnit;

import ch.qos.logback.core.net.SyslogOutputStream;

public class SyslogOutputStream_TCP extends SyslogOutputStream
{
	static final int sBufPerfectSize = 102400;
	static final int sSendBufSize = 1024000;
	static final int sSendFaildLogItemSizeLimit = 2000 ;
	private InetAddress address;
	private Socket mSocket;
	private ByteArrayOutputStream mBouts = new ByteArrayOutputStream();
	private int port;
	
	long mLastLogTime = 0 ;

	final Logger mLogger = LoggerFactory.getLogger("SyslogAppender_TCP");
	final AtomicLong mLastLogExcepTime = new AtomicLong();
	/**
	 * 为true时,表示已经启动了线程,异步发送消息
	 */
	final AtomicBoolean mMonitored = new AtomicBoolean(false);
	final Object mMutex = new Object() ;
	final Queue<byte[]> mSendFaildDatas = new LinkedList<byte[]>() ;

	public SyslogOutputStream_TCP(String aSyslogHost, int aPort) throws UnknownHostException, SocketException
	{
		super(aSyslogHost, aPort);
		this.address = InetAddress.getByName(aSyslogHost);
		this.port = aPort;
	}

	public void write(byte[] byteArray, int offset, int len) throws IOException
	{
		synchronized (mBouts)
		{
			//把里面的\n 替换成 \r
			final byte nc = (byte)'\n' ;
			final byte rc = (byte)'\r' ;
			final byte vc = (byte)0x0b ;
			final byte sc = (byte)' ' ;
			int i = offset + len-1 ;
			while(i-->offset)
			{
				if(byteArray[i] == nc)
					byteArray[i] = vc ;
				else if(byteArray[i] == rc)
					byteArray[i] = sc ;
			}
			mBouts.write(byteArray, offset, len);
		}
	}

	protected void startMonitorSyslogServer()
	{
		if(mMonitored.compareAndSet(false, true))
		{
			Wrapper<ScheduledFuture<?>> futureWrapper = new Wrapper<>() ;
			futureWrapper.set(CommonExecutor.scheduleWithFixedDelay("定时检查syslog服务可用性", ()->{
				try
				{
					while(!mSendFaildDatas.isEmpty())
					{
						byte[] data = mSendFaildDatas.peek() ;
						sendData(data) ;
						mSendFaildDatas.poll() ;
					}
					mMonitored.set(false) ;
					//等待一下,这样可以不使用同步机制
					JCommon.sleep(500);
					while(!mSendFaildDatas.isEmpty())
					{
						byte[] data = mSendFaildDatas.peek() ;
						sendData(data) ;
						mSendFaildDatas.poll() ;
					}
					mLogger.info("取消了检查syslog服务可用性!") ;
					futureWrapper.get().cancel(true) ;
				}
				catch(IOException e)
				{
					mLogger.error(ExceptionAssist.getClearMessage(getClass(), e));
				}
			}, 10 , 10 , TimeUnit.SECONDS)) ;
		}
	}
	

	protected void sendData(byte[] aData) throws IOException
	{
		connect();
		OutputStream outs = mSocket.getOutputStream();
		synchronized (outs)
		{
			
			for (int i = 0, len = 0; i < aData.length; i += len)
			{
				len = Math.min(aData.length - i, sSendBufSize);
				outs.write(aData, i, len);
				outs.flush();
			}
		}
	}
	
	void resetBufferOutputStream()
	{
		// clean up for next round
		if (mBouts.size() > sBufPerfectSize)
		{
			mBouts = new ByteArrayOutputStream();
		}
		else
		{
			mBouts.reset();
		}
	}

	@Override
	public synchronized void flush() throws IOException
	{
		if(mMonitored.get())
		{
			flushToFialdSendDatas();
		}
		else
		{
			try
			{
				sendData(mBouts.toByteArray());
				resetBufferOutputStream();
			}
			catch (IOException e)
			{
				StreamAssist.close(mSocket);
				mSocket = null ;
				flushToFialdSendDatas();
				long lastTime = mLastLogExcepTime.get();
				if (XTime.pass(mLastLogExcepTime.get(), 10, XTimeUnit.MINUTE)
						&& mLastLogExcepTime.compareAndSet(lastTime, System.currentTimeMillis()))
				{
					mLogger.error(ExceptionAssist.getClearMessage(getClass(), e));
				}
				//启动定时检测syslog服务是否已经可用的线程,方法内部会自动监测是否已经启动,如果已经启动,就不会启动其它线程
				startMonitorSyslogServer();
				return;
			}
		}
	}
	
	void connect() throws IOException
	{
		if(mSocket != null && mSocket.isConnected())
			return ;
		StreamAssist.close(mSocket) ;
		mSocket = new Socket() ;
		mSocket.setSendBufferSize(sSendBufSize);
		mSocket.setSoTimeout(500);
		mSocket.connect(new InetSocketAddress(address, port), 500);
	}
	
	void flushToFialdSendDatas()
	{
		mSendFaildDatas.offer(mBouts.toByteArray()) ;
		resetBufferOutputStream();
		if(mSendFaildDatas.size()>sSendFaildLogItemSizeLimit)
		{
			if(XTime.pass(mLastLogTime , 60_000))
				mLogger.info("缓存日志数量已经达到{}的限制,将移除一条旧日志!" , sSendFaildLogItemSizeLimit);
			mSendFaildDatas.poll() ;
		}
	}

	public synchronized void close()
	{
		super.close();
		address = null;
		StreamAssist.close(mSocket) ;
		mSocket = null ;
	}

	@Override
	public void write(int b) throws IOException
	{
		synchronized (mBouts)
		{
			mBouts.write(b);
		}
		
	}
}

用flume运行起来的SysLog服务是单实例的,它的可靠性是不如类似Kafka这种集群设施的(Kafka已经是 大数据平台XSailboat 的基础实施之一)。另外一点,因为笔者的大数据平台具有强大的实时计算开发能力,如果数据能先写入到Kafka,那么就可以更容易地使用实时计算模块(将支持上Flink CEP)对日志个性化分析。出于此原因,笔者决定改造大数据平台的日志采集实现方案,采用下面的新方案。

3. 新的日志采集方案

日志采集方案(新)
这里使用的KafkaAppender来自:

<dependency>
	<groupId>com.github.danielwegener</groupId>
	<artifactId>logback-kafka-appender</artifactId>
	<version>0.2.0-RC2</version>
</dependency>

4. flume配置

我们的flume配置都是存储在Zookeeper中的,使用平台运维中心的“ZK配置中心”功能进行编辑查看。
ZK配置中心,编辑flume配置

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐