0%

Java开发工程师、全栈开发工程师

亲爱的招聘团队:

如果软件工程师是一道菜,那我就是那种经过12年慢火熬制的老汤底——看起来平淡无奇,但一尝就知道功夫在里头。

我的技术栈就像一个资深玩家的技能树:主技能点满了Java和React,副技能解锁了Vue、Docker、Python和Go等。在我的职业旅程中,我善于将复杂问题分解为简单模块,轻松应对各种技术挑战。但我最厉害的外挂其实是曾经当过产品助理——这让我不仅能听懂产品经理说的”简单调整”背后隐藏的36个子需求,还能在技术与业务之间自如翻译,堪称”产品语言通”。

在我的12年职业生涯中,我从”这bug在本地没问题啊”进化到了”这需求有啥实际意义”再到”好的,我来搞定!”。带团队的经历让我明白,比起Debug代码,Debug人际关系才是真正的高难度挑战。所幸,我在这两方面都交出了不错的成绩单。

相信我的技术能力、产品思维和团队协作经验能为您的团队带来实质性的贡献。代码之外,我能够搭建开发者与产品、业务之间的桥梁,确保我们不只是在开发功能,而是在创造价值。我们一定能擦出技术的火花——毕竟,一个能理解产品、带过团队、写了12年代码还没秃顶的工程师,不是每天都能遇到的。

代码问候,
[软件开发特种兵]

联系方式:
电话:[18515068121]
邮箱:[gamehu@yeah.net]
Wechat:[GamehuDB]

P.S. 我的GitHub贡献图可能不够绿,但我的生产环境代码从不让服务器变红。

Java Development Engineer/Full Stack Development Engineer

Dear Hiring Team:

If software engineers were dishes, I’d be that slow-simmered stock that’s been cooking for 12 years — looking unassuming, but one taste reveals the expertise within.

My tech stack resembles a veteran player’s skill tree: maxed-out primary skills in Java and React, with unlocked secondary abilities in Vue, Docker, Python, Go, and more. Throughout my professional journey, I’ve honed the ability to break complex problems into simple modules, easily tackling various technical challenges. But my most powerful perk comes from my experience as a product assistant — I can decode the 36 hidden sub-requirements behind a product manager’s “simple adjustment” and fluently translate between technical and business languages, making me a true “product whisperer.”

During my 12-year career, I’ve evolved from “but the bug doesn’t appear on my local machine” to “what’s the actual purpose of this requirement” to “I’ll handle it!” My team leadership experience taught me that debugging human relationships is far more challenging than debugging code. Fortunately, I’ve managed to achieve good results in both areas.

I believe my technical abilities, product mindset, and team collaboration experience will bring substantial value to your team. Beyond coding, I can build bridges between developers, product teams, and business units, ensuring we’re not just developing features but creating value. We’ll definitely create technical sparks together — after all, an engineer who understands products, has led teams, and has written code for 12 years without going bald isn’t someone you meet every day.

Code regards,
[Software Development Special Forces]

Contact Information:
twitter:[Gamehu520]
email:[gamehu@yeah.net]
Wechat:[GamehuDB]

P.S. My GitHub contribution graph might not be very green, but my production code never turns servers red.

AI 第三篇

背景

知道MCP还是源于因为之前验证大模型集成时了解到的。因为当时后续计划做AI应用,增加saas平台的噱头的同时成为一个亮点功能,提升用户体验。

基础概念与架构设计

MCP(Model Context Protocol)是一个开放协议,它标准化了应用程序如何向大语言模型(LLMs)提供上下文。可以将MCP比作AI应用的USB-C接口。正如USB-C提供了一种标准化的方式,将你的设备连接到各种外围设备和配件,MCP 也提供了一种标准化的方式,将AI模型连接到不同的数据源和工具。

一、协议本质解构

MCP是一种客户端-服务器架构的数据访问协议,专为AI应用(如大语言模型)设计,其核心是为AI应用提供一种标准化方式来安全访问多种数据源。

关键组件

  1. MCP 主机:运行AI应用并发起数据请求,如聊天应用或IDE
  2. MCP 客户端:处理与 MCP 服务器的通信
  3. MCP 服务器:连接到各种数据源的轻量级程序
  4. 大型语言模型(LLM):分析问题并选择回答的 AI 模型
  5. 数据源:包括数据库、外部 API 等

MCP采用简洁的JSON格式进行通信,主要支持两类基本操作:

  • 发现操作:客户端识别服务器提供的能力
  • 执行操作:请求服务器执行特定工具来访问数据

二、核心作用剖析

  1. 访问标准化

    • 为不同类型的数据源提供统一的访问接口
    • 客户端无需了解每个数据源的具体访问细节
  2. 安全控制

    • 服务器明确声明其访问能力和权限范围
    • 支持基本的认证和授权机制
    • 客户端可以限制服务器的访问范围
  3. 工具扩展性

    • 服务器可以动态注册和提供各种工具
    • 客户端可以发现并使用这些工具
    • 支持从简单的文件读取到复杂的API调用等多种操作

三、基本工作流程

MCP的典型工作流程如下:内部实现可能会有多次循环

alt text
大体流程如下:

  1. 用户向 MCP 主机(如聊天应用或 IDE)提出问题
  2. 主机将问题发送给大型语言模型(LLM)进行分析
  3. LLM 确定需要使用哪些工具来回答问题
  4. 主机通过 MCP 客户端请求执行相应工具
  5. MCP 客户端向不同的 MCP 服务器发送工具执行请求
  6. MCP 服务器访问相应的数据源(数据库或外部 API)
  7. 数据源返回结果给 MCP 服务器,再传回客户端
  8. MCP 客户端汇总工具执行结果并返回给主机
  9. 主机将工具结果发送给 LLM 生成最终回答
  10. 最终回答显示给用户

四、现实应用场景

MCP适用于以下典型场景:

  1. 增强型AI聊天应用

    • 让聊天机器人能够访问用户本地文件和数据库
    • 使AI可以获取并引用真实、最新的信息
  2. 智能开发工具

    • IDE中的代码助手可以访问项目代码文件
    • 辅助工具可以查询API文档和相关资源
  3. 企业AI集成

    • 让AI应用安全地访问企业内部数据
    • 在保护敏感信息的同时提供个性化服务

五、协议现状与局限

当前MCP协议的特点与局限:

  1. 简洁性优先

    • 协议设计相对简单,专注于解决基本的数据访问问题
    • 尚未包含复杂的加密、动态路由等高级功能
  2. 开发阶段

    • 协议仍在发展中,标准可能会随时间演进
    • 生态系统正在逐步构建
  3. 基础功能聚焦

    • 当前主要聚焦于基础的数据访问能力
    • 缺乏高级的事务处理、分布式一致性等特性

结论

MCP代表了AI工具与数据源之间交互的一个重要标准化尝试。它为构建能够访问和利用各种数据的AI应用提供了基础架构,虽然相对简单,但解决了AI应用难以安全访问多样化数据的关键问题。随着协议的发展,MCP有潜力成为AI应用与数据源之间交互的重要标准,类似于HTTP对于web应用的意义。

但是目前MCP仍处于相对早期阶段,其真正的潜力和影响力将随着更多实现和应用的出现而逐步显现。

参考

https://www.anthropic.com/news/model-context-protocol
https://github.com/modelcontextprotocol
https://modelcontextprotocol.io/introduction
https://www.youtube.com/watch?v=sahuZMMXNpI
https://www.youtube.com/watch?v=eur8dUO9mvE
https://www.youtube.com/watch?v=kQmXtrmQ5Zg&t=2s

离职系列 第N篇
离职前一天,想想简历咋写,弄个排版出来,后续好造着整理下简历。纯属个人意见。我先自己试试,不好用再改。

我的观点

我觉得简历的本质是为了筛选而不是为了深入了解你。所以我认为简历:

  1. 首先得清爽。
  2. 然后得简明扼要。

不用写太多同时又能体现关键信息,就跟咱们做程序一样,设计时重点之一就是数据得便于各场景使用,便于使用很大的一个方面就是数据能各种过滤和组合,通常是现有简明的入口,如果要了解细节就得下钻,可能是一层或多层才能看透数据。那简历就像入口,如果对方有兴趣才会下钻,所以不应该想着一个简历就把自己交代的底裤都没有,一方面是内容太多不容易抓到重点,另一方面是太细了搞得人都没欲望深入探讨,咋约你面试呢?

所以简历得像咱们对待产品需求一样,你得解决需求场景同时兼顾一些扩展性。抽象出来一个模板适配通用场景,然后可根据具体特殊场景,再保证真实的前提下做一些微调,对其JD中的要求。


抽象了一个通用模板如下:

基本信息

  • 求职意向:技术负责人/技术专家
  • 工作年限:8年+
  • 学历:xx
  • 电话:xx
  • 期望薪资:xx

专业技能

技术栈

  • 后端:Java、Spring Boot、Spring Cloud、MySQL、Redis、消息队列
  • 前端:React、TypeScript、Ant Design、Redux、Webpack
  • DevOps:Docker、Jenkins、Git、Jira
  • 架构:微服务架构、前后端分离、分布式系统设计

管理能力

  • 团队管理:带领3-6人团队,完成项目全周期开发
  • 技术规划:制定技术方案,把控技术方向,推动技术创新
  • 敏捷实践:推行敏捷开发,提升团队效能
  • 人才培养:建立技术培训体系,提升团队技术能力

工作经历

XX公司(2021-至今)

职位:技术负责人

负责工作:

  1. 带领5-6人团队完成大型LLM应用平台开发,实现从0到1

    • 设计并实现基于微服务架构的系统框架
    • 优化系统性能,提升用户体验
    • 建立代码规范和技术文档体系
    • 系统月活用户达到10w+,支持高并发访问
  2. 技术架构升级与优化

    • 推动系统微服务化改造,提升系统可扩展性
    • 实现核心模块性能优化,接口响应时间提升50%
    • 建立监控告警体系,提高系统稳定性

XX公司(2019-2021)

职位:Web前端负责人

负责工作:

  1. 带领3-4人前端团队完成企业级SaaS平台开发

    • 基于React技术栈搭建前端框架
    • 实现组件库设计与开发
    • 推动前端工程化建设
    • 平台服务企业客户100+
  2. 技术改进与创新

    • 建立前端性能监控体系
    • 推动前端自动化测试实践
    • 优化构建流程,部署时间缩短60%

项目经验

xx平台(2022-2023)

  • 项目规模:5-6人团队,服务10w+用户
  • 技术架构:Spring Cloud + React + MySQL + Redis
  • 主要职责:
    • 负责整体技术方案设计
    • 核心功能开发与性能优化
    • 带领团队完成开发任务
  • 项目成就:
    • 系统支持高并发访问,峰值QPS 5000+
    • 用户响应时间<500ms
    • 系统可用性达99.9%

xx企业级SaaS平台(2019-2021)

  • 项目规模:前端3-4人团队
  • 技术架构:React + TypeScript + Ant Design
  • 主要职责:
    • 前端架构设计与实现
    • 团队管理与技术指导
    • 核心功能开发
  • 项目成就:
    • 平台月活用户5w+
    • 前端性能提升40%
    • 客户满意度95%+

教育背景

  • XX大学 计算机科学与技术 本科

个人评价

  • 扎实的技术功底,丰富的项目经验
  • 优秀的团队管理能力和沟通协调能力
  • 具备较强的技术视野和架构设计能力
  • 持续学习,保持对新技术的敏感度

离职系列 第十二篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于AI功能方案验证。

背景

落地事业部上一年关于AI的创新奖的方案,更合适的叫法应该是交接,之前的团队几乎停留在理论上和一个demo上,因为久久没有效果,可能感受到了上级的压力,需要看到实际效果,最终找到了我们团队,希望能借助我们团队让其产生价值(官方说法),我理解实际就接这个摊子。好在年前我自己捣鼓过AI知识库AI应用-知识库,所以不怯场。

方案介绍

因为在这之前就是一个demo,具体前面的团队也说不出来个所以然,所以我们直接说说两边对话后的方案走向,希望做一个功能:容量预测,意思是根据服务器的多个指标历史数据,预估服务器未来的负载情况,从而给予客户参考或预案。

我试着画下大体的方案:

架构图:

alt text

时序图:

alt text

大体流程

  1. 调度器触发flink批任务从ClickHouse获取原始数据
  2. Flink进行基本的数据清洗和标准化
  3. 处理后的数据存入CK
  4. python预测模型从CK获取数据
  5. 预测模型生成预测结果
  6. 阈值分析器识别潜在瓶颈
  7. python大模型提供深度解释和建议
  8. 生成预测报告和告警

学习链接

ollama

ollama is an open-source tool that simplifies running large language models locally on your personal computer
https://www.youtube.com/watch?v=GWB9ApTPTv4&t=171s

离职系列 第十一篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于维护的一个旧功能“全部告警跟踪”。

背景

每个租户有自己的告警数据,少则几千多则几十万条数据,云平台提供了一个功能叫“全部告警跟踪”,该功能顾名思义,会展示所有租户的所有告警信息(刷新那一刻是实时的),还能支持过滤、搜索等操作,这功能据说上线没多久就有问题,比如点分页时不时会出现超时。但是因为这功能用的人非常少,且只有管理员才有权限,也就一直放着。
但是新版需求要求解决这个问题,因为现在是我维护这个功能,所以需要我先出个技术方案。

解法设计输出模板

  1. 解法设计的模板很多,但是我感觉稍微有点重,当前产品的节奏,没有那么多的时间和人力给我做那么详细的解法设计,所以简单梳理了一个简化版的解法设计,并与干系人达成了一致。

  2. 模板如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29

    1. 引言
    - 背景说明
    - 问题陈述(现状、目标)
    - 关键术语
    - 参考资料

    2. 需求分析
    - 核心诉求/期望交付的价值
    - 非功要求
    3. 约束条件
    - 依赖项
    - 假设项

    4. 方案设计
    - 可选方案对比(2-3个)
    * 方案描述
    * 优缺点分析
    * 非功表现
    - 推荐方案详细说明
    * 架构设计
    * 核心流程
    * 关键设计点、算法伪代码(如果有必要)

    5. 实施评估(因为团队自己做实施,所以加上这一章)
    - 影响范围
    - 实施成本
    - 后续影响

  3. 要分清楚解法设计和详细设计的核心区别:

    1. 解法设计:回答”用什么方案解决问题”
      • 关注整体思路
      • 多个方案对比选择
      • 架构层面决策
    2. 详细设计:回答”如何具体实现这个方案”
      • 已选定方案的具体技术实现细节
      • 编码层面设计

开始

在这儿我就不原方不动的把整个解法贴出来了,只捡几个重点说。

需求分析

一定要记住,虽然咱们是干技术的,但是做解法的时候,一定先不要直接从技术的角度思考,先从业务的角度,还原业务场景,以及可能的演进需求,做到扩展性。

  1. 年少不懂事的时候,干过一段时间的产品助理,当时就学会做需求分析的几把斧:

    tips:

    1. 搞清楚买单的人和使用的人谁?分别想解决什么问题,特别是买单的人容易被忽视。(使用方再满意,买单的人不满意也是白搭)
    2. 维护好与需求调研对象的关系(人情世故)
    3. 5W1H方法做需求分析和挖掘(找出底层需求,避免浮于表面文字)
    4. KANO方法对需求分级(找出痛点先解决,其它的都是锦上添花)

这儿的原始需求是管理员能对所有租户的告警跟踪查看,关注其下团队成员所负责的租户的处理情况,对工作进度有了解,同时可以随时查看核心客户的数据。
这样几句简单的话,应用5W1H+KANO拆解下:

  1. 5W1H分析:

    1. WHO(谁)

      • 主体:管理员
      • 关注对象:团队成员、租户
    2. WHAT(什么)

      • 查看所有租户的告警跟踪情况
      • 了解团队成员的工作进度
      • 查看核心客户数据
    3. WHEN(什么时候)

      • 随时(需要实时或准实时的数据)
      • 告警发生后的跟踪过程中
    4. WHERE(在哪里)

      • 系统内
    5. WHY(为什么),更深入可以加入5Why方法,探寻源需求。

      • 监督团队工作情况
      • 及时了解核心客户状况
      • 确保告警得到及时处理
  2. HOW(怎么做)

    • 提供告警跟踪查看、筛选功能
    • 展示团队成员负责的租户处理进度
    • 支持核心客户数据快速查看
  3. KANO模型分析:

    1. 基本型需求(Must-be):

      • 查看所有租户的告警记录
      • 查看告警处理状态
    2. 期望型需求(Performance):

      • 团队成员工作进度追踪
      • 核心客户数据查看
    3. 兴奋型需求(Delighter):

      • 数据分析和统计

这里能得到几个关键信息:

  1. 依然需要在活的实时的数据(需求已经明确)
  2. 需要搜索、分页、筛选(大数据量的场景)
  3. 后续很有可能需要统计数据(要考虑数据聚合)
  4. 非功
    1. 1000+租户,每个租户50w的告警,10s内刷出数据。
    2. 经费有限,且重新申请流程慢,额度小。

方案

  1. 方案1:ShardingSphere 自身实现。
    广播表是ShardingSphere中的一个概念,指的是在所有分片中存在的表,每个分片都有完整的副本。当更新广播表时,所有分片都会同步更新。通常用于数据量不大且需要频繁关联查询的表,比如字典表。
    1. 优点:简单,不用引入任何其他组件。
    2. 缺点:
      1. 数据量太大,无法在每个分片都复制全量数据。
  2. 方案2:ClickHouse(开源版)+Flink CDC
    1. 优点:
      1. CK在已在多个产品运用,学习成本较低。
      2. 可以支持复杂的查询、聚合需求。
      3. 适合离线分析。
      4. 单表查询性能极强。
    2. 缺点:
      1. 不支持事务。
      2. 集群部署成本高(官方没有提供Helm Chart。且ClickHouse集群扩展不方便,很多手动处理,不适合弹性扩展,集成k8s较难)。
      3. 删除/更新性能差,更适合批量追加。告警数据会经常变更,可能存在性能问题。
      4. 手动管理分片、分区、MergeTree等,维护成本较高。
  3. 方案3:Doris+Flink CDC
    1. 优点:

      1. 实时性高、支持高并发。
      2. 可以支持复杂的查询需求、聚合需求。
      3. 集群部署成本低(Doris,官方提供了Helm Chart,且适合弹性扩展,运维压力小)。
      4. 自动话程度高(分片、负载均衡、存储管理等)
      5. SQL友好
      6. 存算分离
    2. 缺点:

      1. 引入Doris新组件,可能会增加采购成本。
      2. 复杂的模糊搜索可能无法实现。
  4. 方案4:ES+Flink CDC
    1. 优点:

      1. 近实时,可能有秒级延迟。
      2. 可以支持复杂的查询需求(特别是全文检索)。
      3. 集群部署成本低(官方有Helm Chart和Operator,且适合弹性扩展,可无缝集成k8s,运维压力小)
    2. 缺点:

      1. 不支持事务
      2. 引入ES新组件,可能会增加较大采购成本(ES需要较多内存和SSD磁盘)。
      3. 很多时候需要手动处理,比如分片分步、设计索引、索引优化、GC 调优等,维护成本较高。
      4. 使用DSL,不是标准 SQL,学习成本较高。

推荐方案2

原因:

  1. 在活告警数据量可控,暂不考虑扩展。
  2. 系统已接入了CK,最低成本(学习、部署、购买)。
时序图

alt text

关键验证点

1、2验证点,由于前期已经做过验证,着重验证3、4就行,特别是更新和删除数据。

验证结果

按500个租户,每个租户5000在活告警,没问题,因为主要是验证可行性,没有那么严格的压测,图啥的当时就没留了。这块详设的时候会更具体严格一些。

离职系列 第十篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是关于线上的bug。

背景

其实说来这个问题,跟之前的遇见连接超时有个遗留项也有一些关系,因为报错的源头,也是是数据库连接关闭,与上一次仅仅是我那块出问题不同的是,这次是大批量的租户多种任务都失败,飞书告警消息都把我弹麻了。

问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

2024-10-23 17:00:10,177 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:631 - 客户:2xx319,告警数据处理异常
org.springframework.jdbc.UncategorizedSQLException:
2024-10-23 17:00:10,176 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:511 - 客户:2xx319,集成平台巡检数据处理异常
024-10-23 17:00:10,175 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:547 - 客户:2xx319,服务状态数据处理异常
org.springframework.jdbc.UncategorizedSQLException:
... 35 common frames omitted
2024-10-23 17:00:10,174 [XxxGenerator8] ERROR [c.r.r.m.s.s.notice.impl.Xxx] Xxx.java:582 - 客户:2xx319,心跳数据处理异常
... 82 common frames omitted
org.springframework.jdbc.UncategorizedSQLException:
### Cause: java.sql.SQLException: Connection is closed
; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLException: Connection is closed


这次定位很快,具体定位的就不再赘述,出了问题后,我想了想有两个明确的因素:

  1. 上次类似的错误就发现了,连接池设置存在问题。
    1. 再次检查,当前没有慢sql,所以初步判断是连接池问题。
  2. 新上线了策略功能,策略把之前定时默认执行的任务,可更改为每个租户下每种类型单独的执行时间和周期。
    1. 怀疑存在了N个客户N个任务都在同一时间点执行的问题,导致连接池耗尽。

处理

  1. 根据预留的后门,手动把核心任务给生成了,让线上能正常处理。
  2. 因为之前已知了引入ShardingSphere后同时引入了HikariCP连接池,现在只留HikariCP连接池,并对参数进行调优。
    1. 以下是同事调优后的参数:超时时间以及连接池大小都对应阿里云购买的高性能PG做了对应的调整。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      master0:
      dataSourceClassName: com.zaxxer.hikari.HikariDataSource
      driverClassName: org.postgresql.Driver
      jdbcUrl: jdbc:postgresql://xx.aliyuncs.com:xx/xx
      username: xxx
      password: xxx
      connectionTimeout: 60000
      idleTimeout: 600000
      maxLifetime: 3600000
      maximumPoolSize: 200
      minimumIdle: 1
      poolName: business-data-master0

  3. 临时的先让cron表达式有一定的偏移量比如
    1. {% codeblock %}
      
                     return timeList.stream()
                         .map(time -> {
                             String[] timeParts = parseTime(time);
                             // TODO 临时解法:为每个cron添加随机偏移( 0~3分钟)
                             int minuteOffset = ThreadLocalRandom.current().nextInt(4); // 生成 0~3 的随机数
                             int minute = (Integer.parseInt(timeParts[1]) + minuteOffset) % 60; // 防止超出 59 分钟
                             return "0 " + minute + " " + timeParts[0] + " * * ? ";
                         })
                         .collect(Collectors.toList());
                      private static String[] parseTime(String time) {
                          return time.split(":"); // 格式为 "HH:mm"
                      }
                  }
      
         {% endcodeblock %}
      

2、3做完之后,腾出缓冲时间着手长期解了,需要重新做下解法设计,以适配高并发的场景。

解法设计1.0

具体的解法设计咋做,可看下之前的遇见多表查询,这儿就直接给出一些结论:

  1. 任务错峰(随机延迟)
  2. 任务限流(线程池 + 队列)
  3. 任务优先级机制(先执行核心任务)

UML:

alt text

流程图:

alt text

时序图:

alt text

关键伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363

// 任务优先级定义
private enum TaskPriority {
HIGH(0),
MEDIUM(5),
LOW(10);

private final int value;

TaskPriority(int value) {
this.value = value;
}

public int getValue() {
return value;
}
}

/**
* manage-biz Powerjob 调度类
* 优化版本 - 任务削峰与队列管理
*
* @author hht
* @since 2024-09-10
*/
@Component(value = "manageBizPowerjobDispatcher")
@Slf4j
@RequiredArgsConstructor
public class ManageBizPowerjobDispatcher {
private final IXxxScheduleService XxxScheduleService;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

/** 平安通告powerjob任务id */
public static final String TASK_SAFETY_NOTICE_ID = "generateXxx";

public static final String SUCCESS = "success";

// 配置参数,可从配置文件注入
@Value("${powerjob.task.max-concurrent:10}")
private int maxConcurrentTasks;

@Value("${powerjob.task.queue-capacity:500}")
private int queueCapacity;

@Value("${powerjob.task.max-delay-minutes:5}")
private int maxDelayMinutes;

@Value("${powerjob.task.worker-threads:20}")
private int workerThreads;



// 延迟任务定义
@Data
private static class DelayedTask implements Delayed {
private final Runnable task;
private final long executeTime;
private final String taskId;
private final String jobParams;

@Override
public long getDelay(TimeUnit unit) {
return unit.convert(executeTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

}

// 优先级任务定义
@Data
private static class PriorityTask implements Comparable<PriorityTask> {
private final Runnable task;
private final TaskPriority priority;
private final String taskId;
private final String jobParams;
private final long createTime;

@Override
public int compareTo(PriorityTask other) {
// 先按优先级排序,再按创建时间排序
int priorityCompare = Integer.compare(priority.getValue(), other.priority.getValue());
if (priorityCompare != 0) {
return priorityCompare;
}
return Long.compare(createTime, other.createTime);
}
}

/**
* 1.单独的线程,负责从队列中获取任务并分发
* 2.协调延迟队列和优先级队列
* 3.控制任务的并发执行数量
*/
private class TaskDispatcher implements Runnable {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
// 先检查延迟队列
DelayedTask delayedTask = delayedTaskQueue.poll();
if (delayedTask != null) {
// 将任务添加到优先级队列
submitToPriorityQueue(delayedTask.getTask(), TaskPriority.HIGH, delayedTask.getTaskId(), delayedTask.getJobParams());
continue;
}

// 从优先级队列取任务执行
PriorityTask priorityTask = priorityTaskQueue.take();
if (priorityTask != null) {
try {
// 获取信号量,控制并发
taskSemaphore.acquire();

// 记录任务开始执行
activeTaskCount.incrementAndGet();
taskExecutionCount.computeIfAbsent(priorityTask.getTaskId(), k -> new AtomicInteger(0)).incrementAndGet();

// 提交到线程池执行
executorService.submit(() -> {
try {
log.info("执行任务: {}, 参数: {}", priorityTask.getTaskId(), priorityTask.getJobParams());
priorityTask.getTask().run();
} catch (Exception e) {
log.error("任务执行异常: {}", priorityTask.getTaskId(), e);
} finally {
// 释放信号量
taskSemaphore.release();
// 更新计数器
activeTaskCount.decrementAndGet();
AtomicInteger counter = taskExecutionCount.get(priorityTask.getTaskId());
if (counter != null) {
counter.decrementAndGet();
}
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("任务分发器异常", e);
}
}
}
}

// 任务队列和执行器
private DelayQueue<DelayedTask> delayedTaskQueue;
private PriorityBlockingQueue<PriorityTask> priorityTaskQueue;
private ExecutorService executorService;
private ExecutorService dispatcherService;
private Semaphore taskSemaphore;
private Random random;

// 任务执行状态监控
private AtomicLong totalTasksReceived = new AtomicLong(0);
private AtomicLong totalTasksExecuted = new AtomicLong(0);
private AtomicInteger activeTaskCount = new AtomicInteger(0);
private Map<String, AtomicInteger> taskExecutionCount = new ConcurrentHashMap<>();

@PostConstruct
public void init() {
// 初始化任务队列
delayedTaskQueue = new DelayQueue<>();
priorityTaskQueue = new PriorityBlockingQueue<>(queueCapacity);

// 初始化线程池
executorService = Executors.newFixedThreadPool(workerThreads, new ThreadFactory() {
private final AtomicInteger counter = new AtomicInteger(1);

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "task-worker-" + counter.getAndIncrement());
thread.setDaemon(true);
return thread;
}
});

// 初始化分发器线程,
dispatcherService = Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r, "task-dispatcher");
thread.setDaemon(true);
return thread;
});

// 初始化信号量
taskSemaphore = new Semaphore(maxConcurrentTasks);

// 初始化随机数生成器
random = new Random();

// 启动任务分发线程
dispatcherService.submit(new TaskDispatcher());

log.info("任务调度器初始化完成,最大并发任务数: {}, 队列容量: {}, 最大延迟分钟数: {}, 工作线程数: {}",
maxConcurrentTasks, queueCapacity, maxDelayMinutes, workerThreads);
}

@PreDestroy
public void shutdown() {
// 关闭调度器
if (dispatcherService != null) {
dispatcherService.shutdownNow();
}

// 关闭执行器
if (executorService != null) {
executorService.shutdown();
try {
if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
}

log.info("任务调度器已关闭,总接收任务数: {}, 总执行任务数: {}",
totalTasksReceived.get(), totalTasksExecuted.get());
}

/**
* 提交任务到延迟队列
*/
private void submitToDelayQueue(Runnable task, String taskId, String jobParams) {
// 随机延迟时间,在0到maxDelayMinutes分钟之间
long delayMs = random.nextInt((int) TimeUnit.MINUTES.toMillis(maxDelayMinutes));
DelayedTask delayedTask = new DelayedTask(task, delayMs, taskId, jobParams);
delayedTaskQueue.offer(delayedTask);
totalTasksReceived.incrementAndGet();

log.info("任务已提交到延迟队列: {}, 延迟: {}ms", taskId, delayMs);
}

/**
* 提交任务到优先级队列
*/
private void submitToPriorityQueue(Runnable task, TaskPriority priority, String taskId, String jobParams) {
PriorityTask priorityTask = new PriorityTask(task, priority, taskId, jobParams);
priorityTaskQueue.offer(priorityTask);

log.info("任务已提交到优先级队列: {}, 优先级: {}", taskId, priority);
}

/**
* 获取任务类型对应的优先级
*/
private TaskPriority getTaskPriority(String taskId) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
case TASK_PUSH_SERVICE_STATUS_ID:
return TaskPriority.HIGH;
case TASK_TENANT_SERVICE_PHASE_ID:
case TASK_WEEKLY_SUMMARY:
return TaskPriority.MEDIUM;
default:
return TaskPriority.LOW;
}
}

/**
* 创建可执行的任务
*/
private Runnable createExecutableTask(String taskId, String jobParams, TaskContext taskContext) {
switch (taskId) {
case TASK_SAFETY_NOTICE_ID:
return () -> generateXxxTask(taskContext);
case TASK_OTHER:
return () -> generateOtherTask(taskContext);
...
default:
throw new IllegalArgumentException("未知的任务类型: " + taskId);
}
}

/**
* 通用任务提交方法
*/
private ProcessResult submitTask(String taskId, TaskContext taskContext) {
try {
totalTasksReceived.incrementAndGet();

// 检查任务执行情况,如果已有大量相同类型任务,加入延迟队列
int activeCount = taskExecutionCount.computeIfAbsent(taskId, k -> new AtomicInteger(0)).get();
if (activeCount > maxConcurrentTasks / 2) {
log.warn("当前任务类型 {} 正在执行的数量较多: {}, 将使用延迟队列分散负载", taskId, activeCount);
submitToDelayQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), taskId, taskContext.getJobParams());
} else {
// 根据任务类型分配优先级
TaskPriority priority = getTaskPriority(taskId);
submitToPriorityQueue(createExecutableTask(taskId, taskContext.getJobParams(), taskContext), priority, taskId, taskContext.getJobParams());
}

return new ProcessResult(true, formatResponse(SUCCESS, taskId));
} catch (Exception e) {
log.error("提交任务异常: {}", taskId, e);
return new ProcessResult(false, formatResponse(e.getMessage(), taskId));
}
}

// ====== 以下是原始的PowerJob任务处理方法,改为使用队列系统 ======

/**
* 告警任务
*/
@PowerJobHandler(name = TASK_OTHER)
public ProcessResult generateOtherTask(TaskContext taskContext) {
log.info("==================== 调度触发(其它任务) ======================");
return submitTask(TASK_OTHER, taskContext);
}

private ProcessResult generateOtherTask(TaskContext taskContext) {
try {
StrategyJobParams jobParams = new StrategyJobParams();
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
strategyScheduleService.generateTask(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse(SUCCESS, TASK_OTHER));
} catch (Exception e) {
log.error("调度执行【其它任务】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_OTHER));
}
}


/**
* 生成平安通告
*/
@PowerJobHandler(name = TASK_SAFETY_NOTICE_ID)
public ProcessResult generateXxx(TaskContext taskContext) {
log.info("==================== 调度触发(平安通告) ======================");
return submitTask(TASK_SAFETY_NOTICE_ID, taskContext);
}

private ProcessResult generateXxxTask(TaskContext taskContext) {
try {
// 获取调度任务的参数
StrategyJobParams jobParams = null;
if (StringUtils.hasLength(taskContext.getJobParams())) {
jobParams = OBJECT_MAPPER.readValue(taskContext.getJobParams(), StrategyJobParams.class);
}
// 生成平安通告
XxxScheduleService.autoGenerateBatch(jobParams);
totalTasksExecuted.incrementAndGet();
return new ProcessResult(true, formatResponse("success", TASK_SAFETY_NOTICE_ID));
} catch (Exception e) {
log.error("调度执行【平安通告】异常", e);
return new ProcessResult(false, formatResponse(e.getMessage(), TASK_SAFETY_NOTICE_ID));
}
}

private String formatResponse(String info, String id) {
return String.format("{\"taskId\": \"%s\", \"info\": \"%s\"}", id, info);
}
}


解法设计2.0

主要是解决一些异常场景,比如:

  1. 服务异常重启,任务丢了?
  2. 信号量获取阻塞,所有任务堆积?
  3. 发生异常,及时感知等

这一步还停留在设计阶段,也可能是我设计并落地,也先做个记录。

解法:

  1. Redis代替内存队列,开启持久话,便于启动后恢复。
  2. 核心业务单独维护信号量
  3. 设置拒绝策略,当队列超过阈值直接异常返回给powerjob
    1. 同时发送告警
  4. 适当的动态调整信号量

离职系列 第九篇
离职系列,想想这几年在公司的成长,在这做个记录。这篇是遇到的一个比较典型的线上问题。

问题现象

写了一个每天执行两次的定时任务,该任务会分批对线上所有几百个租户生成《平安通告》,上线1个多月后突然手机收到告警,某几个用户生成失败。

  
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
2024-10-17 17:00:10,125 [safetyNoticeGenerator8] ERROR [com.alibaba.druid.pool.DruidDataSource] DruidDataSource.java:1988 - {conn-110021} discard
org.postgresql.util.PSQLException: An I/O error occurred while sending to the backend.
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:395)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:177)
at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:483)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement$2.executeSQL(ShardingSpherePreparedStatement.java:439)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement$2.executeSQL(ShardingSpherePreparedStatement.java:435)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback.execute(JDBCExecutorCallback.java:95)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutorCallback.execute(JDBCExecutorCallback.java:75)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.syncExecute(ExecutorEngine.java:135)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.serialExecute(ExecutorEngine.java:121)
at org.apache.shardingsphere.infra.executor.kernel.ExecutorEngine.execute(ExecutorEngine.java:115)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor.execute(JDBCExecutor.java:65)
at org.apache.shardingsphere.infra.executor.sql.execute.engine.driver.jdbc.JDBCExecutor.execute(JDBCExecutor.java:49)
at org.apache.shardingsphere.driver.executor.DriverJDBCExecutor.doExecute(DriverJDBCExecutor.java:156)
at org.apache.shardingsphere.driver.executor.DriverJDBCExecutor.execute(DriverJDBCExecutor.java:145)
at org.apache.shardingsphere.driver.jdbc.core.statement.ShardingSpherePreparedStatement.execute(ShardingSpherePreparedStatement.java:402)
at com.zaxxer.hikari.pool.ProxyPreparedStatement.execute(ProxyPreparedStatement.java:44)
at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.execute(HikariProxyPreparedStatement.java)
at org.apache.ibatis.executor.statement.PreparedStatementHandler.query(PreparedStatementHandler.java:65)
at org.apache.ibatis.executor.statement.RoutingStatementHandler.query(RoutingStatementHandler.java:80)
at jdk.internal.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:61)
at jdk.proxy2/jdk.proxy2.$Proxy207.query(Unknown Source)
at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:65)
at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:333)
at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:158)
at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:110)
at com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor.intercept(MybatisPlusInterceptor.java:81)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:59)
at jdk.proxy2/jdk.proxy2.$Proxy206.query(Unknown Source)
at jdk.internal.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.ibatis.plugin.Invocation.proceed(Invocation.java:49)
at com.github.yulichang.interceptor.MPJInterceptor.intercept(MPJInterceptor.java:76)
at org.apache.ibatis.plugin.Plugin.invoke(Plugin.java:59)
at jdk.proxy2/jdk.proxy2.$Proxy206.query(Unknown Source)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:154)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:147)
at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:142)
at jdk.internal.reflect.GeneratedMethodAccessor109.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:425)
at jdk.proxy2/jdk.proxy2.$Proxy189.selectList(Unknown Source)
at org.mybatis.spring.SqlSessionTemplate.selectList(SqlSessionTemplate.java:224)
at com.baomidou.mybatisplus.core.override.MybatisMapperMethod.executeForMany(MybatisMapperMethod.java:166)
at com.baomidou.mybatisplus.core.override.MybatisMapperMethod.execute(MybatisMapperMethod.java:77)
at com.baomidou.mybatisplus.core.override.MybatisMapperProxy$PlainMethodInvoker.invoke(MybatisMapperProxy.java:152)
at com.baomidou.mybatisplus.core.override.MybatisMapperProxy.invoke(MybatisMapperProxy.java:89)
at jdk.proxy2/jdk.proxy2.$Proxy197.findDistinctCiIdsByIdentityId(Unknown Source)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.addAlertData(xxServiceImpl.java:612)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.lambda$setJsonField$5(xxServiceImpl.java:368)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.setJsonField(xxServiceImpl.java:346)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl.batchCreate(xxServiceImpl.java:201)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:343)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:117)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:391)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:751)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:703)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.xxServiceImpl$$SpringCGLIB$$0.batchCreate(<generated>)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.SafetyNoticeScheduleServiceImpl.processBatch(SafetyNoticeScheduleServiceImpl.java:194)
at com.xxx.xxxcloud.manage.service.safety.notice.impl.SafetyNoticeScheduleServiceImpl.lambda$processBatchesInThreadPool$0(SafetyNoticeScheduleServiceImpl.java:112)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.net.SocketTimeoutException: Read timed out
at java.base/sun.nio.ch.NioSocketImpl.timedRead(NioSocketImpl.java:288)
at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:314)
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:355)
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:808)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:966)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:484)
at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:478)
at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1465)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:1069)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:161)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:128)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:113)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:73)
at org.postgresql.core.PGStream.receiveChar(PGStream.java:465)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2155)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
... 82 common frames omitted
org.springframework.jdbc.UncategorizedSQLException:
### Error querying database. Cause: java.sql.SQLException: Connection is closed
### The error may exist in class path resource [mapper/TbxxHealthCheckResultMapper.xml]
### The error may involve com.xxx.xxxcloud.manage.mapper.xx.TbxxHealthCheckResultMapper.selectExecuteLatest
### The error occurred while executing a query
### Cause: java.sql.SQLException: Connection is closed
; uncategorized SQLException; SQL state [null]; error code [0]; Connection is closed
at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:93)
at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:439)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.sql.SQLException: Connection is closed
at com.zaxxer.hikari.pool.ProxyConnection$ClosedConnection.lambda$getClosedConnection$0(ProxyConnection.java:502)
at jdk.proxy3/jdk.proxy3.$Proxy173.prepareStatement(Unknown Source)
at com.zaxxer.hikari.pool.ProxyConnection.prepareStatement(ProxyConnection.java:327)
at com.zaxxer.hikari.pool.HikariProxyConnection.prepareStatement(HikariProxyConnection.java)

从上面有几个关键信息梳理下异常调用链:

得出几个重要信息:

  1. 异常发生的业务代码是在批量处理”xx报告”时,查询告警处
  2. ShardingSphere下的连接池hikari抛出了异常Connection is closed
  3. Druid连接池也抛出了异常{conn-xxx} discard
  4. 底层SocketTimeoutException,表明是客户端等待数据库服务器的响应超时(初步判断为慢sql)

所以我开始从以下几个方面排查:

  1. 查业务代码的变更,为什么之前好好的跑了一个多月,突然出问题。
  2. 检查数据库连接参数设置
  3. 评估查询的数据量是否过大
  4. 看下HikariCP和Druid是否有啥联系以及为啥会有两个连接池?
  5. 查看数据库负载情况

处理过程

第一步:摸索

怀疑一切,切忌先入为主。

  1. 想办法重现问题
    1. 提前发通知,下午7点以后会操作线上系统。通常6点半以后几乎就没人用系统了。
    2. 因为设计该功能时,留了补偿手动,可手动重新触发报告生成。
    3. 反复重试了几次,问题未复现。
  2. 查看业务代码变更。
    1. 报错处业务代码owner是我,没做任何更改,所以变成重点关注addAlertData方法,也就是与告警相关(重点在数据量)
    2. 业务没变更但是加入了ShardingSphere(异常中也有这块的信息,先存疑)
  3. 检查系统连接池参数
    1. druid,几乎都没有做定制,都是使用的默认值。
      1. 使用默认值其实存在风险,应该根据业务调整一些参数,因为买的阿里的pg所以咨询了他们拿到了一份他们暴露的参数调优参考,后续可针对性修改)
    2. 新增的ShardingSphere的HikariCP也是使用的默认值。(异常中也有这块的信息,先存疑)
  4. 观察显示的数据库负载情况(阿里云的监控看板、pg的pg_stat_activity等视图、数据库日志等)
    1. 从视图发现确实存在执行时间较长的几条sqlsql,虽然有些慢但是不足以触发异常。慢的原因初步判断为数据量过大(报错的租户行数都在50w左右)
    2. 记录下这个sql,拿去控制台执行,EXPLAIN ANALYZE该sql,发现Seq Scan除了时间较长以外还存在索引问题,几乎每次查询都要用到的“告警状态”字段之前没加索引。(可能是原因之一)
    3. 查看数据库日志如下,这表明数据库和客户端的连接中断确实是问题的根源,可能是因为网络问题、数据库负载过高、或者连接超时等因素导致的。

第二步:验证

验证怀疑的所有点,通常控制变量法进行验证。

因为暂时没有复现,不着急先按兵不动。

  1. 前提是要有补偿方案,不能阻断线上使用,特别是这种核心业务。因为当时留了后门可以手动触发某个租户所以没问题。
  2. 虽然是线上故障同时也算严重bug,但是也不要着急,胡乱改一通,可能按下葫芦又起瓢,尽可能的找到根因,哪怕不能一次性修复。

等待了两天发现同样的问题又发生了。

  1. 但是这次发现了上一次遗漏的一个信息,报错的租户从日志看时间,异常都是在10s以后抛出的。(初步判断是SocketTimeout的超时时间,可能是10s)
  2. 拿到同样报错的sql去执行发现虽然跟之前没啥差别,问题还是那些问题,也没超过10s。(怀疑是因为报错时候执行了sql,触发了pg的缓存,所以再去查缓存生效,导致执行时间变短)

验证第一个问题:

因为当前看存在有两个连接池,查HikariCP与Druid的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// HikariCP只有connectionTimeout为30s
static {
CONNECTION_TIMEOUT = TimeUnit.SECONDS.toMillis(30L);
VALIDATION_TIMEOUT = TimeUnit.SECONDS.toMillis(5L);
SOFT_TIMEOUT_FLOOR = Long.getLong("com.zaxxer.hikari.timeoutMs.floor", 250L);
IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(10L);
MAX_LIFETIME = TimeUnit.MINUTES.toMillis(30L);
unitTest = false;
}

// DruidDataSource初始化,socketTimeout是10s
public void init() throws SQLException {
if (!this.inited) {
DruidDriver.getInstance();
ReentrantLock lock = this.lock;

try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new SQLException("interrupt", e);
}

boolean init = false;

if (this.connectTimeout == 0) {
this.connectTimeout = 10000;
}

if (this.socketTimeout == 0) {
this.socketTimeout = 10000;
}

先不管HikariCP,至少能确定Druid的10s超时确实存在,因为阿里云上我开起了pg的慢sql监控,根据时间刚好查到了确实存在一条告警sql查询执行时间为10s+。

  1. 所以初步判断SocketTimeoutException是因为sql执行时间超过了连接池的默认超时。
  2. 去查了下该租户下告警的数据总数已经超过了50W,在活5000左右。

第三步:修复方案

方案如下:

  1. 加索引,对常用的字段“告警状态”添加索引。
  2. 对在活告警查询的地方分批
  3. 对在活主告警限制查询的条数而不是查所有。

第四步:方案执行

  1. 对所有配置分表的表,检查索引,添加必要的索引。

    1.   -- ========================================
        -- 描述: 创建告警表的“status”字段索引
        -- 文件名: 001_create_alert_status_index.sql
        -- 作者: hht
        -- 创建日期: 2024-10-31
        -- ========================================
      
        DO $$
        DECLARE
            i INTEGER;
        BEGIN
            -- 遍历 tb_xx_alert_0 ~ tb_xx_alert_15
            FOR i IN 0..15 LOOP
                -- 动态生成 ALTER TABLE 语句,添加字段
                EXECUTE FORMAT('
                    ALTER TABLE public.tb_xx_alert_%s 
                    CREATE INDEX tb_xx_alert_%s_status_index ON tb_xx_alert_%s  (status);
                ', i);
            END LOOP;
        END $$;
      
  2. 告警分批且限制条数

    1. 1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      private List<AlertVO> findSubscribedAlerts(AlertSubscribedParam param) {
      try {
      List<List<String>> parts = PartsListUtil.getParts(param.getSubscribedCiIds(), BATCH_SIZE);
      List<AlertVO> all = new ArrayList<>();
      for (List<String> part : parts) {
      // 分批查询
      param.setSubscribedCiIds(part);
      // 主告警不超过100条
      Page<AlertVO> page = new Page<>(1, 100 - all.size());
      IPage<AlertVO> result = tbXxAlertMapper.findSubscribedAlerts(page, param);
      if (result.getTotal() == 0) {
      continue;
      }
      // 聚合告警的子告警数据补充
      getAlertAggChildren(param.getIdentityId(), result.getRecords());
      all.addAll(result.getRecords());
      // 如果已经达到100,退出循环
      if (all.size() == 100) {
      break;
      }
      }
      // 最后统一按createTime降序排序
      all.sort((a1, a2) -> Long.compare(a2.getCreateTime(), a1.getCreateTime()));
      return all;
      } catch (Exception e) {
      String errorMessage =
      String.format("查询关注告警失败: identityId=%s, deal=%s", param.getIdentityId(), param.isDeal());
      log.error(errorMessage, param.getIdentityId(), e);
      throw new SafetyNoticeException(errorMessage, e);
      }
      }

第五步:监控效果

  1. 上线后,连续一周到点蹲守
    1. SELECT pg_stat_reset(); – 重置所有统计信息
    2. 数据库负载看板以及pg的pg_stat_activity。
      1. 之前的sql执行时间没有再超过2s
      2. 看板上慢sql也没有在发现
      3. 数据库日志也没有再出现异常
    3. pg_stat_user_indexes+EXPLAIN ANALYZE 对应sql,查看索引使用情况。
  2. 业务功能正常。

看上去目前超时的问题暂时解决,但是要想更彻底的解,还需要后续对遗留项逐个解决。

遗留项/改进项

  1. 连接池的参数调优,虽然默认的看上去没啥问题,但是迟早肯定会出问题,记一个DFX。
  2. 多了一个HikariCP连接池,而且通过jconsole看了下,两个连接池都会初始化,这块是否有必要有两个连接池,这儿存在隐患,对ShardingSphere需要深入了解下,记一个DFX。
  3. 历史数据的清理,跟PO提出,需要加一个需求不仅仅是告警,可能还有其他数据。
  4. 对于核心且经常更新的表是否需要定时REINDEX

离职系列 第八篇
离职系列,想想这几年在公司的成长,在这做个记录。

此篇是因为遇到了太多环境类问题,从LMT建立的数据统计,这类问题占比已经超过了我处理问题的60%左右,占所有现场问题的20%左右以上,所以抽了个时间把可以在前置检查中避免的问题都梳理出来,试着写了个脚本,并一次次到现场验证,最终有了以下版本。该脚本在产品没有出根解之前,直接交给了TAC,让其与一线一起前置处理,避免过多的流转到LMT。

检查脚本,

   
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
#!/bin/bash

# 环境前置检查脚本(用于提前感知问题,减少安装失败、升级失败等问题出现)
# by hht
# 上传后,chmod +x 授予执行权限,然后./pre_check.sh 直接执行

# 输出错误到文件
exec 2>/tmp/pre_test_error.log
# ANSI颜色代码
GREEN='\033[0;32m'
RED='\033[0;31m'
NC='\033[0m' # 恢复默认颜色
YELLOW='\033[0;33m'
YELLOW_BG='\033[43m'
BLACK='\033[30m' # 黑色字体
none='\e[0m'
BLUE="\e[0;94m"
_red_bg() { echo -e "\e[41m$@${none}"; }
is_err=$(_red_bg 异常!)

warn() {
echo -e "\n${YELLOW_BG}${BLACK}警告!${NC} $@\n"
}
err() {
echo -e "\n$@ $is_err\n"
}

# 函数来检验IP地址的有效性
is_valid_ip() {
local ip="$1"
local ip_regex="^([0-9]{1,3}\.){3}[0-9]{1,3}$"

if [[ $ip =~ $ip_regex ]]; then
return 0
else
return 1
fi
}

# 声明账号
server1_user="root"
server2_user="root"

echo -e "\n${GREEN}------------------------------------前置检查------------------------------------${NC}"
echo
# 输入服务器IP地址,进行校验
while true; do
echo "请输入master服务器的IP地址:"
read server1_ip

if is_valid_ip "$server1_ip"; then
break
else
warn "无效的IP地址,请重新输入。"
fi
done

while true; do
echo "请输入worker服务器的IP地址:"
read server2_ip
if is_valid_ip "$server2_ip"; then
break
else
warn "无效的IP地址,请重新输入。"
fi
done
# 测试Ping
echo -e "\n${BLUE}ping测试:${NC}\n"
ping_check(){
local server1_ip=$1
local server2_ip=$2
ping -c 3 $server1_ip > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo -e "$server1_ip 可以Ping通。${GREEN}正常${NC}"
else
err "$server1_ip 无法Ping通。"
fi

ping -c 3 $server2_ip > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo -e "$server2_ip 可以Ping通。${GREEN}正常${NC}"
else
err "$server2_ip 无法Ping通。" && exit 1
fi

}
ping_check $server1_ip $server2_ip
echo -e "\n------------------------------------------------------------------------------"
# SSH测试
echo -e "\n${BLUE}SSH测试:${NC}\n"
ssh_check(){
local server1_ip=$1
local server2_ip=$2
if sshpass timeout 10s ssh -o StrictHostKeyChecking=no root@$server1_ip echo "SSH test" 2>/dev/null; then
echo -e "Success: SSH from $server1_ip to $server2_ip connected.${GREEN}正常${NC}"
else
err "SSH from $server1_ip to $server2_ip failed"
fi
}

ssh_check $server1_ip $server2_ip
echo -e "\n------------------------------------------------------------------------------"
# 时间一致性检查
echo -e "\n${BLUE}时间一致性检查:${NC}\n"
date_check(){
local server1_ip=$1
local server2_ip=$2
# 时间一致性检查
server1_time=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@$server1_ip date +%s)
server2_time=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@$server2_ip date +%s)

if [ $server1_time -eq $server2_time ]; then
echo -e "两台服务器的时间完全一致。${GREEN}正常${NC}"
else
# 使用 date 命令将时间戳转换为日期和时间
formatted1_time=$(date -d "@$server1_time")
formatted2_time=$(date -d "@$server2_time")
err "两台服务器的时间不一致。"
echo -e "${server1_ip}时间为:${formatted1_time}"
echo -e "${server2_ip}时间为:${formatted2_time}"
fi
}

date_check $server1_ip $server2_ip
echo -e "\n------------------------------------------------------------------------------"

# 添加检查防火墙是否开启
echo -e "\n${BLUE}检查防火墙:${NC}\n"
check_firewall_status() {
local server_ip=$1

# 使用 ssh 连接到服务器并查看 firewalld 服务的状态
firewall_status=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null "root@$server_ip" "systemctl is-active firewalld")

if [ "$firewall_status" = "active" ]; then
warn "$server_ip 防火墙已开启。请根据http://172.17.160.32:18090/x/cYA0CQ检查端口"
else
echo "$server_ip 防火墙未开启。"
fi
}

# 调用这个函数并传入服务器 IP 和用户名
check_firewall_status $server1_ip
check_firewall_status $server2_ip
echo -e "\n------------------------------------------------------------------------------"

# DNS配置检查
echo -e "\n${BLUE}DNS检查:${NC}\n"
dns_check(){
local server1_user=$1
local server2_user=$1
local server1_ip=$2
local server2_ip=$3
# DNS配置检查 - 验证是否一致
server1_dns=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server1_user@$server1_ip cat /etc/resolv.conf)
server2_dns=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server2_user@$server2_ip cat /etc/resolv.conf)

if [ "$server1_dns" == "$server2_dns" ]; then
echo -e "两台服务器的DNS配置一致。${GREEN}正常${NC}"
else
warn "两台服务器的DNS配置不一致。请判断是否影响集群。"
fi


# DNS配置检查 - 验证是否存在多行nameserver记录
server1_nameserver_count=$(echo "$server1_dns" | grep -c '^nameserver')
server2_nameserver_count=$(echo "$server2_dns" | grep -c '^nameserver')

if [ $server1_nameserver_count -eq 1 ] && [ $server2_nameserver_count -eq 1 ]; then
echo -e "两台服务器的DNS配置中只存在一行nameserver记录。${GREEN}正常${NC}"
else
if [ $server1_nameserver_count -ne 1 ]; then
warn "第一台服务器($server1_ip)的DNS配置存在多行nameserver记录。请判断是否影响集群。"
fi

if [ $server2_nameserver_count -ne 1 ]; then
warn "第二台服务器($server2_ip)的DNS配置存在多行nameserver记录。请判断是否影响集群。"
fi
fi
}

dns_check $server1_user $server1_ip $server2_ip
echo -e "\n------------------------------------------------------------------------------"
# 挂载检查
echo -e "\n${BLUE}数据盘挂载检查:${NC}\n"
mount_check(){
local server1_user=$1
local server2_user=$1
local server1_ip=$2
local server2_ip=$3
server1_mount=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server1_user@$server1_ip mount | grep /opt/local-path-provisioner)
server2_mount=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server2_user@$server2_ip mount | grep /opt/local-path-provisioner)

if [ -n "$server1_mount" ] && [ -n "$server2_mount" ]; then
echo -e "两台服务器都正确挂载了/opt/local-path-provisioner目录。${GREEN}正常${NC}"
else
# echo -e "两台服务器中有一台或两台未正确挂载/opt/local-path-provisioner目录。${RED}异常${NC}"
if [ -z "$server1_mount" ]; then
err "第一台服务器($server1_ip)未正确挂载/opt/local-path-provisioner目录。"
fi

if [ -z "$server2_mount" ]; then
err "第二台服务器($server2_ip)未正确挂载/opt/local-path-provisioner目录。"
fi
fi
}

mount_check $server1_user $server1_ip $server2_ip
echo -e "\n------------------------------------------------------------------------------"
# 使用Telnet检查SFTP服务是否联通
# check_sftp() {
# local server_ip=$1
# echo -e "\n-----------使用Telnet检查SFTP服务是否联通-------------"
# # # 使用Telnet连接到SSH端口(默认是22)
# # # 尝试连接SFTP服务器
# # sftp -oPort=22 $server_ip <<EOF 2>/dev/null
# # quit
# # EOF

# # # Check the exit status of the SFTP command
# # if [ $? -eq 0 ]; then
# # echo "SFTP on $IP is working normally"
# # else
# # echo "SFTP on $IP is not working!"
# # fi

# # 使用SSH命令测试SFTP服务
# ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null sftpuser@$server_ip sftp exit > /dev/null 2>&1

# # 检查SSH命令的退出状态码
# if [ $? -eq 0 ]; then
# echo "SFTP服务正常,可以连接到主机 $server_ip 的SFTP服务。"
# else
# err "SFTP服务异常,无法连接到主机 $server_ip 的SFTP服务。"
# fi
# }
# # 调用函数来进行Telnet检查
# check_sftp $server_ip

# 检查master SSH服务状态和配置
echo -e "\n${BLUE}master SSH服务状态和配置检查:${NC}\n"
check_sshd_config() {

local server_ip=$1
local server_user=$2

# 使用 ssh 连接到服务器并执行命令检查SSH服务状态
ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "systemctl is-active sshd" > /dev/null 2>&1

if [ $? -eq 0 ]; then
# 获取SSH配置文件内容
sshd_config_content=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "cat /etc/ssh/sshd_config")

# 检查配置文件内容是否包含指定的三行数据
if echo "$sshd_config_content" | grep -q -E "Subsystem\s+sftp\s+internal-sftp" && echo "$sshd_config_content" | grep -q "Match User sftpuser" && echo "$sshd_config_content" | grep -q "ChrootDirectory /opt/ftpfile/sftp/sftpuser/"; then
echo -e "SSH配置正常。${GREEN}正常${NC}"
else
warn "请检查sshd_config文件,是否正确配置sftp。"
fi
else
warn "无法连接服务器或检查SSH服务状态。"
fi
}

# 调用函数执行检查
check_sshd_config $server1_ip $server1_user
echo -e "\n------------------------------------------------------------------------------"
# sftp_with_password() {
# echo -e "\n-----------检查master的sftp连通性-------------"
# local server_ip=$1
# local server_password=$2

# # 使用 expect 来自动输入密码
# expect -c "
# spawn sftp -oStrictHostKeyChecking=no -oUserKnownHostsFile=/dev/null sftpuser@$server_ip
# expect {
# \"password:\" {
# send \"$server_password\n\"
# exp_continue
# }
# eof
# }
# "

# if [ $? -eq 0 ]; then
# echo -e "$server_ip SFTP连接成功。${GREEN}正常${NC}"
# else
# warn "无法连接$server_ip 的SFTP服务。"
# fi
# }

# # 调用函数来进行SFTP连接检查
# sftp_with_password $server1_ip "FYktvR1w2upoOb"

# 检查目录权限是否为777
# check_directory_permission() {
# local server_ip=$1
# local directory_path=$2
# echo -e "\n-----------检查master的目录${directory_path}权限-------------"

# # 使用 ssh 连接到服务器并执行 stat 命令获取目录权限信息
# ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null sftpuser@$server_ip "stat -c %a $directory_path" > /dev/null 2>&1

# if [ $? -eq 0 ]; then
# # 获取目录权限
# directory_permission=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null sftpuser@$server_ip "stat -c %a $directory_path")

# if [ "$directory_permission" -eq 777 ]; then
# echo -e "$directory_path 目录权限为777。${GREEN}正常${NC}"
# else
# warn "$directory_path 目录权限不是777。"
# fi
# else
# warn "无法连接$server_ip 服务器或获取目录权限。"
# fi
# }
echo -e "\n${BLUE}检查master的sftp目录权限:${NC}\n"
check_user() {
ssh $1 "id $2 >/dev/null 2>&1"
if [ $? -eq 0 ]; then
echo -e "$2用户在$1上存在。${GREEN}正常${NC}"
else
err "$2用户在$1上不存在"
fi
}

check_dir_perm() {
ssh $1 "if [ \`stat -c %a $2\` -eq $3 ]; then echo -e '$2目录为$3权限。${GREEN}正常${NC}'; else err '$2目录不为$3权限'; fi"
}

# 调用函数来检查服务器的目录权限
check_user $server1_ip "sftpuser"

check_dir_perm $server1_ip "/opt" 755

check_dir_perm $server1_ip "/opt/ftpfile/sftp/sftpuser" 755

echo -e "\n------------------------------------------------------------------------------"
# check_directory_permission $server1_ip "/opt"
# check_directory_permission $server1_ip "/opt/ftpfile/sftp/sftpuser"

# 检查主机名与/etc/hosts文件的一致性
echo -e "\n${BLUE}检查主机名的一致性:${NC}\n"
check_hostname_and_hosts() {

local server_ip=$1
local server_user=$2
# 使用 ssh 连接到服务器并获取主机名
server_hostname=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "hostname")

if [ $? -eq 0 ]; then
# 获取 /etc/hosts 文件内容
hosts_file_content=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "cat /etc/hosts")

# 检查主机名与 /etc/hosts 内是否一致
if echo "$hosts_file_content" | grep -q -E "^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+\s+$server_hostname\s*$"; then
echo -e "$server_ip 主机名与/etc/hosts文件一致。${GREEN}正常${NC}"
else
warn "$server_ip 主机名与/etc/hosts文件不一致。"
fi
else
warn "无法连接$server_ip 服务器或获取主机名。"
fi
}

# 检查主机名与Kubernetes节点名是否一致
check_hostname_and_k8s_node() {

local server_ip=$1
local server_user=$2
local num=$3
# 使用 ssh 连接到服务器并获取Kubernetes节点名
k8s_node_name=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "kubectl get node -o jsonpath='{.items[$num].metadata.name}'")

if [ $? -eq 0 ]; then
# 获取主机名
server_hostname=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "hostname")

# 检查主机名与Kubernetes节点名是否一致
if [ "$server_hostname" = "$k8s_node_name" ]; then
echo -e "$server_ip 主机名与Kubernetes节点名一致。${GREEN}正常${NC}"
else
warn "$server_ip 主机名与Kubernetes节点名不一致。"
fi
else
warn "无法连接$server_ip 服务器或获取Kubernetes节点名。"
fi
}

# 调用函数来检查服务器的主机名与/etc/hosts文件一致性
check_hostname_and_hosts $server1_ip $server1_user
check_hostname_and_hosts $server2_ip $server2_user

# 调用函数来检查服务器的主机名与Kubernetes节点名一致性
check_hostname_and_k8s_node $server1_ip $server1_user 0
check_hostname_and_k8s_node $server2_ip $server2_user 1
echo -e "\n------------------------------------------------------------------------------"

# 磁盘写入速度测试(带缓存)
# disk_speed_test() {
# # local server_ip=$1
# # local server_user=$2
# # local test_file="/tmp/disk_speed_test_file"
# # # 进行写入测试
# # write_speed=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "dd if=/dev/zero of=${test_file} bs=8k count=128 conv=fsync oflag=direct" 2>&1 | tail -n 1)
# # echo -e "${server_ip} 写入速度: $write_speed"
# # # 进行读取测试
# # read_speed=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "sudo dd if=${test_file} of=/dev/null bs=8k count=128 conv=fsync iflag=direct" 2>&1 | tail -n 1)
# # echo -e "${server_ip} 读取速度: $read_speed"

# # # 删除测试文件
# # ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "rm $test_file"
# }

# 测试磁盘读速度的函数
disk_speed_test() {
local server_ip=$1
local test_duration=30 # 测试持续时间(秒)

echo "开始测试 $server_ip 过程会持续30s..."

local time0=$(date "+%s")
cat /dev/null > disk_res

while ((($(date "+%s") - time0) <= test_duration)); do
disk_info=$(ssh "$server_ip" 'dd if=/dev/zero of=output.file bs=8k count=128 conv=fsync 2>&1 1>/dev/null')
io_res=$(echo "$disk_info" | grep --only-matching -E '[0-9.]+ ([MGk]?B|bytes)/[s(ec)?|秒]')
echo "$io_res" >> disk_res
done

local count=$(cat disk_res | wc -l)
local sum=$(cat disk_res | xargs -n2 | awk '{ if ($2 == "kB/秒" || $2 == "kB/s") a+=($1/1024); else a+=$1 } END{printf("%.2f", a)}')
local average_speed=$(awk 'BEGIN{printf "%.2f\n", '$sum'/'$count'}')

echo -e "平均速度 $server_ip: ${RED}$average_speed MB/s${NC}。推荐值:${GREEN}>=200m/s${NC}"
}



# 磁盘写入速度测试(不带缓存)

# disk_speed_test_no_cache() {
# local server_ip=$1
# local server_user=$2
# local test_file="/tmp/disk_speed_test_file"
# # 禁用磁盘缓存
# ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "hdparm -W0 /dev/mapper/centos-root" 2>/dev/null
# # 进行写入测试
# write_speed=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "dd if=/dev/zero of=${test_file} bs=8k count=128 conv=fsync oflag=direct" 2>&1 | tail -n 1)
# echo -e "${server_ip} 写入速度: $write_speed"
# # 进行读取测试
# read_speed=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "sudo dd if=${test_file} of=/dev/null bs=8k count=128 conv=fsync iflag=direct" 2>&1 | tail -n 1)
# echo -e "${server_ip} 读取速度: $read_speed"

# # 删除测试文件
# ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "rm $test_file"
# # 启用磁盘缓存
# ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null $server_user@$server_ip "hdparm -W1 /dev/mapper/centos-root" 2>/dev/null
# }

# 测试磁盘写入速度的函数
disk_speed_test_no_cache() {
local server_ip="$1"
local test_duration=30 # 测试持续时间(秒)

echo "开始测试 $server_ip 过程会持续30s..."

local time0=$(date "+%s")
cat /dev/null > disk_res

while ((($(date "+%s") - time0) <= test_duration)); do
disk_info=$(ssh "$server_ip" 'dd if=/dev/zero of=output.file bs=8k count=128 oflag=direct,nonblock conv=fsync 2>&1 1>/dev/null')
io_res=$(echo "$disk_info" | grep --only-matching -E '[0-9.]+ ([MGk]?B|bytes)/[s(ec)?|秒]')
echo "$io_res" >> disk_res
done

local count=$(cat disk_res | wc -l)
local sum=$(cat disk_res | xargs -n2 | awk '{ if ($2 == "kB/秒" || $2 == "kB/s") a+=($1/1024); else a+=$1 } END{printf("%.2f", a)}')
local average_speed=$(awk 'BEGIN{printf "%.2f\n", '$sum'/'$count'}')

echo -e "平均速度 $server_ip: ${RED} $average_speed MB/s ${NC}。推荐值:${GREEN}>=50m/s${NC}"
}


# 检查 CPU 是否支持 AVX
echo -e "\n${BLUE}检查 CPU 是否支持 AVX:${NC}\n"
check_avx_support() {
local server_ip=$1
local avx_check=$(ssh -q -o BatchMode=yes -o ConnectTimeout=5 -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null root@$server_ip "grep -o 'avx' /proc/cpuinfo")

if [ -n "$avx_check" ]; then
echo -e "$server_ip CPU 支持 AVX (Advanced Vector Extensions)。${GREEN}正常${NC}"
else
warn "$server_ip CPU 不支持 AVX (Advanced Vector Extensions)。"
fi
}

# 调用函数来检查 AVX 支持
check_avx_support $server1_ip
check_avx_support $server2_ip

# 定义要加载镜像的目录列表
load_images() {
image_directories=("/opt/xx/images/product/insight" "/opt/xx/images/product/insight/patch")

# 遍历目录并加载镜像
for directory in "${image_directories[@]}"; do
if [ -d "$directory" ]; then
cd "$directory"
echo "进入目录: $directory"
for file in $(ls . | grep .tgz); do
echo "加载镜像: $file"
docker load < "$file"
done
else
echo "目录 $directory 不存在。"
fi
done
}


# 检查磁盘性能
echo -e "\n${BLUE}磁盘测试:${NC}\n"
# read -p "是否要检查磁盘? (y/n): " confirm
echo "是否要检查磁盘? (y/n):"
read confirm
if [ "$confirm" = "y" ]; then
echo -e "\n${BLUE}磁盘写入速度测试(不带缓存):${NC}\n"
disk_speed_test_no_cache $server1_ip
disk_speed_test_no_cache $server2_ip
echo -e "\n${BLUE}磁盘写入速度测试(带缓存):${NC}\n"
disk_speed_test $server1_ip
disk_speed_test $server2_ip
else
echo "取消磁盘检查。"
fi

# 镜像丢的时候使用
echo -e "\n${BLUE}镜像加载:${NC}\n"
echo "是否要加载 Docker 镜像? (y/n):"
read confirm
# read -p "是否要加载 Docker 镜像? (y/n): " confirm
if [ "$confirm" = "y" ]; then
load_images
else
echo "取消加载 Docker 镜像。"
fi



离职系列 第七篇
离职系列,想想这几年在公司的成长,在这做个记录。此篇主要谈谈LMT时,处理的两次OOM问题。

为啥是两次?因为都很有代表性,一次可以算是三方库使用不当,一次是程序自身的问题。

第一次

JPA使用不当,….待续

第二次

应用频繁重启,….待续

离职系列 第七篇
离职系列,想想这几年在公司的成长,在这做个记录。上一篇服务可用性定位问题常用命令,针对服务可用性常用命令进行了说明,这一篇主要是讲实践案例。

因为我在LMT除了日常管理工作外,额外向领导请求了承担部分现场问题处理,挑的比较陌生且有挑战性的“服务可用性的问题”,想着在实践中学习,并且这部分问题是最麻烦的没有什么标准,往往又是最紧急的,我担起来一是减少了组员上报成本,另一个我更方便直接找到对应的负责人进行沟通,提高效率。所以这块积累的经验更多。

案例分享

简单分享几个案例,一个主要对外(TAC),一个对内(组内研发)。两个文档因为对象不同,使用场景不同,稍有差异。

整理案例,一是为了组内共同学习,二是提供给TAC团队,让其能更多的拦截一些一线提出的较简单的现场问题。

案例1(TAC)

案例2(TAC)

案例3(研发)