Commit 9b3ddcc4 by ethan

CAT_1.4.0 原始版本

0 parents
Showing 1000 changed files with 5036 additions and 0 deletions

Too many changes to show.

To preserve performance only 1000 of 1000+ files are displayed.

language: java
install:
mvn install -Dmaven.test.skip -B -fae
script:
mvn test -B -fae
This diff is collapsed. Click to expand it.
Dianping CAT
Copyright 2013, DianPing Inc.
Third party libraries
---------------------
This product includes software developed by The Apache Software
Foundation (http://www.apache.org/),including, but not limit to:
- Apache Hadoop
- Apache Commons
- Apache Maven
- Apache log4j
This product includes the several frameworks developped by
unidal.org (https://github.com/unidal/),including:
- maven-plugins (https://github.com/unidal/maven-plugins)
- frameworks (https://github.com/unidal/frameworks)
Copyright 2011-2013 unidal.org
This product includes the Jetty HTTP server
(http://jetty.codehaus.org/jetty/).
Copyright 1995-2006 Mort Bay Consulting Pty Ltd
Netty
(https://netty.io/)
Copyright (C) 2011 The Netty Project
MySQL Connector/J
(http://www.mysql.com)
distributed under GPL v2 license with MySQL FOSS exception (LICENSE-MYSQL)
Copyright (c) 2000, 2011, Oracle and/or its affiliates
Gson
(http://code.google.com/p/google-gson)
distributed under the Apache Software License, Version 2.0.
Copyright (c) 2010 Google Inc.
javaparser
(https://code.google.com/p/javaparser/)
distributed under GNU Lesser GPL
freemarker
(http://freemarker.org/)
distributed under BSD-style license
cobar
(http://code.alibabatech.com/wiki/display/cobar/Home)
Copyright (c) 2010 Alibaba Inc.
JUnit
(http://www.junit.org/)
distributed under the Common Public License v1.0
CAT
[![Build Status](https://travis-ci.org/dianping/cat.png?branch=master)](https://travis-ci.org/dianping/cat)
[![GitHub stars](https://img.shields.io/github/stars/dianping/cat.svg?style=social&label=Star&)](https://github.com/dianping/cat/stargazers)
[![GitHub forks](https://img.shields.io/github/forks/dianping/cat.svg?style=social&label=Fork&)](https://github.com/dianping/cat/fork)
===
[![Join the chat at https://gitter.im/dianping/cat](https://badges.gitter.im/dianping/cat.svg)](https://gitter.im/dianping/cat?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
##### CAT基于Java开发的实时应用监控平台,包括实时应用监控,业务监控。[2013-01-06]
##### CAT支持的监控消息类型包括:
+ **Transaction** 适合记录跨越系统边界的程序访问行为,比如远程调用,数据库调用,也适合执行时间较长的业务逻辑监控,Transaction用来记录一段代码的执行时间和次数。
+ **Event** 用来记录一件事发生的次数,比如记录系统异常,它和transaction相比缺少了时间的统计,开销比transaction要小。
+ **Heartbeat** 表示程序内定期产生的统计信息, 如CPU%, MEM%, 连接池状态, 系统负载等。
+ **Metric** 用于记录业务指标、指标可能包含对一个指标记录次数、记录平均值、记录总和,业务指标最低统计粒度为1分钟。
+ **Trace** 用于记录基本的trace信息,类似于log4j的info信息,这些信息仅用于查看一些相关信息
消息树
===
CAT监控系统将每次URL、Service的请求内部执行情况都封装为一个完整的消息树、消息树可能包括Transaction、Event、Heartbeat、Metric和Trace信息。
完整的消息树
---------------------
![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logviewAll01.png)
可视化消息树
---------------------
![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logviewAll02.png)
分布式消息树【一台机器调用另外一台机器】
---------------------
![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logviewAll03.png)
Requirements
---------------------
* Linux 2.6以及之上(2.6内核才可以支持epoll),Mac以及Windows环境可以作为开发环境
* Java 6,7,8
* Maven 3.2.3+
* MySQL 5.6
我司的环境配置如下
```
Distributor ID: CentOS
Description: CentOS release 6.5 (Final)
Release: 6.5
Codename: Final
Server version: Apache Tomcat/8.0.30
Server built: Dec 1 2015 22:30:46 UTC
Server number: 8.0.30.0
OS Name: Linux
OS Version: 2.6.32-431.el6.x86_64
Architecture: amd64
JVM Version: 1.8.0_111-b14
JVM Vendor: Oracle Corporation
Maven 3.3.3
Mysql 5.6
```
Quick Started
---------------------
##### 1、在CAT目录下,用maven构建项目
mvn clean install -DskipTests
如果下载有问题,可以尝试翻墙后下载,可以 git clone git@github.com:dianping/cat.git mvn-repo 下载到本地,这个分支是cat编译需要的依赖的一些jar ,将这些jar放入本地的maven仓库文件夹中。
##### 2、配置CAT的环境
mvn cat:install
Note:
* Linux\Mac 需要对/data/appdatas/cat和/data/applogs/cat有读写权限
* Windows 则是对系统运行盘下的/data/appdatas/cat和/data/applogs/cat有读写权限,如果cat服务运行在e盘的tomcat中,则需要对e:/data/appdatas/cat和e:/data/applogs/cat有读写权限
*
此步骤是配置一些cat启动需要的基本数据库配置
##### 3、(Optional)如果安装了hadoop集群,需到/data/appdatas/cat/server.xml中配置对应hadoop信息。将localmode设置为false,默认情况下,CAT在开发模式(localmode=true)下工作。
##### 4、启动的cat单机版本基本步骤
* 检查下/data/appdatas/cat/ 下面需要的几个配置文件,配置文件在源码script 。
* 在cat目录下执行 mvn install -DskipTests 。
* cat-home打包出来的war包,重新命名为cat.war, 并放入tomcat的webapps 。
* 启动tomcat
* 访问 http://localhost:8080/cat/r
* 具体详细的还可以参考 http://unidal.org/cat/r/home?op=view&docName=deploy
##### 5、遇到jar不能下载的情况
* cat jar在cat的mvn-repo分支下,可以download到本地,在copy至本地的仓库目录
* git clone https://github.com/dianping/cat.git
* cd cat
* git checkout mvn-repo
* cp -R * ~/.m2/repository
##### 6、导入eclipse发现找不到类
* 请先执行mvn eclipse:eclipse 会自动生成相关的类文件
* 作为普通项目导入eclipse,不要用作为maven项目导入eclipse
##### 7、可以参考script目录下详细资料
Copyright and license
---------------------
Copyright 2013 DianPing, Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this work except in compliance with the License. You may obtain a copy of the License in the LICENSE file, or at:
<http://www.apache.org/licenses/LICENSE-2.0>
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
### 支持的话可以扫一扫,支持CAT官网 http://unidal.org/ 的建设。
<img src="https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/img/weixin.jpg" width="50%"/>
CAT接入公司
===
![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/dianping.png)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/ctrip.png)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/lufax.png)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/ly.png)
![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/liepin.png)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/qipeipu.jpg)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/shangping.jpg)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/zhenlv.png)![Alt text](https://raw.github.com/dianping/cat/master/cat-home/src/main/webapp/images/logo/oppo.png)
更多接入公司,欢迎在<https://github.com/dianping/cat/issues/753>登记
No preview for this file type
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<parent>
<groupId>com.dianping.cat</groupId>
<artifactId>parent</artifactId>
<version>1.4.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cat-client</artifactId>
<name>cat-client</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.unidal.framework</groupId>
<artifactId>foundation-service</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<optional>true</optional>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.unidal.framework</groupId>
<artifactId>test-framework</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.unidal.maven.plugins</groupId>
<artifactId>codegen-maven-plugin</artifactId>
<executions>
<execution>
<id>generate data model</id>
<phase>generate-sources</phase>
<goals>
<goal>dal-model</goal>
</goals>
<configuration>
<manifest>
${basedir}/src/main/resources/META-INF/dal/model/client-manifest.xml,
${basedir}/src/main/resources/META-INF/dal/model/status-manifest.xml,</manifest>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.unidal.maven.plugins</groupId>
<artifactId>plexus-maven-plugin</artifactId>
<executions>
<execution>
<id>generate plexus component descriptor</id>
<phase>process-classes</phase>
<goals>
<goal>plexus</goal>
</goals>
<configuration>
<className>com.dianping.cat.build.ComponentsConfigurator</className>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
package com.dianping.cat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.LockSupport;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.AbstractThreadListener;
import org.unidal.initialization.AbstractModule;
import org.unidal.initialization.DefaultModuleContext;
import org.unidal.initialization.Module;
import org.unidal.initialization.ModuleContext;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.internal.MilliSecondTimer;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.status.StatusUpdateTask;
public class CatClientModule extends AbstractModule {
public static final String ID = "cat-client";
@Override
protected void execute(final ModuleContext ctx) throws Exception {
ctx.info("Current working directory is " + System.getProperty("user.dir"));
// initialize milli-second resolution level timer
MilliSecondTimer.initialize();
// tracking thread start/stop
Threads.addListener(new CatThreadListener(ctx));
// warm up Cat
Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());
// bring up TransportManager
ctx.lookup(TransportManager.class);
ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms
// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
// Threads.forGroup("cat").start(mmapReaderTask);
}
}
@Override
public Module[] getDependencies(ModuleContext ctx) {
return null; // no dependencies
}
public static final class CatThreadListener extends AbstractThreadListener {
private final ModuleContext m_ctx;
private CatThreadListener(ModuleContext ctx) {
m_ctx = ctx;
}
@Override
public void onThreadGroupCreated(ThreadGroup group, String name) {
m_ctx.info(String.format("Thread group(%s) created.", name));
}
@Override
public void onThreadPoolCreated(ExecutorService pool, String name) {
m_ctx.info(String.format("Thread pool(%s) created.", name));
}
@Override
public void onThreadStarting(Thread thread, String name) {
m_ctx.info(String.format("Starting thread(%s) ...", name));
}
@Override
public void onThreadStopping(Thread thread, String name) {
m_ctx.info(String.format("Stopping thread(%s).", name));
}
@Override
public boolean onUncaughtException(Thread thread, Throwable e) {
m_ctx.error(String.format("Uncaught exception thrown out of thread(%s)", thread.getName()), e);
return true;
}
}
}
package com.dianping.cat;
public class CatConstants {
/**
* Cat Json length
*/
public static final int MAX_LENGTH = 1000;
public static final int MAX_ITEM_LENGTH = 50;
/**
* Cat instrument attribute names
*/
public static final String CAT_STATE = "cat-state";
public static final String CAT_PAGE_URI = "cat-page-uri";
public static final String CAT_PAGE_TYPE = "cat-page-type";
/**
* Pigeon Transation Type
*/
public static final String TYPE_CALL = "Call";
public static final String TYPE_RESULT = "Result";
public static final String TYPE_TimeOut = "PigeonTimeOut";
public static final String TYPE_SERVICE = "Service";
public static final String TYPE_REMOTE_CALL = "RemoteCall";
public static final String TYPE_REQUEST = "Request";
public static final String TYPE_RESPONSE = "Respone";
/**
* Pigeon Event Type, it is used to record the param
*/
public static final String TYPE_PIGEON_REQUEST = "PigeonRequest";
public static final String TYPE_PIGEON_RESPONSE = "PigeonRespone";
/**
* Pigeon Event name
*/
public static final String NAME_REQUEST = "PigeonRequest";
public static final String NAME_RESPONSE = "PigeonRespone";
public static final String NAME_TIME_OUT = "ClientTimeOut";
/**
* Pigeon Context Info
*/
public static final String PIGEON_ROOT_MESSAGE_ID = "RootMessageId";
public static final String PIGEON_CURRENT_MESSAGE_ID = "CurrentMessageId";
public static final String PIGEON_SERVER_MESSAGE_ID = "ServerMessageId";
public static final String PIGEON_RESPONSE_MESSAGE_ID = "ResponseMessageId";
public static final String TYPE_SQL = "SQL";
public static final String TYPE_SQL_PARAM = "SQL.PARAM";
public static final String TYPE_URL = "URL";
public static final String TYPE_URL_FORWARD = "URL.Forward";
public static final String TYPE_ACTION = "Action";
public static final String TYPE_METRIC = "MetricType";
public static final String TYPE_TRACE = "TraceMode";
public static final int ERROR_COUNT = 100;
public static final int SUCCESS_COUNT = 1000;
}
\ No newline at end of file \ No newline at end of file
package com.dianping.cat.build;
import java.util.ArrayList;
import java.util.List;
import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.codec.BufferWriter;
import com.dianping.cat.message.spi.codec.EscapingBufferWriter;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
class CodecComponentConfigurator extends AbstractResourceConfigurator {
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(BufferWriter.class, EscapingBufferWriter.ID, EscapingBufferWriter.class));
all.add(C(MessageCodec.class, PlainTextMessageCodec.ID, PlainTextMessageCodec.class) //
.req(BufferWriter.class, EscapingBufferWriter.ID));
return all;
}
}
package com.dianping.cat.build;
import java.util.ArrayList;
import java.util.List;
import org.unidal.initialization.Module;
import org.unidal.lookup.configuration.AbstractResourceConfigurator;
import org.unidal.lookup.configuration.Component;
import com.dianping.cat.CatClientModule;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.configuration.DefaultClientConfigManager;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultMessageProducer;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.DefaultTransportManager;
import com.dianping.cat.message.io.TcpSocketSender;
import com.dianping.cat.message.io.TransportManager;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageStatistics;
import com.dianping.cat.status.StatusUpdateTask;
public class ComponentsConfigurator extends AbstractResourceConfigurator {
public static void main(String[] args) {
generatePlexusComponentsXmlFile(new ComponentsConfigurator());
}
@Override
public List<Component> defineComponents() {
List<Component> all = new ArrayList<Component>();
all.add(C(ClientConfigManager.class, DefaultClientConfigManager.class));
all.add(C(MessageIdFactory.class));
all.add(C(MessageManager.class, DefaultMessageManager.class) //
.req(ClientConfigManager.class, TransportManager.class, MessageIdFactory.class));
all.add(C(MessageProducer.class, DefaultMessageProducer.class) //
.req(MessageManager.class, MessageIdFactory.class));
all.add(C(TcpSocketSender.class) //
.req(ClientConfigManager.class, MessageIdFactory.class) //
.req(MessageStatistics.class, "default", "m_statistics") //
.req(MessageCodec.class, PlainTextMessageCodec.ID, "m_codec"));
all.add(C(TransportManager.class, DefaultTransportManager.class) //
.req(ClientConfigManager.class, TcpSocketSender.class));
all.add(C(MessageStatistics.class, DefaultMessageStatistics.class));
all.add(C(StatusUpdateTask.class) //
.req(MessageStatistics.class, ClientConfigManager.class));
all.add(C(Module.class, CatClientModule.ID, CatClientModule.class));
all.addAll(new CodecComponentConfigurator().defineComponents());
return all;
}
}
package com.dianping.cat.configuration;
import java.io.File;
import java.util.List;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.configuration.client.entity.Server;
public interface ClientConfigManager {
public Domain getDomain();
public int getMaxMessageLength();
public String getServerConfigUrl();
public List<Server> getServers();
public int getTaggedTransactionCacheSize();
public void initialize(File configFile) throws Exception;
public boolean isCatEnabled();
public boolean isDumpLocked();
}
\ No newline at end of file \ No newline at end of file
package com.dianping.cat.configuration;
import java.util.Stack;
import com.dianping.cat.configuration.client.entity.ClientConfig;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.configuration.client.entity.Property;
import com.dianping.cat.configuration.client.transform.DefaultMerger;
public class ClientConfigMerger extends DefaultMerger {
public ClientConfigMerger(ClientConfig config) {
super(config);
}
@Override
protected void mergeDomain(Domain old, Domain domain) {
if (domain.getIp() != null) {
old.setIp(domain.getIp());
}
if (domain.getEnabled() != null) {
old.setEnabled(domain.getEnabled());
}
if (domain.getMaxMessageSize() > 0) {
old.setMaxMessageSize(domain.getMaxMessageSize());
}
}
@Override
protected void visitConfigChildren(ClientConfig to, ClientConfig from) {
if (to != null) {
Stack<Object> objs = getObjects();
// if servers is configured, then override it instead of merge
if (!from.getServers().isEmpty()) {
to.getServers().clear();
to.getServers().addAll(from.getServers());
}
// only configured domain in client configure will be merged
for (Domain source : from.getDomains().values()) {
Domain target = to.findDomain(source.getId());
if (target == null) {
target = new Domain(source.getId());
to.addDomain(target);
}
if (to.getDomains().containsKey(source.getId())) {
objs.push(target);
source.accept(this);
objs.pop();
}
}
for (Property source : from.getProperties().values()) {
Property target = to.findProperty(source.getName());
if (target == null) {
target = new Property(source.getName());
to.addProperty(target);
}
objs.push(target);
source.accept(this);
objs.pop();
}
}
}
}
package com.dianping.cat.configuration;
import java.text.MessageFormat;
import java.util.Date;
import com.dianping.cat.configuration.client.entity.ClientConfig;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.configuration.client.entity.Server;
import com.dianping.cat.configuration.client.transform.DefaultValidator;
public class ClientConfigValidator extends DefaultValidator {
private ClientConfig m_config;
private String getLocalAddress() {
return NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
}
private void log(String severity, String message) {
MessageFormat format = new MessageFormat("[{0,date,MM-dd HH:mm:ss.sss}] [{1}] [{2}] {3}");
System.out.println(format.format(new Object[] { new Date(), severity, "cat", message }));
}
@Override
public void visitConfig(ClientConfig config) {
config.setMode("client");
if (config.getServers().size() == 0) {
config.setEnabled(false);
log("WARN", "CAT client was disabled due to no CAT servers configured!");
} else if (!config.isEnabled()) {
log("WARN", "CAT client was globally disabled!");
}
m_config = config;
super.visitConfig(config);
if (m_config.isEnabled()) {
for (Domain domain : m_config.getDomains().values()) {
if (!domain.isEnabled()) {
m_config.setEnabled(false);
log("WARN", "CAT client was disabled in domain(" + domain.getId() + ") explicitly!");
}
break; // for first domain only
}
}
}
@Override
public void visitDomain(Domain domain) {
super.visitDomain(domain);
// set default values
if (domain.getEnabled() == null) {
domain.setEnabled(true);
}
if (domain.getIp() == null) {
domain.setIp(getLocalAddress());
}
}
@Override
public void visitServer(Server server) {
super.visitServer(server);
// set default values
if (server.getPort() == null) {
server.setPort(2280);
}
if (server.getEnabled() == null) {
server.setEnabled(true);
}
}
}
package com.dianping.cat.configuration;
import java.io.File;
import java.io.InputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.helper.Files;
import com.dianping.cat.Cat;
import com.dianping.cat.configuration.client.entity.ClientConfig;
import com.dianping.cat.configuration.client.entity.Domain;
import com.dianping.cat.configuration.client.entity.Server;
import com.dianping.cat.configuration.client.transform.DefaultSaxParser;
public class DefaultClientConfigManager implements LogEnabled, ClientConfigManager, Initializable {
private static final String CAT_CLIENT_XML = "/META-INF/cat/client.xml";
private static final String PROPERTIES_CLIENT_XML = "/META-INF/app.properties";
private static final String XML = "/data/appdatas/cat/client.xml";
private Logger m_logger;
private ClientConfig m_config;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public Domain getDomain() {
Domain domain = null;
if (m_config != null) {
Map<String, Domain> domains = m_config.getDomains();
domain = domains.isEmpty() ? null : domains.values().iterator().next();
}
if (domain != null) {
return domain;
} else {
return new Domain("UNKNOWN").setEnabled(false);
}
}
@Override
public int getMaxMessageLength() {
if (m_config == null) {
return 5000;
} else {
return getDomain().getMaxMessageSize();
}
}
@Override
public String getServerConfigUrl() {
if (m_config == null) {
return null;
} else {
List<Server> servers = m_config.getServers();
for (Server server : servers) {
Integer httpPort = server.getHttpPort();
if (httpPort == null || httpPort == 0) {
httpPort = 8080;
}
return String.format("http://%s:%d/cat/s/router?domain=%s&ip=%s&op=json", server.getIp().trim(), httpPort,
getDomain().getId(), NetworkInterfaceManager.INSTANCE.getLocalHostAddress());
}
}
return null;
}
@Override
public List<Server> getServers() {
if (m_config == null) {
return Collections.emptyList();
} else {
return m_config.getServers();
}
}
@Override
public int getTaggedTransactionCacheSize() {
return 1024;
}
@Override
public boolean isCatEnabled() {
if (m_config == null) {
return false;
} else {
return m_config.isEnabled();
}
}
@Override
public boolean isDumpLocked() {
if (m_config == null) {
return false;
} else {
return m_config.isDumpLocked();
}
}
private ClientConfig loadConfigFromEnviroment() {
String appName = loadProjectName();
if (appName != null) {
ClientConfig config = new ClientConfig();
config.addDomain(new Domain(appName));
return config;
}
return null;
}
private ClientConfig loadConfigFromXml() {
InputStream in = null;
try {
in = Thread.currentThread().getContextClassLoader().getResourceAsStream(CAT_CLIENT_XML);
if (in == null) {
in = Cat.class.getResourceAsStream(CAT_CLIENT_XML);
}
if (in != null) {
String xml = Files.forIO().readFrom(in, "utf-8");
m_logger.info(String.format("Resource file(%s) found.", Cat.class.getResource(CAT_CLIENT_XML)));
return DefaultSaxParser.parse(xml);
}
return null;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
}
}
}
return null;
}
private String loadProjectName() {
String appName = null;
InputStream in = null;
try {
in = Thread.currentThread().getContextClassLoader().getResourceAsStream(PROPERTIES_CLIENT_XML);
if (in == null) {
in = Cat.class.getResourceAsStream(PROPERTIES_CLIENT_XML);
}
if (in != null) {
Properties prop = new Properties();
prop.load(in);
appName = prop.getProperty("app.name");
if (appName != null) {
m_logger.info(String.format("Find domain name %s from app.properties.", appName));
} else {
m_logger.info(String.format("Can't find app.name from app.properties."));
return null;
}
} else {
m_logger.info(String.format("Can't find app.properties in %s", PROPERTIES_CLIENT_XML));
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (in != null) {
try {
in.close();
} catch (Exception e) {
}
}
}
return appName;
}
@Override
public void initialize() throws InitializationException {
File configFile = new File(XML);
initialize(configFile);
}
@Override
public void initialize(File configFile) throws InitializationException {
try {
ClientConfig globalConfig = null;
ClientConfig clientConfig = null;
if (configFile != null) {
if (configFile.exists()) {
String xml = Files.forIO().readFrom(configFile.getCanonicalFile(), "utf-8");
globalConfig = DefaultSaxParser.parse(xml);
m_logger.info(String.format("Global config file(%s) found.", configFile));
} else {
m_logger.warn(String.format("Global config file(%s) not found, IGNORED.", configFile));
}
}
// load the client configure from Java class-path
clientConfig = loadConfigFromEnviroment();
if (clientConfig == null) {
clientConfig = loadConfigFromXml();
}
// merge the two configures together to make it effected
if (globalConfig != null && clientConfig != null) {
globalConfig.accept(new ClientConfigMerger(clientConfig));
}
if (clientConfig != null) {
clientConfig.accept(new ClientConfigValidator());
}
m_config = clientConfig;
} catch (Exception e) {
throw new InitializationException(e.getMessage(), e);
}
}
}
package com.dianping.cat.configuration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class KVConfig {
private Map<String, String> m_kvs = new HashMap<String, String>();
public Set<String> getKeys() {
return m_kvs.keySet();
}
public Map<String, String> getKvs() {
return m_kvs;
}
public String getValue(String key) {
return m_kvs.get(key);
}
public void setKvs(Map<String, String> kvs) {
m_kvs = kvs;
}
}
package com.dianping.cat.configuration;
import org.unidal.helper.Inets;
public enum NetworkInterfaceManager {
INSTANCE;
private NetworkInterfaceManager() {
}
public String getLocalHostAddress() {
return Inets.IP4.getLocalHostAddress();
}
public String getLocalHostName() {
return Inets.IP4.getLocalHostName();
}
}
package com.dianping.cat.log4j;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.apache.log4j.AppenderSkeleton;
import org.apache.log4j.Level;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.log4j.spi.ThrowableInformation;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Trace;
public class CatAppender extends AppenderSkeleton {
@Override
protected void append(LoggingEvent event) {
boolean isTraceMode = Cat.getManager().isTraceMode();
Level level = event.getLevel();
if (level.isGreaterOrEqual(Level.ERROR)) {
logError(event);
} else if (isTraceMode) {
logTrace(event);
}
}
private String buildExceptionStack(Throwable exception) {
if (exception != null) {
StringWriter writer = new StringWriter(2048);
exception.printStackTrace(new PrintWriter(writer));
return writer.toString();
} else {
return "";
}
}
@Override
public void close() {
}
private void logError(LoggingEvent event) {
ThrowableInformation info = event.getThrowableInformation();
if (info != null) {
Throwable exception = info.getThrowable();
Object message = event.getMessage();
if (message != null) {
Cat.logError(String.valueOf(message), exception);
} else {
Cat.logError(exception);
}
}
}
private void logTrace(LoggingEvent event) {
String type = "Log4j";
String name = event.getLevel().toString();
Object message = event.getMessage();
String data;
if (message instanceof Throwable) {
data = buildExceptionStack((Throwable) message);
} else {
data = event.getMessage().toString();
}
ThrowableInformation info = event.getThrowableInformation();
if (info != null) {
data = data + '\n' + buildExceptionStack(info.getThrowable());
}
Cat.logTrace(type, name, Trace.SUCCESS, data);
}
@Override
public boolean requiresLayout() {
return false;
}
}
package com.dianping.cat.message;
/**
* <p>
* <code>Event</code> is used to log anything interesting happens at a specific time. Such as an exception thrown, a
* review added by user, a new user registered, an user logged into the system etc.
* </p>
*
* <p>
* However, if it could be failure, or last for a long time, such as a remote API call, database call or search engine
* call etc. It should be logged as a <code>Transaction</code>
* </p>
*
* <p>
* All CAT message will be constructed as a message tree and send to back-end for further analysis, and for monitoring.
* Only <code>Transaction</code> can be a tree node, all other message will be the tree leaf. The transaction without
* other messages nested is an atomic transaction.
* </p>
*
* @author Frankie Wu
*/
public interface Event extends Message {
}
package com.dianping.cat.message;
public interface ForkedTransaction extends Transaction {
public void fork();
public String getForkedMessageId();
}
package com.dianping.cat.message;
/**
* <p>
* <code>Heartbeat</code> is used to log data that happens in a regular intervals, for example once per second, such as
* system load, CPU percentage, memory usage, thread pool statistics, cache hit/miss rate, service manifest etc., and
* even some configuration could be carried by <code>Heartbeat</code>. There could be some good use cases, for example
* health checker and load balancer, that make good use of it.
* </p>
*
* <p>
* <code>Heartbeat</code> should never be used per request since the request is not regular predictable, instead it
* could be logged in a daemon background thread, or something like a Timer.
* </p>
*
* <p>
* All CAT message will be constructed as a message tree and send to back-end for further analysis, and for monitoring.
* Only <code>Transaction</code> can be a tree node, all other message will be the tree leaf. The transaction without
* other messages nested is an atomic transaction.
* </p>
*
* @author Frankie Wu
*/
public interface Heartbeat extends Message {
}
package com.dianping.cat.message;
/**
* <p>
* Message represents data collected during application runtime. It will be sent to back-end system asynchronous for
* further processing.
* </p>
*
* <p>
* Super interface of <code>Event</code>, <code>Heartbeat</code> and <code>Transaction</code>.
* </p>
*
* @see Event, Heartbeat, Transaction
* @author Frankie Wu
*/
public interface Message {
public static final String SUCCESS = "0";
/**
* add one or multiple key-value pairs to the message.
*
* @param keyValuePairs
* key-value pairs like 'a=1&b=2&...'
*/
public void addData(String keyValuePairs);
/**
* add one key-value pair to the message.
*
* @param key
* @param value
*/
public void addData(String key, Object value);
/**
* Complete the message construction.
*/
public void complete();
/**
* @return key value pairs data
*/
public Object getData();
/**
* Message name.
*
* @return message name
*/
public String getName();
/**
* Get the message status.
*
* @return message status. "0" means success, otherwise error code.
*/
public String getStatus();
/**
* The time stamp the message was created.
*
* @return message creation time stamp in milliseconds
*/
public long getTimestamp();
/**
* Message type.
*
* <p>
* Typical message types are:
* <ul>
* <li>URL: maps to one method of an action</li>
* <li>Service: maps to one method of service call</li>
* <li>Search: maps to one method of search call</li>
* <li>SQL: maps to one SQL statement</li>
* <li>Cache: maps to one cache access</li>
* <li>Error: maps to java.lang.Throwable (java.lang.Exception and java.lang.Error)</li>
* </ul>
* </p>
*
* @return message type
*/
public String getType();
/**
* If the complete() method was called or not.
*
* @return true means the complete() method was called, false otherwise.
*/
public boolean isCompleted();
/**
* @return
*/
public boolean isSuccess();
/**
* Set the message status.
*
* @param status
* message status. "0" means success, otherwise error code.
*/
public void setStatus(String status);
/**
* Set the message status with exception class name.
*
* @param e
* exception.
*/
public void setStatus(Throwable e);
}
package com.dianping.cat.message;
/**
* <p>
* Message factory is used to create new transaction,event and/or heartbeat.
* </p>
*
* <p>
* Normally, application code logs message in following ways, for example:
* <ul>
* <li>Event
*
* <pre>
* public class MyClass {
* public static MessageFactory CAT = Cat.getFactory();
*
* public void bizMethod() {
* Event event = CAT.newEvent("Review", "New");
*
* event.addData("id", 12345);
* event.addData("user", "john");
* ...
* event.setStatus("0");
* event.complete();
* }
* ...
* }
* </pre>
*
* </li>
* <li>Heartbeat
*
* <pre>
* public class MyClass {
* public static MessageFactory CAT = Cat.getFactory();
*
* public void bizMethod() {
* Heartbeat event = CAT.newHeartbeat("System", "Status");
*
* event.addData("ip", "192.168.10.111");
* event.addData("host", "host-1");
* event.addData("load", "2.1");
* event.addData("cpu", "0.12,0.10");
* event.addData("memory.total", "2G");
* event.addData("memory.free", "456M");
* event.setStatus("0");
* event.complete();
* }
* ...
* }
* </pre>
*
* </li>
* <li>Transaction
*
* <pre>
* public class MyClass {
* public static MessageFactory CAT = Cat.getFactory();
*
* public void bizMethod() {
* Transaction t = CAT.newTransaction("URL", "MyPage");
*
* try {
* // do your business here
* t.addData("k1", "v1");
* t.addData("k2", "v2");
* t.addData("k3", "v3");
* Thread.sleep(30);
*
* t.setStatus("0");
* } catch (Exception e) {
* t.setStatus(e);
* } finally {
* t.complete();
* }
* }
* ...
* }
* </pre>
*
* </li>
* </ul>
*
* or logs event or heartbeat in one shot, for example:
* <ul>
* <li>Event
*
* <pre>
* public class MyClass {
* public static MessageFactory CAT = Cat.getFactory();
*
* public void bizMethod() {
* CAT.logEvent("Review", "New", "0", "id=12345&user=john");
* }
* ...
* }
* </pre>
*
* </li>
* <li>Heartbeat
*
* <pre>
* public class MyClass {
* public static MessageFactory CAT = Cat.getFactory();
*
* public void bizMethod() {
* CAT.logHeartbeat("System", "Status", "0", "ip=192.168.10.111&host=host-1&load=2.1&cpu=0.12,0.10&memory.total=2G&memory.free=456M");
* }
* ...
* }
* </pre>
*
* </li>
* </ul>
* </p>
*
* @author Frankie Wu
*/
public interface MessageProducer {
/**
* Create a new message id.
*
* @return new message id
*/
public String createMessageId();
/**
* Check if the CAT client is enabled for current domain.
*
* @return true if CAT client is enabled, false means CAT client is disabled.
*/
public boolean isEnabled();
/**
* Log an error.
*
* @param cause
* root cause exception
*/
public void logError(Throwable cause);
/**
* Log an error.
*
* @param cause
* root cause exception
*/
public void logError(String message, Throwable cause);
/**
* Log an event in one shot with SUCCESS status.
*
* @param type
* event type
* @param name
* event name
*/
public void logEvent(String type, String name);
/**
* Log an trace in one shot with SUCCESS status.
*
* @param type
* trace type
* @param name
* trace name
*/
public void logTrace(String type, String name);
/**
* Log an event in one shot.
*
* @param type
* event type
* @param name
* event name
* @param status
* "0" means success, otherwise means error code
* @param nameValuePairs
* name value pairs in the format of "a=1&b=2&..."
*/
public void logEvent(String type, String name, String status, String nameValuePairs);
/**
* Log an trace in one shot.
*
* @param type
* trace type
* @param name
* trace name
* @param status
* "0" means success, otherwise means error code
* @param nameValuePairs
* name value pairs in the format of "a=1&b=2&..."
*/
public void logTrace(String type, String name, String status, String nameValuePairs);
/**
* Log a heartbeat in one shot.
*
* @param type
* heartbeat type
* @param name
* heartbeat name
* @param status
* "0" means success, otherwise means error code
* @param nameValuePairs
* name value pairs in the format of "a=1&b=2&..."
*/
public void logHeartbeat(String type, String name, String status, String nameValuePairs);
/**
* Log a metric in one shot.
*
* @param name
* metric name
* @param status
* "0" means success, otherwise means error code
* @param nameValuePairs
* name value pairs in the format of "a=1&b=2&..."
*/
public void logMetric(String name, String status, String nameValuePairs);
/**
* Create a new event with given type and name.
*
* @param type
* event type
* @param name
* event name
*/
public Event newEvent(String type, String name);
/**
* Create a new trace with given type and name.
*
* @param type
* trace type
* @param name
* trace name
*/
public Trace newTrace(String type, String name);
/**
* Create a new heartbeat with given type and name.
*
* @param type
* heartbeat type
* @param name
* heartbeat name
*/
public Heartbeat newHeartbeat(String type, String name);
/**
* Create a new metric with given type and name.
*
* @param type
* metric type
* @param name
* metric name
*/
public Metric newMetric(String type, String name);
/**
* Create a new transaction with given type and name.
*
* @param type
* transaction type
* @param name
* transaction name
*/
public Transaction newTransaction(String type, String name);
/**
* Create a forked transaction for child thread.
*
* @param type
* transaction type
* @param name
* transaction name
* @return forked transaction
*/
public ForkedTransaction newForkedTransaction(String type, String name);
/**
* Create a tagged transaction for another process or thread.
*
* @param type
* transaction type
* @param name
* transaction name
* @param tag
* tag applied to the transaction
* @return tagged transaction
*/
public TaggedTransaction newTaggedTransaction(String type, String name, String tag);
}
package com.dianping.cat.message;
/**
* <p>
* <code>Metric</code> is used to log business data point happens at a specific time. Such as an exception thrown, a
* review added by user, a new user registered, an user logged into the system etc.
* </p>
*
* <p>
* However, if it could be failure, or last for a long time, such as a remote API call, database call or search engine
* call etc. It should be logged as a <code>Transaction</code>
* </p>
*
* <p>
* All CAT message will be constructed as a message tree and send to back-end for further analysis, and for monitoring.
* Only <code>Transaction</code> can be a tree node, all other message will be the tree leaf. The transaction without
* other messages nested is an atomic transaction.
* </p>
*
* @author Frankie Wu
*/
public interface Metric extends Message {
}
package com.dianping.cat.message;
public interface TaggedTransaction extends Transaction {
public void bind(String tag, String childMessageId, String title);
public String getParentMessageId();
public String getRootMessageId();
public String getTag();
public void start();
}
package com.dianping.cat.message;
/**
* <p>
* <code>Trace</code> is used to log anything for trace message info happens at a specific time. Such as an debug or info message.
* </p>
*
* <p>
* All CAT message will be constructed as a message tree and send to back-end for further analysis, and for monitoring.
* Only <code>Transaction</code> can be a tree node, all other message will be the tree leaf. The transaction without
* other messages nested is an atomic transaction.
* </p>
*
* @author Frankie Wu
*/
public interface Trace extends Message {
}
package com.dianping.cat.message;
import java.util.List;
/**
* <p>
* <code>Transaction</code> is any interesting unit of work that takes time to complete and may fail.
* </p>
*
* <p>
* Basically, all data access across the boundary needs to be logged as a <code>Transaction</code> since it may fail and
* time consuming. For example, URL request, disk IO, JDBC query, search query, HTTP request, 3rd party API call etc.
* </p>
*
* <p>
* Sometime if A needs call B which is owned by another team, although A and B are deployed together without any
* physical boundary. To make the ownership clear, there could be some <code>Transaction</code> logged when A calls B.
* </p>
*
* <p>
* Most of <code>Transaction</code> should be logged in the infrastructure level or framework level, which is
* transparent to the application.
* </p>
*
* <p>
* All CAT message will be constructed as a message tree and send to back-end for further analysis, and for monitoring.
* Only <code>Transaction</code> can be a tree node, all other message will be the tree leaf. The transaction without
* other messages nested is an atomic transaction.
* </p>
*
* @author Frankie Wu
*/
public interface Transaction extends Message {
/**
* Add one nested child message to current transaction.
*
* @param message
* to be added
*/
public Transaction addChild(Message message);
/**
* Get all children message within current transaction.
*
* <p>
* Typically, a <code>Transaction</code> can nest other <code>Transaction</code>s, <code>Event</code>s and
* <code>Heartbeat</code> s, while an <code>Event</code> or <code>Heartbeat</code> can't nest other messages.
* </p>
*
* @return all children messages, empty if there is no nested children.
*/
public List<Message> getChildren();
/**
* How long the transaction took from construction to complete. Time unit is microsecond.
*
* @return duration time in microsecond
*/
public long getDurationInMicros();
/**
* How long the transaction took from construction to complete. Time unit is millisecond.
*
* @return duration time in millisecond
*/
public long getDurationInMillis();
/**
* Has children or not. An atomic transaction does not have any children message.
*
* @return true if child exists, else false.
*/
public boolean hasChildren();
/**
* Check if the transaction is stand-alone or belongs to another one.
*
* @return true if it's an root transaction.
*/
public boolean isStandalone();
}
package com.dianping.cat.message.internal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.charset.Charset;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
public abstract class AbstractMessage implements Message {
private String m_type;
private String m_name;
private String m_status = "unset";
private long m_timestampInMillis;
private CharSequence m_data;
private boolean m_completed;
public AbstractMessage(String type, String name) {
m_type = String.valueOf(type);
m_name = String.valueOf(name);
m_timestampInMillis = MilliSecondTimer.currentTimeMillis();
}
@Override
public void addData(String keyValuePairs) {
if (m_data == null) {
m_data = keyValuePairs;
} else if (m_data instanceof StringBuilder) {
((StringBuilder) m_data).append('&').append(keyValuePairs);
} else {
StringBuilder sb = new StringBuilder(m_data.length() + keyValuePairs.length() + 16);
sb.append(m_data).append('&');
sb.append(keyValuePairs);
m_data = sb;
}
}
@Override
public void addData(String key, Object value) {
if (m_data instanceof StringBuilder) {
((StringBuilder) m_data).append('&').append(key).append('=').append(value);
} else {
String str = String.valueOf(value);
int old = m_data == null ? 0 : m_data.length();
StringBuilder sb = new StringBuilder(old + key.length() + str.length() + 16);
if (m_data != null) {
sb.append(m_data).append('&');
}
sb.append(key).append('=').append(str);
m_data = sb;
}
}
@Override
public CharSequence getData() {
if (m_data == null) {
return "";
} else {
return m_data;
}
}
@Override
public String getName() {
return m_name;
}
@Override
public String getStatus() {
return m_status;
}
@Override
public long getTimestamp() {
return m_timestampInMillis;
}
@Override
public String getType() {
return m_type;
}
@Override
public boolean isCompleted() {
return m_completed;
}
@Override
public boolean isSuccess() {
return Message.SUCCESS.equals(m_status);
}
public void setCompleted(boolean completed) {
m_completed = completed;
}
public void setName(String name) {
m_name = name;
}
@Override
public void setStatus(String status) {
m_status = status;
}
@Override
public void setStatus(Throwable e) {
m_status = e.getClass().getName();
}
public void setTimestamp(long timestamp) {
m_timestampInMillis = timestamp;
}
public void setType(String type) {
m_type = type;
}
@Override
public String toString() {
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
codec.encodeMessage(this, buf);
codec.reset();
return buf.toString(Charset.forName("utf-8"));
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultEvent extends AbstractMessage implements Event {
private MessageManager m_manager;
public DefaultEvent(String type, String name) {
super(type, name);
}
public DefaultEvent(String type, String name, MessageManager manager) {
super(type, name);
m_manager = manager;
}
@Override
public void complete() {
setCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.Cat;
import com.dianping.cat.message.ForkedTransaction;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultForkedTransaction extends DefaultTransaction implements ForkedTransaction {
private String m_rootMessageId;
private String m_parentMessageId;
private String m_forkedMessageId;
public DefaultForkedTransaction(String type, String name, MessageManager manager) {
super(type, name, manager);
setStandalone(false);
MessageTree tree = manager.getThreadLocalMessageTree();
if (tree != null) {
m_rootMessageId = tree.getRootMessageId();
m_parentMessageId = tree.getMessageId();
// Detach parent transaction and this forked transaction, by calling linkAsRunAway(), at this earliest moment,
// so that thread synchronization is not needed at all between them in the future.
m_forkedMessageId = Cat.createMessageId();
}
}
@Override
public void fork() {
MessageManager manager = getManager();
manager.setup();
manager.start(this, false);
MessageTree tree = manager.getThreadLocalMessageTree();
if (tree != null) {
// Override tree.messageId to be forkedMessageId of current forked transaction, which is created in the parent
// thread.
tree.setMessageId(m_forkedMessageId);
tree.setRootMessageId(m_rootMessageId == null ? m_parentMessageId : m_rootMessageId);
tree.setParentMessageId(m_parentMessageId);
}
}
@Override
public String getForkedMessageId() {
return m_forkedMessageId;
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultHeartbeat extends AbstractMessage implements Heartbeat {
private MessageManager m_manager;
public DefaultHeartbeat(String type, String name) {
super(type, name);
}
public DefaultHeartbeat(String type, String name, MessageManager manager) {
super(type, name);
m_manager = manager;
}
@Override
public void complete() {
setCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
}
package com.dianping.cat.message.internal;
import java.io.PrintWriter;
import java.io.StringWriter;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.ForkedTransaction;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.MessageProducer;
import com.dianping.cat.message.Metric;
import com.dianping.cat.message.TaggedTransaction;
import com.dianping.cat.message.Trace;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageProducer implements MessageProducer {
@Inject
private MessageManager m_manager;
@Inject
private MessageIdFactory m_factory;
@Override
public String createMessageId() {
return m_factory.getNextId();
}
@Override
public boolean isEnabled() {
return m_manager.isMessageEnabled();
}
@Override
public void logError(String message, Throwable cause) {
if (Cat.getManager().isCatEnabled()) {
if (shouldLog(cause)) {
m_manager.getThreadLocalMessageTree().setSample(false);
StringWriter writer = new StringWriter(2048);
if (message != null) {
writer.write(message);
writer.write(' ');
}
cause.printStackTrace(new PrintWriter(writer));
String detailMessage = writer.toString();
if (cause instanceof Error) {
logEvent("Error", cause.getClass().getName(), "ERROR", detailMessage);
} else if (cause instanceof RuntimeException) {
logEvent("RuntimeException", cause.getClass().getName(), "ERROR", detailMessage);
} else {
logEvent("Exception", cause.getClass().getName(), "ERROR", detailMessage);
}
}
} else {
cause.printStackTrace();
}
}
@Override
public void logError(Throwable cause) {
logError(null, cause);
}
@Override
public void logEvent(String type, String name) {
logEvent(type, name, Message.SUCCESS, null);
}
@Override
public void logEvent(String type, String name, String status, String nameValuePairs) {
Event event = newEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
event.complete();
}
@Override
public void logHeartbeat(String type, String name, String status, String nameValuePairs) {
Heartbeat heartbeat = newHeartbeat(type, name);
heartbeat.addData(nameValuePairs);
heartbeat.setStatus(status);
heartbeat.complete();
}
@Override
public void logMetric(String name, String status, String nameValuePairs) {
String type = "";
Metric metric = newMetric(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
metric.addData(nameValuePairs);
}
metric.setStatus(status);
metric.complete();
}
@Override
public void logTrace(String type, String name) {
logTrace(type, name, Message.SUCCESS, null);
}
@Override
public void logTrace(String type, String name, String status, String nameValuePairs) {
if (m_manager.isTraceMode()) {
Trace trace = newTrace(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
trace.addData(nameValuePairs);
}
trace.setStatus(status);
trace.complete();
}
}
@Override
public Event newEvent(String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultEvent event = new DefaultEvent(type, name, m_manager);
return event;
} else {
return NullMessage.EVENT;
}
}
public Event newEvent(Transaction parent, String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled() && parent != null) {
DefaultEvent event = new DefaultEvent(type, name);
parent.addChild(event);
return event;
} else {
return NullMessage.EVENT;
}
}
@Override
public ForkedTransaction newForkedTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
MessageTree tree = m_manager.getThreadLocalMessageTree();
if (tree.getMessageId() == null) {
tree.setMessageId(createMessageId());
}
DefaultForkedTransaction transaction = new DefaultForkedTransaction(type, name, m_manager);
if (m_manager instanceof DefaultMessageManager) {
((DefaultMessageManager) m_manager).linkAsRunAway(transaction);
}
m_manager.start(transaction, true);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}
@Override
public Heartbeat newHeartbeat(String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name, m_manager);
m_manager.getThreadLocalMessageTree().setSample(false);
return heartbeat;
} else {
return NullMessage.HEARTBEAT;
}
}
@Override
public Metric newMetric(String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultMetric metric = new DefaultMetric(type == null ? "" : type, name, m_manager);
m_manager.getThreadLocalMessageTree().setSample(false);
return metric;
} else {
return NullMessage.METRIC;
}
}
@Override
public TaggedTransaction newTaggedTransaction(String type, String name, String tag) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
MessageTree tree = m_manager.getThreadLocalMessageTree();
if (tree.getMessageId() == null) {
tree.setMessageId(createMessageId());
}
DefaultTaggedTransaction transaction = new DefaultTaggedTransaction(type, name, tag, m_manager);
m_manager.start(transaction, true);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}
@Override
public Trace newTrace(String type, String name) {
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultTrace trace = new DefaultTrace(type, name, m_manager);
return trace;
} else {
return NullMessage.TRACE;
}
}
@Override
public Transaction newTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
m_manager.start(transaction, false);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}
public Transaction newTransaction(Transaction parent, String type, String name) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
m_manager.setup();
}
if (m_manager.isMessageEnabled() && parent != null) {
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
parent.addChild(transaction);
transaction.setStandalone(false);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}
private boolean shouldLog(Throwable e) {
if (m_manager instanceof DefaultMessageManager) {
return ((DefaultMessageManager) m_manager).shouldLog(e);
} else {
return true;
}
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Metric;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultMetric extends AbstractMessage implements Metric {
private MessageManager m_manager;
public DefaultMetric(String type, String name) {
super(type, name);
}
public DefaultMetric(String type, String name, MessageManager manager) {
super(type, name);
m_manager = manager;
}
@Override
public void complete() {
setCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.TaggedTransaction;
import com.dianping.cat.message.spi.MessageManager;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultTaggedTransaction extends DefaultTransaction implements TaggedTransaction {
private String m_rootMessageId;
private String m_parentMessageId;
private String m_tag;
public DefaultTaggedTransaction(String type, String name, String tag, MessageManager manager) {
super(type, name, manager);
m_tag = tag;
setStandalone(false);
MessageTree tree = manager.getThreadLocalMessageTree();
if (tree != null) {
m_rootMessageId = tree.getRootMessageId();
m_parentMessageId = tree.getMessageId();
}
}
@Override
public void bind(String tag, String childMessageId, String title) {
DefaultEvent event = new DefaultEvent("RemoteCall", "Tagged");
if (title == null) {
title = getType() + ":" + getName();
}
event.addData(childMessageId, title);
event.setTimestamp(getTimestamp());
event.setStatus(Message.SUCCESS);
event.setCompleted(true);
addChild(event);
}
@Override
public String getParentMessageId() {
return m_parentMessageId;
}
@Override
public String getRootMessageId() {
return m_rootMessageId;
}
@Override
public String getTag() {
return m_tag;
}
@Override
public void start() {
MessageTree tree = getManager().getThreadLocalMessageTree();
if (tree != null && tree.getRootMessageId() == null) {
tree.setParentMessageId(m_parentMessageId);
tree.setRootMessageId(m_rootMessageId);
}
}
}
package com.dianping.cat.message.internal;
import com.dianping.cat.message.Trace;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultTrace extends AbstractMessage implements Trace {
private MessageManager m_manager;
public DefaultTrace(String type, String name) {
super(type, name);
}
public DefaultTrace(String type, String name, MessageManager manager) {
super(type, name);
m_manager = manager;
}
@Override
public void complete() {
setCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
}
package com.dianping.cat.message.internal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import com.dianping.cat.Cat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.spi.MessageManager;
public class DefaultTransaction extends AbstractMessage implements Transaction {
private long m_durationInMicro = -1; // must be less than 0
private List<Message> m_children;
private MessageManager m_manager;
private boolean m_standalone;
private long m_durationStart;
public DefaultTransaction(String type, String name, MessageManager manager) {
super(type, name);
m_manager = manager;
m_standalone = true;
m_durationStart = System.nanoTime();
}
@Override
public DefaultTransaction addChild(Message message) {
if (m_children == null) {
m_children = new ArrayList<Message>();
}
if (message != null) {
m_children.add(message);
} else {
Cat.logError(new Exception("null child message"));
}
return this;
}
@Override
public void complete() {
try {
if (isCompleted()) {
// complete() was called more than once
DefaultEvent event = new DefaultEvent("cat", "BadInstrument");
event.setStatus("TransactionAlreadyCompleted");
event.complete();
addChild(event);
} else {
m_durationInMicro = (System.nanoTime() - m_durationStart) / 1000L;
setCompleted(true);
if (m_manager != null) {
m_manager.end(this);
}
}
} catch (Exception e) {
// ignore
}
}
@Override
public List<Message> getChildren() {
if (m_children == null) {
return Collections.emptyList();
}
return m_children;
}
@Override
public long getDurationInMicros() {
if (m_durationInMicro >= 0) {
return m_durationInMicro;
} else { // if it's not completed explicitly
long duration = 0;
int len = m_children == null ? 0 : m_children.size();
if (len > 0) {
Message lastChild = m_children.get(len - 1);
if (lastChild instanceof Transaction) {
DefaultTransaction trx = (DefaultTransaction) lastChild;
duration = (trx.getTimestamp() - getTimestamp()) * 1000L;
} else {
duration = (lastChild.getTimestamp() - getTimestamp()) * 1000L;
}
}
return duration;
}
}
@Override
public long getDurationInMillis() {
return getDurationInMicros() / 1000L;
}
protected MessageManager getManager() {
return m_manager;
}
@Override
public boolean hasChildren() {
return m_children != null && m_children.size() > 0;
}
@Override
public boolean isStandalone() {
return m_standalone;
}
public void setDurationInMicros(long duration) {
m_durationInMicro = duration;
}
public void setDurationInMillis(long duration) {
m_durationInMicro = duration * 1000L;
}
public void setStandalone(boolean standalone) {
m_standalone = standalone;
}
public void setDurationStart(long durationStart) {
m_durationStart = durationStart;
}
}
package com.dianping.cat.message.internal;
import java.util.List;
import org.unidal.helper.Splitters;
public class MessageId {
private static final long VERSION1_THRESHOLD = 1325347200000L; // Jan. 1 2012
private String m_domain;
private String m_ipAddressInHex;
private long m_timestamp;
private int m_index;
public static MessageId parse(String messageId) {
List<String> list = Splitters.by('-').split(messageId);
int len = list.size();
if (len >= 4) {
String ipAddressInHex = list.get(len - 3);
long timestamp = Long.parseLong(list.get(len - 2));
int index = Integer.parseInt(list.get(len - 1));
String domain;
if (len > 4) { // allow domain contains '-'
StringBuilder sb = new StringBuilder();
for (int i = 0; i < len - 3; i++) {
if (i > 0) {
sb.append('-');
}
sb.append(list.get(i));
}
domain = sb.toString();
} else {
domain = list.get(0);
}
return new MessageId(domain, ipAddressInHex, timestamp, index);
}
throw new RuntimeException("Invalid message id format: " + messageId);
}
MessageId(String domain, String ipAddressInHex, long timestamp, int index) {
m_domain = domain;
m_ipAddressInHex = ipAddressInHex;
m_timestamp = timestamp;
m_index = index;
}
public String getDomain() {
return m_domain;
}
public int getIndex() {
return m_index;
}
public String getIpAddress() {
StringBuilder sb = new StringBuilder();
String local = m_ipAddressInHex;
int length = local.length();
for (int i = 0; i < length; i += 2) {
char first = local.charAt(i);
char next = local.charAt(i + 1);
int temp = 0;
if (first >= '0' && first <= '9') {
temp += (first - '0') << 4;
} else {
temp += ((first - 'a') + 10) << 4;
}
if (next >= '0' && next <= '9') {
temp += next - '0';
} else {
temp += (next - 'a') + 10;
}
if (sb.length() > 0) {
sb.append('.');
}
sb.append(temp);
}
return sb.toString();
}
public String getIpAddressInHex() {
return m_ipAddressInHex;
}
public long getTimestamp() {
if (m_timestamp > VERSION1_THRESHOLD) {
return m_timestamp;
} else {
return m_timestamp * 3600 * 1000L;
}
}
public int getVersion() {
if (m_timestamp > VERSION1_THRESHOLD) {
return 1;
} else {
return 2;
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(m_domain.length() + 32);
sb.append(m_domain);
sb.append('-');
sb.append(m_ipAddressInHex);
sb.append('-');
sb.append(m_timestamp);
sb.append('-');
sb.append(m_index);
return sb.toString();
}
}
package com.dianping.cat.message.internal;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.unidal.helper.Splitters;
import com.dianping.cat.configuration.NetworkInterfaceManager;
public class MessageIdFactory {
private volatile long m_timestamp = getTimestamp();
private volatile AtomicInteger m_index;
private String m_domain;
private String m_ipAddress;
private MappedByteBuffer m_byteBuffer;
private RandomAccessFile m_markFile;
private static final long HOUR = 3600 * 1000L;
private BlockingQueue<String> m_reusedIds = new LinkedBlockingQueue<String>(100000);
public void close() {
try {
m_markFile.close();
} catch (Exception e) {
// ignore it
}
}
private File createMarkFile(String domain) {
File mark = new File("/data/appdatas/cat/", "cat-" + domain + ".mark");
if (!mark.exists()) {
boolean success = true;
try {
success = mark.createNewFile();
} catch (Exception e) {
success = false;
}
if (!success) {
mark = createTempFile(domain);
}
} else if (!mark.canWrite()) {
mark = createTempFile(domain);
}
return mark;
}
private File createTempFile(String domain) {
String tmpDir = System.getProperty("java.io.tmpdir");
File mark = new File(tmpDir, "cat-" + domain + ".mark");
return mark;
}
public String getNextId() {
String id = m_reusedIds.poll();
if (id != null) {
return id;
} else {
long timestamp = getTimestamp();
if (timestamp != m_timestamp) {
m_index = new AtomicInteger(0);
m_timestamp = timestamp;
}
int index = m_index.getAndIncrement();
StringBuilder sb = new StringBuilder(m_domain.length() + 32);
sb.append(m_domain);
sb.append('-');
sb.append(m_ipAddress);
sb.append('-');
sb.append(timestamp);
sb.append('-');
sb.append(index);
return sb.toString();
}
}
protected long getTimestamp() {
long timestamp = MilliSecondTimer.currentTimeMillis();
return timestamp / HOUR; // version 2
}
public void initialize(String domain) throws IOException {
m_domain = domain;
if (m_ipAddress == null) {
String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();
List<String> items = Splitters.by(".").noEmptyItem().split(ip);
byte[] bytes = new byte[4];
for (int i = 0; i < 4; i++) {
bytes[i] = (byte) Integer.parseInt(items.get(i));
}
StringBuilder sb = new StringBuilder(bytes.length / 2);
for (byte b : bytes) {
sb.append(Integer.toHexString((b >> 4) & 0x0F));
sb.append(Integer.toHexString(b & 0x0F));
}
m_ipAddress = sb.toString();
}
File mark = createMarkFile(domain);
m_markFile = new RandomAccessFile(mark, "rw");
m_byteBuffer = m_markFile.getChannel().map(MapMode.READ_WRITE, 0, 20);
if (m_byteBuffer.limit() > 0) {
int index = m_byteBuffer.getInt();
long lastTimestamp = m_byteBuffer.getLong();
if (lastTimestamp == m_timestamp) { // for same hour
m_index = new AtomicInteger(index + 10000);
} else {
m_index = new AtomicInteger(0);
}
}
saveMark();
}
protected void resetIndex() {
m_index.set(0);
}
public void reuse(String id) {
m_reusedIds.offer(id);
}
public void saveMark() {
try {
m_byteBuffer.rewind();
m_byteBuffer.putInt(m_index.get());
m_byteBuffer.putLong(m_timestamp);
} catch (Exception e) {
// ignore it
}
}
public void setDomain(String domain) {
m_domain = domain;
}
public void setIpAddress(String ipAddress) {
m_ipAddress = ipAddress;
}
}
package com.dianping.cat.message.internal;
import java.util.concurrent.locks.LockSupport;
/**
* This timer provides milli-second precise system time.
*/
public class MilliSecondTimer {
private static long m_baseTime;
private static long m_startNanoTime;
private static boolean m_isWindows = false;
public static long currentTimeMillis() {
if (m_isWindows) {
if (m_baseTime == 0) {
initialize();
}
long elipsed = (long) ((System.nanoTime() - m_startNanoTime) / 1e6);
return m_baseTime + elipsed;
} else {
return System.currentTimeMillis();
}
}
public static void initialize() {
String os = System.getProperty("os.name");
if (os.startsWith("Windows")) {
m_isWindows = true;
m_baseTime = System.currentTimeMillis();
while (true) {
LockSupport.parkNanos(100000); // 0.1 ms
long millis = System.currentTimeMillis();
if (millis != m_baseTime) {
m_baseTime = millis;
m_startNanoTime = System.nanoTime();
break;
}
}
} else {
m_baseTime = System.currentTimeMillis();
m_startNanoTime = System.nanoTime();
}
}
}
package com.dianping.cat.message.internal;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Metric;
import com.dianping.cat.message.Transaction;
public abstract class MockMessageBuilder {
private Stack<TransactionHolder> m_stack = new Stack<TransactionHolder>();
public final Message build() {
try {
return define().build();
} finally {
m_stack.clear();
}
}
public abstract MessageHolder define();
protected EventHolder e(String type, String name) {
EventHolder e = new EventHolder(type, name);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return e;
}
protected EventHolder e(String type, String name, String data) {
EventHolder e = new EventHolder(type, name, data);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return e;
}
protected HeartbeatHolder h(String type, String name) {
HeartbeatHolder h = new HeartbeatHolder(type, name);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
h.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return h;
}
protected MetricHolder m(String type, String name) {
MetricHolder e = new MetricHolder(type, name);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return e;
}
protected MetricHolder m(String type, String name, String data) {
MetricHolder e = new MetricHolder(type, name, data);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
e.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
return e;
}
protected TransactionHolder t(String type, String name, long durationInMillis) {
TransactionHolder t = new TransactionHolder(type, name, durationInMillis);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
t.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
m_stack.push(t);
return t;
}
protected TransactionHolder t(String type, String name, String data, long durationInMillis) {
TransactionHolder t = new TransactionHolder(type, name, data, durationInMillis);
TransactionHolder parent = m_stack.isEmpty() ? null : m_stack.peek();
if (parent != null) {
t.setTimestampInMicros(parent.getCurrentTimestampInMicros());
}
m_stack.push(t);
return t;
}
protected static abstract class AbstractMessageHolder implements MessageHolder {
private String m_type;
private String m_name;
private String m_data;
private long m_timestampInMicros;
private String m_status = "0";
public AbstractMessageHolder(String type, String name) {
m_type = type;
m_name = name;
}
public AbstractMessageHolder(String type, String name, String data) {
m_type = type;
m_name = name;
m_data = data;
}
public void addData(String key, String value) {
if (m_data == null) {
m_data = key + "=" + value;
} else {
m_data = m_data + "&" + key + "=" + value;
}
}
public String getData() {
return m_data;
}
public String getName() {
return m_name;
}
public String getStatus() {
return m_status;
}
@Override
public long getTimestampInMicros() {
return m_timestampInMicros;
}
public long getTimestampInMillis() {
return m_timestampInMicros / 1000;
}
public String getType() {
return m_type;
}
public void setStatus(String status) {
m_status = status;
}
@Override
public void setTimestampInMicros(long timestampInMicros) {
m_timestampInMicros = timestampInMicros;
}
}
public static class EventHolder extends AbstractMessageHolder {
private DefaultEvent m_event;
public EventHolder(String type, String name) {
super(type, name);
}
public EventHolder(String type, String name, String data) {
super(type, name, data);
}
@Override
public Event build() {
m_event = new DefaultEvent(getType(), getName(), null);
m_event.setTimestamp(getTimestampInMillis());
m_event.setStatus(getStatus());
m_event.addData(getData());
m_event.complete();
return m_event;
}
public EventHolder status(String status) {
setStatus(status);
return this;
}
}
protected static class HeartbeatHolder extends AbstractMessageHolder {
private DefaultHeartbeat m_heartbeat;
public HeartbeatHolder(String type, String name) {
super(type, name);
}
@Override
public Heartbeat build() {
m_heartbeat = new DefaultHeartbeat(getType(), getName());
m_heartbeat.setTimestamp(getTimestampInMillis());
m_heartbeat.setStatus(getStatus());
m_heartbeat.complete();
return m_heartbeat;
}
public HeartbeatHolder status(String status) {
setStatus(status);
return this;
}
}
protected static interface MessageHolder {
public Message build();
public long getTimestampInMicros();
public void setTimestampInMicros(long timestampInMicros);
}
protected static class MetricHolder extends AbstractMessageHolder {
private DefaultMetric m_metric;
public MetricHolder(String type, String name) {
super(type, name);
}
public MetricHolder(String type, String name, String data) {
super(type, name, data);
}
@Override
public Metric build() {
m_metric = new DefaultMetric(getType(), getName());
m_metric.setTimestamp(getTimestampInMillis());
m_metric.setStatus(getStatus());
m_metric.addData(getData());
m_metric.complete();
return m_metric;
}
public MetricHolder status(String status) {
setStatus(status);
return this;
}
}
protected class TransactionHolder extends AbstractMessageHolder {
private long m_durationInMicros;
private long m_currentTimestampInMicros;
private List<MessageHolder> m_children = new ArrayList<MessageHolder>();
private DefaultTransaction m_transaction;
private long m_markTimestampInMicros;
public TransactionHolder(String type, String name, long durationInMicros) {
super(type, name);
m_durationInMicros = durationInMicros;
}
public TransactionHolder(String type, String name, String data, long durationInMicros) {
super(type, name, data);
m_durationInMicros = durationInMicros;
}
public TransactionHolder after(long periodInMicros) {
m_currentTimestampInMicros += periodInMicros;
return this;
}
public TransactionHolder at(long timestampInMillis) {
m_currentTimestampInMicros = timestampInMillis * 1000;
setTimestampInMicros(m_currentTimestampInMicros);
return this;
}
@Override
public Transaction build() {
m_transaction = new DefaultTransaction(getType(), getName(), null);
m_transaction.setTimestamp(getTimestampInMillis());
for (MessageHolder child : m_children) {
m_transaction.addChild(child.build());
}
m_transaction.setStatus(getStatus());
m_transaction.addData(getData());
m_transaction.complete();
m_transaction.setDurationInMicros(m_durationInMicros);
return m_transaction;
}
public TransactionHolder child(MessageHolder child) {
if (child instanceof TransactionHolder) {
m_currentTimestampInMicros += ((TransactionHolder) child).getDurationInMicros();
m_stack.pop();
}
m_children.add(child);
return this;
}
public TransactionHolder data(String key, String value) {
addData(key, value);
return this;
}
public long getCurrentTimestampInMicros() {
return m_currentTimestampInMicros;
}
public long getDurationInMicros() {
return m_durationInMicros;
}
public TransactionHolder mark() {
m_markTimestampInMicros = m_currentTimestampInMicros;
return this;
}
public TransactionHolder reset() {
m_currentTimestampInMicros = m_markTimestampInMicros;
return this;
}
@Override
public void setTimestampInMicros(long timestampInMicros) {
super.setTimestampInMicros(timestampInMicros);
m_currentTimestampInMicros = timestampInMicros;
}
public TransactionHolder status(String status) {
setStatus(status);
return this;
}
}
}
package com.dianping.cat.message.internal;
import java.util.Collections;
import java.util.List;
import com.dianping.cat.message.Event;
import com.dianping.cat.message.ForkedTransaction;
import com.dianping.cat.message.Heartbeat;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Metric;
import com.dianping.cat.message.TaggedTransaction;
import com.dianping.cat.message.Trace;
import com.dianping.cat.message.Transaction;
public enum NullMessage implements Transaction, Event, Metric, Trace, Heartbeat, ForkedTransaction, TaggedTransaction {
TRANSACTION,
EVENT,
METRIC,
TRACE,
HEARTBEAT;
@Override
public Transaction addChild(Message message) {
return this;
}
@Override
public void addData(String keyValuePairs) {
}
@Override
public void addData(String key, Object value) {
}
@Override
public void bind(String tag, String childMessageId, String title) {
}
@Override
public void complete() {
}
@Override
public void fork() {
}
@Override
public List<Message> getChildren() {
return Collections.emptyList();
}
@Override
public Object getData() {
return null;
}
@Override
public long getDurationInMicros() {
return 0;
}
@Override
public long getDurationInMillis() {
return 0;
}
@Override
public String getForkedMessageId() {
throw new UnsupportedOperationException();
}
@Override
public String getName() {
throw new UnsupportedOperationException();
}
public String getParentMessageId() {
return null;
}
public String getRootMessageId() {
return null;
}
@Override
public String getStatus() {
throw new UnsupportedOperationException();
}
@Override
public String getTag() {
throw new UnsupportedOperationException();
}
@Override
public long getTimestamp() {
throw new UnsupportedOperationException();
}
@Override
public String getType() {
throw new UnsupportedOperationException();
}
@Override
public boolean hasChildren() {
return false;
}
@Override
public boolean isCompleted() {
return true;
}
@Override
public boolean isStandalone() {
return true;
}
@Override
public boolean isSuccess() {
return true;
}
@Override
public void setStatus(String status) {
}
@Override
public void setStatus(Throwable e) {
}
@Override
public void start() {
}
}
package com.dianping.cat.message.io;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageQueue implements MessageQueue {
private BlockingQueue<MessageTree> m_queue;
private AtomicInteger m_count = new AtomicInteger();
public DefaultMessageQueue(int size) {
m_queue = new LinkedBlockingQueue<MessageTree>(size);
}
@Override
public boolean offer(MessageTree tree) {
return m_queue.offer(tree);
}
@Override
public boolean offer(MessageTree tree, double sampleRatio) {
if (tree.isSample() && sampleRatio < 1.0) {
if (sampleRatio > 0) {
int count = m_count.incrementAndGet();
if (count % (1 / sampleRatio) == 0) {
return offer(tree);
}
}
return false;
} else {
return offer(tree);
}
}
@Override
public MessageTree peek() {
return m_queue.peek();
}
@Override
public MessageTree poll() {
try {
return m_queue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
return null;
}
}
@Override
public int size() {
return m_queue.size();
}
}
package com.dianping.cat.message.io;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.configuration.client.entity.Server;
public class DefaultTransportManager implements TransportManager, Initializable, LogEnabled {
@Inject
private ClientConfigManager m_configManager;
@Inject
private TcpSocketSender m_tcpSocketSender;
private Logger m_logger;
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public MessageSender getSender() {
return m_tcpSocketSender;
}
@Override
public void initialize() throws InitializationException {
List<Server> servers = m_configManager.getServers();
if (!m_configManager.isCatEnabled()) {
m_tcpSocketSender = null;
m_logger.warn("CAT was DISABLED due to not initialized yet!");
} else {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (Server server : servers) {
if (server.isEnabled()) {
addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
}
}
m_logger.info("Remote CAT servers: " + addresses);
if (addresses.isEmpty()) {
throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers);
} else {
m_tcpSocketSender.setServerAddresses(addresses);
m_tcpSocketSender.initialize();
}
}
}
}
package com.dianping.cat.message.io;
import com.dianping.cat.message.spi.MessageTree;
public interface MessageSender {
public void initialize();
public void send(MessageTree tree);
public void shutdown();
}
package com.dianping.cat.message.io;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.helper.Threads.Task;
import org.unidal.lookup.annotation.Inject;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
public class TcpSocketSender implements Task, MessageSender, LogEnabled {
public static final String ID = "tcp-socket-sender";
public static final int SIZE = 5000;
@Inject
private MessageCodec m_codec;
@Inject
private MessageStatistics m_statistics;
@Inject
private ClientConfigManager m_configManager;
@Inject
private MessageIdFactory m_factory;
private MessageQueue m_queue = new DefaultMessageQueue(SIZE);
private MessageQueue m_atomicTrees = new DefaultMessageQueue(SIZE);
private List<InetSocketAddress> m_serverAddresses;
private ChannelManager m_manager;
private Logger m_logger;
private transient boolean m_active;
private AtomicInteger m_errors = new AtomicInteger();
private AtomicInteger m_attempts = new AtomicInteger();
private static final int MAX_CHILD_NUMBER = 200;
private static final long HOUR = 1000 * 60 * 60L;
private boolean checkWritable(ChannelFuture future) {
boolean isWriteable = false;
Channel channel = future.channel();
if (future != null && channel.isOpen()) {
if (channel.isActive() && channel.isWritable()) {
isWriteable = true;
} else {
int count = m_attempts.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Netty write buffer is full! Attempts: " + count);
}
}
}
return isWriteable;
}
@Override
public void enableLogging(Logger logger) {
m_logger = logger;
}
@Override
public String getName() {
return "TcpSocketSender";
}
@Override
public void initialize() {
m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
Threads.forGroup("cat").start(this);
Threads.forGroup("cat").start(m_manager);
Threads.forGroup("cat").start(new MergeAtomicTask());
}
private boolean isAtomicMessage(MessageTree tree) {
Message message = tree.getMessage();
if (message instanceof Transaction) {
String type = message.getType();
if (type.startsWith("Cache.") || "SQL".equals(type)) {
return true;
} else {
return false;
}
} else {
return true;
}
}
private void logQueueFullInfo(MessageTree tree) {
if (m_statistics != null) {
m_statistics.onOverflowed(tree);
}
int count = m_errors.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
}
tree = null;
}
private MessageTree mergeTree(MessageQueue trees) {
int max = MAX_CHILD_NUMBER;
DefaultTransaction tran = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);
MessageTree first = trees.poll();
tran.setStatus(Transaction.SUCCESS);
tran.setCompleted(true);
tran.addChild(first.getMessage());
tran.setTimestamp(first.getMessage().getTimestamp());
long lastTimestamp = 0;
long lastDuration = 0;
while (max >= 0) {
MessageTree tree = trees.poll();
if (tree == null) {
tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
break;
}
lastTimestamp = tree.getMessage().getTimestamp();
if(tree.getMessage() instanceof DefaultTransaction){
lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
} else {
lastDuration = 0;
}
tran.addChild(tree.getMessage());
m_factory.reuse(tree.getMessageId());
max--;
}
((DefaultMessageTree) first).setMessage(tran);
return first;
}
@Override
public void run() {
m_active = true;
while (m_active) {
ChannelFuture channel = m_manager.channel();
if (channel != null && checkWritable(channel)) {
try {
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
long current = System.currentTimeMillis();
long oldTimestamp = current - HOUR;
while (true) {
try {
MessageTree tree = m_queue.peek();
if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
MessageTree discradTree = m_queue.poll();
if (discradTree != null) {
m_statistics.onOverflowed(discradTree);
}
} else {
break;
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
break;
}
}
try {
Thread.sleep(5);
} catch (Exception e) {
// ignore it
m_active = false;
}
}
}
}
@Override
public void send(MessageTree tree) {
if (isAtomicMessage(tree)) {
boolean result = m_atomicTrees.offer(tree, m_manager.getSample());
if (!result) {
logQueueFullInfo(tree);
}
} else {
boolean result = m_queue.offer(tree, m_manager.getSample());
if (!result) {
logQueueFullInfo(tree);
}
}
}
private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.channel();
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
m_codec.encode(tree, buf);
int size = buf.readableBytes();
Channel channel = future.channel();
channel.writeAndFlush(buf);
if (m_statistics != null) {
m_statistics.onBytes(size);
}
}
public void setServerAddresses(List<InetSocketAddress> serverAddresses) {
m_serverAddresses = serverAddresses;
}
private boolean shouldMerge(MessageQueue trees) {
MessageTree tree = trees.peek();
if (tree != null) {
long firstTime = tree.getMessage().getTimestamp();
int maxDuration = 1000 * 30;
if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
return true;
}
}
return false;
}
@Override
public void shutdown() {
m_active = false;
m_manager.shutdown();
}
public class MergeAtomicTask implements Task {
@Override
public String getName() {
return "merge-atomic-task";
}
@Override
public void run() {
while (true) {
if (shouldMerge(m_atomicTrees)) {
MessageTree tree = mergeTree(m_atomicTrees);
boolean result = m_queue.offer(tree);
if (!result) {
logQueueFullInfo(tree);
}
} else {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
break;
}
}
}
}
@Override
public void shutdown() {
}
}
}
package com.dianping.cat.message.io;
public interface TransportManager {
public MessageSender getSender();
}
package com.dianping.cat.message.spi;
import io.netty.buffer.ByteBuf;
public interface MessageCodec {
public MessageTree decode(ByteBuf buf);
public void decode(ByteBuf buf, MessageTree tree);
public void encode(MessageTree tree, ByteBuf buf);
}
package com.dianping.cat.message.spi;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
/**
* Message manager to help build CAT message.
* <p>
*
* Notes: This method is reserved for internal usage only. Application developer should never call this method directly.
*/
public interface MessageManager {
public void add(Message message);
/**
* Be triggered when a transaction ends, whatever it's the root transaction or nested transaction. However, if it's
* the root transaction then it will be flushed to back-end CAT server asynchronously.
* <p>
*
* @param transaction
*/
public void end(Transaction transaction);
/**
* Get peek transaction for current thread.
*
* @return peek transaction for current thread, null if no transaction there.
*/
public Transaction getPeekTransaction();
/**
* Get thread local message information.
*
* @return message tree, null means current thread is not setup correctly.
*/
public MessageTree getThreadLocalMessageTree();
/**
* Check if the thread context is setup or not.
*
* @return true if the thread context is setup, false otherwise
*/
public boolean hasContext();
/**
* Check if current context logging is enabled or disabled.
*
* @return true if current context is enabled
*/
public boolean isMessageEnabled();
/**
* Check if CAT logging is enabled or disabled.
*
* @return true if CAT is enabled
*/
public boolean isCatEnabled();
/**
* Check if CAT trace mode is enabled or disabled.
*
* @return true if CAT is trace mode
*/
public boolean isTraceMode();
/**
* Do cleanup for current thread environment in order to release resources in thread local objects.
*/
public void reset();
/**
* Set CAT trace mode.
*
*/
public void setTraceMode(boolean traceMode);
/**
* Do setup for current thread environment in order to prepare thread local objects.
*/
public void setup();
/**
* Be triggered when a new transaction starts, whatever it's the root transaction or nested transaction.
*
* @param transaction
* @param forked
*/
public void start(Transaction transaction, boolean forked);
/**
* Binds the current message tree to the transaction tagged with <code>tag</code>.
*
* @param tag
* tag name of the tagged transaction
* @param title
* title shown in the logview
*/
public void bind(String tag, String title);
/**
* get domain
*
*/
public String getDomain();
}
\ No newline at end of file \ No newline at end of file
package com.dianping.cat.message.spi;
public interface MessageQueue {
public boolean offer(MessageTree tree);
public boolean offer(MessageTree tree, double sampleRatio);
public MessageTree peek();
public MessageTree poll();
// the current size of the queue
public int size();
}
package com.dianping.cat.message.spi;
public interface MessageStatistics {
public long getBytes();
public long getOverflowed();
public long getProduced();
public void onBytes(int size);
public void onOverflowed(MessageTree tree);
}
package com.dianping.cat.message.spi;
import com.dianping.cat.message.Message;
public interface MessageTree extends Cloneable {
public MessageTree copy();
public String getDomain();
public String getHostName();
public String getIpAddress();
public Message getMessage();
public String getMessageId();
public String getParentMessageId();
public String getRootMessageId();
public String getSessionToken();
public String getThreadGroupName();
public String getThreadId();
public String getThreadName();
public boolean isSample();
public void setDomain(String domain);
public void setHostName(String hostName);
public void setIpAddress(String ipAddress);
public void setMessage(Message message);
public void setMessageId(String messageId);
public void setParentMessageId(String parentMessageId);
public void setRootMessageId(String rootMessageId);
public void setSessionToken(String sessionToken);
public void setThreadGroupName(String name);
public void setThreadId(String threadId);
public void setThreadName(String id);
public void setSample(boolean sample);
}
package com.dianping.cat.message.spi.codec;
import io.netty.buffer.ByteBuf;
public interface BufferWriter {
public int writeTo(ByteBuf buf, byte[] data);
}
package com.dianping.cat.message.spi.codec;
import io.netty.buffer.ByteBuf;
public class EscapingBufferWriter implements BufferWriter {
public static final String ID = "escape";
@Override
public int writeTo(ByteBuf buf, byte[] data) {
int len = data.length;
int count = len;
int offset = 0;
for (int i = 0; i < len; i++) {
byte b = data[i];
if (b == '\t' || b == '\r' || b == '\n' || b == '\\') {
buf.writeBytes(data, offset, i - offset);
buf.writeByte('\\');
if (b == '\t') {
buf.writeByte('t');
} else if (b == '\r') {
buf.writeByte('r');
} else if (b == '\n') {
buf.writeByte('n');
} else {
buf.writeByte(b);
}
count++;
offset = i + 1;
}
}
if (len > offset) {
buf.writeBytes(data, offset, len - offset);
}
return count;
}
}
package com.dianping.cat.message.spi.internal;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
public class DefaultMessageStatistics implements MessageStatistics {
private long m_produced;
private long m_overflowed;
private long m_bytes;
@Override
public long getBytes() {
return m_bytes;
}
@Override
public long getOverflowed() {
return m_overflowed;
}
@Override
public long getProduced() {
return m_produced;
}
@Override
public void onBytes(int bytes) {
m_bytes += bytes;
m_produced++;
}
@Override
public void onOverflowed(MessageTree tree) {
m_overflowed++;
}
}
package com.dianping.cat.message.spi.internal;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.nio.charset.Charset;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.PlainTextMessageCodec;
public class DefaultMessageTree implements MessageTree {
private ByteBuf m_buf;
private String m_domain;
private String m_hostName;
private String m_ipAddress;
private Message m_message;
private String m_messageId;
private String m_parentMessageId;
private String m_rootMessageId;
private String m_sessionToken;
private String m_threadGroupName;
private String m_threadId;
private String m_threadName;
private boolean m_sample = true;
@Override
public MessageTree copy() {
MessageTree tree = new DefaultMessageTree();
tree.setDomain(m_domain);
tree.setHostName(m_hostName);
tree.setIpAddress(m_ipAddress);
tree.setMessageId(m_messageId);
tree.setParentMessageId(m_parentMessageId);
tree.setRootMessageId(m_rootMessageId);
tree.setSessionToken(m_sessionToken);
tree.setThreadGroupName(m_threadGroupName);
tree.setThreadId(m_threadId);
tree.setThreadName(m_threadName);
tree.setMessage(m_message);
tree.setSample(m_sample);
return tree;
}
public ByteBuf getBuffer() {
return m_buf;
}
@Override
public String getDomain() {
return m_domain;
}
@Override
public String getHostName() {
return m_hostName;
}
@Override
public String getIpAddress() {
return m_ipAddress;
}
@Override
public Message getMessage() {
return m_message;
}
@Override
public String getMessageId() {
return m_messageId;
}
@Override
public String getParentMessageId() {
return m_parentMessageId;
}
@Override
public String getRootMessageId() {
return m_rootMessageId;
}
@Override
public String getSessionToken() {
return m_sessionToken;
}
@Override
public String getThreadGroupName() {
return m_threadGroupName;
}
@Override
public String getThreadId() {
return m_threadId;
}
@Override
public String getThreadName() {
return m_threadName;
}
@Override
public boolean isSample() {
return m_sample;
}
public void setBuffer(ByteBuf buf) {
m_buf = buf;
}
@Override
public void setDomain(String domain) {
m_domain = domain;
}
@Override
public void setHostName(String hostName) {
m_hostName = hostName;
}
@Override
public void setIpAddress(String ipAddress) {
m_ipAddress = ipAddress;
}
@Override
public void setMessage(Message message) {
m_message = message;
}
@Override
public void setMessageId(String messageId) {
if (messageId != null && messageId.length() > 0) {
m_messageId = messageId;
}
}
@Override
public void setParentMessageId(String parentMessageId) {
if (parentMessageId != null && parentMessageId.length() > 0) {
m_parentMessageId = parentMessageId;
}
}
@Override
public void setRootMessageId(String rootMessageId) {
if (rootMessageId != null && rootMessageId.length() > 0) {
m_rootMessageId = rootMessageId;
}
}
@Override
public void setSample(boolean sample) {
m_sample = sample;
}
@Override
public void setSessionToken(String sessionToken) {
m_sessionToken = sessionToken;
}
@Override
public void setThreadGroupName(String threadGroupName) {
m_threadGroupName = threadGroupName;
}
@Override
public void setThreadId(String threadId) {
m_threadId = threadId;
}
@Override
public void setThreadName(String threadName) {
m_threadName = threadName;
}
@Override
public String toString() {
PlainTextMessageCodec codec = new PlainTextMessageCodec();
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
codec.encode(this, buf);
buf.readInt(); // get rid of length
codec.reset();
return buf.toString(Charset.forName("utf-8"));
}
}
package com.dianping.cat.servlet;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.unidal.helper.Joiners;
import org.unidal.helper.Joiners.IBuilder;
import com.dianping.cat.Cat;
import com.dianping.cat.CatConstants;
import com.dianping.cat.configuration.client.entity.Server;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultMessageManager;
import com.dianping.cat.message.internal.DefaultTransaction;
public class CatFilter implements Filter {
private List<Handler> m_handlers = new ArrayList<Handler>();
@Override
public void destroy() {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException,
ServletException {
Context ctx = new Context((HttpServletRequest) request, (HttpServletResponse) response, chain, m_handlers);
ctx.handle();
}
protected String getOriginalUrl(ServletRequest request) {
return ((HttpServletRequest) request).getRequestURI();
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
m_handlers.add(CatHandler.ENVIRONMENT);
m_handlers.add(CatHandler.LOG_SPAN);
m_handlers.add(CatHandler.LOG_CLIENT_PAYLOAD);
m_handlers.add(CatHandler.ID_SETUP);
}
private static enum CatHandler implements Handler {
ENVIRONMENT {
@Override
public void handle(Context ctx) throws IOException, ServletException {
HttpServletRequest req = ctx.getRequest();
boolean top = !Cat.getManager().hasContext();
ctx.setTop(top);
if (top) {
ctx.setType(CatConstants.TYPE_URL);
setTraceMode(req);
} else {
ctx.setType(CatConstants.TYPE_URL_FORWARD);
}
ctx.handle();
}
protected void setTraceMode(HttpServletRequest req) {
String traceMode = "X-CAT-TRACE-MODE";
String headMode = req.getHeader(traceMode);
if ("true".equals(headMode)) {
Cat.getManager().setTraceMode(true);
}
}
},
ID_SETUP {
private String m_servers;
private String getCatServer() {
if (m_servers == null) {
DefaultMessageManager manager = (DefaultMessageManager) Cat.getManager();
List<Server> servers = manager.getConfigManager().getServers();
m_servers = Joiners.by(',').join(servers, new IBuilder<Server>() {
@Override
public String asString(Server server) {
String ip = server.getIp();
Integer httpPort = server.getHttpPort();
return ip + ":" + httpPort;
}
});
}
return m_servers;
}
@Override
public void handle(Context ctx) throws IOException, ServletException {
boolean isTraceMode = Cat.getManager().isTraceMode();
HttpServletResponse res = ctx.getResponse();
if (isTraceMode) {
String id = Cat.getCurrentMessageId();
res.setHeader("X-CAT-ROOT-ID", id);
res.setHeader("X-CAT-SERVER", getCatServer());
}
ctx.handle();
}
},
LOG_CLIENT_PAYLOAD {
@Override
public void handle(Context ctx) throws IOException, ServletException {
HttpServletRequest req = ctx.getRequest();
String type = ctx.getType();
if (ctx.isTop()) {
logRequestClientInfo(req, type);
logRequestPayload(req, type);
} else {
logRequestPayload(req, type);
}
ctx.handle();
}
protected void logRequestClientInfo(HttpServletRequest req, String type) {
StringBuilder sb = new StringBuilder(1024);
String ip = "";
String ipForwarded = req.getHeader("x-forwarded-for");
if (ipForwarded == null) {
ip = req.getRemoteAddr();
} else {
ip = ipForwarded;
}
sb.append("IPS=").append(ip);
sb.append("&VirtualIP=").append(req.getRemoteAddr());
sb.append("&Server=").append(req.getServerName());
sb.append("&Referer=").append(req.getHeader("referer"));
sb.append("&Agent=").append(req.getHeader("user-agent"));
Cat.logEvent(type, type + ".Server", Message.SUCCESS, sb.toString());
}
protected void logRequestPayload(HttpServletRequest req, String type) {
StringBuilder sb = new StringBuilder(256);
sb.append(req.getScheme().toUpperCase()).append('/');
sb.append(req.getMethod()).append(' ').append(req.getRequestURI());
String qs = req.getQueryString();
if (qs != null) {
sb.append('?').append(qs);
}
Cat.logEvent(type, type + ".Method", Message.SUCCESS, sb.toString());
}
},
LOG_SPAN {
public static final char SPLIT = '/';
private void customizeStatus(Transaction t, HttpServletRequest req) {
Object catStatus = req.getAttribute(CatConstants.CAT_STATE);
if (catStatus != null) {
t.setStatus(catStatus.toString());
} else {
t.setStatus(Message.SUCCESS);
}
}
private void customizeUri(Transaction t, HttpServletRequest req) {
if (t instanceof DefaultTransaction) {
Object catPageType = req.getAttribute(CatConstants.CAT_PAGE_TYPE);
if (catPageType instanceof String) {
((DefaultTransaction) t).setType(catPageType.toString());
}
Object catPageUri = req.getAttribute(CatConstants.CAT_PAGE_URI);
if (catPageUri instanceof String) {
((DefaultTransaction) t).setName(catPageUri.toString());
}
}
}
private String getRequestURI(HttpServletRequest req) {
String url = req.getRequestURI();
int length = url.length();
StringBuilder sb = new StringBuilder(length);
for (int index = 0; index < length;) {
char c = url.charAt(index);
if (c == SPLIT && index < length - 1) {
sb.append(c);
StringBuilder nextSection = new StringBuilder();
boolean isNumber = false;
boolean first = true;
for (int j = index + 1; j < length; j++) {
char next = url.charAt(j);
if ((first || isNumber == true) && next != SPLIT) {
isNumber = isNumber(next);
first = false;
}
if (next == SPLIT) {
if (isNumber) {
sb.append("{num}");
} else {
sb.append(nextSection.toString());
}
index = j;
break;
} else if (j == length - 1) {
if (isNumber) {
sb.append("{num}");
} else {
nextSection.append(next);
sb.append(nextSection.toString());
}
index = j + 1;
break;
} else {
nextSection.append(next);
}
}
} else {
sb.append(c);
index++;
}
}
return sb.toString();
}
@Override
public void handle(Context ctx) throws IOException, ServletException {
HttpServletRequest req = ctx.getRequest();
Transaction t = Cat.newTransaction(ctx.getType(), getRequestURI(req));
try {
ctx.handle();
customizeStatus(t, req);
} catch (ServletException e) {
t.setStatus(e);
Cat.logError(e);
throw e;
} catch (IOException e) {
t.setStatus(e);
Cat.logError(e);
throw e;
} catch (Throwable e) {
t.setStatus(e);
Cat.logError(e);
throw new RuntimeException(e);
} finally {
customizeUri(t, req);
t.complete();
}
}
private boolean isNumber(char c) {
return (c >= '0' && c <= '9') || c == '.' || c == '-' || c == ',';
}
};
}
protected static class Context {
private FilterChain m_chain;
private List<Handler> m_handlers;
private int m_index;
private HttpServletRequest m_request;
private HttpServletResponse m_response;
private boolean m_top;
private String m_type;
public Context(HttpServletRequest request, HttpServletResponse response, FilterChain chain, List<Handler> handlers) {
m_request = request;
m_response = response;
m_chain = chain;
m_handlers = handlers;
}
public HttpServletRequest getRequest() {
return m_request;
}
public HttpServletResponse getResponse() {
return m_response;
}
public String getType() {
return m_type;
}
public void handle() throws IOException, ServletException {
if (m_index < m_handlers.size()) {
Handler handler = m_handlers.get(m_index++);
handler.handle(this);
} else {
m_chain.doFilter(m_request, m_response);
}
}
public boolean isTop() {
return m_top;
}
public void setTop(boolean top) {
m_top = top;
}
public void setType(String type) {
m_type = type;
}
}
protected static interface Handler {
public void handle(Context ctx) throws IOException, ServletException;
}
}
package com.dianping.cat.servlet;
import java.io.File;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import com.dianping.cat.Cat;
public class CatListener implements ServletContextListener {
@Override
public void contextDestroyed(ServletContextEvent sce) {
Cat.destroy();
}
@Override
public void contextInitialized(ServletContextEvent sce) {
ServletContext ctx = sce.getServletContext();
String catClientXml = ctx.getInitParameter("cat-client-xml");
if (catClientXml == null) {
catClientXml = new File(Cat.getCatHome(), "client.xml").getPath();
}
Cat.initialize(new File(catClientXml));
}
}
package com.dianping.cat.servlet;
import java.io.IOException;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
@Deprecated
public class CdnFilter implements Filter {
@Override
public void destroy() {
}
@Override
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException,
ServletException {
chain.doFilter(request, response);
}
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
}
package com.dianping.cat.status;
import java.util.HashMap;
import java.util.Map;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.Initializable;
import org.codehaus.plexus.personality.plexus.lifecycle.phase.InitializationException;
public class HeartbeatExtenstion implements StatusExtension, Initializable {
@Override
public String getId() {
return "MyTestId";
}
@Override
public String getDescription() {
return "MyDescription";
}
@Override
public Map<String, String> getProperties() {
Map<String, String> maps = new HashMap<String, String>();
maps.put("key1", String.valueOf(1));
maps.put("key2", String.valueOf(2));
maps.put("key3", String.valueOf(3));
return maps;
}
@Override
public void initialize() throws InitializationException {
StatusExtensionRegister.getInstance().register(this);
}
}
package com.dianping.cat.status;
import java.util.Map;
public interface StatusExtension {
public String getId();
public String getDescription();
public Map<String, String> getProperties();
}
package com.dianping.cat.status;
import java.util.ArrayList;
import java.util.List;
public class StatusExtensionRegister {
public static StatusExtensionRegister getInstance() {
return s_register;
}
private List<StatusExtension> m_extensions = new ArrayList<StatusExtension>();
public static StatusExtensionRegister s_register = new StatusExtensionRegister();
private StatusExtensionRegister() {
}
public List<StatusExtension> getStatusExtension() {
synchronized (this) {
return m_extensions;
}
}
public void register(StatusExtension monitor) {
synchronized (this) {
m_extensions.add(monitor);
}
}
public void unregister(StatusExtension monitor) {
synchronized (this) {
m_extensions.remove(monitor);
}
}
}
package com.dianping.cat.status;
import java.io.File;
import java.lang.management.GarbageCollectorMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.lang.management.MemoryPoolMXBean;
import java.lang.management.OperatingSystemMXBean;
import java.lang.management.RuntimeMXBean;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Date;
import java.util.List;
import java.util.TreeMap;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.status.model.entity.DiskInfo;
import com.dianping.cat.status.model.entity.DiskVolumeInfo;
import com.dianping.cat.status.model.entity.Extension;
import com.dianping.cat.status.model.entity.GcInfo;
import com.dianping.cat.status.model.entity.MemoryInfo;
import com.dianping.cat.status.model.entity.MessageInfo;
import com.dianping.cat.status.model.entity.OsInfo;
import com.dianping.cat.status.model.entity.RuntimeInfo;
import com.dianping.cat.status.model.entity.StatusInfo;
import com.dianping.cat.status.model.entity.ThreadsInfo;
import com.dianping.cat.status.model.transform.BaseVisitor;
public class StatusInfoCollector extends BaseVisitor {
private MessageStatistics m_statistics;
private boolean m_dumpLocked;
private String m_jars;
private String m_dataPath = "/data";
private StatusInfo m_statusInfo;
public StatusInfoCollector(MessageStatistics statistics, String jars) {
m_statistics = statistics;
m_jars = jars;
}
private int countThreadsByPrefix(ThreadInfo[] threads, String... prefixes) {
int count = 0;
for (ThreadInfo thread : threads) {
for (String prefix : prefixes) {
if (thread.getThreadName().startsWith(prefix)) {
count++;
}
}
}
return count;
}
private int countThreadsBySubstring(ThreadInfo[] threads, String... substrings) {
int count = 0;
for (ThreadInfo thread : threads) {
for (String str : substrings) {
if (thread.getThreadName().contains(str)) {
count++;
}
}
}
return count;
}
private String getThreadDump(ThreadInfo[] threads) {
StringBuilder sb = new StringBuilder(32768);
int index = 1;
TreeMap<String, ThreadInfo> sortedThreads = new TreeMap<String, ThreadInfo>();
for (ThreadInfo thread : threads) {
sortedThreads.put(thread.getThreadName(), thread);
}
for (ThreadInfo thread : sortedThreads.values()) {
sb.append(index++).append(": ").append(thread);
}
return sb.toString();
}
boolean isInstanceOfInterface(Class<?> clazz, String interfaceName) {
if (clazz == Object.class) {
return false;
} else if (clazz.getName().equals(interfaceName)) {
return true;
}
Class<?>[] interfaceclasses = clazz.getInterfaces();
for (Class<?> interfaceClass : interfaceclasses) {
if (isInstanceOfInterface(interfaceClass, interfaceName)) {
return true;
}
}
return isInstanceOfInterface(clazz.getSuperclass(), interfaceName);
}
public StatusInfoCollector setDumpLocked(boolean dumpLocked) {
m_dumpLocked = dumpLocked;
return this;
}
@Override
public void visitDisk(DiskInfo disk) {
File[] roots = File.listRoots();
if (roots != null) {
for (File root : roots) {
disk.addDiskVolume(new DiskVolumeInfo(root.getAbsolutePath()));
}
}
File data = new File(m_dataPath);
if (data.exists()) {
disk.addDiskVolume(new DiskVolumeInfo(data.getAbsolutePath()));
}
super.visitDisk(disk);
}
@Override
public void visitDiskVolume(DiskVolumeInfo diskVolume) {
Extension diskExtension = m_statusInfo.findOrCreateExtension("Disk");
File volume = new File(diskVolume.getId());
diskVolume.setTotal(volume.getTotalSpace());
diskVolume.setFree(volume.getFreeSpace());
diskVolume.setUsable(volume.getUsableSpace());
diskExtension.findOrCreateExtensionDetail(diskVolume.getId() + " Free").setValue(volume.getFreeSpace());
}
@Override
public void visitMemory(MemoryInfo memory) {
MemoryMXBean bean = ManagementFactory.getMemoryMXBean();
Runtime runtime = Runtime.getRuntime();
memory.setMax(runtime.maxMemory());
memory.setTotal(runtime.totalMemory());
memory.setFree(runtime.freeMemory());
memory.setHeapUsage(bean.getHeapMemoryUsage().getUsed());
memory.setNonHeapUsage(bean.getNonHeapMemoryUsage().getUsed());
List<GarbageCollectorMXBean> beans = ManagementFactory.getGarbageCollectorMXBeans();
Extension gcExtension = m_statusInfo.findOrCreateExtension("GC");
for (GarbageCollectorMXBean mxbean : beans) {
if (mxbean.isValid()) {
GcInfo gc = new GcInfo();
String name = mxbean.getName();
long count = mxbean.getCollectionCount();
gc.setName(name);
gc.setCount(count);
gc.setTime(mxbean.getCollectionTime());
memory.addGc(gc);
gcExtension.findOrCreateExtensionDetail(name + "Count").setValue(count);
gcExtension.findOrCreateExtensionDetail(name + "Time").setValue(mxbean.getCollectionTime());
}
}
Extension heapUsage = m_statusInfo.findOrCreateExtension("JVMHeap");
for (MemoryPoolMXBean mpBean : ManagementFactory.getMemoryPoolMXBeans()) {
long count = mpBean.getUsage().getUsed();
String name = mpBean.getName();
heapUsage.findOrCreateExtensionDetail(name).setValue(count);
}
super.visitMemory(memory);
}
@Override
public void visitMessage(MessageInfo message) {
Extension catExtension = m_statusInfo.findOrCreateExtension("CatUsage");
if (m_statistics != null) {
catExtension.findOrCreateExtensionDetail("Produced").setValue(m_statistics.getProduced());
catExtension.findOrCreateExtensionDetail("Overflowed").setValue(m_statistics.getOverflowed());
catExtension.findOrCreateExtensionDetail("Bytes").setValue(m_statistics.getBytes());
}
}
@Override
public void visitOs(OsInfo os) {
Extension systemExtension = m_statusInfo.findOrCreateExtension("System");
OperatingSystemMXBean bean = ManagementFactory.getOperatingSystemMXBean();
os.setArch(bean.getArch());
os.setName(bean.getName());
os.setVersion(bean.getVersion());
os.setAvailableProcessors(bean.getAvailableProcessors());
os.setSystemLoadAverage(bean.getSystemLoadAverage());
systemExtension.findOrCreateExtensionDetail("LoadAverage").setValue(bean.getSystemLoadAverage());
// for Sun JDK
if (isInstanceOfInterface(bean.getClass(), "com.sun.management.OperatingSystemMXBean")) {
com.sun.management.OperatingSystemMXBean b = (com.sun.management.OperatingSystemMXBean) bean;
os.setTotalPhysicalMemory(b.getTotalPhysicalMemorySize());
os.setFreePhysicalMemory(b.getFreePhysicalMemorySize());
os.setTotalSwapSpace(b.getTotalSwapSpaceSize());
os.setFreeSwapSpace(b.getFreeSwapSpaceSize());
os.setProcessTime(b.getProcessCpuTime());
os.setCommittedVirtualMemory(b.getCommittedVirtualMemorySize());
systemExtension.findOrCreateExtensionDetail("FreePhysicalMemory").setValue(b.getFreePhysicalMemorySize());
systemExtension.findOrCreateExtensionDetail("FreeSwapSpaceSize").setValue(b.getFreeSwapSpaceSize());
}
m_statusInfo.addExtension(systemExtension);
}
@Override
public void visitRuntime(RuntimeInfo runtime) {
RuntimeMXBean bean = ManagementFactory.getRuntimeMXBean();
runtime.setStartTime(bean.getStartTime());
runtime.setUpTime(bean.getUptime());
runtime.setJavaClasspath(m_jars);
runtime.setJavaVersion(System.getProperty("java.version"));
runtime.setUserDir(System.getProperty("user.dir"));
runtime.setUserName(System.getProperty("user.name"));
}
@Override
public void visitStatus(StatusInfo status) {
status.setTimestamp(new Date());
status.setOs(new OsInfo());
status.setDisk(new DiskInfo());
status.setRuntime(new RuntimeInfo());
status.setMemory(new MemoryInfo());
status.setThread(new ThreadsInfo());
status.setMessage(new MessageInfo());
m_statusInfo = status;
super.visitStatus(status);
}
@Override
public void visitThread(ThreadsInfo thread) {
Extension frameworkThread = m_statusInfo.findOrCreateExtension("FrameworkThread");
ThreadMXBean bean = ManagementFactory.getThreadMXBean();
bean.setThreadContentionMonitoringEnabled(true);
ThreadInfo[] threads;
if (m_dumpLocked) {
threads = bean.dumpAllThreads(true, true);
} else {
threads = bean.dumpAllThreads(false, false);
}
thread.setCount(bean.getThreadCount());
thread.setDaemonCount(bean.getDaemonThreadCount());
thread.setPeekCount(bean.getPeakThreadCount());
thread.setTotalStartedCount((int) bean.getTotalStartedThreadCount());
int jbossThreadsCount = countThreadsByPrefix(threads, "http-", "catalina-exec-");
int jettyThreadsCount = countThreadsBySubstring(threads, "@qtp");
thread.setDump(getThreadDump(threads));
frameworkThread.findOrCreateExtensionDetail("HttpThread").setValue(jbossThreadsCount + jettyThreadsCount);
frameworkThread.findOrCreateExtensionDetail("CatThread").setValue(countThreadsByPrefix(threads, "Cat-"));
frameworkThread.findOrCreateExtensionDetail("PigeonThread").setValue(
countThreadsByPrefix(threads, "Pigeon-", "DPSF-", "Netty-", "Client-ResponseProcessor"));
frameworkThread.findOrCreateExtensionDetail("ActiveThread").setValue(bean.getThreadCount());
frameworkThread.findOrCreateExtensionDetail("StartedThread").setValue(bean.getTotalStartedThreadCount());
m_statusInfo.addExtension(frameworkThread);
}
}
\ No newline at end of file \ No newline at end of file
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
This diff could not be displayed because it is too large.
This diff could not be displayed because it is too large.
No preview for this file type
No preview for this file type
No preview for this file type
No preview for this file type
Styling with Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!