大数据平台(XSailboat)/微服务集群的日志采集设计与实践
我们的大数据平台由众多的前端web服务和中台服务构成(模块多,总数目前有16个左右)。为了方便查看这些服务的日志信息,我们实现了对这些服务的日志采集、集中存储和查看。
1. 背景
我们的大数据平台XSailboat由众多的前端web服务和中台服务构成(模块多,总数目前有16个左右)。为了方便查看这些服务的日志信息,我们实现了对这些服务的日志采集、集中存储和查看。
2. 旧的日志采集方案
LogBack提供了ch.qos.logback.classic.net.SyslogAppender,但是这个Appender在SysLog服务不可用时,会抛出异常,影响程序运行。笔者希望及时SysLog服务不可用时,只是这期间的日志不能正常采集到HBase中集中存储,而不应该干扰到应用的正常运行。所以我对它进行了修改。
类:com.cimstech.ms.common.log.SyslogAppender_TCP
package com.cimstech.ms.common.log;
import java.net.SocketException;
import java.net.UnknownHostException;
import com.cimstech.xfront.common.AppContext;
import ch.qos.logback.classic.net.SyslogAppender;
import ch.qos.logback.core.net.SyslogOutputStream;
public class SyslogAppender_TCP extends SyslogAppender
{
public SyslogAppender_TCP()
{
super() ;
setCharset(AppContext.sUTF8);
}
@Override
public SyslogOutputStream createOutputStream() throws SocketException, UnknownHostException
{
return new SyslogOutputStream_TCP(getSyslogHost(), getPort());
}
}
类:com.cimstech.ms.common.log.SyslogOutputStream_TCP
package com.cimstech.ms.common.log;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cimstech.xfront.common.CommonExecutor;
import com.cimstech.xfront.common.JCommon;
import com.cimstech.xfront.common.Wrapper;
import com.cimstech.xfront.common.excep.ExceptionAssist;
import com.cimstech.xfront.common.serial.StreamAssist;
import com.cimstech.xfront.common.time.XTime;
import com.cimstech.xfront.common.time.XTimeUnit;
import ch.qos.logback.core.net.SyslogOutputStream;
public class SyslogOutputStream_TCP extends SyslogOutputStream
{
static final int sBufPerfectSize = 102400;
static final int sSendBufSize = 1024000;
static final int sSendFaildLogItemSizeLimit = 2000 ;
private InetAddress address;
private Socket mSocket;
private ByteArrayOutputStream mBouts = new ByteArrayOutputStream();
private int port;
long mLastLogTime = 0 ;
final Logger mLogger = LoggerFactory.getLogger("SyslogAppender_TCP");
final AtomicLong mLastLogExcepTime = new AtomicLong();
/**
* 为true时,表示已经启动了线程,异步发送消息
*/
final AtomicBoolean mMonitored = new AtomicBoolean(false);
final Object mMutex = new Object() ;
final Queue<byte[]> mSendFaildDatas = new LinkedList<byte[]>() ;
public SyslogOutputStream_TCP(String aSyslogHost, int aPort) throws UnknownHostException, SocketException
{
super(aSyslogHost, aPort);
this.address = InetAddress.getByName(aSyslogHost);
this.port = aPort;
}
public void write(byte[] byteArray, int offset, int len) throws IOException
{
synchronized (mBouts)
{
//把里面的\n 替换成 \r
final byte nc = (byte)'\n' ;
final byte rc = (byte)'\r' ;
final byte vc = (byte)0x0b ;
final byte sc = (byte)' ' ;
int i = offset + len-1 ;
while(i-->offset)
{
if(byteArray[i] == nc)
byteArray[i] = vc ;
else if(byteArray[i] == rc)
byteArray[i] = sc ;
}
mBouts.write(byteArray, offset, len);
}
}
protected void startMonitorSyslogServer()
{
if(mMonitored.compareAndSet(false, true))
{
Wrapper<ScheduledFuture<?>> futureWrapper = new Wrapper<>() ;
futureWrapper.set(CommonExecutor.scheduleWithFixedDelay("定时检查syslog服务可用性", ()->{
try
{
while(!mSendFaildDatas.isEmpty())
{
byte[] data = mSendFaildDatas.peek() ;
sendData(data) ;
mSendFaildDatas.poll() ;
}
mMonitored.set(false) ;
//等待一下,这样可以不使用同步机制
JCommon.sleep(500);
while(!mSendFaildDatas.isEmpty())
{
byte[] data = mSendFaildDatas.peek() ;
sendData(data) ;
mSendFaildDatas.poll() ;
}
mLogger.info("取消了检查syslog服务可用性!") ;
futureWrapper.get().cancel(true) ;
}
catch(IOException e)
{
mLogger.error(ExceptionAssist.getClearMessage(getClass(), e));
}
}, 10 , 10 , TimeUnit.SECONDS)) ;
}
}
protected void sendData(byte[] aData) throws IOException
{
connect();
OutputStream outs = mSocket.getOutputStream();
synchronized (outs)
{
for (int i = 0, len = 0; i < aData.length; i += len)
{
len = Math.min(aData.length - i, sSendBufSize);
outs.write(aData, i, len);
outs.flush();
}
}
}
void resetBufferOutputStream()
{
// clean up for next round
if (mBouts.size() > sBufPerfectSize)
{
mBouts = new ByteArrayOutputStream();
}
else
{
mBouts.reset();
}
}
@Override
public synchronized void flush() throws IOException
{
if(mMonitored.get())
{
flushToFialdSendDatas();
}
else
{
try
{
sendData(mBouts.toByteArray());
resetBufferOutputStream();
}
catch (IOException e)
{
StreamAssist.close(mSocket);
mSocket = null ;
flushToFialdSendDatas();
long lastTime = mLastLogExcepTime.get();
if (XTime.pass(mLastLogExcepTime.get(), 10, XTimeUnit.MINUTE)
&& mLastLogExcepTime.compareAndSet(lastTime, System.currentTimeMillis()))
{
mLogger.error(ExceptionAssist.getClearMessage(getClass(), e));
}
//启动定时检测syslog服务是否已经可用的线程,方法内部会自动监测是否已经启动,如果已经启动,就不会启动其它线程
startMonitorSyslogServer();
return;
}
}
}
void connect() throws IOException
{
if(mSocket != null && mSocket.isConnected())
return ;
StreamAssist.close(mSocket) ;
mSocket = new Socket() ;
mSocket.setSendBufferSize(sSendBufSize);
mSocket.setSoTimeout(500);
mSocket.connect(new InetSocketAddress(address, port), 500);
}
void flushToFialdSendDatas()
{
mSendFaildDatas.offer(mBouts.toByteArray()) ;
resetBufferOutputStream();
if(mSendFaildDatas.size()>sSendFaildLogItemSizeLimit)
{
if(XTime.pass(mLastLogTime , 60_000))
mLogger.info("缓存日志数量已经达到{}的限制,将移除一条旧日志!" , sSendFaildLogItemSizeLimit);
mSendFaildDatas.poll() ;
}
}
public synchronized void close()
{
super.close();
address = null;
StreamAssist.close(mSocket) ;
mSocket = null ;
}
@Override
public void write(int b) throws IOException
{
synchronized (mBouts)
{
mBouts.write(b);
}
}
}
用flume运行起来的SysLog服务是单实例的,它的可靠性是不如类似Kafka这种集群设施的(Kafka已经是 大数据平台XSailboat 的基础实施之一)。另外一点,因为笔者的大数据平台具有强大的实时计算开发能力,如果数据能先写入到Kafka,那么就可以更容易地使用实时计算模块(将支持上Flink CEP)对日志个性化分析。出于此原因,笔者决定改造大数据平台的日志采集实现方案,采用下面的新方案。
3. 新的日志采集方案
这里使用的KafkaAppender来自:
<dependency>
<groupId>com.github.danielwegener</groupId>
<artifactId>logback-kafka-appender</artifactId>
<version>0.2.0-RC2</version>
</dependency>
4. flume配置
我们的flume配置都是存储在Zookeeper中的,使用平台运维中心的“ZK配置中心”功能进行编辑查看。
更多推荐
所有评论(0)