RNN - 序列模型
序列模型
序列数据
- 实际中很多数据是有时序结构的
- 电影的评价随时间变化而变化
- 拿奖后评分上升,直到奖项被忘记
- 看了很多好电影后,人们的期望变高
- 季节性:贺岁片、暑期档
- 导演、演员的负面报道导致评分变低
- 音乐、语言、文本、和视频都是连续的
- 标题“狗咬人”远没有“人咬狗”那么令人惊讶
- 大地震发生后,很可能会有几次较小的余震
- 人的互动是连续的,从网上吵架可以看出
- 预测明天的股价要比填补昨天遗失的股价的更困难
统计工具
- 在时间 t t t 观察到 x t x_t xt,那么得到 T T T 个不独立的随机变量 ( x 1 , … , x T ) ∼ p ( x ) (x_1, \ldots, x_T) \sim p(\mathbf{x}) (x1,…,xT)∼p(x)
- 使用条件概率展开 p ( a , b ) = p ( a ) p ( b ∣ a ) = p ( b ) p ( a ∣ b ) p(a, b) = p(a)p(b \mid a) = p(b)p(a \mid b) p(a,b)=p(a)p(b∣a)=p(b)p(a∣b)
序列模型
- 对条件概率建模 p ( x t ∣ x 1 , … , x t − 1 ) = p ( x t ∣ f ( x 1 , … , x t − 1 ) ) p(x_t | x_1, \ldots, x_{t-1}) = p(x_t | f(x_1, \ldots, x_{t-1})) p(xt∣x1,…,xt−1)=p(xt∣f(x1,…,xt−1))
对见过的数据建模,也称自回归模型
那么如何对上述过程进行建模呢,有如下方案 :
方案 A - 马尔科夫假设
p ( x ) = p ( x 1 ) ⋅ p ( x 2 ∣ x 1 ) ⋅ p ( x 3 ∣ x 1 , x 2 ) ⋯ p ( x T ∣ x 1 , … , x T − 1 ) p(\mathbf{x}) = p(x_1) \cdot p(x_2 | x_1) \cdot p(x_3 | x_1, x_2) \cdots p(x_T | x_1, \ldots, x_{T-1}) p(x)=p(x1)⋅p(x2∣x1)⋅p(x3∣x1,x2)⋯p(xT∣x1,…,xT−1)
- 假设当前数据只跟 τ \tau τ 个过去数据点相关
- 也就是预测一个新的数据,只需要看过去的 τ \tau τ 个数据, τ \tau τ 小一点的话,模型就简单一点, τ \tau τ 大一点的话,模型就复杂一点。( e . g . e.g. e.g. 股票预测,预测文本)
p ( x t ∣ x 1 , … , x t − 1 ) = p ( x t ∣ x t − τ , … , x t − 1 ) = p ( x t ∣ f ( x t − τ , … , x t − 1 ) ) p(x_t | x_1, \ldots, x_{t-1}) = p(x_t | x_{t-\tau}, \ldots, x_{t-1}) = p(x_t | f(x_{t-\tau}, \ldots, x_{t-1})) p(xt∣x1,…,xt−1)=p(xt∣xt−τ,…,xt−1)=p(xt∣f(xt−τ,…,xt−1))
例如在过去数据上训练一个 MLP 模型
方案 B - 潜变量模型
p ( x ) = p ( x 1 ) ⋅ p ( x 2 ∣ x 1 ) ⋅ p ( x 3 ∣ x 1 , x 2 ) ⋯ p ( x T ∣ x 1 , … , x T − 1 ) p(\mathbf{x}) = p(x_1) \cdot p(x_2 | x_1) \cdot p(x_3 | x_1, x_2) \cdots p(x_T | x_1, \ldots, x_{T-1}) p(x)=p(x1)⋅p(x2∣x1)⋅p(x3∣x1,x2)⋯p(xT∣x1,…,xT−1)
- 引入潜变量 h t h_t ht 来表示过去信息 h t = f ( x 1 , … , x t − 1 ) h_t = f(x_1, \ldots, x_{t-1}) ht=f(x1,…,xt−1)
- 这样 x t = p ( x t ∣ h t ) x_t = p(x_t | h_t) xt=p(xt∣ht)
- 这样 x t = p ( x t ∣ h t ) x_t = p(x_t | h_t) xt=p(xt∣ht)
上述一旦引入了潜变量的话,就可以不断的更新 h h h , h ′ h' h′ 可以认为是根据前一个时间的潜变量和前一个时间的 x x x 相关,等价于上述就是建立两个模型:
- 一个模型是根据前面时间的 h h h 和前面时间的 x x x 来计算新的潜变量 h ′ h' h′ ;
- 另一个模型是给定新的潜变量的状态 h ′ h' h′ 和前一个时间的 x x x 来计算新的 x ′ x' x′
总结
- 时序模型中,当前数据跟之前观察到的数据相关
- 自回归模型使用自身过去数据来预测未来
- 马尔科夫模型假设当前只跟最近少数数据相关,从而简化模型
- 潜变量模型使用潜变量来概括历史信息
代码实现
马尔可夫模型
在自回归模型的近似法中,使用 x t − 1 , … , x t − τ x_{t-1}, \ldots, x_{t-\tau} xt−1,…,xt−τ而不是 x t − 1 , … , x 1 x_{t-1}, \ldots, x_1 xt−1,…,x1来估计 x t x_t xt。只要这种是近似精确的,我们就说序列满足马尔可夫条件(Markov condition)。特别是,如果 τ = 1 \tau = 1 τ=1,得到一个一阶马尔可夫模型(first-order Markov model), P ( x ) P(x) P(x)由下式给出:
P ( x 1 , … , x T ) = ∏ t = 1 T P ( x t ∣ x t − 1 ) 当 P ( x 1 ∣ x 0 ) = P ( x 1 ) . P(x_1, \ldots, x_T) = \prod_{t=1}^T P(x_t \mid x_{t-1}) \text{ 当 } P(x_1 \mid x_0) = P(x_1). P(x1,…,xT)=t=1∏TP(xt∣xt−1) 当 P(x1∣x0)=P(x1).当假设 x t x_t xt仅是离散值时,这样的模型特别棒,因为在这种情况下,使用动态规划可以沿着马尔可夫链精确地计算结果。例如,我们可以高效地计算 P ( x t + 1 ∣ x t − 1 ) P(x_{t+1} \mid x_{t-1}) P(xt+1∣xt−1):
P ( x t + 1 ∣ x t − 1 ) = ∑ x t P ( x t + 1 , x t , x t − 1 ) P ( x t − 1 ) = ∑ x t P ( x t + 1 ∣ x t , x t − 1 ) P ( x t , x t − 1 ) P ( x t − 1 ) = ∑ x t P ( x t + 1 ∣ x t ) P ( x t ∣ x t − 1 ) \begin{aligned} P(x_{t+1} \mid x_{t-1}) &= \frac{\sum_{x_t} P(x_{t+1}, x_t, x_{t-1})}{P(x_{t-1})}\\ &= \frac{\sum_{x_t} P(x_{t+1} \mid x_t, x_{t-1}) P(x_t, x_{t-1})}{P(x_{t-1})}\\ &= \sum_{x_t} P(x_{t+1} \mid x_t) P(x_t \mid x_{t-1}) \end{aligned} P(xt+1∣xt−1)=P(xt−1)∑xtP(xt+1,xt,xt−1)=P(xt−1)∑xtP(xt+1∣xt,xt−1)P(xt,xt−1)=xt∑P(xt+1∣xt)P(xt∣xt−1)
利用这一事实,我们只需要考虑过去观察中的一个非常短的历史: P ( x t + 1 ∣ x t , x t − 1 ) = P ( x t + 1 ∣ x t ) P(x_{t+1} \mid x_t, x_{t-1}) = P(x_{t+1} \mid x_t) P(xt+1∣xt,xt−1)=P(xt+1∣xt)。
因果关系
原则上,将 P ( x 1 , … , x T ) P(x_1, \ldots, x_T) P(x1,…,xT)倒序展开也没什么问题。毕竟,基于条件概率公式总是可以写出: P ( x 1 , … , x T ) = ∏ t = T 1 P ( x t ∣ x t + 1 , … , x T ) . P(x_1, \ldots, x_T) = \prod_{t=T}^1 P(x_t \mid x_{t+1}, \ldots, x_T). P(x1,…,xT)=t=T∏1P(xt∣xt+1,…,xT).事实上,如果基于一个马尔可夫模型,我们还可以得到一个反向的条件概率分布。然而,在许多情况下,数据存在一个自然的方向,即在时间上是前进的。很明显,未来的事件不能影响过去。因此,如果我们改变 x t x_t xt,可能会影响未来发生的事情 x t + 1 x_{t+1} xt+1,但不能反过来。也就是说,如果我们改变 x t x_t xt,基于过去事件得到的分布不会改变。因此,解释 P ( x t + 1 ∣ x t ) P(x_{t+1} \mid x_t) P(xt+1∣xt)应该比解释 P ( x t ∣ x t + 1 ) P(x_t \mid x_{t+1}) P(xt∣xt+1)更容易。例如,在某些情况下,对于某些可加性噪声 ϵ \epsilon ϵ,显然我们可以找到 x t + 1 = f ( x t ) + ϵ x_{t+1} = f(x_t) + \epsilon xt+1=f(xt)+ϵ,而反之则不行 :cite:Hoyer.Janzing.Mooij.ea.2009
。
而这个向前推进的方向恰好也是我们通常感兴趣的方向。彼得斯等人 cite:Peters.Janzing.Scholkopf.2017
对该主题的更多内容做了详尽的解释。
训练
首先,生成一些数据:使用正弦函数和一些可加性噪声来生成序列数据,时间步为 1 , 2 , … , 1000 1, 2, \ldots, 1000 1,2,…,1000。
%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2lT = 1000 # 总共产生1000个点
time = torch.arange(1, T + 1, dtype=torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,)) # 一个正弦波加上噪声
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
接下来,将这个序列转换为模型的特征-标签(feature-label)对。基于嵌入维度 τ \tau τ,将数据映射为数据对 y t = x t y_t = x_t yt=xt和 x t = [ x t − τ , … , x t − 1 ] \mathbf{x}_t = [x_{t-\tau}, \ldots, x_{t-1}] xt=[xt−τ,…,xt−1]。这比我们提供的数据样本少了 τ \tau τ个,因为我们没有足够的历史记录来描述前 τ \tau τ个数据样本。一个简单的解决办法是:如果拥有足够长的序列就丢弃这几项;另一个方法是用零填充序列。在这里,我们仅使用前600个“特征-标签”对进行训练。
# 马尔科夫假设,生成数据对,将 tau 设成 4
"""
特征矩阵 features,形状为 (T - tau, tau)。
标签矩阵 labels,形状为 (T - tau, 1)。
"""
tau = 4
features = torch.zeros((T - tau, tau)) # 特征矩阵,行数(样本数)为 T - tau,列数(特征数)为 tau
# 第 i 列对应的是时间序列 x 中从第 i 个时间步开始的子序列,长度为 T - tau。
for i in range(tau):features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1))batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),batch_size, is_train=True)
下面在这里,使用一个相当简单的架构训练模型:一个拥有两个全连接层的多层感知机,ReLU激活函数和平方损失。
# 初始化网络权重的函数
def init_weights(m):if type(m) == nn.Linear:nn.init.xavier_uniform_(m.weight)# 一个简单的多层感知机
def get_net():net = nn.Sequential(nn.Linear(4, 10),nn.ReLU(),nn.Linear(10, 1))net.apply(init_weights)return net# 平方损失。注意:MSELoss计算平方误差时不带系数1/2
loss = nn.MSELoss(reduction='none')
下面,编写训练代码并进行模型的训练:
def train(net, train_iter, loss, epochs, lr):trainer = torch.optim.Adam(net.parameters(), lr)for epoch in range(epochs):for X, y in train_iter:trainer.zero_grad()l = loss(net(X), y)l.sum().backward()trainer.step()print(f'epoch {epoch + 1}, 'f'loss: {d2l.evaluate_loss(net, train_iter, loss):f}')net = get_net()
train(net, train_iter, loss, 5, 0.01)
训练结果展示如下:
预测
由于训练损失很小,因此期望模型能有很好的工作效果。首先是检查[模型预测下一个时间步]的能力,也就是单步预测(one-step-ahead prediction)。
onestep_preds = net(features)
# 给定前面的 tau 个点,预测下一个点,这还是比较简单的点
d2l.plot([time, time[tau:]],[x.detach().numpy(), onestep_preds.detach().numpy()], 'time','x', legend=['data', '1-step preds'], xlim=[1, 1000],figsize=(6, 3))
上述可以发现,单步预测效果不错。即使这些预测的时间步超过了 600 + 4 600+4 600+4(n_train + tau
),其结果看起来仍然是可信的。但是有一个小问题:如果数据观察序列的时间步只到 604 604 604,需要一步一步地向前迈进:
x ^ 605 = f ( x 601 , x 602 , x 603 , x 604 ) , x ^ 606 = f ( x 602 , x 603 , x 604 , x ^ 605 ) , x ^ 607 = f ( x 603 , x 604 , x ^ 605 , x ^ 606 ) , x ^ 608 = f ( x 604 , x ^ 605 , x ^ 606 , x ^ 607 ) , x ^ 609 = f ( x ^ 605 , x ^ 606 , x ^ 607 , x ^ 608 ) , … \hat{x}_{605} = f(x_{601}, x_{602}, x_{603}, x_{604}), \\ \hat{x}_{606} = f(x_{602}, x_{603}, x_{604}, \hat{x}_{605}), \\ \hat{x}_{607} = f(x_{603}, x_{604}, \hat{x}_{605}, \hat{x}_{606}),\\ \hat{x}_{608} = f(x_{604}, \hat{x}_{605}, \hat{x}_{606}, \hat{x}_{607}),\\ \hat{x}_{609} = f(\hat{x}_{605}, \hat{x}_{606}, \hat{x}_{607}, \hat{x}_{608}),\\ \ldots x^605=f(x601,x602,x603,x604),x^606=f(x602,x603,x604,x^605),x^607=f(x603,x604,x^605,x^606),x^608=f(x604,x^605,x^606,x^607),x^609=f(x^605,x^606,x^607,x^608),…通常,对于直到 x t x_t xt的观测序列,其在时间步 t + k t+k t+k处的预测输出 x ^ t + k \hat{x}_{t+k} x^t+k称为 k k k步预测( k k k-step-ahead-prediction)。由于我们的观察已经到了 x 604 x_{604} x604,它的 k k k步预测是 x ^ 604 + k \hat{x}_{604+k} x^604+k。换句话说,我们必须使用我们自己的预测(而不是原始数据)来[进行多步预测]。
multistep_preds = torch.zeros(T)
multistep_preds[: n_train + tau] = x[: n_train + tau]
for i in range(n_train + tau, T):multistep_preds[i] = net(multistep_preds[i - tau:i].reshape((1, -1)))d2l.plot([time, time[tau:], time[n_train + tau:]],[x.detach().numpy(), onestep_preds.detach().numpy(),multistep_preds[n_train + tau:].detach().numpy()], 'time','x', legend=['data', '1-step preds', 'multistep preds'],xlim=[1, 1000], figsize=(6, 3))
如上面的例子所示,绿线的预测显然并不理想。经过几个预测步骤之后,预测的结果很快就会衰减到一个常数。
为什么这个算法效果这么差呢?事实是由于错误的累积:假设在步骤 1 1 1之后,我们积累了一些错误 ϵ 1 = ϵ ˉ \epsilon_1 = \bar\epsilon ϵ1=ϵˉ。于是,步骤 2 2 2的输入被扰动了 ϵ 1 \epsilon_1 ϵ1,结果积累的误差是依照次序的 ϵ 2 = ϵ ˉ + c ϵ 1 \epsilon_2 = \bar\epsilon + c \epsilon_1 ϵ2=ϵˉ+cϵ1,其中 c c c为某个常数,后面的预测误差依此类推。因此误差可能会相当快地偏离真实的观测结果。例如,未来 24 24 24小时的天气预报往往相当准确,但超过这一点,精度就会迅速下降。
下面基于 k = 1 , 4 , 16 , 64 k = 1, 4, 16, 64 k=1,4,16,64,通过对整个序列预测的计算,[更仔细地看一下 k k k步预测]的困难。
# 表示最大预测步长,即最多可以预测未来 64 步。
max_steps = 64features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))
# 列i(i<tau)是来自x的观测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau):features[:, i] = x[i: i + T - tau - max_steps + 1]# 列i(i>=tau)是来自(i-tau+1)步的预测,其时间步从(i)到(i+T-tau-max_steps+1)
for i in range(tau, tau + max_steps):features[:, i] = net(features[:, i - tau:i]).reshape(-1)steps = (1, 4, 16, 64)
d2l.plot([time[tau + i - 1: T - max_steps + i] for i in steps],[features[:, (tau + i - 1)].detach().numpy() for i in steps], 'time', 'x',legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000],figsize=(6, 3))
以上例子清楚地说明了当我们试图预测更远的未来时,预测的质量是如何变化的。虽然“ 4 4 4步预测”看起来仍然不错,但超过这个跨度的任何预测几乎都是无用的。
小结
- 内插法(在现有观测值之间进行估计)和外推法(对超出已知观测范围进行预测)在实践的难度上差别很大。因此,对于所拥有的序列数据,在训练时始终要尊重其时间顺序,即最好不要基于未来的数据进行训练。
- 序列模型的估计需要专门的统计工具,两种较流行的选择是自回归模型和隐变量自回归模型。
- 对于时间是向前推进的因果模型,正向估计通常比反向估计更容易。
- 对于直到时间步 t t t的观测序列,其在时间步 t + k t+k t+k的预测输出是“ k k k步预测”。随着我们对预测时间 k k k值的增加,会造成误差的快速累积和预测质量的极速下降。
QA 思考
Q1:在常规范围内tau是不是越大越好。刚才例子tau=5是不是比4好
A1:当然 ! 但是需要考虑一定的局限性 ,要是 tau 设置的非常大,这样我的训练样本会大大的减少,甚至没有了, tau 变得很大的时候,计算量也会变大。数据变少了,模型又变复杂了。
Q2:潜变量模型和隐马尔科夫模型有什么区别?
A2:没有太多联系,但是可以相互借鉴,潜变量模型可以使用隐马尔科夫假设。
后记
import torch
from matplotlib import pyplot as plt
from torch import nn
from torch.utils import data# 积累 n 个指标
class Accumulator:# 若需要同时累积两个指标 ,则可以初始化为 Accumulator(2) 会创建一个列表 [0.0,0.0]def __init__(self, n):self.data = [0.0] * n# 假设当前 self.data = [1.0, 2.0],调用 add(3.0, 4.0) 后,self.data 会变为 [4.0, 6.0]def add(self, *args):self.data = [a + float(b) for a, b in zip(self.data, args)]def reset(self):self.data = [0.0] * len(self.data)# 返回 self.data 中对应位置的元素def __getitem__(self, idx):return self.data[idx]# 计算损失
def evaluate_loss(net, data_iter, loss):metric = Accumulator(2) # [0.0, 0.0]for X, y in data_iter:out = net(X)y = y.reshape(out.shape)l = loss(out, y)# 累加总损失 , 累加样本总数metric.add(l.sum(), l.numel())return metric[0] / metric[1]def set_figsize(figsize=(3.5, 2.5)):"""设置 matplotlib 图形的大小"""plt.rcParams['figure.figsize'] = figsizedef set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend):"""设置 matplotlib 的轴"""axes.set_xlabel(xlabel)axes.set_ylabel(ylabel)axes.set_xscale(xscale)axes.set_yscale(yscale)axes.set_xlim(xlim)axes.set_ylim(ylim)if legend:axes.legend(legend)axes.grid()def plot(X, Y=None, xlabel=None, ylabel=None, legend=None, xlim=None,ylim=None, xscale='linear', yscale='linear',fmts=('-', 'm--', 'g-.', 'r:'), figsize=(3.5, 2.5), axes=None):"""绘制数据点"""if legend is None:legend = []set_figsize(figsize)axes = axes if axes else plt.gca()def has_one_axis(X):"""判断 X 是否只有一个轴"""return (hasattr(X, "ndim") and X.ndim == 1 or isinstance(X, list)and not hasattr(X[0], "__len__"))if has_one_axis(X):X = [X]if Y is None:X, Y = [[]] * len(X), Xelif has_one_axis(Y):Y = [Y]if len(X) != len(Y):X = X * len(Y)axes.cla()for x, y, fmt in zip(X, Y, fmts):if len(x):axes.plot(x, y, fmt)else:axes.plot(y, fmt)set_axes(axes, xlabel, ylabel, xlim, ylim, xscale, yscale, legend)def load_array(data_arrays, batch_size, is_train=True):"""将数据数组加载到 DataLoader 中"""dataset = data.TensorDataset(*data_arrays)return data.DataLoader(dataset, batch_size, shuffle=is_train)def init_weights(m):"""初始化网络权重"""if type(m) == nn.Linear:nn.init.xavier_uniform_(m.weight)def get_net():"""创建一个简单的多层感知机网络"""net = nn.Sequential(nn.Linear(4, 10),nn.ReLU(),nn.Linear(10, 1))net.apply(init_weights)return netdef train(net, train_iter, loss, epochs, lr):"""训练模型"""trainer = torch.optim.Adam(net.parameters(), lr)for epoch in range(epochs):for X, y in train_iter:trainer.zero_grad()l = loss(net(X), y)l.sum().backward()trainer.step()print(f'epoch {epoch + 1}, 'f'loss: {evaluate_loss(net, train_iter, loss):f}')def main():# 生成时间序列数据T = 1000time = torch.arange(1, T + 1, dtype=torch.float32)x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,))plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))plt.show()# 马尔科夫假设,生成数据对tau = 4# 实际上滑动窗口应该是 (T - (tau + 1) + 1) ,包含 tau 个历史信息,和一个预测信息features = torch.zeros((T - tau, tau))for i in range(tau):features[:, i] = x[i:T - tau + i]labels = x[tau:].reshape((-1, 1))# 划分训练集batch_size, n_train = 16, 600train_iter = load_array((features[:n_train], labels[:n_train]),batch_size, is_train=True)# 定义损失函数loss = nn.MSELoss(reduction='none')# 创建并训练模型net = get_net()train(net, train_iter, loss, 5, 0.01)# 一步预测onestep_preds = net(features)plot([time, time[tau:]],[x.detach().numpy(), onestep_preds.detach().numpy()], 'time','x', legend=['data', '1-step preds'], xlim=[1, 1000],figsize=(6, 3))plt.show()# 多步预测multistep_preds = torch.zeros(T)multistep_preds[: n_train + tau] = x[: n_train + tau]for i in range(n_train + tau, T):multistep_preds[i] = net(multistep_preds[i - tau:i].reshape((1, -1)))plot([time, time[tau:], time[n_train + tau:]],[x.detach().numpy(), onestep_preds.detach().numpy(),multistep_preds[n_train + tau:].detach().numpy()], 'time','x', legend=['data', '1-step preds', 'multistep preds'],xlim=[1, 1000], figsize=(6, 3))plt.show()# 不同步长的预测max_steps = 64 # 表示最大预测步长,即最多可以预测未来 64 步。features = torch.zeros((T - tau - max_steps + 1, tau + max_steps))for i in range(tau):features[:, i] = x[i: i + T - tau - max_steps + 1]for i in range(tau, tau + max_steps):features[:, i] = net(features[:, i - tau:i]).reshape(-1)steps = (1, 4, 16, 64)plot([time[tau + i - 1: T - max_steps + i] for i in steps],[features[:, (tau + i - 1)].detach().numpy() for i in steps], 'time', 'x',legend=[f'{i}-step preds' for i in steps], xlim=[5, 1000],figsize=(6, 3))plt.show()if __name__ == "__main__":main()