对服务做一个简单的监控告警
背景
在日常工作中,除了基本开发需求外,我们往往还承担着线上系统的维护责任,但线上系统7*24一刻不停歇,用户随时都可能在访问使用,而系统故障却总会在不经意间产生,而我们不可能实时关注。
因此,一个监控系统就显得尤为重要了。但并不是每个公司都有足够人力去开发、甚至连个专职的运维都没有,更别谈一个成熟的监控体系了!
运维可以没有,但监控系统不可缺,哪怕是一个简陋版,聊胜于无。虽然很多云服务商也提供了服务器相关的监控和告警,但在应用层面上是远远不能满足我们的需求的。
需求
对线上系统、中间件、数据库等进行监控,当发现系统挂掉或响应超时及时给相关人员发送告警信息,便于及时排查处理。
开发
虽然市面上有很多非常成熟的监控系统,但本着满足需求即可的原则,我们也可以自己快速搭建一款简易的监控系统;使用xxl
调度任务来配置定时(30-60s
)检测功能,并在异常后给相关人员发送告警短信、邮件信息;配合服务器监控,也能满足当前需求。
配置xxl
引入xxl依赖
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.0.2</version>
</dependency>
添加xxl配置文件
xxl:
job:
admin:
# 调度中心地址
addresses: http://127.0.0.1:8080/xxl-job-admin
executor:
appName: monitor
ip:
port: 9998
accessToken:
# 日志路径,为空默认
logPath: /data/appLogs/xxl-job/jobHandler
# 日志保存天数
logRetentionDays: -1
编写xxl配置类
@Configuration
public class XxlJobConfig implements Serializable {
private static final long serialVersionUID = -614269997749854012L;
private static final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses:}")
private String adminAddresses;
@Value("${xxl.job.executor.appName:}")
private String appName;
@Value("${xxl.job.executor.ip:}")
private String ip;
@Value("${xxl.job.executor.port:}")
private int port;
@Value("${xxl.job.accessToken:}")
private String accessToken;
@Value("${xxl.job.executor.logPath:}")
private String logPath;
@Value("${xxl.job.executor.logRetentionDays:}")
private int logRetentionDays;
@Bean(initMethod = "start", destroyMethod = "destroy")
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppName(appName);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
}
添加定时监测任务
@Component
@JobHandler("healthCheckJobHandler")
public class HealthCheckJobHandler extends IJobHandler implements Serializable {
private static final long serialVersionUID = 6585851871708093092L;
private static final Logger logger = LoggerFactory.getLogger(HealthCheckJobHandler.class);
@Autowired private HealthCheckService healthCheckService;
@Override
public ReturnT<String> execute(String s) {
logger.info("do healthCheckJobHandler start");
// 执行调度任务。。。。。
logger.info("do healthCheckJobHandler end");
return SUCCESS;
}
}
管理控制台配置调度任务
添加被监控服务
统一配置需要被监控的服务配置信息
health:
check:
admin: http://192.168.1.1:8080/admin
blog: http://192.168.1.1:8081/blog
zookeeper: 192.168.1.74:2181,192.168.1.250:2181
dataBase: '{"192.168.1.1":3306,"192.168.1.1":6379}'
@RefreshScope
@Configuration
public class HealthCheckConfig implements Serializable {
private static final long serialVersionUID = 5881482877420605006L;
@Value("#{'${health.check.admin:}'.split(',')}")
private List<String> admin;
@Value("#{'${health.check.blog:}'.split(',')}")
private List<String> blog;
@Value("#{'${health.check.zookeeper:}'.split(',')}")
private List<String> zookeeperServers;
@Value("#{${health.check.dataBase:}}")
private Map<String, Integer> dbServers;
// 省略Getter、Setter
}
数据库 + Redis
我们对所有的 DB
库和 Redis
进行端口探测,当探测到端口存在即表示正常,否则表示可能存在异常。并将异常信息记录存 Map
统一返回。
@Component
public class DataBaseHealthIndicator implements HealthIndicator, Serializable {
private static final long serialVersionUID = -4346980726558885426L;
private static final Logger logger = LoggerFactory.getLogger(DataBaseHealthIndicator.class);
@Autowired private HealthCheckConfig healthCheckConfig;
@Override
public Health health() {
Map<String, Integer> map = Maps.newHashMap();
healthCheckConfig.getDbServers().forEach((ip, port) -> {
Socket socket = null;
try {
socket = new Socket();
socket.connect(new InetSocketAddress(ip, port), 1000);
if (socket.isConnected()) {
logger.info("ip {} port {} connection success ", ip, port);
}else {
map.put(ip, port);
}
} catch (Exception e) {
map.put(ip, port);
logger.error("ip {} port {} connection error ", ip, port, e);
}finally {
try {
if(Objects.nonNull(socket)){
socket.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
});
if(map.isEmpty()){
return Health.up().build();
}
return Health.down().withDetails(map).build();
}
}
调度告警判断方法基本类似,这里就以 DB
为例;其它告警判断不一一列出:
@Async
@Override
public void dataBaseHealthCheck() {
try {
Health health = dataBaseHealthIndicator.health();
if(health.getStatus().equals(Status.DOWN)){
Map<String, Object> details = health.getDetails();
// 发送告警
}
}catch (Exception e){
logger.error("dataBase 健康检测异常", e);
// 发送告警
}
}
ZooKeeper监测
引入依赖
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.8.1</version>
</dependency>
编写监测代码,每次尝试创建 ZooKeeper
连接并设置超时时间,在超时时间内等待连接并判断连接状态,并将连接异常的信息返回。
@Component
public class ZooKeeperHealthIndicator implements HealthIndicator, Serializable {
private static final long serialVersionUID = -3760146911582231552L;
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHealthIndicator.class);
@Autowired private HealthCheckConfig healthCheckConfig;
private static final int SESSION_TIMEOUT = 2500;
@Override
public Health health() {
Map<String, String> map = Maps.newHashMap();
List<String> servers = healthCheckConfig.getZookeeperServers();
final CountDownLatch connectedSignal = new CountDownLatch(servers.size());
servers.forEach(server -> {
map.put(server, server);
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(server, SESSION_TIMEOUT, event -> {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
map.remove(server);
}
});
connectedSignal.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (IOException | InterruptedException e) {
logger.error("ZooKeeperHealthIndicator error:",e);
} finally {
if (Objects.nonNull(zooKeeper)) {
try {
zooKeeper.close();
} catch (Exception e) {
logger.error("ZooKeeperHealthIndicator close error:",e);
}
}
}
});
if(map.isEmpty()){
return Health.up().build();
}
return Health.down().withDetails(map).build();
}
}
服务监测
Springboot服务监测
Springboot
服务我们可以直接用Feign
调用服务自身提供的健康检查接口进行监测,它会返回所有和本服务有关联关系的状态数据,包括但不限于数据库、redis、磁盘信息、注册中心等等,详细如下:
{
"status": "UP",
"components": {
"db": {
"status": "UP",
"components": {
"primary": {
"status": "UP",
"details": {
"database": "MySQL",
"result": 1,
"validationQuery": "/* ping */ SELECT 1"
}
},
"secondary": {
"status": "UP",
"details": {
"database": "MySQL",
"result": 1,
"validationQuery": "/* ping */ SELECT 1"
}
}
}
},
"discoveryComposite": {
"status": "UP",
"components": {
"discoveryClient": {
"status": "UP",
"details": {
"services": [
"blog"
]
}
},
"eureka": {
"description": "Remote status from Eureka server",
"status": "UP",
"details": {
"applications": {
"BLOG": 1
}
}
}
}
},
"diskSpace": {
"status": "UP",
"details": {
"total": 23069986816,
"free": 5403578368,
"threshold": 10485760
}
},
"hystrix": {
"status": "UP"
},
"ping": {
"status": "UP"
},
"redis": {
"status": "UP",
"details": {
"version": "5.0.3"
}
},
"refreshScope": {
"status": "UP"
}
}
}
MVC 服务监测
MVC并没有像Springboot一样提供服务健康状态的接口,不过我们可以模拟Springboot提供一个接口供内外部调用;如下:
@Component
public class HealthCheckService implements Serializable {
private static final long serialVersionUID = 3138618810158258447L;
private static final Logger logger = LoggerFactory.getLogger(HealthCheckService.class);
@Autowired protected HealthCheckMapper healthCheckMapper;
@Autowired protected RedisTemplate redisTemplate;
public Health healthCheck() {
Health health = new Health();
health.setStatus("UP");
Map<String, Object> map = Maps.newHashMap();
try {
healthCheckMapper.healthCheck();
}catch (Exception e){
map.put(Type.MYSQL.name(), Type.MYSQL.name());
logger.error("MYSQL exception ", e);
}
try {
redisTemplate.hasKey("");
}catch (Exception e){
map.put(Type.REDIS.name(), Type.REDIS.name());
logger.error("REDIS exception ", e);
}
File[] rootFiles = File.listRoots();
if (Objects.nonNull(rootFiles) && rootFiles.length != 0) {
long total = 0, free = 0;
for (File file : rootFiles) {
total += file.getTotalSpace(); // 总量
free += file.getUsableSpace(); // 未用
}
long user = total - free; // 已用
double userRate = total == 0 ? 0 : ((double) user / total);// 利用率
logger.info("healthIndicator... total {}, free {} userRate {}", total, free, userRate);
if(userRate > 0.95){
map.put(Type.STORAGE.name(), userRate);
}
}
if (!map.isEmpty()){
health.setStatus("DOWN");
health.setDetails(map);
}
return health;
}
public enum Type{
MYSQL,
REDIS,
STORAGE
}
}
DB
Redis
随便写一个查询,只要正常返回即表示当前连接正常;
<select id="healthCheck" resultType="java.util.Date">select NOW()</select>
redisTemplate.hasKey("");
RocketMQ监测
添加RocketMQ
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-tools</artifactId>
<version>4.9.3</version>
</dependency>
添加RocketMQ
配置
#rocketMq配置
rocketmq:
#nameservice服务器地址(多个以英文逗号隔开)
name-server: 192.168.1.1:9876
producer:
group: test
topic: test-topic
tag: test-tags
添加配置
@Component
public class RocketConfig implements Serializable {
private static final long serialVersionUID = 9156674410666469124L;
@Value("${rocketmq.name-server}")
private String nameServ;
@Bean
public DefaultMQAdminExt init() throws MQClientException {
DefaultMQAdminExt rocketDefaultMQAdminExt = new DefaultMQAdminExt();
rocketDefaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
rocketDefaultMQAdminExt.setNamesrvAddr(nameServ);
rocketDefaultMQAdminExt.start();
return rocketDefaultMQAdminExt;
}
}
监测RocketMQ
状态并监测队列长度,判断是否消息堆积
@Component
public class RocketHealthIndicator implements HealthIndicator, Serializable {
private static final long serialVersionUID = 330718359948866493L;
private static final Logger logger = LoggerFactory.getLogger(RocketHealthIndicator.class);
@Autowired private DefaultMQAdminExt defaultMQAdminExt;
@Value("${rocketmq.producer.group}")
private String rocketGroup;
@Override
public Health health() {
logger.info("RocketHealthIndicator... ");
try {
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(rocketGroup);
long count = consumeStats.getOffsetTable().size();
if(count > 10000){
return Health.down().build();
}
} catch (Exception e) {
return Health.down().build();
}
return Health.up().build();
}
}
Kafka监测
引入Kafka
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.2.RELEASE</version>
</dependency>
kafka
配置
##kafka serverIp 多个用逗号隔开
kafka:
servers: 192.168.1.1:9092
监测kafka
集群是否正常
@Component
public class KafkaHealthIndicator implements HealthIndicator, Serializable {
private static final Logger logger = LoggerFactory.getLogger(KafkaHealthIndicator.class);
private static final long serialVersionUID = -5671848530433251605L;
@Value("#{'${kafka.servers:}'.split(',')}")
private List<String> servers;
private Properties initProperties(String server){
Properties properties = new Properties();
properties.put("bootstrap.servers", server);
properties.put("retries", 0); // 重试次数
properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); // 请求超时时间
properties.put("batch.size", 16384); // 批量发送大小
properties.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
@Override
public Health health() {
Map<String, String> map = Maps.newHashMap();
servers.forEach(server -> {
AdminClient client = null;
try {
client = AdminClient.create(this.initProperties(server));
/* 查询集群信息 */
DescribeClusterResult result = client.describeCluster();
result.clusterId().get(5, TimeUnit.SECONDS);
logger.info("{} Kafka is healthy.", server);
} catch (Exception e) {
logger.error("{} Kafka is not healthy.", server, e);
map.put(server, server);
} finally {
if(Objects.nonNull(client)){
client.close();
}
}
});
if(map.isEmpty()){
return Health.up().build();
}
return Health.down().withDetails(map).build();
}
}
- 本文标签: java
- 本文链接: https://www.58cto.cn/article/28
- 版权声明: 本文由程序言原创发布, 非商业性可自由转载、引用,但需署名作者且注明文章出处:程序言 》 对服务做一个简单的监控告警 - https://www.58cto.cn/article/28