运筹学之粒子群算法
目录
- 一、历史
- 二、精髓思想
- 三、案例与代码实现
一、历史
- 问:谁在什么时候提出粒子群算法?
- 答:粒子群算法(Particle Swarm Optimization,PSO)是由 James Kennedy(詹姆斯·肯尼迪) 和 Russell Eberhart(拉塞尔·埃伯哈特) 于 1995年 提出来的。他们最初是受到鸟群觅食行为的启发,想模拟群体智能(Swarm Intelligence),从而发展出这套优化算法。
- 论文:Kennedy, J., & Eberhart, R. (1995). Particle Swarm Optimization. Proceedings of the IEEE International Conference on Neural Networks (ICNN), 1995.
- 动机:解决函数优化问题,尤其是 非线性、连续、多维的复杂优化问题。
二、精髓思想
- 一言以蔽之:三人行,必有我师焉!
- 解释:“个体经验 + 群体智慧 = 高效全局搜索”。
- 通俗:班级里面各位都要向成绩好的同学学习,但是各位同学结合自身情况学习,也不能否定自己的努力,还要有冲劲,相信最终咱们班一定会有考上清华的!
总结下来其实就4个step:
- 初始化一堆粒子(有个班级)
- 计算每个粒子历史最优(和自己比)
- 找到这堆粒子全局最优(和大家比)
- 反思、有动力,再考试再排名(重新洗牌)
三、案例与代码实现
-
题干:
6艘船靠3个港口,已知达到时间arrival和卸货时长handling,求最优停靠方案,即等待时长最短。 -
数据:
ships = {'S1': {'arrival': 0.1, 'handling': 4},'S2': {'arrival': 2.1, 'handling': 3},'S3': {'arrival': 4.2, 'handling': 2},'S4': {'arrival': 6.3, 'handling': 3},'S5': {'arrival': 5.5, 'handling': 2},'S6': {'arrival': 3.9, 'handling': 4},'S7': {'arrival': 3.7, 'handling': 4},'S8': {'arrival': 3.4, 'handling': 4},
}
- 实现:
- step1:初始化一堆粒子(一个所有船舶靠港顺序和对应港口的方案)
num_particles = 30 # 粒子数(生成30套方案)
ship_ids = list(ships.keys()) # 船名# 初始化粒子
def init_particle():order = ship_ids.copy()random.shuffle(order)ports = [random.randint(0, num_ports - 1) for _ in range(num_ships)]return order, portsparticles = [init_particle() for _ in range(num_particles)] # 随机初始化位置(船调度顺序 + 泊位分配)# 输出:[(['S1', 'S3', 'S6', 'S7', 'S5', 'S2', 'S4', 'S8'], [2, 0, 1, 2, 2, 1, 1, 1]),...]
- step2:计算每个粒子历史最优(和自己比)
首先,定义一下评估函数或者叫做自适应度函数,如下:
# 适应度函数:最小化总等待时间
def evaluate(order, ports):berth_end_time = [0.0] * num_ports # 每个港口当前结束时间total_wait = 0.0for i, ship_id in enumerate(order):port = ports[i]arrival = ships[ship_id]['arrival']handling = ships[ship_id]['handling']# start_time = 开始卸货时间 = max(到达时间, 泊位可用时间)。# 解释:要卸货的话,船先到达了没有泊位可用也得等,反之,有空泊位了船还没到也得等。start_time = max(arrival, berth_end_time[port])wait_time = start_time - arrivaltotal_wait += wait_timeberth_end_time[port] = start_time + handlingreturn total_wait
然后,找到自身的历史最优,那不就是重新洗牌后与上一次的对比嘛~
重新洗牌放在步骤4来讲。
# 假设particles是重新洗牌后的结果,已和上次的对比完了的结果。
p_best = copy.deepcopy(particles) # 每个个体历史最优
p_best_scores = [evaluate(*p) for p in particles] # 每个个体历史最优的得分
- step3:找到这堆粒子全局最优(和大家比)
g_best = p_best[np.argmin(p_best_scores)] # 全局最优粒子
g_best_score = min(p_best_scores) # 全局最优粒子得分
- step4:反思、有动力,再考试再排名(重新洗牌)
上面有了自身最优和全局最优,那就有了参考,小明铆足一股冲劲,开始暗暗发力。
首先,他开始反思,不能妄自菲薄全盘否定自己,自身还是有可取之处的(个体学习因子),当然,别人成绩优秀是要向他学习的(群体学习因子),下了多大的决心或者铆了多少劲要有个数(惯性权重)
w = 0.6 # 惯性权重
c1 = 1.4 # 个体学习因子
c2 = 1.4 # 群体学习因子
然后,直接贴下公式吧,如下:
-
连续空间情况
v i t + 1 = w ⋅ v i t + c 1 ⋅ r 1 ⋅ ( p i b e s t − x i t ) + c 2 ⋅ r 2 ⋅ ( g b e s t − x i t ) v_{i}^{t+1} = w \cdot v_i^t + c_1 \cdot r_1 \cdot (p_{i}^{best} - x_i^t) + c_2 \cdot r_2 \cdot (g^{best} - x_i^t) vit+1=w⋅vit+c1⋅r1⋅(pibest−xit)+c2⋅r2⋅(gbest−xit)
x i t + 1 = x i t + v i t + 1 x_i^{t+1} = x_i^t + v_i^{t+1} xit+1=xit+vit+1 -
v i v_i vi: 第 i i i 个粒子的速度(搜索方向)
-
x i x_i xi: 第 i i i 个粒子的位置(一个解)
-
p i b e s t p_i^{best} pibest: 第 i i i 个粒子的历史最好解
-
g b e s t g^{best} gbest: 全局最优解
-
w w w: 惯性权重(控制粒子的“惯性”)
-
c 1 , c 2 c_1, c_2 c1,c2: 学习因子(控制粒子对个体/群体经验的依赖程度)
-
r 1 , r 2 ∈ [ 0 , 1 ] r_1, r_2 \in [0,1] r1,r2∈[0,1]: 两个随机数,增加探索性
连续空间直接套用该公式即可。
- 离散空间
由于组合问题(如排序或泊位分配)无法直接进行“加减”,所以我们引入“交换操作序列” + “泊位扰动”模拟速度和更新。
通俗解释就是:
- 当前方案变成自身历史最优方案,该怎么做?
- 当前方案变成全局最优方案,该怎么做?
# 获取交换序列,p1通过改变自身顺序从而变成p2模样
# 例子:p1 = ['S1', 'S2', 'S3', 'S4'], p2 = ['S3', 'S1', 'S4', 'S2']
# 想从 p1 变成 p2,要怎么换?我们一步步找出交换动作:
# 第0位期望是 S3,当前是 S1 → 找到 S3 在第2位 → 交换 (0, 2)
# 现在顺序变为:['S3', 'S2', 'S1', 'S4']
# 第1位期望是 S1,当前是 S2 → 找到 S1 在第2位 → 交换 (1, 2)
# 现在变为:['S3', 'S1', 'S2', 'S4']
# 第2位期望是 S4,当前是 S2 → 找到 S4 在第3位 → 交换 (2, 3)
# 现在就是目标顺序了!
# 所以返回的交换序列是:[(0, 2), (1, 2), (2, 3)]
def get_order_swaps(p1, p2):swaps = []temp = p1.copy()for i in range(num_ships):if temp[i] != p2[i]:j = temp.index(p2[i])swaps.append((i, j))temp[i], temp[j] = temp[j], temp[i]return swaps# 停船顺序、对应港口
order, ports = particles[i]
# 计算新速度
order_swap = get_order_swaps(order, p_best[i][0])
order_swap_g = get_order_swaps(order, g_best[0])
有了这些经验,下一步就是根据这些自身、全局经验确定到底该怎么做,如下:
# 从个体最优和全局最优中随机截取一部分交互序列
r1, r2 = random.random(), random.random()
new_order_swap = order_swap[:int(c1 * r1 * len(order_swap))] + \
order_swap_g[:int(c2 * r2 * len(order_swap_g))]
相当于拼合出一个新的交换顺序,和上述连续空间的公式概念上很像,但是少了惯性w的表达。也对,毕竟是离散的,如果是连续的才有惯性一说吧~
还需要加点随机扰动,有助于跳出局部最优解,如下:
# 随机扰动一下港口
new_ports = update_ports(ports, [])
最后,进行粒子位置的更新,如下:
# 更新粒子位置
new_order = apply_swaps(order, new_order_swap)
particles[i] = (new_order, new_ports)# 评价新位置
score = evaluate(new_order, new_ports)
附上完整代码:
import random
import numpy as np
import copy# 船舶数据
ships = {'S1': {'arrival': 0.1, 'handling': 4},'S2': {'arrival': 2.1, 'handling': 3},'S3': {'arrival': 4.2, 'handling': 2},'S4': {'arrival': 6.3, 'handling': 3},'S5': {'arrival': 5.5, 'handling': 2},'S6': {'arrival': 3.9, 'handling': 4},'S7': {'arrival': 3.7, 'handling': 4},'S8': {'arrival': 3.4, 'handling': 4},
}
ship_ids = list(ships.keys()) # 船名
num_ships = len(ship_ids) # 船数
num_ports = 3 # 港口数# PSO 参数
num_particles = 30 # 粒子数
num_iterations = 100 # 迭代次数
w = 0.6 # 惯性权重
c1 = 1.4 # 个体学习因子
c2 = 1.4 # 群体学习因子# 解的初始化
def init_particle():order = ship_ids.copy()random.shuffle(order)ports = [random.randint(0, num_ports - 1) for _ in range(num_ships)]return order, ports# 适应度函数:最小化总等待时间
def evaluate(order, ports):berth_end_time = [0.0] * num_ports # 每个港口当前结束时间total_wait = 0.0for i, ship_id in enumerate(order):port = ports[i]arrival = ships[ship_id]['arrival']handling = ships[ship_id]['handling']# start_time = 开始卸货时间 = max(到达时间, 泊位可用时间)。# 解释:要卸货的话,船先到达了没有泊位可用也得等,反之,有空泊位了船还没到也得等。start_time = max(arrival, berth_end_time[port])wait_time = start_time - arrivaltotal_wait += wait_timeberth_end_time[port] = start_time + handlingreturn total_wait# 获取交换序列,p1通过改变自身顺序从而变成p2模样
# 例子:p1 = ['S1', 'S2', 'S3', 'S4'], p2 = ['S3', 'S1', 'S4', 'S2']
# 想从 p1 变成 p2,要怎么换?我们一步步找出交换动作:
# 第0位期望是 S3,当前是 S1 → 找到 S3 在第2位 → 交换 (0, 2)
# 现在顺序变为:['S3', 'S2', 'S1', 'S4']
# 第1位期望是 S1,当前是 S2 → 找到 S1 在第2位 → 交换 (1, 2)
# 现在变为:['S3', 'S1', 'S2', 'S4']
# 第2位期望是 S4,当前是 S2 → 找到 S4 在第3位 → 交换 (2, 3)
# 现在就是目标顺序了!
# 所以返回的交换序列是:[(0, 2), (1, 2), (2, 3)]
def get_order_swaps(p1, p2):swaps = []temp = p1.copy()for i in range(num_ships):if temp[i] != p2[i]:j = temp.index(p2[i])swaps.append((i, j))temp[i], temp[j] = temp[j], temp[i]return swaps# 应用交换序列,执行交换的动作
def apply_swaps(order, swaps):new_order = order.copy()for i, j in swaps:new_order[i], new_order[j] = new_order[j], new_order[i]return new_order# 港口更新(port vector 加法)
def update_ports(ports, delta):new_ports = ports.copy()for i in range(len(ports)):if random.random() < 0.5:new_ports[i] = (new_ports[i] + random.choice([-1, 0, 1])) % num_portsreturn new_portsif __name__ == "__main__":# 1.粒子参数# 粒子表示为 [(order, ports), velocity]particles = [init_particle() for _ in range(num_particles)] # 随机初始化位置(船调度顺序 + 泊位分配)print(particles)velocities = [([], []) for _ in range(num_particles)] # 开辟速度存储空间 ([], [])print(velocities)p_best = copy.deepcopy(particles) # 每个个体历史最优p_best_scores = [evaluate(*p) for p in particles] # 每个个体历史最优的得分print(p_best)print(p_best_scores)g_best = p_best[np.argmin(p_best_scores)] # 全局最优粒子g_best_score = min(p_best_scores) # 全局最优粒子得分print(g_best)print(g_best_score)# 2.PSO 主循环for iter in range(num_iterations):for i in range(num_particles):# 遍历每个粒子(船的调度顺序和停靠泊位)order, ports = particles[i]# 计算新速度order_swap = get_order_swaps(order, p_best[i][0])order_swap_g = get_order_swaps(order, g_best[0])# 从个体最优和全局最优中随机截取一部分交互序列r1, r2 = random.random(), random.random()new_order_swap = order_swap[:int(c1 * r1 * len(order_swap))] + \order_swap_g[:int(c2 * r2 * len(order_swap_g))]# 随机扰动一下港口new_ports = update_ports(ports, [])# 更新粒子位置new_order = apply_swaps(order, new_order_swap)particles[i] = (new_order, new_ports)# 评价新位置score = evaluate(new_order, new_ports)# 个体最优更新if score < p_best_scores[i]:p_best[i] = (new_order, new_ports)p_best_scores[i] = score# 全局最优更新,每次算个体最优时候,同步对比下是否是全局最优if score < g_best_score:g_best = (new_order, new_ports)g_best_score = scoreif (iter + 1) % 10 == 0:print(f"迭代 {iter + 1}: 当前最优总等待时间 = {g_best_score:.2f}")# 输出结果print("\n✅ 最优停靠顺序和泊位分配:")for ship, port in zip(*g_best):print(f"船 {ship} 停靠港口 {port}")print(f"最小总等待时间: {g_best_score:.2f}")