본문 바로가기

트렌드 한눈에 보기/학계 트렌드

PyTorch를 활용한 DQN tutorial - 2탄

지난 번에는 DQN tutorial을 적당히 수정해서

완벽히 실행시켜보는 것에 의의를 뒀다면

이번에는 실제 논문을 읽어보고 코드를 분석해봤다.

 

 

놀라웠던 점은 8 페이지 남짓한 짧은 논문에

강화학습의 기본 원리부터 이번에 만들어낸 알고리즘의 특성까지

세세하게 적혀 있었다는 것이다.

DQN의 핵심 내용 역시 위의 알고리즘에

거의 모두 담겨있다고 해도 과언이 아니다.

 

때문에, 튜토리얼의 설명이 아무리 잘 되어 있더라도,

또는 여기서 내가 아무리 설명을 기똥차게 하더라도,

논문을 한 번 읽어보는 것만 못하다.

코드가 어떤식으로 논문 내용을 반영할 수 있었는지를 분석하면서

내용을 나름대로 정리하는 수준이라도 되었으면 좋겠다.

 

colab.research.google.com/drive/1SVVKAAeuI-LATWybuZauc2e9QzOQbzEn?usp=sharing

 

Google Colaboratory

 

colab.research.google.com

코드는 크게 세 부분으로 이뤄져있으며,

세 부분을 유기적으로 연결해주는 블록이 존재한다.

1. Replay Memory

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))
# Transition: Replay Memory에 삽입되는 데이터의 최소형태

class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0
# capacity: Replay Memory의 사이즈 
# 기본적으로는 python list의 형태

    def push(self, *args):
      
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity
  
# push: Replay Memory에 Transition을 밀어넣음
# Replay Memory 크기가 capacity보다 작을 때는 추가하는 것에서 그치지만,
# capacity보다 커졌을 때는 position의 위치에 신규 transition을 넣는다.
# 새로운 transition이 삽입되면 position = {(position + 1)을 capacity로 나눠준 결과의 나머지}
# append 함수는 리스트의 끝에 원소를 삽입하므로,
# 리스트가 capacity만큼 커진 뒤 처음으로 삭제되는 원소는 제일 처음에 삽입했던 원소가 된다. (fisrt in first out)
# 다음으로 들어오는 transition은 1의 위치에 삽입, 
# position이 가리키는 정수가 capacity만큼 커지면, 자동으로 0으로 돌아감(capcity로 나눠준 결과의 '나머지'를 활용하므로)


    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
# sample 함수는 'memory'에서 'batch_size'만큼의 원소를 추출해줌


    def __len__(self):
        return len(self.memory)

 

Replay Memory를 사용하게 된 까닭은, 

실시간으로 입력되는 경험을 위주로 학습을 진행하면 생기는

부작용이 너무 많았기 때문이다.

 

자율주행을 예로 들면, 오르막길과 내리막길이 반복되면서 길이 나타날 텐데,

오르막길에서는 엑셀을 더 밟는 것을 배우고,

내리막길에서는 브레이크를 밟는 것을 배웠다가는

평지를 만날 때 혼란스럽기 그지없을 테다.

 

계속해서 오르막길만 나오는 환경이라면야 Replay Memory를 쓸 이유도 적어지겠지만

환경이 다양하게 존재하는 학습과정에서는 

그런 overfitting을 피해줘야할 필요가 있다.

 

2.

모델 설정

class DQN(nn.Module):
  def __init__(self, h, w, outputs):
    super(DQN, self).__init__()
    self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2)
    self.bn1 = nn.BatchNorm2d(16)
    self.conv2 = nn.Conv2d(16, 32, kernel_size = 5, stride = 2)
    self.bn2 = nn.BatchNorm2d(32)
    self.conv3 = nn.Conv2d(32, 32, 5, 2)
    self.bn3 = nn.BatchNorm2d(32)

# 구조: 
# Conv1:  3개의 인풋채널 -> 16개의 5x5 커널, stride 2 -> 16개의 채널 
# Conv2: 16개의 인풋채널 -> 32개의 5x5 커널, stride 2 -> 32개의 채널
# Conv3: 32개의 인풋채널 -> 32개의 5x5 커널, stride 2 -> 32개의 채널

# Conv Layer 최종 결과:
# 32개 채널의 convw * convh 크기 output

    def conv2d_size_out(size, kernel_size = 5, stride = 2):
      return (size - (kernel_size-1) -1)//stride + 1
    convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))
    convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))
    linear_input_size = convw*convh*32
    self.head = nn.Linear(linear_input_size, outputs)

  def forward(self, x):
    x = F.relu(self.bn1(self.conv1(x)))
    x = F.relu(self.bn2(self.conv2(x)))
    x = F.relu(self.bn3(self.conv3(x)))
    return self.head(x.view(x.size(0), -1))

    

 

해당 부분의 구조는 pytorch CNN tutorial에서 살펴본 내용과 동일하다.

중요한 것은 입력과 출력이 어떤 것인지 파악하는 일인데

현재 이미지를 입력으로 해서,

카트의 이동방향(왼쪽 vs 오른쪽)을 출력으로 주는 분류 문제라고 할 수 있다.

 

3. 이미지 전처리

resize = T.Compose([T.ToPILImage(),
                    T.Resize(40, interpolation=Image.CUBIC),
                    T.ToTensor()])

# T: torchvision transform은 이미지 변형에 사용됨
# PIL image를 입력받은 뒤 적절하게 변형하여 tensor로 바꿈

def get_cart_location(screen_width):
    world_width = env.x_threshold * 2
    scale = screen_width / world_width
    return int(env.state[0] * scale + screen_width / 2.0)  # MIDDLE OF CART

# cart의 위치 구하기
# cart 가로 사이즈: world width
# scale: 카트 대비 스크린의 크기
# env.state: 카트 이동 거리(카트 크기 기준)
# 카트 위치: 카트 이동 거리*scale + 스크린 중간

def get_screen():
    screen = env.render(mode='rgb_array').transpose((2, 0, 1))
    _, screen_height, screen_width = screen.shape
    screen = screen[:, int(screen_height*0.4):int(screen_height * 0.8)]
# 카트가 화면 아래쪽에 위치하므로 화면 크기를 줄일 수 있음
    view_width = int(screen_width * 0.6)
    cart_location = get_cart_location(screen_width)
    if cart_location < view_width // 2:
        slice_range = slice(view_width)
    elif cart_location > (screen_width - view_width // 2):
        slice_range = slice(-view_width, None)
    else:
        slice_range = slice(cart_location - view_width // 2,
                            cart_location + view_width // 2)
    screen = screen[:, :, slice_range]
    screen = np.ascontiguousarray(screen, dtype=np.float32) / 255
    screen = torch.from_numpy(screen)
    return resize(screen).unsqueeze(0).to(device)


env.reset()
plt.figure()
plt.imshow(get_screen().cpu().squeeze(0).permute(1, 2, 0).numpy(),
           interpolation='none')
ipythondisplay.clear_output(wait=True)
plt.title('Example extracted screen')
plt.show()

 

꽤나 복잡한 코드지만, 실상 별 영양가는 없는 내용이다.

OpenAI에서 제공해주는 Gym의 스크린을 복사하고,

카트가 위치한 부분만을 중점적으로 파악하기 위해

이미지를 잘라주는 역할을 한다.

 

4. 통합

def optimize_model():
  if len(memory) < BATCH_SIZE:
    return

# memory 가 batch_size 이상만큼 쌓였을 때 시작
# 참고로 memory capcity는 10000, batch_size 는 128

  transitions = memory.sample(BATCH_SIZE)

  batch = Transition(*zip(*transitions))
# memory에서 추출된 샘플들을 또 한데 모음
# 한데 모은 샘플들은 하나의 tensor로 병합될 예정 (마지막 state 빼고)
# 참고로, 샘플의 형식은 (State, Action, Next State, Reward)

  non_final_mask = torch.tensor(tuple(map(lambda s: s != None, batch.next_state)), device = device, dtype = torch.bool)
# 현재 state가 final state가 아님을 보장해주는 필터
  non_final_next_states = torch.cat([s for s in batch.next_state if s != None])
# Next state가 final state가 아님을 보장해주는 필터

  state_batch = torch.cat(batch.state)
  action_batch = torch.cat(batch.action)
  reward_batch = torch.cat(batch.reward)

# Loss 계산: (Q - expected Q) 최소화
  state_action_values = policy_net(state_batch).gather(1, action_batch)
# gather의 역할:

  next_state_values = torch.zeros(BATCH_SIZE, device = device)
  next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()
# mask 사용법:
# detach의 역할
  expected_state_action_values = (next_state_values * GAMMA) + reward_batch
# Expected Q: r + gamma * max_a(Q')

  loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))

  optimizer.zero_grad()
  loss.backward()
  for param in policy_net.parameters():
    param.grad.data.clamp_(-1, 1)
  optimizer.step()

위 세 부분들을 연결시켜주는 역할을 하는 코드이다.

현재까지의 경험들을 모아놓은 Replay Memory에서

"transitions"라는 이름으로 128개를 랜덤 추출한 뒤에 

앞서 설정한 CNN 모델을 통해 학습을 진행시킨다.


결과

코드를 찬찬히 뜯어봐도, 결과가 거지같은 것은 어쩔 수가 없다.

Gym Monitor를 사용해서 10번의 에피소드마다 동영상으로 저장하게끔 만들었는데

파일 크기에서 알 수 있듯이 카트가 균형을 유지하는 시간은

아주 형편없는 수준이다.

 

그 중에서 가장 학습이 잘 된 예시가 5초 가량 유지되는 것인데

OpenAI에서 설정해 놓은 '성공적인 학습'의 요건이

100개의 에피소드의 평균이 timestep 195 이상이라고 하니,

택도 없는 수치인 셈이다.

이해가 가지 않는 것은 'timestep'이라는 말의 정의인데,

DQN으로 구현한 결과 역시 평균적으로 50의 timestep을 유지하고 있는 것을 보면

0.1초 단위겠거니- 싶다.

 

어쨌든, 아직 불완전한 요소가 많은 코드라는 것은

변하지 않는 사실임이 틀림없기에

더 공부를 해봐야겠다.