cmu 10-414 HW4

本节只展示我认为homework4中比较重要的部分,其余参考GitHub代码

Convolutional neural network

实现卷积操作

  • flip的作用是将ndarray沿着某个维度翻转,在卷积算子反向传播求梯度时使用
  • dilate会在element之间填充0,同样用于反向传播

下面详细介绍Conv算子的实现

输入A经过padding之后的形状是(N, H, W, C), 卷积核形状是(kernel_size, kernel_size, C, C_out)

输出Output形状是(N, H-K+1, W-K+1, C_out)

假设忽略N,对于Outpu中每个element,都是卷积核与输入的对应位置做矩阵乘法,相当于是卷积核与一个大小为N×(H-K+1)×(W-K+1)×K×K×C_in的分块矩阵做运算,那么如何将输入转换成N×(H-K+1)×(W-K+1)×K×K×C_in呢?as_strided()可以做到

1
A = A.as_strided(shape = (N, H_new, W_new, K, K, C_in), strides = (Ns, Hs, Ws, Hs, Ws, Cs)).compact()

shape = (N, H_new, W_new, K, K, C_in)是分块矩阵的大小,strides是其步长,其各个数字是怎么来的呢?假设卷积核的stride=1。我要从第一行的第一个块移动到第二行的第一个块,由于卷积核stride=1,就往下平移一格,因此strides[1]就是原始A在H上的stride,即Hs,Ws同理;而块内的stride也是向下(右)平移一格,因此也是Hs和Ws。而在N和C维度上不变

卷积核和小块之间的运算实质就是向量乘,这也是im2col的原理。通过reshape将块转换成vector

image-20240814144417995

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def compute(self, A, B):
"""
Computes the convolution of A and B.
Parameters:
A - input NDArray in NHWC format
B - kernel
"""
pad_axes = [(0,0)] + [(self.padding, self.padding)] * (A.ndim - 2) + [(0, 0)]
A = A.pad(pad_axes)

N, H, W, C_in = A.shape
K, _, _, C_out = B.shape
Ns, Hs, Ws, Cs = A.strides

inner_dim = K * K * C_in

if self.stride > 1:
H_new = (H -K) // self.stride + 1
W_new = (W -K) // self.stride + 1
A = A.as_strided(shape = (N, H_new, W_new, K, K, C_in), strides = (Ns, Hs * self.stride, Ws * self.stride, Hs, Ws, Cs)).compact()
else:
H_new = H-K+1
W_new = H-K+1
A = A.as_strided(shape = (N, H_new, W_new, K, K, C_in), strides = (Ns, Hs, Ws, Hs, Ws, Cs)).compact()

A = A.reshape((math.prod(A.shape[:-3]), inner_dim))
B = B.compact()
out = A @ B.reshape((inner_dim, C_out))


out = out.reshape((N, H_new, W_new, C_out))
return out

对于卷积操作的反向传播,公式推导复杂,只关注结果即可,https://zhuanlan.zhihu.com/p/61898234

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def gradient(self, out_grad, node):
A, B = node.inputs # B是weight
N, H, W, C_in = A.shape
K, _, _, C_out = B.shape
Ns, Hs, Ws, Cs = A.realize_cached_data().strides

if self.stride > 1:
out_grad = dilate(out_grad, (1,2), self.stride - 1)
"""
卷积结果的delta误差经过零填充后,与卷积核旋转180度后的卷积
"""
B_traspose = flip(B, (0,1)).transpose((2,3))
A_grad = conv(out_grad, B_traspose, padding=K-1-self.padding)

"""
https://zhuanlan.zhihu.com/p/61898234
可以利用之前的分析方法,卷积核上点A显然对卷积结果每一个点都有影响。它对卷积结果的影响等于将整个原图左上3×3的部分乘上点A的值,
因此delta误差反向传播回时,点A的导数等于卷积结果的delta误差与原图左上3×3红色部分逐点相乘后求和。
因此二维卷积核的导数等于原图对应通道与卷积结果对应通道的delta误差直接进行卷积。即讲out_grad当作卷积核,去与A计算,但是要注意维度转换
下面代码的难点也就在于维度转换
一般的卷积核形如(K, K, C_in, C_out),其前两个维度就是窗口大小。因此将out_grad当作卷积核时,其前两个维度也应该是窗口大小,在2D情况,这个窗口大小
就是H_out和W_out。因此应该使用transopose将H_out和W_out移到前面
同时,应该将A的batch_size的维度移到最后,来和卷积核的第三个维度N对应
最后计算完成之后,需要将out的维度转换回形如(K, K, C_in, C_out)
"""
A_t = A.transpose((0,3)) #(C_in, H, W, N)
out_grad_t = out_grad.transpose((0,2)).transpose((0,1)) #(H_out, W_out, N, C_out)
B_grad_t = conv(A_t, out_grad_t, padding=self.padding)
B_grad = B_grad_t.transpose((0,2)).transpose((0,1))

return A_grad, B_grad

RNN

对于传统的神经网络,不同时刻的输入之间互不影响,但对于一些词性分析,翻译等任务,输入之间存在关系,因此需要在神经网络中保存之前的信息,并在下一次使用。这就是RNN

image-20240813140338496

h(t)就是隐藏状态(作为信息传给后一时刻),它的计算与h(t-1)和x(t)有关,b(h)是偏移。而外层的f函数是激活函数,在作业中特指tanh或者relu

RNNCell实现单个RNN

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class RNNCell(Module):
def __init__(self, input_size, hidden_size, bias=True, nonlinearity='tanh', device=None, dtype="float32"):
super().__init__()
self.input_size = input_size
self.hidden_size = hidden_size
self.bias = bias
self.nonlinearity = nonlinearity
self.device = device
self.dtype = dtype

bound = math.sqrt(1 / hidden_size)
self.W_ih = Parameter(init.rand(input_size, hidden_size, low=-bound, high=bound, device=device, dtype=dtype, requires_grad=True))
self.W_hh = Parameter(init.rand(hidden_size, hidden_size, low=-bound, high=bound, device=device, dtype=dtype, requires_grad=True))
if bias:
self.bias_ih = Parameter(init.rand(1, hidden_size, low=-bound, high=bound, device=device, dtype=dtype, requires_grad=True))
self.bias_hh = Parameter(init.rand(1, hidden_size, low=-bound, high=bound, device=device, dtype=dtype, requires_grad=True))
else:
self.bias_ih = None
self.bias_hh = None
def forward(self, X, h=None):
batch_size = X.shape[0]
if h is None:
h = init.zeros(batch_size, self.hidden_size, device=self.device, dtype=self.dtype) #h0 = 0
if self.bias:
bias_ih = self.bias_ih.broadcast_to((batch_size, self.hidden_size))
bias_hh = self.bias_hh.broadcast_to((batch_size,self.hidden_size))
out = X @ self.W_ih + bias_ih + h @ self.W_hh + bias_hh
else:
out = X @ self.W_ih + h @ self.W_hh

# 还有一个非线性层
if self.nonlinearity == 'tanh':
out = ops.tanh(out)
elif self.nonlinearity == 'relu':
out = ops.relu(out)
else:
raise ValueError("Invalid nonlinearity: {}".format(self.nonlinearity))

return out

RNN 实现了多层RNN

在RNN的forward中,外层循环是layer,内层循环是每一个时刻t。对layer(i)来说,将输入从t0到tn都代入layer中计算。都计算完后才进入下一层。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class RNN(Module):
def __init__(self, input_size, hidden_size, num_layers=1, bias=True, nonlinearity='tanh', device=None, dtype="float32"):

super().__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.device = device
self.dtype = dtype
# 第一层是(input_size, hidden_size) 后边层都是(hidden_size, hidden_size)
self.rnn_cells = [RNNCell(input_size, hidden_size, bias=bias, nonlinearity=nonlinearity, device=device, dtype=dtype)] + \
[RNNCell(hidden_size, hidden_size, bias=bias, nonlinearity=nonlinearity, device=device, dtype=dtype) for _ in range(num_layers - 1)]

def forward(self, X, h0=None):

_, batch_size, _ = X.shape
if h0 is None:
h0 = [init.zeros(batch_size, self.hidden_size, device=self.device, dtype=self.dtype) for _ in range(self.num_layers)]
else:
h0 = tuple(ops.split(h0, axis=0))

h_n = [] # 存储每一层RNN最后的隐藏状态
inputs = list(ops.split(X, axis=0))
for layer, h in zip(self.rnn_cells, h0):
for t, input in enumerate(inputs):
h = layer(input, h)
inputs[t] = h
h_n.append(h)

return ops.stack(inputs, axis=0), ops.stack(h_n, axis=0)

LSTM

长短期记忆(Long short-term memory, LSTM)是一种特殊的RNN,主要是为了解决长序列训练过程中的梯度消失和梯度爆炸问题。

image-20240813153504108

其结构上,包括遗忘门、输入门、输出门三个门控和一个记忆元,计算公式如下。需要注意,计算这三个门和记忆元利用的前一个隐藏状态Ht-1。并且不同公式中的Wx 和Wh是不同的

image-20240813153535859

image-20240813153552308

然后计算Ct和Ht

Ct的计算包括遗忘阶段和记忆阶段:遗忘阶段Ct-1经过遗忘门;记忆阶段是记忆元经过输入门。最后二者相加image-20240813154610382

Ht的计算将Ct经过一个tanh,并与输出门结果相加image-20240813155048317

image-20240813154236687

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class LSTM(Module):
def __init__(self, input_size, hidden_size, num_layers=1, bias=True, device=None, dtype="float32"):
super().__init__()
"""
Applies a multi-layer long short-term memory (LSTM) RNN to an input sequence.

Parameters:
input_size - The number of expected features in the input x
hidden_size - The number of features in the hidden state h
num_layers - Number of recurrent layers.
bias - If False, then the layer does not use bias weights.

Variables:
lstm_cells[k].W_ih: The learnable input-hidden weights of the k-th layer,
of shape (input_size, 4*hidden_size) for k=0. Otherwise the shape is
(hidden_size, 4*hidden_size).
lstm_cells[k].W_hh: The learnable hidden-hidden weights of the k-th layer,
of shape (hidden_size, 4*hidden_size).
lstm_cells[k].bias_ih: The learnable input-hidden bias of the k-th layer,
of shape (4*hidden_size,).
lstm_cells[k].bias_hh: The learnable hidden-hidden bias of the k-th layer,
of shape (4*hidden_size,).
"""
self.hidden_size = hidden_size
self.num_layers = num_layers
self.device = device
self.dtype = dtype
self.lstm_cells = [LSTMCell(input_size, hidden_size, bias=bias, device=device, dtype=dtype)] + \
[LSTMCell(hidden_size, hidden_size, bias=bias, device=device, dtype=dtype) for _ in range(num_layers-1)]
def forward(self, X, h=None):
"""
Inputs: X, h
X of shape (seq_len, bs, input_size) containing the features of the input sequence.
h, tuple of (h0, c0) with
h_0 of shape (num_layers, bs, hidden_size) containing the initial
hidden state for each element in the batch. Defaults to zeros if not provided.
c0 of shape (num_layers, bs, hidden_size) containing the initial
hidden cell state for each element in the batch. Defaults to zeros if not provided.

Outputs: (output, (h_n, c_n))
output of shape (seq_len, bs, hidden_size) containing the output features
(h_t) from the last layer of the LSTM, for each t.
tuple of (h_n, c_n) with
h_n of shape (num_layers, bs, hidden_size) containing the final hidden state for each element in the batch.
h_n of shape (num_layers, bs, hidden_size) containing the final hidden cell state for each element in the batch.
"""
batch_size = X.shape[1]
if h is None:
h0 = [init.zeros(batch_size, self.hidden_size, device=self.device, dtype=self.dtype) for _ in range(self.num_layers)]
c0 = [init.zeros(batch_size, self.hidden_size, device=self.device, dtype=self.dtype) for _ in range(self.num_layers)]
else:
h0, c0 = h
h0 = tuple(ops.split(h0, axis=0))
c0 = tuple(ops.split(c0, axis=0))

h_n = []
c_n = []
inputs = list(ops.split(X, axis=0))

for layer, h, c in zip(self.lstm_cells, h0, c0):
for t, input in enumerate(inputs):
h, c = layer(input, (h, c))
inputs[t] = h
h_n.append(h)
c_n.append(c)

return ops.stack(inputs, axis=0), (ops.stack(h_n, axis=0), ops.stack(c_n, axis=0))

Embedding

在传统的机器学习中,常使用One-hot编码将文本和图像等高维度数据转化为二元向量进行处理。但是这种处理方式存在两个主要问题:一是维度灾难,当数据量增加时,维度也会随之增加,导致计算和存储成本巨大;二是无法捕捉特征和语义信息,因为向量中每个维度都是相互独立的,无法反映不同维度之间的关系。为了解决这些问题,研究者们提出了嵌入式模型,它可以将高维度的数据转化为低维度的嵌入空间,并通过学习将相似的数据点映射到嵌入空间中相近的位置,从而捕捉特征和语义信息,并提高模型的效率和准确性。

嵌入式模型的核心思想是将每个数据点映射到一个低维度的嵌入向量(embedding vector)中,使得相似的数据点在嵌入空间中距离接近。具体来说,嵌入向量是一个实数向量,通常包含几十到几百个元素,每个元素代表一个特征或语义信息,而不是像One-hot编码一样只有一个元素是1,其余都是0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class Embedding(Module):
def __init__(self, num_embeddings, embedding_dim, device=None, dtype="float32"):
super().__init__()
"""
Maps one-hot word vectors from a dictionary of fixed size to embeddings.

Parameters:
num_embeddings (int) - Size of the dictionary
embedding_dim (int) - The size of each embedding vector

Variables:
weight - The learnable weights of shape (num_embeddings, embedding_dim)
initialized from N(0, 1).
"""
self.num_embeddings = num_embeddings
self.embedding_dim = embedding_dim
self.device = device
self.dtype = dtype

self.weight = Parameter(init.rand(num_embeddings, embedding_dim, low=0, high=1, device=device, dtype=dtype, requires_grad=True))
def forward(self, x: Tensor) -> Tensor:
"""
Maps word indices to one-hot vectors, and projects to embedding vectors

Input:
x of shape (seq_len, bs)

Output:
output of shape (seq_len, bs, embedding_dim)
"""
seq_len, bs = x.shape
x_one_hot = init.one_hot(self.num_embeddings, x, device=self.device, dtype=self.dtype) #shape(seq_len, bs, num_embeddings)
out = x_one_hot.reshape((seq_len * bs, self.num_embeddings)) @ self.weight
return out.reshape((seq_len, bs, self.embedding_dim))

forward的输入x的shape是(seq_len, bs),其中的element是这个单词对应在词典中的idx。32行的x_one_hot = init.one_hot将x它转换成one_hot,此时x_one_hot的shape是(seq_len, bs, num_embedding)。

self.weight的作用就是学习单词之间的相关性并做降维

LanguageModel

这个模型包含一个embedding层,一个序列模型—RNN或者LSTM,最后是一个线性层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class LanguageModel(nn.Module):
def __init__(self, embedding_size, output_size, hidden_size, num_layers=1,
seq_model='rnn', device=None, dtype="float32"):
"""
Consists of an embedding layer, a sequence model (either RNN or LSTM), and a
linear layer.
Parameters:
output_size: Size of dictionary
embedding_size: Size of embeddings
hidden_size: The number of features in the hidden state of LSTM or RNN
seq_model: 'rnn' or 'lstm', whether to use RNN or LSTM
num_layers: Number of layers in RNN or LSTM
"""
self.hidden_size = hidden_size
super(LanguageModel, self).__init__()
self.embedding = nn.Embedding(output_size, embedding_size, device=device, dtype=dtype)
if seq_model == 'rnn':
self.seq_model = nn.RNN(embedding_size, hidden_size, num_layers=num_layers, device=device, dtype=dtype)
elif seq_model == 'lstm':
self.seq_model = nn.LSTM(embedding_size, hidden_size, num_layers=num_layers, device=device, dtype=dtype)
self.linear = nn.Linear(hidden_size, output_size, device=device, dtype=dtype)

def forward(self, x, h=None):
"""
Given sequence (and the previous hidden state if given), returns probabilities of next word
(along with the last hidden state from the sequence model).
Inputs:
x of shape (seq_len, bs)
h of shape (num_layers, bs, hidden_size) if using RNN,
else h is tuple of (h0, c0), each of shape (num_layers, bs, hidden_size)
Returns (out, h)
out of shape (seq_len*bs, output_size)
h of shape (num_layers, bs, hidden_size) if using RNN,
else h is tuple of (h0, c0), each of shape (num_layers, bs, hidden_size)
"""
seq_len, bs = x.shape
x_embedding = self.embedding(x)
out, h = self.seq_model(x_embedding, h)
out = out.reshape((seq_len*bs, self.hidden_size)) #flatten
out = self.linear(out)
return out, h

forward的输入x的shape是(seq_len, bs),也就是说此时的输入已经经过字典化处理将单词转换成对应在字典中idx了。

image-20240814135131188