当前位置: 首页 > news >正文

HeavyKeeper 算法

HeavyKeeper 算法介绍与原理

原代码go语言实现出自Topk
HeavyKeeper是一种高效的流式TopK检测算法,专为识别大规模数据流中的频繁项(热点Key)而生,它基于Count-Min Sketch算法改进,主要通过以下组件实现:

  1. 二维数组:算法维护一个二维数组,里面有 d 个数组,每个数组里有 w 个桶,桶里记录哈希指纹和计数值。

  2. 计数衰减机制:核心创新点,当发生哈希冲突时,不是简单的覆盖,而是通过概率衰减原有计数。

  3. 堆结构:维护一个大小为 k 的最小堆,用于记录当前观测到的TopK项。

当一个Key到达时:

  1. 对Key应用d个哈希函数,映射到d个数组中的对应桶

  2. 对每个桶:

  • 如果桶为空或已存储的哈希指纹与当前哈希指纹相同,增加计数器

  • 如果发生冲突,以概率P(decay) = 1/(b^C)衰减已有计数,b为衰减因子,C 为计数值

维护最小堆,保留最大的k个计数项

public class HeavyKeeper implements TopK {// 查找表大小,用于存放衰减概率private static final int LOOKUP_TABLE_SIZE = 256;private final int k;  // Top-K 的数量private final int width;  // 每层桶的宽度private final int depth;  // 总共的哈希层数private final double[] lookupTable;  // 衰减概率查找表private final Bucket[][] buckets;  // 哈希桶二维数组private final PriorityQueue<Node> minHeap;  // 最小堆用于维护前K个高频项private final BlockingQueue<Item> expelledQueue;  // 被移出 Top-K 的队列private final Random random;  // 用于概率衰减private long total;  // 总加入项的个数private final int minCount;  // 进入 Top-K 的最小频率门槛// 构造函数public HeavyKeeper(int k, int width, int depth, double decay, int minCount) {this.k = k;this.width = width;this.depth = depth;this.minCount = minCount;// 初始化查找表,存储每个 count 下的衰减概率this.lookupTable = new double[LOOKUP_TABLE_SIZE];for (int i = 0; i < LOOKUP_TABLE_SIZE; i++) {lookupTable[i] = Math.pow(decay, i);}// 初始化桶this.buckets = new Bucket[depth][width];for (int i = 0; i < depth; i++) {for (int j = 0; j < width; j++) {buckets[i][j] = new Bucket();}}// 初始化最小堆和其他结构this.minHeap = new PriorityQueue<>(Comparator.comparingInt(n -> n.count));this.expelledQueue = new LinkedBlockingQueue<>();this.random = new Random();this.total = 0;}// 返回当前 Top-K 列表@Overridepublic List<Item> list() {synchronized (minHeap) {List<Item> result = new ArrayList<>(minHeap.size());for (Node node : minHeap) {result.add(new Item(node.key, node.count));}// 按频率降序排序result.sort((a, b) -> Integer.compare(b.count(), a.count()));return result;}}// 返回被移出 Top-K 的项@Overridepublic BlockingQueue<Item> expelled() {return expelledQueue;}// 数据衰减操作(定期调用)@Overridepublic void fading() {// 所有桶的计数都右移一位(除以2)for (Bucket[] row : buckets) {for (Bucket bucket : row) {synchronized (bucket) {bucket.count = bucket.count >> 1;}}}// Top-K 小堆的值也同步衰减synchronized (minHeap) {PriorityQueue<Node> newHeap = new PriorityQueue<>(Comparator.comparingInt(n -> n.count));for (Node node : minHeap) {newHeap.add(new Node(node.key, node.count >> 1));}minHeap.clear();minHeap.addAll(newHeap);}total = total >> 1;}// 返回总项数@Overridepublic long total() {return total;}// 桶结构:记录指纹和频率private static class Bucket {long fingerprint;int count;}// 小堆节点private static class Node {final String key;final int count;Node(String key, int count) {this.key = key;this.count = count;}}// MurmurHash32 哈希函数private static int hash(byte[] data) {return HashUtil.murmur32(data);}// 添加元素逻辑@Overridepublic AddResult add(String key, int increment) {byte[] keyBytes = key.getBytes();long itemFingerprint = hash(keyBytes);int maxCount = 0;// 遍历每层哈希表for (int i = 0; i < depth; i++) {int bucketNumber = Math.abs(hash(keyBytes)) % width;Bucket bucket = buckets[i][bucketNumber];synchronized (bucket) {if (bucket.count == 0) {// 桶是空的,直接填入bucket.fingerprint = itemFingerprint;bucket.count = increment;maxCount = Math.max(maxCount, increment);} else if (bucket.fingerprint == itemFingerprint) {// 命中同一个元素,累加计数bucket.count += increment;maxCount = Math.max(maxCount, bucket.count);} else {// 不同元素,进行概率衰减for (int j = 0; j < increment; j++) {double decay = bucket.count < LOOKUP_TABLE_SIZE ?lookupTable[bucket.count] :lookupTable[LOOKUP_TABLE_SIZE - 1];// 随机衰减if (random.nextDouble() < decay) {bucket.count--;if (bucket.count == 0) {// 替换为当前项bucket.fingerprint = itemFingerprint;bucket.count = increment - j;maxCount = Math.max(maxCount, bucket.count);break;}}}}}}// 总计数累加total += increment;// 如果未达到最小门槛,不加入 Top-Kif (maxCount < minCount) {return new AddResult(null, false, null);}// 尝试加入 Top-K 小堆synchronized (minHeap) {boolean isHot = false;String expelled = null;// 如果已存在,更新它Optional<Node> existing = minHeap.stream().filter(n -> n.key.equals(key)).findFirst();if (existing.isPresent()) {minHeap.remove(existing.get());minHeap.add(new Node(key, maxCount));isHot = true;} else {// 不存在,则判断是否可以进入 Top-Kif (minHeap.size() < k || maxCount >= Objects.requireNonNull(minHeap.peek()).count) {Node newNode = new Node(key, maxCount);if (minHeap.size() >= k) {expelled = minHeap.poll().key;expelledQueue.offer(new Item(expelled, maxCount));}minHeap.add(newNode);isHot = true;}}return new AddResult(expelled, isHot, key);}}
}

相关文章:

  • Python类和对象一(十)
  • STM32 HAL 水位传感器驱动程序
  • MySQL -数据类型
  • leetcode0078. 子集-medium
  • (undone) 吴恩达版提示词工程 2. 指南
  • Sentinel源码—6.熔断降级和数据统计的实现一
  • 【kubernetes】pod资源配额
  • 5G网络切片:精准分配资源,提升网络效率的关键技术
  • 基于 LWE 的格密码python实战
  • (done) 吴恩达版提示词工程 1. 引言 (Base LLM 和 Instruction Tuned LLM)
  • visual studio无法跳转到函数定义、变量定义、跳转函数位置不准问题解决
  • Java链表反转方法详解
  • 注意力机制(np计算示例)单头和多头
  • 信息系统项目管理工程师备考计算类真题讲解五
  • 用python脚本怎么实现:把一个文件夹里面.png文件没有固定名称,复制到另外一个文件夹按顺序命名?
  • 基于Django的AI客服租车分析系统
  • Linux 常用命令 -pkill【通过进程名或其他属性来发送信号给一个或多个进程】
  • 2025.4.14-2025.4.20学习周报
  • 宝塔面板部署 Dify-latest 最新版本
  • QML 字符串格式化
  • 同济研究生开发AI二维码拿下大奖,新一代00开发者掀起AI创业潮
  • 2025年度“沪惠保”将于4月22日开售,保费不变
  • 习近平致电祝贺诺沃亚当选连任厄瓜多尔总统
  • 一周观展|上海,一系列特展大展渐次呈现
  • 语言天才、魔方大师,击败王楚钦前他豪言:我能比中国球员强
  • 中央和国家机关工委建立健全整治形式主义为基层减负长效机制