雪花算法(JAVA单例不用修改版)
一、简介
雪花算法(Snowflake Algorithm)是一种用于生成分布式系统中唯一ID的算法。起初由Twitter设计,用于解决分布式系统中唯一ID的需求。这一算法的目标是生成全局唯一、有序的64位整数ID,以确保数据不冲突、不重复。
二、雪花算法组成:
固定值:
1bit,最高位是符号位,0 表示正,1 表示负,固定为 0,如果是 1 就是负数了。
时间戳:
41bit,存储毫秒级时间戳(41 位的长度可以使用 69 年)。
标识位(存储机器码):
10bit,上面中的 机器id(5bit)和 服务id(5bit)统一叫作“标识位”,两个标识位组合起来最多可以支持部署 1024 个节点。
序列号:
12bit,用于表示在同一毫秒内生成的多个ID的序号。如果在同一毫秒内生成的ID超过了4096个(2的12次方),则需要等到下一毫秒再生成ID。
三、雪花算法使用中可能存在的问题
1、时钟回拨问题
时钟回拨问题指的是在分布式系统中,服务器的系统时钟发生倒退(即时间被调回到一个较早的时间点),导致系统内时间不连续、时间戳错乱的问题。这在使用时间戳生成唯一ID、排序或者进行协调时,会产生严重的影响。
解决方法:
NTP时钟同步:使用网络时间协议(NTP)服务来保持服务器之间的时钟同步,减少时钟回拨的可能性。
等待时钟追赶:当检测到时钟回拨时(本地存时间戳,每次生产ID时比较,小于当前时间则轮训),算法会暂停生成新的ID,直到系统时钟恢复正常。这种方法简单但可能会导致服务暂停,影响用户体验。
2、分布式生成的ID并非绝对随时间递增
由于在分布式中,各个机器的机器ID不同,导致生成的ID并非绝对随时间递增。
3、服役时间
雪花算法(Snowflake)的服役时间大约为69年。超过该时间可能会出现重复ID。
四、代码实现
1、SnowflakeIdGenerator(ID生成)
import com.alibaba.fastjson.JSONObject;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Random;/*** 雪花ID 生成*/
@Slf4j
public class SnowflakeIdGenerator {private static volatile SnowflakeIdGenerator snowflakeIdGenerator = null;// ==============================字段==============================/*** 机器ID* 唯一性:不支持热更新*/private final long workerId;/*** 数据中心ID* 唯一性:不支持热更新*/private final long datacenterId;/*** 序列号*/private long sequence = 0L;// 配置参数private static final long MAX_WORKER_ID = 31L;private static final long MAX_DATACENTER_ID = 31L;private static final long TIMESTAMP_BITS = 41L;private static final long MAX_TIMESTAMP = ~(-1L << TIMESTAMP_BITS);private static final long WORKER_ID_BITS = 5L;private static final long DATACENTER_ID_BITS = 5L;private static final long SEQUENCE_BITS = 12L;// 位移偏移量private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATACENTER_ID_BITS;private static final long WORKER_ID_SHIFT = SEQUENCE_BITS + DATACENTER_ID_BITS;private static final long DATACENTER_ID_SHIFT = SEQUENCE_BITS;/*** 上次生成ID的时间戳*/private long lastTimestamp = -1L;// ==============================构造函数==============================/*** 构造函数** @param workerId 机器ID (0 - 31)* @param datacenterId 数据中心ID (0 - 31)*/private SnowflakeIdGenerator(long workerId, long datacenterId) {// 检查workerId和datacenterId是否合法validateParameters(workerId, datacenterId);// 设置workerId和datacenterIdthis.workerId = workerId;this.datacenterId = datacenterId;}/*** 无参构造方法* workerId 为根据IP最后一位取值* datacenterId 为 0-31 随即取值*/private SnowflakeIdGenerator() {// 获取ip最后一段String ip = String.valueOf(getIpLastSegment());Random random = new Random();// 确保workId在0-31范围内int workerId = getIpLastSegment() % 32;int datacenterId = random.nextInt(32);log.info("SnowflakeIdGenerator 机器【{}】使用默认配置 workerId={} datacenterId={}", ip, workerId, datacenterId);// 检查workerId和datacenterId是否合法validateParameters(workerId, datacenterId);// 设置workerId和datacenterIdthis.workerId = workerId;this.datacenterId = datacenterId;}// ==============================单例==============================/*** 雪花算法 单例模式*/public static SnowflakeIdGenerator getSnowflakeIdGenerator() {if (snowflakeIdGenerator == null) {synchronized (SnowflakeIdGenerator.class) {if (snowflakeIdGenerator == null) {try {// 初始化snowflakeIdGenerator = configurationInit();} catch (Exception e) {log.error("getSnowflakeIdGenerator 获取 配置文件【snowflakeId】失败", e);// 报错 采用默认算法snowflakeIdGenerator = new SnowflakeIdGenerator();}}}}return snowflakeIdGenerator;}// ==============================方法==============================/*** 获取IP的最后一段*/private static int getIpLastSegment() {int ipLastSegment;String hostAddress = null;try {hostAddress = InetAddress.getLocalHost().getHostAddress();String[] segments = hostAddress.split("\\.");if (segments.length < 4) {throw new IllegalArgumentException("getIpLastSegment 不是IPV4地址 hostAddress=" + hostAddress);}ipLastSegment = Integer.parseInt(segments[3]);} catch (IllegalArgumentException e) {log.error("getIpLastSegment 获取IP 失败", e);Random random = new Random();ipLastSegment = random.nextInt(256);log.error("getIpLastSegment hostAddress={} 获取随即ip={}", hostAddress, ipLastSegment);} catch (UnknownHostException e) {log.error("getIpLastSegment 获取IP 失败", e);Random random = new Random();ipLastSegment = random.nextInt(256);}return ipLastSegment;}/*** 生成唯一ID** @return 唯一ID*/public synchronized long generateUniqueId() {long timestamp = getCurrentTimestamp();// 检查时钟回退情况if (timestamp < lastTimestamp) {throw new RuntimeException("Clock moved backwards. Refusing to generate ID.");}// 同一毫秒内自增序列号if (timestamp == lastTimestamp) {sequence = (sequence + 1) & ((1 << SEQUENCE_BITS) - 1); // 自增并掩码if (sequence == 0) {// 序列号用完,等待下一毫秒timestamp = getNextTimestamp(lastTimestamp);}} else {sequence = 0L; // 不同毫秒重置序列号}lastTimestamp = timestamp;// 组合生成唯一IDreturn ((timestamp << TIMESTAMP_SHIFT) |(datacenterId << DATACENTER_ID_SHIFT) |(workerId << WORKER_ID_SHIFT) |sequence);}/*** 获取当前时间戳** @return 当前时间戳(毫秒)*/private long getCurrentTimestamp() {return System.currentTimeMillis();}/*** 等待下一毫秒,直到获得新的时间戳** @param lastTimestamp 上次生成ID的时间戳* @return 新的时间戳*/private long getNextTimestamp(long lastTimestamp) {long timestamp = getCurrentTimestamp();while (timestamp <= lastTimestamp) {timestamp = getCurrentTimestamp();}return timestamp;}/*** 初始化*/private static SnowflakeIdGenerator configurationInit() {// 获取ip最后一段String ip = String.valueOf(getIpLastSegment());// 配置类似于 { "39": { "workerId": 13, "datacenterId": 13 } }String config = SpringBootBeanUtil.getEnvironment().getProperty("snowflakeId.config");log.info("configurationInit 机器【{}】的配置 snowflakeId.config={}", ip, config);// 没有配置项 用默认的if (StringUtils.isBlank(config)) {return new SnowflakeIdGenerator();}JSONObject json = JSONObject.parseObject(config).getJSONObject(ip);// 没有配置项 用默认的if (json == null) {log.info("configurationInit 机器【{}】的 无配置", ip);return new SnowflakeIdGenerator();}// 校验配置参数 如果没问题就使用配置的参数Parameter parameter = json.toJavaObject(Parameter.class);Integer workerId = parameter.getWorkerId();Integer datacenterId = parameter.getDatacenterId();log.info("configurationInit 机器【{}】的 workerId={} datacenterId={}", ip, workerId, datacenterId);if (workerId == null || datacenterId == null) {throw new IllegalArgumentException("configurationInit 机器【" + ip + "】配置有误");}validateParameters(workerId, datacenterId);return new SnowflakeIdGenerator(workerId, datacenterId);}/*** 校验机器ID是否合法*/private static void validateParameters(long workerId, long datacenterId) {if (workerId > MAX_WORKER_ID || workerId < 0) {throw new IllegalArgumentException("Invalid workerId: " + workerId);}if (datacenterId > MAX_DATACENTER_ID || datacenterId < 0) {throw new IllegalArgumentException("Invalid datacenterId: " + datacenterId);}}/*** 配置参数*/@Dataprivate static class Parameter {private Integer workerId;private Integer datacenterId;}
}
2、SpringBootBeanUtil(用于获取配置文件)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;@Component
@SuppressWarnings("unused")
public class SpringBootBeanUtil implements ApplicationContextAware, EnvironmentAware {private static final Logger log = LoggerFactory.getLogger(SpringBootBeanUtil.class);private static ApplicationContext applicationContext;private static Environment environment;@Overridepublic void setEnvironment(final Environment environment) {if (SpringBootBeanUtil.applicationContext == null) {SpringBootBeanUtil.environment = environment;}}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {if (SpringBootBeanUtil.applicationContext == null) {SpringBootBeanUtil.applicationContext = applicationContext;}log.info("========ApplicationContext配置成功========");log.info("========在普通类可以通过调用SpringUtils.getAppContext()获取applicationContext对象========");log.info("========applicationContext=" + SpringBootBeanUtil.applicationContext + "========");}/*** 获取applicationContext** @return ApplicationContext*/public static ApplicationContext getApplicationContext() {return applicationContext;}/*** 获取applicationContext** @return ApplicationContext*/public static Environment getEnvironment() {return environment;}/*** 通过name获取 Bean.** @param name name* @return Object Object*/public static Object getBean(String name) {return getApplicationContext().getBean(name);}/*** 通过class获取Bean.** @param clazz clazz* @return T*/public static <T> T getBean(Class<T> clazz) {return getApplicationContext().getBean(clazz);}/*** 通过name,以及Clazz返回指定的Bean** @param name name* @param clazz clazz* @return T*/public static <T> T getBean(String name, Class<T> clazz) {return getApplicationContext().getBean(name, clazz);}
}
3、application.properties(配置文件,一般放到配置中心区)
# 机器的 IPV4 最后一段为 39 就是说 机器 39 的 workerId = 13 datacenterId = 13 后面的同上
snowflakeId.config={"39":{"workerId":13,"datacenterId":13},"40":{"workerId":14,"datacenterId":14}}
4、SnowflakeIdTest
package com.example.test;import com.example.test.utils.SnowflakeIdGenerator;
import com.example.test.utils.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;@SpringBootTest
@Slf4j
public class SnowflakeIdTest {@Testpublic void testId() throws InterruptedException {// 根据配置文件 单例构建一个 雪花算法实例 如果没有配置则根据IP和随机数生产 机器ID(10bit)SnowflakeIdGenerator snowflakeIdGenerator = SnowflakeIdGenerator.getSnowflakeIdGenerator();// 线程池 最大线程数 为 200 最大 为 500// tomcat 最大线程数 为 200ThreadPoolExecutor threadPool = ThreadPoolUtil.getThreadPool();CountDownLatch countDownLatch = new CountDownLatch(10000);// 模拟并发// 由于加了线程池有等待,拿到雪花算法单例后生成ID也有锁,所以本方法的性能其实并不高,只是方便各位看到该生成方法ID的唯一性for (int i = 0; i < 10000; i++) {threadPool.execute(() -> {long id = snowflakeIdGenerator.generateUniqueId();log.info("id={}", id);countDownLatch.countDown();});}countDownLatch.await();}
}
5、ThreadPoolUtil(线程池)
package com.example.test.utils;import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** 线程池工具类,单例模式,全局唯一* @author ZHAOPINGAN*/
public class ThreadPoolUtil {private static volatile ThreadPoolExecutor threadPool = null;private ThreadPoolUtil(){}public static ThreadPoolExecutor getThreadPool() {if (threadPool == null) {synchronized (ThreadPoolUtil.class) {if (threadPool == null) {threadPool = new ThreadPoolExecutor(// 核心线程数200,500,// 10s 过期时间10000L, TimeUnit.MILLISECONDS,// 队列 1000new LinkedBlockingQueue<>(10000),// 线程池工厂Executors.defaultThreadFactory(),// 拒绝策略 丢弃任务,并抛出拒绝执行 RejectedExecutionException 异常信息new ThreadPoolExecutor.AbortPolicy());}}}return threadPool;}
}