中词频查询:
select count(*) from test01 where has_token(log_msg, '1978979513');
高词频查询:
select count(*) from test01 where has_token(log_msg, '1664553600');
可以看到,在中低词频的检索中,对比于TokenBloomFilter,TokenBitMap索引的查询性能更好,在需要扫描的数据量和查询消耗的CPU时间方面优势更加明显。不过在实际的日志排障使用场景中,考虑到最近的日志数据在ClickHouse有存储加速,Iceberg日志数据主要满足历史以及跨天日志数据排障,查询频次较低,我们更关注存储成本的代价,所以对于大部分日志数据,只创建TokenBloomFilter索引,只对少部分查询频次较高,性能要求较高的日志数据构建TokenBitMap索引。
8、进一步的探索
日志数据除了如timestamp/app_id等公共字段及log_msg文本字段,通常还会在数据入湖过程中抽取出不同业务各自的私有字段用于日志查询时更方便的检索过滤,这些私有字段各业务皆不相同且可能动态变化,所以通常使用Map或者Json类型字段存储,对于此类字段,如何更好地利用过滤条件进行Data Skipping,是我们进一步探索的方向,我们在这方面的工作如下:
- 支持基于map_keys(col)/map_values(col)表达式创建索引,此索引可以用于常见Map类型过滤条件element_at的Data Skipping,例如对于过滤条件element_at(col, 'key1') = 'v1', 可以首先使用基于map_keys(col)生成的索引判断‘key1‘是否在文件中存在,然后使用基于map_values(col)的索引判断‘v1’是否在文件中存在;
- 如果用户日志查询只会经常使用某一个key值做过滤,则可以直接基于element_at(col, 'key1')表达式创建索引,只从Map中抽取‘key1’对应的value构建索引,从而减少索引大小,提升索引过滤效果;
- 支持基于json_scalar_extract($json_path)表达式创建索引,用户可以使用此方式从json字段中抽取常见内部字段构建索引,在查询时,如果使用对应json路径抽取的字段作为过滤条件,则可以通过索引判断是否可以跳过扫描文件。
9、计算下推
当前log-agent主要以物理机部署为主,即B站几乎所有机器上都部署了log-agent服务。当前log-agent支持多种input/processor/output等。
为了减少后端资源的使用,我们可以在log-agent上执行一部分简单的计算,把后端的计算卸载到相关节点上,把物理机上的闲散资源利用起来。其中比较典型的玩法是支持下推非结构化/半结构化日志解析为结构化日志,我们通过不同的参数配置可以让相关转换是在消费端进行还是采集端进行。现在只有小部分因为相关机器资源使用要求,我们计算还是在消费端专门的消费服务进行解析,大部分日志的结构化转换我们都已经在log-agent完成。
10、消费调度
1)旨在解决的问题
考虑到容灾和可用性要求,我们在3.0中的基本思路是按高优集群 专用集群 通用大集群的方式进行数据分流。
- log-agent可以根据AppID StreamID路由规则进行调度到不同的log-gateway集群。默认情况下,高优日志进入kafka-proxy-high集群,没有特殊要求的日志进入到日志大集群(绝大部分日志都在这个集群), 另外有特殊场景要求的,比如极高优要求完全不想被其他人影响的,值得专门部署一套链路的,我们也支持专用集群,但原则上我们尽量会避免,因为这在资源利用率上并不会有很好的效果。对于出现任意集群出现不稳定时,我们优先会考虑对集群快速弹性的扩容(log-gateway是无状态的), 当扩容不能解决问题时,我们可以快速将该集群的流量一部分或所有切到其他集群中;
- log-gateway可以根据AppID StreamID维度路由规则进行调度到不同的kafka集群。同样我们把kafka分成了高优/专用/通用大集群,绝大部分日志会进通用大集群。由于kafka是一个有状态服务,加之其相关设计实现弹性扩缩容能力并不太理想。在这个层面我们会优先把相关日志流调度到其他集群,同时配合下游log-consumer的扩容;
- kafka topic层面我们同样采用大 小的方式,对于一些特别大,或优先级高的我们会拆分单独的topic(这里提一点在我们的架构下,把一个或多个流拆分到其他topic是很简单的事情);对于一般的日志流我们会根据资源使用相对均匀得拆到到N个topic里面。采用大 小的主要是成本 容灾之间的tradeoff;
- log-consumer同样是一个无状态服务,采用golang编写,容器化部署,整体资源使用率比同样场景的flink至少少50%。可以实现方便的弹性伸缩,同时可以根据路由规则动态消费不同的topic以实现充分的资源均衡利用。
该方案上线之后效果显著,年初频繁因为业务突增流量导致整个日志链路整体不可用的情况得到很好的抑制, 半年来未发生因为这块出现相关故障。
2)打通大数据体系
得益于我们架构上采用了iceberg这种table format,打通B站大数据体系变得容易起来。下面简单提一下批处理场景和流处理场景。
①批处理场景分区提交
这个策略是基于Kafka消费延迟和写入延迟的双重指标来动态提交Hive分区。
- 监控写入程序的消费延迟:这是初始步骤,需要计算日志的上报时间和写入存储的时间差,这样就可以得到日志在实际被写入之前的延迟时间。这是一项关键的度量,因为它可以了解数据从接收到实际写入存储的耗时。
- 监控 Kafka 的消费 lag:观察到数据消费存在延迟时,对比消费延迟时间和消费端的吞吐量,可以预估出一个延迟数据被消费掉的时间。
- 结合写入延迟和Kafka的lag:在这个阶段,我们结合写入延迟和Kafka的lag,以及预定的提交延迟阈值,来决定是否提交Hive分区。可以设定一个规则,如果写入延迟和Kafka的lag都超过了预设的阈值,那么就提交该分区。
②流处理场景分区提交
Flink侧是使用Flink作为观察者发送消息通知,观察者为Iceberg端,被观察者分区是否就绪是引擎端可以直接感知的事情。具体的感知方式会因不同的引擎而异。对于Flink,我们可以利用Watermark这个概念感知分区是否就绪。当分区就绪后,我们可以注册一个事件处理函数和对应的事件类型——在我们的例子中,是实现了Flink自带的PartitionCommitPolicy的CommitPolicy。在CommitPolicy中,我们实现具体的commit逻辑,即调用调度平台API以实现分区就绪的通知机制。
具体实现这一设计思路需要对Flink写入Iceberg的线程模型进行修改。我们可以在IcebergStreamWriter算子的prepareSnapshotPreBarrier阶段增加分区处理逻辑,并把分区信息发送到下游IcebergFilesCommitter算子。这些新的分区信息(我们称之为pendingPartition)被存储在一个Set中,等待提交。当这些pendingPartitions满足提交条件后,我们将其从Iterator中移除。
分区处理逻辑的实现借鉴了Hive connector的做法。在checkpoint完成时,我们将可提交的分区(committablePartition)发送到下游的IcebergFilesCommitter算子。IcebergFilesCommitter收到committablePartition后,会将这些committablePartition加到pendingPartitions里。
当分区就绪时,我们会调用Archer(B站DAG 任务调度平台) API完成消息通知。为了在批量计算过程中支持 Iceberg 表,我们需要设计一套在分区就绪后进行消息通知的策略,分区就绪的标志分为两部分,一部分是观察分区就绪的条件,另一部分是分区就绪后的消息通知设计。消息通知设计的时候,主要考虑在分区就绪的时候,在哪个层面通知 Archer 调度下游任务,其中包含两种设计思路:一种是将 Flink 作为观察者发送消息通知,另一种是将 Iceberg 作为观察者发送消息通知。
在 Flink 观察者模式下,分区就绪的标志是引擎测可以直接感知的,具体的感知方式会因不同的引擎而有所不同,对于 Flink,我们可以使用 watermark 这个概念来感知分区是否就绪。在分区就绪后,我们可以注册一个事件处理函数和对应的事件类型 ArcherCommitPolicy(实现了 Flink 自带的 PartitionCommitPolicy),并且在 ArcherCommitPolicy 里实现具体的 commit 逻辑,即调用 Archer API 来实现分区就绪的通知机制。由于 Iceberg 是基于文件级别进行统计的,所以我们可以在文件级别获取到对应的分区信息。
11、日志聚类
我们加强了日志分析的能力,帮助用户进行更好的日志排障。在服务出现问题时候,通常ERROR的日志量会暴增,不利于问题的定位,使用我们的轻量级日志聚类功能,可以将相似度高的日志聚合,做到秒级返回日志聚类,迅速理解日志全景,提升问题定位效率。
日志聚类在DevOps中可以被应用于问题定位和版本比对,这对于快速发现异常日志和定位问题是非常有帮助的。主要的设计需求包括:
- 聚类过程需要尽可能快,而且结果应非常稳定。换言之,聚类的类别和结果不应有波动。
- 需要能够保证日志模式的一致性,以便在不同的时间段内,通过日志类别查看其波动和变化。
设计思路是结合阿里云和观测云的日志聚类功能。阿里云采用全量日志聚类,将所有日志数据通过聚类模型获取其模式。这需要消耗大量的计算资源,且模式和索引需要落盘,从而增加了约10%的日志存储。观测云则选择对部分日志进行聚类,它查询限定时间范围内的1w条日志数据进行聚类,因此其聚类结果可能不完全稳定,同时也无法进行日志对比。
因此,我们的目标是在需求更少的资源的同时,获得更丰富且更稳定的聚类结果。
我们可以用下面这张图来理解日志聚类所做的工作:
日志模式解析过程可以理解为是一个倒推日志打印代码的过程,也是一个对日志聚类的过程(相同pattern的日志认为是同一类日志)。
算法思路设计:
被同一条代码打印出来的日志肯定是相似的,所以我们可以得到第一种模式解析的思路,给出文本相似度公式或距离公式,通过聚类算法,将相同模式的日志聚到一起,
然后再获取日志模板,业界基于聚类的日志模式解析算法,如Drain3、Lenma、Logmine、SHISO等。但在实际聚类过程中会往往存在很多的问题,聚类速度慢,大量的pattern类别、全量计算消耗大量资源等问题,
我们设计了基于固定深度解析树的思路,多个子pattern进行层次融合的方式,结合代码行号等特征对聚类速度和精度进行加速聚类
整体算法步骤分为以下几个部分:
预处理的获取日志平台表达式查询后的全部日志数据,(对于超过10w条的日志进行采样)在对日志进行解析前,都会先进行分词,因为词是表达完整含义的最小单位,将一些特殊词,如IP地址、时间等给识别出来,
然后替换为特殊字符或去掉,这是由于这些特殊词明显是参数,如此处理可以有效提高相同模式日志的相似度。提取日志消息对应的 日志行号特征数据
聚类的简单过程如下,我们首先构建一个固定深度的解析树,对于日志进行聚类:
- 根据日志的长度分组和日志行号等以及根据日志的前几个单词分组,树深度决定了用前多少个单词进行分组。
- 解析树的上层节点以日志行号特征和日志消息的长度(token的数量)区分日志组,根据预处理后的日志消息前几个单词依次向下搜索,直到叶子节点。叶子节点下存储着该组别中的聚类簇,
搜索到叶子节点后再计算相似度,根据相似度计算结果更新子聚类中心或者创建新的聚类子簇。
相似度计算逻辑如下, 在找到simSeq最大的日志组后,将其与自适应的相似度阈值st进行比较,如果simSeq≥st,那么就会返回该组作为最佳匹配。
- 更新解析树,将每个日志消息解析为字段,并按照固定深度树的结构进行插入。每个字段都对应树中的一个节点,如果节点已存在,则更新节点的统计信息;如果节点不存在,则创建新节点,对于匹配上的子pattern。
描日志消息和日志事件相同位置的token,如果两个token相同,则不修改该token位置上的token。否则,在日志事件中通过通配符*更新该token位置上的token。
- 层次融合,对于相似的pattern进行融合,结合LCS(最大公共子序列)的思路进行融合,将改善聚类效果,比如使同一行号下不同的pattern和不同行号特征下的子pattern聚类得到公共pattern。
- 模型保存与推理,聚类后的模型按appid进行保存,在后续实时日志聚类推理过程中,将直接日志消息与模型的解析流程进行匹配,未匹配上的日志将实时更新聚类的模型。
下面是日志聚类的效果: