原创

对服务做一个简单的监控告警

温馨提示:
本文最后更新于 2023年11月23日,已超过 426 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

背景

在日常工作中,除了基本开发需求外,我们往往还承担着线上系统的维护责任,但线上系统7*24一刻不停歇,用户随时都可能在访问使用,而系统故障却总会在不经意间产生,而我们不可能实时关注。

因此,一个监控系统就显得尤为重要了。但并不是每个公司都有足够人力去开发、甚至连个专职的运维都没有,更别谈一个成熟的监控体系了!

运维可以没有,但监控系统不可缺,哪怕是一个简陋版,聊胜于无。虽然很多云服务商也提供了服务器相关的监控和告警,但在应用层面上是远远不能满足我们的需求的。

需求

对线上系统、中间件、数据库等进行监控,当发现系统挂掉或响应超时及时给相关人员发送告警信息,便于及时排查处理。

开发

虽然市面上有很多非常成熟的监控系统,但本着满足需求即可的原则,我们也可以自己快速搭建一款简易的监控系统;使用xxl调度任务来配置定时(30-60s)检测功能,并在异常后给相关人员发送告警短信、邮件信息;配合服务器监控,也能满足当前需求。

配置xxl

引入xxl依赖

<dependency>
    <groupId>com.xuxueli</groupId>
    <artifactId>xxl-job-core</artifactId>
    <version>2.0.2</version>
</dependency>

添加xxl配置文件

xxl:
  job:
    admin:
      # 调度中心地址
      addresses: http://127.0.0.1:8080/xxl-job-admin
    executor:
      appName: monitor
      ip:
      port: 9998
      accessToken:
      # 日志路径,为空默认
      logPath: /data/appLogs/xxl-job/jobHandler
      # 日志保存天数
      logRetentionDays: -1

编写xxl配置类

@Configuration
public class XxlJobConfig implements Serializable {
    private static final long serialVersionUID = -614269997749854012L;
    private static final Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    @Value("${xxl.job.admin.addresses:}")
    private String adminAddresses;
    @Value("${xxl.job.executor.appName:}")
    private String appName;
    @Value("${xxl.job.executor.ip:}")
    private String ip;
    @Value("${xxl.job.executor.port:}")
    private int port;
    @Value("${xxl.job.accessToken:}")
    private String accessToken;
    @Value("${xxl.job.executor.logPath:}")
    private String logPath;
    @Value("${xxl.job.executor.logRetentionDays:}")
    private int logRetentionDays;

    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobSpringExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
        xxlJobSpringExecutor.setAppName(appName);
        xxlJobSpringExecutor.setIp(ip);
        xxlJobSpringExecutor.setPort(port);
        xxlJobSpringExecutor.setAccessToken(accessToken);
        xxlJobSpringExecutor.setLogPath(logPath);
        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobSpringExecutor;
    }
}

添加定时监测任务

@Component
@JobHandler("healthCheckJobHandler")
public class HealthCheckJobHandler extends IJobHandler implements Serializable {
    private static final long serialVersionUID = 6585851871708093092L;
    private static final Logger logger = LoggerFactory.getLogger(HealthCheckJobHandler.class);
    @Autowired private HealthCheckService healthCheckService;

    @Override
    public ReturnT<String> execute(String s) {
        logger.info("do healthCheckJobHandler start");

        // 执行调度任务。。。。。

        logger.info("do healthCheckJobHandler end");
        return SUCCESS;
    }
}

管理控制台配置调度任务

配置调度任务

添加被监控服务

统一配置需要被监控的服务配置信息

health:
  check:
    admin: http://192.168.1.1:8080/admin
    blog: http://192.168.1.1:8081/blog
    zookeeper: 192.168.1.74:2181,192.168.1.250:2181
    dataBase: '{"192.168.1.1":3306,"192.168.1.1":6379}'
@RefreshScope
@Configuration
public class HealthCheckConfig implements Serializable {
    private static final long serialVersionUID = 5881482877420605006L;
    @Value("#{'${health.check.admin:}'.split(',')}")
    private List<String> admin;
    @Value("#{'${health.check.blog:}'.split(',')}")
    private List<String> blog;

    @Value("#{'${health.check.zookeeper:}'.split(',')}")
    private List<String> zookeeperServers;

    @Value("#{${health.check.dataBase:}}")
    private Map<String, Integer> dbServers;

    // 省略Getter、Setter
}

数据库 + Redis

我们对所有的 DB 库和 Redis 进行端口探测,当探测到端口存在即表示正常,否则表示可能存在异常。并将异常信息记录存 Map 统一返回。

@Component
public class DataBaseHealthIndicator implements HealthIndicator, Serializable {
    private static final long serialVersionUID = -4346980726558885426L;
    private static final Logger logger = LoggerFactory.getLogger(DataBaseHealthIndicator.class);
    @Autowired private HealthCheckConfig healthCheckConfig;

    @Override
    public Health health() {
        Map<String, Integer> map = Maps.newHashMap();
        healthCheckConfig.getDbServers().forEach((ip, port) -> {
            Socket socket = null;
            try {
                socket = new Socket();
                socket.connect(new InetSocketAddress(ip, port), 1000);
                if (socket.isConnected()) {
                    logger.info("ip {} port {} connection success ", ip, port);
                }else {
                    map.put(ip, port);
                }
            } catch (Exception e) {
                map.put(ip, port);
                logger.error("ip {} port {} connection error ", ip, port, e);
            }finally {
                try {
                    if(Objects.nonNull(socket)){
                        socket.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        });

        if(map.isEmpty()){
            return Health.up().build();
        }
        return Health.down().withDetails(map).build();
    }
}

调度告警判断方法基本类似,这里就以 DB 为例;其它告警判断不一一列出:

@Async
@Override
public void dataBaseHealthCheck() {
    try {
        Health health = dataBaseHealthIndicator.health();
        if(health.getStatus().equals(Status.DOWN)){
            Map<String, Object> details = health.getDetails();
            // 发送告警
        }
    }catch (Exception e){
        logger.error("dataBase 健康检测异常", e);
        // 发送告警
    }
}

ZooKeeper监测

引入依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.8.1</version>
</dependency>

编写监测代码,每次尝试创建 ZooKeeper 连接并设置超时时间,在超时时间内等待连接并判断连接状态,并将连接异常的信息返回。

@Component
public class ZooKeeperHealthIndicator implements HealthIndicator, Serializable {
    private static final long serialVersionUID = -3760146911582231552L;
    private static final Logger logger = LoggerFactory.getLogger(ZooKeeperHealthIndicator.class);
    @Autowired private HealthCheckConfig healthCheckConfig;
    private static final int SESSION_TIMEOUT = 2500;

    @Override
    public Health health() {
        Map<String, String> map = Maps.newHashMap();
        List<String> servers = healthCheckConfig.getZookeeperServers();
        final CountDownLatch connectedSignal = new CountDownLatch(servers.size());
        servers.forEach(server -> {
            map.put(server, server);
            ZooKeeper zooKeeper = null;
            try {
                zooKeeper = new ZooKeeper(server, SESSION_TIMEOUT, event -> {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        connectedSignal.countDown();
                        map.remove(server);
                    }
                });
                connectedSignal.await(SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
            } catch (IOException | InterruptedException e) {
                logger.error("ZooKeeperHealthIndicator error:",e);
            } finally {
                if (Objects.nonNull(zooKeeper)) {
                    try {
                        zooKeeper.close();
                    } catch (Exception e) {
                        logger.error("ZooKeeperHealthIndicator close error:",e);
                    }
                }
            }
        });

        if(map.isEmpty()){
            return Health.up().build();
        }
        return Health.down().withDetails(map).build();
    }
}

服务监测

Springboot服务监测

Springboot服务我们可以直接用Feign调用服务自身提供的健康检查接口进行监测,它会返回所有和本服务有关联关系的状态数据,包括但不限于数据库、redis、磁盘信息、注册中心等等,详细如下:

{
    "status": "UP",
    "components": {
        "db": {
            "status": "UP",
            "components": {
                "primary": {
                    "status": "UP",
                    "details": {
                        "database": "MySQL",
                        "result": 1,
                        "validationQuery": "/* ping */ SELECT 1"
                    }
                },
                "secondary": {
                    "status": "UP",
                    "details": {
                        "database": "MySQL",
                        "result": 1,
                        "validationQuery": "/* ping */ SELECT 1"
                    }
                }
            }
        },
        "discoveryComposite": {
            "status": "UP",
            "components": {
                "discoveryClient": {
                    "status": "UP",
                    "details": {
                        "services": [
                            "blog"
                        ]
                    }
                },
                "eureka": {
                    "description": "Remote status from Eureka server",
                    "status": "UP",
                    "details": {
                        "applications": {
                            "BLOG": 1
                        }
                    }
                }
            }
        },
        "diskSpace": {
            "status": "UP",
            "details": {
                "total": 23069986816,
                "free": 5403578368,
                "threshold": 10485760
            }
        },
        "hystrix": {
            "status": "UP"
        },
        "ping": {
            "status": "UP"
        },
        "redis": {
            "status": "UP",
            "details": {
                "version": "5.0.3"
            }
        },
        "refreshScope": {
            "status": "UP"
        }
    }
}

MVC 服务监测

MVC并没有像Springboot一样提供服务健康状态的接口,不过我们可以模拟Springboot提供一个接口供内外部调用;如下:

@Component
public class HealthCheckService implements Serializable {
    private static final long serialVersionUID = 3138618810158258447L;
    private static final Logger logger = LoggerFactory.getLogger(HealthCheckService.class);
    @Autowired protected HealthCheckMapper healthCheckMapper;
    @Autowired protected RedisTemplate redisTemplate;

    public Health healthCheck() {
        Health health = new Health();
        health.setStatus("UP");
        Map<String, Object> map = Maps.newHashMap();
        try {
            healthCheckMapper.healthCheck();
        }catch (Exception e){
            map.put(Type.MYSQL.name(), Type.MYSQL.name());
            logger.error("MYSQL exception ", e);
        }
        try {
            redisTemplate.hasKey("");
        }catch (Exception e){
            map.put(Type.REDIS.name(), Type.REDIS.name());
            logger.error("REDIS exception ", e);
        }
        File[] rootFiles = File.listRoots();
        if (Objects.nonNull(rootFiles) && rootFiles.length != 0) {
            long total = 0, free = 0;
            for (File file : rootFiles) {
                total += file.getTotalSpace(); // 总量
                free += file.getUsableSpace(); // 未用
            }
            long user = total - free; // 已用
            double userRate = total == 0 ? 0 : ((double) user / total);// 利用率
            logger.info("healthIndicator... total {}, free {} userRate {}", total, free, userRate);
            if(userRate > 0.95){
                map.put(Type.STORAGE.name(), userRate);
            }
        }
        if (!map.isEmpty()){
            health.setStatus("DOWN");
            health.setDetails(map);
        }
        return health;
    }

    public enum Type{
        MYSQL,
        REDIS,
        STORAGE
    }
}

DB Redis 随便写一个查询,只要正常返回即表示当前连接正常;

<select id="healthCheck" resultType="java.util.Date">select NOW()</select>

redisTemplate.hasKey("");

RocketMQ监测

添加RocketMQ依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-tools</artifactId>
    <version>4.9.3</version>
</dependency>

添加RocketMQ配置

#rocketMq配置
rocketmq:
  #nameservice服务器地址(多个以英文逗号隔开)
  name-server: 192.168.1.1:9876
  producer:
    group: test
    topic: test-topic
    tag: test-tags

添加配置

@Component
public class RocketConfig implements Serializable {
    private static final long serialVersionUID = 9156674410666469124L;
    @Value("${rocketmq.name-server}")
    private String nameServ;

    @Bean
    public DefaultMQAdminExt init() throws MQClientException {
        DefaultMQAdminExt rocketDefaultMQAdminExt = new DefaultMQAdminExt();
        rocketDefaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
        rocketDefaultMQAdminExt.setNamesrvAddr(nameServ);
        rocketDefaultMQAdminExt.start();
        return rocketDefaultMQAdminExt;
    }
}

监测RocketMQ状态并监测队列长度,判断是否消息堆积

@Component
public class RocketHealthIndicator implements HealthIndicator, Serializable {
    private static final long serialVersionUID = 330718359948866493L;
    private static final Logger logger = LoggerFactory.getLogger(RocketHealthIndicator.class);
    @Autowired private DefaultMQAdminExt defaultMQAdminExt;
    @Value("${rocketmq.producer.group}")
    private String rocketGroup;

    @Override
    public Health health() {
        logger.info("RocketHealthIndicator... ");
        try {
            ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(rocketGroup);
            long count = consumeStats.getOffsetTable().size();
            if(count > 10000){
                return Health.down().build();
            }
        } catch (Exception e) {
            return Health.down().build();
        }
        return Health.up().build();
    }
}

Kafka监测

引入Kafka依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.2.RELEASE</version>
</dependency>

kafka配置

##kafka serverIp 多个用逗号隔开
kafka:
  servers: 192.168.1.1:9092

监测kafka集群是否正常

@Component
public class KafkaHealthIndicator implements HealthIndicator, Serializable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaHealthIndicator.class);
    private static final long serialVersionUID = -5671848530433251605L;
    @Value("#{'${kafka.servers:}'.split(',')}")
    private List<String> servers;

    private Properties initProperties(String server){
        Properties properties = new Properties();
        properties.put("bootstrap.servers", server);
        properties.put("retries", 0); // 重试次数
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000); // 请求超时时间
        properties.put("batch.size", 16384); // 批量发送大小
        properties.put("buffer.memory", 33554432); // 缓存大小,根据本机内存大小配置
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }

    @Override
    public Health health() {
        Map<String, String> map = Maps.newHashMap();
        servers.forEach(server -> {
            AdminClient client = null;
            try {
                client = AdminClient.create(this.initProperties(server));
                /* 查询集群信息 */
                DescribeClusterResult result = client.describeCluster();
                result.clusterId().get(5, TimeUnit.SECONDS);
                logger.info("{} Kafka is healthy.", server);
            } catch (Exception e) {
                logger.error("{} Kafka is not healthy.", server, e);
                map.put(server, server);
            } finally {
                if(Objects.nonNull(client)){
                    client.close();
                }
            }
        });
        if(map.isEmpty()){
            return Health.up().build();
        }
        return Health.down().withDetails(map).build();
    }
}
正文到此结束
本文目录