본문 바로가기
공부/정보과학

[예제] 시계열 Data로부터 Transfer Entropy 구하기

by 죠옹 2019. 2. 19.

 정보 엔트로피를 소개하는 글에서 언급되었던 Transfer entropy. 수식과는 다르게 실제로 구하는 방법은 감이 쉽게 오지 않는다. 그래서 관련 예제를 정리해본다.

 

 잠깐 복습을 하자면, Transfer entropy는 특정 시간의 data의 값이 이전 자신이 지녔던 값과 관련 있는지, 아니면 다른 data에 더 관련을 지니고 있는지 확률을 통해 정량화한 값이다.

 더 간단히 예를 들어보자면, 두 시계열 데이터 X, Y가 있을 때, 특정 시간 t의 X의 값이 X의 이전 값보다 Y의 이전 값으로 설명이 더 잘된다면, Y의 값이 다음 time step의 X의 값에 영향을 미치고 있는 것으로 판단해 볼 수 있다.

 

 식으로 나타내자면, 다음과 같다. 

 X_t+1의 값이 X_t(k), 즉, 이전 X값을 k개 모아 놓은 조건에서 지니는 정보 엔트로피(불확실성)가

 X_t+1의 값이 Y_t(l), 즉, 이전 Y값을 l개 모아 놓은 조건에서 지니는 정보 엔트로피(불확실성)에 의해

 얼마나 감소하였는가 나타내는 양이다.

 Y가 X에 영향을 미칠 수록, 후자의 불확실성이 감소하며, T_Y→X는 증가한다. 

 

 

시계열 Data로부터 Transfer entropy 구하는 방법


 오늘은 각 데이터의 window 값 k, l 을 1로 한 상태에서 Transfer entropy를 구하는 예제를 풀어보려 한다. k, l이 1인 것은 특정 시간 t와 t-1의 값이 지니는 인과관계를 살펴본다는 것을 뜻한다.

 또한, 시계열 데이터의 값은 0과 1 만 지니는 상태에서 Transfer entropy를 살펴본다. 값을 담아야 할 카테고리가 개수(n)에 따라 n^3의 확률 공간이 계산에 필요하게 된다 ( x_t, y_t, x_t+1 를 한개씩의 축으로 확률 공간의 밀도를 탐색해야 하기 때문에 n^3, 만약 k와 l이 1보다 크게 된다면 축이 하나씩 늘어나는 효과 ).

 

 상기의 T_Y→X의 식은 k, l window값 1에서 다음과 같이 변형 가능하다.

 

 이제, p(x_t+1, x_t, y_t)를 담을 행렬(2x2x2), p(x_t+1, x_t)를 담을 행렬(2x2), p(x_t, y_t)를 담을 행렬(2x2), p(x_t)를 담을 행렬(2)을 마련하여 시간 t 동안 나타나는 x_t값, y_t값, x_t+1값에 해당하는 행렬의 위치에 개수를 축적한다.

 각 행렬을 전체 데이터 수로 normalizing 하면, 각각의 확률 p를 담은 행렬을 구할 수 있다.

 

 다음, 상기의 T_Y→X 식에 대입하여 총 합을 구하면 계산 끝!


 

 

예제


 0과 1의 값을 random으로 생성한 size 1000의 배열 d1과 그 배열 앞에 '0'값을 삽입하여 time step이 한칸씩 밀린 배열 d2를 마련하고 d1과 d2사이의 Transfer entropy를 계산해본다.

예시) size 10

  d1 = [1, 0, 0, 1, 1, 0, 1, 0, 1, 1]

  d2 = [0, 1, 0, 0, 1, 1, 0, 1, 0, 1, 1]    삽입 삭제

 

 T_d2→d1 = 0.0006839255348313569

 T_d1→d2 = 0.6927528715716236

 

 d1에서 d2로의 Transfer entropy가 훨씬 큰 값을 지니므로, 의도했던 것처럼 d1의 time step t값이 d2의 timestep t+1 값에 미치는 영향의 정량적 평가가 이루어 졌음을 확인할 수 있다.

 

 

코드

import numpy as np


"""Tr_Y->X = SIGMA{ p(x_n+1, x_n, y_n) * log ( p(x_n+1, x_n, y_n) * p(x_n) / p(x_n, y_n) / p(x_n+1, x_n) }  """
def CAL_TE_Y_to_X(X, Y):
    """ Count """
    M_0 = np.zeros((2, 2, 2))     # p(x_n+1, x_n, y_n)
    M_1 = np.zeros((2, 2))      # p(x_n+1, x_n)
    M_2 = np.zeros((2, 2))      # p(x_n, y_n)
    M_3 = np.zeros(2)           # p(x_n)
    for t in range(len(X)):
        if t+1 != len(X):
            M_0[X[t + 1], X[t], Y[t]] += 1
            M_1[X[t + 1], X[t]] += 1
        M_2[X[t], Y[t]] += 1
        M_3[X[t]] += 1

    """ Normalizing (Count -> Probability) """
    M_0 = M_0 / M_0.sum()
    M_1 = M_1 / M_1.sum()
    M_2 = M_2 / M_2.sum()
    M_3 = M_3 / M_3.sum()

    TR_Y_to_X = 0
    for x_n1 in range(2):
        for x_n in range(2):
            for y_n in range(2):
                if ((M_0[x_n1, x_n, y_n] * M_3[x_n]) != 0) and (M_2[x_n, y_n] * M_1[x_n1, x_n] != 0):
                    TR_Y_to_X += M_0[x_n1, x_n, y_n] * np.log(M_0[x_n1, x_n, y_n] * M_3[x_n] / M_2[x_n, y_n] / M_1[x_n1, x_n])

    return TR_Y_to_X

""" d1 생성 : random """
d1 = np.random.randint(2, size=1000)
""" d1이 한칸씩 뒤로 밀린 data d2 생성 """
d2 = np.insert(d1, 0, 0)[:-1]

""" Transfer entropy 계산"""
TE1 = CAL_TE_Y_to_X(X=d1, Y=d2)
TE2 = CAL_TE_Y_to_X(X=d2, Y=d1)
print(TE1, TE2)

 

반응형

댓글