오늘의 인기 글
최근 글
최근 댓글
Today
Total
11-23 19:17
관리 메뉴

우노

[추천시스템] Latent Factor Model with Pytorch 본문

Data/Recommender System

[추천시스템] Latent Factor Model with Pytorch

운호(Noah) 2020. 10. 29. 17:36

MovieLens 데이터

데이터 준비

  • ml-100k.zip 파일을 받아서 적절한 위치에 압축풀기

Pytorch 사용하기

  • 필요한 모듈 import 하기

      import torch
      import pandas as pd
      import torch.nn.functional as F
      import matplotlib.pyplot as plt

파일 불러오기

  • Pandas를 이용하여 파일 불러오기

      train = pd.read_csv("../ds_data/ml-100k/ua.base",
       sep="\t", names=['user', 'movie', 'rating', 'timestamp'])
      test = pd.read_csv("../ds_data/ml-100k/ua.test",
       sep="\t", names=['user', 'movie', 'rating', 'timestamp'])
  • Pytorch tensor 데이터로 변환

      items = torch.LongTensor(train['movie'])
      users = torch.LongTensor(train['user'])
      ratings = torch.FloatTensor(train['rating'])
      items_test = torch.LongTensor(test['movie'])
      users_test = torch.LongTensor(test['user'])
      ratings_test = torch.FloatTensor(test['rating'])

Latent Factor Model

  • 기본 Matrix Factorization 구현 (정규화 및 bias 없는 버전)

      # rank → 사용자, 아이템 vector의 차원
      # numUsers → 사용자 수
      # numItems → 아이템 수
      # P → 아이템 매트릭스
      # Q → 사용자매트릭스
    
      rank = 10
    
      numItems = items.max() + 1
      numUsers = users.max() + 1
    
      P = torch.randn(numItems, rank, requires_grad=True)
      Q = torch.randn(numUsers, rank, requires_grad=True)
    
      optimizer = torch.optim.Adam([P, Q], lr= 0.1)
    
      for epoch in range(1000):
    
          # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
          hypothesis = torch.sum(P[items] * Q[users], dim= 1)
          # 비용 = MSE
          cost = F.mse_loss(hypothesis ,ratings)
    
          # 기울기 계산
          optimizer.zero_grad()
          cost.backward()
          optimizer.step()
    
          if epoch % 100 == 0:
              print("epoch: {}, cost: {:.6f}" .format(epoch, cost.item()))
  • Matplotlib으로 epoch마다 Training MSE 그려보기

      optimizer = torch.optim.Adam([P, Q], lr= 0.1)
    
      X = []
      Y = []
    
      for epoch in range(1000):
    
          # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
          hypothesis = torch.sum(P[items] * Q[users], dim= 1)
    
              # 비용 = MSE
          cost = F.mse_loss(hypothesis ,ratings)
    
          # 기울기 계산
          optimizer.zero_grad()
          cost.backward()
          optimizer.step()
    
          X.append(epoch)
          Y.append(cost)
    
          if epoch % 100 == 0:
              print("epoch: {}, cost: {:.6f}" .format(epoch, cost.item()))
    
      plt.ylabel("MSE")
      plt.xlabel("Epoch")
      plt.plot(X,Y, c="blue", label="Training MSE")
      plt.legend()
      plt.show()
  • Matplotlib으로 epoch마다 Test MSE 그려보기

      rank = 10
      numItems = items.max() + 1
      numUsers = users.max() + 1
      P = torch.randn(numItems, rank, requires_grad=True)
      Q = torch.randn(numUsers, rank, requires_grad=True)
    
      optimizer = torch.optim.Adam([P, Q], lr= 0.1)
    
      X = []
      Y = []
      Y_test = []
    
      for epoch in range(1000):
    
          # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
          hypothesis = torch.sum(P[items] * Q[users], dim= 1)
    
              # 비용 = MSE
          cost = F.mse_loss(hypothesis ,ratings)
    
          # 기울기 계산
          optimizer.zero_grad()
          cost.backward()
          optimizer.step()
    
          # 기울기 계산 필요 없다.
          with torch.no_grad():
                      # 만들어진 P, Q matrix로 예측값을 뽑아내고 검증데이터와 비교한다.
    
                      # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
              hypo_test = torch.sum(P[items_test] * Q[users_test], dim=1)
                  # 비용 = MSE
              cost_test = F.mse_loss(hypo_test, ratings_test)
    
              X.append(epoch)
              Y.append(cost)
              Y_test.append(cost_test)
    
          if epoch % 100 == 0:
              print("epoch: {}, cost: {:.6f}" .format(epoch, cost.item()))
    
      plt.ylabel("MSE")
      plt.xlabel("Epoch")
      plt.plot(X,Y, c="blue", label="Training MSE")
      plt.plot(X,Y_test, c="red", label="Test MSE")
      plt.legend()
      plt.show()

기본 Matrix Factorization에 Regularization 추가하기

rank = 10
numItems = items.max() + 1
numUsers = users.max() + 1
P = torch.randn(numItems, rank, requires_grad=True)
Q = torch.randn(numUsers, rank, requires_grad=True)

# 정규화에 사용할 파라미터
lambda1 = 0.0001
lambda2 = 0.0001

optimizer = torch.optim.Adam([P, Q], lr= 0.1)

X = []
Y = []
Y_test = []

for epoch in range(1000):

        # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
    hypothesis = torch.sum(P[items] * Q[users], dim=1)

        # 비용 = MSE + 정규화
        cost = F.mse_loss(hypothesis ,ratings)
        loss = cost + lambda1 * torch.sum(P ** 2) + lambda2 * torch.sum(Q ** 2)

    # 기울기 계산
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # 기울기 계산 필요 없다.
    with torch.no_grad():
                # 만들어진 P, Q matrix로 예측값을 뽑아내고 검증데이터와 비교한다.

                # 가설 = 예상별점(아이템벡터와 유저벡터 내적)
        hypo_test = torch.sum(P[items_test] * Q[users_test], dim=1)
            # 비용 = MSE
        cost_test = F.mse_loss(hypo_test, ratings_test)

        # cost 결과 작성
        X.append(epoch)
        Y.append(cost)
        Y_test.append(cost_test)

    if epoch % 100 == 0:
        print("epoch: {}, cost: {:.6f}" .format(epoch, cost.item()))

plt.ylabel("MSE")
plt.xlabel("Epoch")
plt.plot(X,Y, c="blue", label="Training MSE")
plt.plot(X,Y_test, c="red", label="Test MSE")
plt.legend()
plt.show()

기본 Matrix Factorization에 Bias 추가하기

# Bias 추가하기

rank = 10
numItems = items.max() + 1
numUsers = users.max() + 1
P = torch.randn(numItems, rank, requires_grad=True)
Q = torch.randn(numUsers, rank, requires_grad=True)

# Global Baseline Estimate 적용시 bias_item, bias_user, Pi, Qx를 정규화하기 위해 사용
lambda1 = 0.0001
lambda2 = 0.0001
lambda3 = 0.001
lambda4 = 0.001

# 각 item, user 벡터에 적용할 bias 값
# 학습해야하는 파라미터이기에 랜덤한 값으로 채워넣는다.
bias_item = torch.randn(numItems, requires_grad=True)
bias_user = torch.randn(numUsers, requires_grad=True)

# 전체 평점 평균 (Global Baseline Estimate를 적용한 Latent Factor model의 가설함수에 필요)
mean = (ratings.sum() / len(ratings)).item()

optimizer = torch.optim.Adam([P, Q, bias_item, bias_user], lr= 0.1)

X = []
Y = []
Y_test = []

for epoch in range(1000):

        # 가설 = 예상별점(아이템벡터와 유저벡터 내적) + 전체평점평균 + 아이템벡터의 bias + 유저벡터의 bias
    hypothesis = torch.sum(P[items] * Q[users], dim=1) + mean + bias_item[items] + bias_user[users]

        # 비용 = MSE + 정규화
    cost = F.mse_loss(hypothesis ,ratings)
        loss = cost + lambda1 * torch.sum(P ** 2) + lambda2 * torch.sum(Q ** 2) + lambda3 * torch.sum(bias_item ** 2) + lambda4 * torch.sum(bias_user ** 2) 

    # 기울기 계산
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    # 기울기 계산 필요 없다.
    with torch.no_grad():
                # 만들어진 P, Q matrix로 예측값을 뽑아내고 검증데이터와 비교한다.

                # 가설 = 예상별점(아이템벡터와 유저벡터 내적) + 전체평점평균 + 아이템벡터의 bias + 유저벡터의 bias
        hypo_test = torch.sum(P[items_test] * Q[users_test], dim=1)+ mean + bias_item[items_test] + bias_user[users_test]

                # 비용 = MSE
        cost_test = F.mse_loss(hypo_test, ratings_test)

                # cost 결과 작성
        X.append(epoch)
        Y.append(cost)
        Y_test.append(cost_test)

    if epoch % 100 == 0:
        print("epoch: {}, cost: {:.6f}" .format(epoch, cost.item()))

plt.ylabel("MSE")
plt.xlabel("Epoch")
plt.plot(X,Y, c="blue", label="Training MSE")
plt.plot(X,Y_test, c="red", label="Test MSE")
plt.legend()
plt.show()
Comments