算法岗位面试中,常问到手写卷积和maxpooling,本文用Numpy库分别实现这两部分功能,并且,在此基础上进行改进,实现线性的时间复杂度。
1、 手写卷积 已知输入,核,padding和stride,得到的输出大小为:
$$ h_{out} = \frac{h_{input} + 2 * padding - kernel}{stride} + 1 $$
$$ w_{out} = \frac{w_{input} + 2 * padding - kernel}{stride} + 1 $$
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 def conv_one (x, ksize, padding=0 , stride=1 ): h,w = x.shape kernel = np.random.random(ksize) if padding != 0 : pad_x = np.zeros((h+2 *padding,w+2 *padding)) pad_x[padding:-padding,padding:-padding] = x else : pad_x = x out_h = (h + 2 * padding - ksize[1 ]) // stride + 1 out_w = (w + 2 * padding - ksize[0 ]) // stride + 1 out = np.zeros((out_h,out_w)) for i in range (out_h): for j in range (out_w): roi_x = pad_x[i*stride:i*stride+ksize[1 ], j*stride:j*stride+ksize[0 ]] out[i,j] = np.sum (roi_x * kernel) return out
2、手写maxpooling 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 def max_pooling (x,kernel,padding=0 ,stride=1 ): h,w = x.shape a,b = kernel.shape if padding: padding_x = np.zeros((h+2 *padding,w+2 *padding)) padding_x[padding:-padding,padding:-padding] = x else : padding_x = x out_h = (h + 2 * padding - a) // stride + 1 out_w = (w + 2 * padding - b) // stride + 1 out_x = np.zeros((out_h,out_w)) for i in range (out_h): for j in range (out_w): tmp = np.max (x[i*stride:i*stride+a,j*stride:j*stride+b]) out_x[i,j] = tmp return out_x
3 例子 1 2 3 4 img = np.arange(64 ).reshape(8 ,8 ) kernel = np.random.random((3 ,4 )) out = conv_one(img,ksize=kernel_size,padding=1 ,stride=1 ) out = max_pooling(out,kernel,padding=1 ,stride=1 )
4 进阶 以上,只是对我们理解的CNN卷积的简单模拟,但是如果只是做到这里,实际上在面试的时候是不够的,以上算法的复杂度是$O(mn kernelsize)$,即四个数的乘积。
这是非常耗时的,实际上,参考leecode239题(滑动窗口最大值) ,这题本质就是一维的maxpooling操作,如果暴力解,必然时间复杂度是$O(l*k)$,其中l是列表长度,k是滑窗大小。这题之所以被标注为困难,当然不是让你用暴力解法做出来,而是希望在线性时间复杂度内实现。
改进的方法很多,有优先队列和单调队列等,本次采用单调(递减)队列实现。单调队列是在队列的基础上,自己实现的一种数据结构,作为一种数据结构,它的定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 class DecreaseDq (): def __init__ (self ): self.dq = deque() def pop (self,value ): if self.dq[0 ] == value: return self.dq.popleft() else : return def push (self,value ): while self.dq and self.dq[-1 ] < value: self.dq.pop() self.dq.append(value) def front (self ): tmp = self.dq.popleft() self.dq.appendleft(tmp) return tmp
它实现了三种操作,即pop(弹出元素),push(压入元素),和查找元素最前面的值。这里的pop和push操作,和传统的弹出和压入不太一样。
pop(value):如果窗口移除的元素value等于单调队列的出口元素,那么队列弹出元素,否则不用任何操作
push(value):如果push的元素value大于入口元素的数值,那么就将队列入口的元素弹出,直到push元素的数值小于等于队列入口元素的数值为止
保持这个顺序,就可以保证窗口内的数据一定是单调队列了。
才用单调队列,就可以在$O(l)$的时间复杂度内ac这题了。
回到二维maxpooling的情况,无非就是在两个维度上分别实现滑动窗口法而已,因此,进阶的方法时间复杂度为$O(m*n)$。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def max_pooling (img,kernel,strid=1 ): h,w = len (img),len (img[0 ]) img_ans = [[0 for _ in range (w-1 )] for _ in range (h)] for i in range (h): decdq = DecreaseQue() for m in range (kernel): decdq.push(img[i][m]) img_ans[i][0 ] = decdq.front() for m in range (kernel,w): decdq.pop(img[i][m-2 ]) decdq.push(img[i][m]) img_ans[i][m-1 ] = decdq.front() h,w = len (img_ans),len (img_ans[0 ]) result = [[0 for _ in range (w)] for _ in range (h-1 )] for j in range (w): decdq = DecreaseQue() for m in range (kernel): decdq.push(img_ans[m][j]) result[0 ][j] = decdq.front() for m in range (kernel,h): decdq.pop(img_ans[m-2 ][j]) decdq.push(img_ans[m][j]) result[m-1 ][j] = decdq.front() print (result)
例子如下:
1 2 img = [[i*j for i in range (1 ,5 )] for j in range (6 ,1 ,-1 )] max_pooling(img,kernel=2 )