0%

Numpy实现卷积和最大池化

算法岗位面试中,常问到手写卷积和maxpooling,本文用Numpy库分别实现这两部分功能,并且,在此基础上进行改进,实现线性的时间复杂度。

1、 手写卷积

已知输入,核,padding和stride,得到的输出大小为:

$$
h_{out} = \frac{h_{input} + 2 * padding - kernel}{stride} + 1
$$

$$
w_{out} = \frac{w_{input} + 2 * padding - kernel}{stride} + 1
$$

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def conv_one(x, ksize, padding=0, stride=1):

h,w = x.shape

kernel = np.random.random(ksize)



if padding != 0:

​ pad_x = np.zeros((h+2*padding,w+2*padding))

​ pad_x[padding:-padding,padding:-padding] = x

else:

​ pad_x = x



out_h = (h + 2 * padding - ksize[1]) // stride + 1

out_w = (w + 2 * padding - ksize[0]) // stride + 1



out = np.zeros((out_h,out_w))



for i in range(out_h):

for j in range(out_w):

​ roi_x = pad_x[i*stride:i*stride+ksize[1], j*stride:j*stride+ksize[0]]

​ out[i,j] = np.sum(roi_x * kernel)

return out

2、手写maxpooling

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def max_pooling(x,kernel,padding=0,stride=1):

h,w = x.shape

a,b = kernel.shape



if padding:

​ padding_x = np.zeros((h+2*padding,w+2*padding))

​ padding_x[padding:-padding,padding:-padding] = x

else:

​ padding_x = x



out_h = (h + 2 * padding - a) // stride + 1

out_w = (w + 2 * padding - b) // stride + 1

out_x = np.zeros((out_h,out_w))



for i in range(out_h):

for j in range(out_w):

​ tmp = np.max(x[i*stride:i*stride+a,j*stride:j*stride+b])

​ out_x[i,j] = tmp

return out_x

3 例子

1
2
3
4
img = np.arange(64).reshape(8,8)
kernel = np.random.random((3,4))
out = conv_one(img,ksize=kernel_size,padding=1,stride=1)
out = max_pooling(out,kernel,padding=1,stride=1)

4 进阶

以上,只是对我们理解的CNN卷积的简单模拟,但是如果只是做到这里,实际上在面试的时候是不够的,以上算法的复杂度是$O(mnkernelsize)$,即四个数的乘积。

这是非常耗时的,实际上,参考leecode239题(滑动窗口最大值),这题本质就是一维的maxpooling操作,如果暴力解,必然时间复杂度是$O(l*k)$,其中l是列表长度,k是滑窗大小。这题之所以被标注为困难,当然不是让你用暴力解法做出来,而是希望在线性时间复杂度内实现。

改进的方法很多,有优先队列和单调队列等,本次采用单调(递减)队列实现。单调队列是在队列的基础上,自己实现的一种数据结构,作为一种数据结构,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class DecreaseDq():
def __init__(self):
self.dq = deque()

def pop(self,value):
if self.dq[0] == value:
return self.dq.popleft()
else:
return

def push(self,value):
while self.dq and self.dq[-1] < value:
self.dq.pop()
self.dq.append(value)

def front(self):
tmp = self.dq.popleft()
self.dq.appendleft(tmp)
return tmp

它实现了三种操作,即pop(弹出元素),push(压入元素),和查找元素最前面的值。这里的pop和push操作,和传统的弹出和压入不太一样。

  1. pop(value):如果窗口移除的元素value等于单调队列的出口元素,那么队列弹出元素,否则不用任何操作
  2. push(value):如果push的元素value大于入口元素的数值,那么就将队列入口的元素弹出,直到push元素的数值小于等于队列入口元素的数值为止

保持这个顺序,就可以保证窗口内的数据一定是单调队列了。

才用单调队列,就可以在$O(l)$的时间复杂度内ac这题了。

回到二维maxpooling的情况,无非就是在两个维度上分别实现滑动窗口法而已,因此,进阶的方法时间复杂度为$O(m*n)$。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def max_pooling(img,kernel,strid=1):
h,w = len(img),len(img[0])
img_ans = [[0 for _ in range(w-1)] for _ in range(h)]
for i in range(h):
decdq = DecreaseQue()
for m in range(kernel):
decdq.push(img[i][m])
img_ans[i][0] = decdq.front()
for m in range(kernel,w):
decdq.pop(img[i][m-2])
decdq.push(img[i][m])
img_ans[i][m-1] = decdq.front()
h,w = len(img_ans),len(img_ans[0])
result = [[0 for _ in range(w)] for _ in range(h-1)]
for j in range(w):
decdq = DecreaseQue()
for m in range(kernel):
decdq.push(img_ans[m][j])
result[0][j] = decdq.front()
for m in range(kernel,h):
decdq.pop(img_ans[m-2][j])
decdq.push(img_ans[m][j])
result[m-1][j] = decdq.front()
print(result)

例子如下:

1
2
img = [[i*j for i in range(1,5)] for j in range(6,1,-1)]
max_pooling(img,kernel=2)