2012-03-15 4 views
14

Я выполняю проект Python, в котором я хотел бы использовать алгоритм Витерби. Кто-нибудь знает о полной реализации Python алгоритма Витерби? Правильность одного в Википедии, похоже, под вопросом на странице обсуждения. У кого-нибудь есть указатель?Python Реализация алгоритма Витерби

ответ

7

Хм, я могу отправить свое. Его не очень, хотя, пожалуйста, дайте мне знать, если вам нужно разъяснение. Я написал это сравнительно недавно специально для выделения тегов.

class Trellis: 
    trell = [] 
    def __init__(self, hmm, words): 
     self.trell = [] 
     temp = {} 
     for label in hmm.labels: 
      temp[label] = [0,None] 
     for word in words: 
      self.trell.append([word,copy.deepcopy(temp)]) 
     self.fill_in(hmm) 

    def fill_in(self,hmm): 
     for i in range(len(self.trell)): 
      for token in self.trell[i][1]: 
       word = self.trell[i][0] 
       if i == 0: 
        self.trell[i][1][token][0] = hmm.e(token,word) 
       else: 
        max = None 
        guess = None 
        c = None 
        for k in self.trell[i-1][1]: 
         c = self.trell[i-1][1][k][0] + hmm.t(k,token) 
         if max == None or c > max: 
          max = c 
          guess = k 
        max += hmm.e(token,word) 
        self.trell[i][1][token][0] = max 
        self.trell[i][1][token][1] = guess 

    def return_max(self): 
     tokens = [] 
     token = None 
     for i in range(len(self.trell)-1,-1,-1): 
      if token == None: 
       max = None 
       guess = None 
       for k in self.trell[i][1]: 
        if max == None or self.trell[i][1][k][0] > max: 
         max = self.trell[i][1][k][0] 
         token = self.trell[i][1][k][1] 
         guess = k 
       tokens.append(guess) 
      else: 
       tokens.append(token) 
       token = self.trell[i][1][token][1] 
     tokens.reverse() 
     return tokens 
+1

Я немного смущен, почему это выше, чем сообщение NLTK, является ли их реализация неправильной? OP вы нашли мой полностью недокументированный код удовлетворительным? – placeybordeaux

+1

Вероятно, причина заключается в том, что легче адаптироваться к потребностям, чем код NLTK. – chiffa

+0

@placeybordeaux Что делает эта функция 'hmm.t (k, token)' do? Я пытался копировать код, но я не мог понять, что делает «hmm.t (k, токен)». Можете ли вы привести пример? – Mohammed

11

я нашел следующую code в примере хранилище Artificial Intelligence: A Modern Approach. Что-то вроде этого, что вы ищете?

def viterbi_segment(text, P): 
    """Find the best segmentation of the string of characters, given the 
    UnigramTextModel P.""" 
    # best[i] = best probability for text[0:i] 
    # words[i] = best word ending at position i 
    n = len(text) 
    words = [''] + list(text) 
    best = [1.0] + [0.0] * n 
    ## Fill in the vectors best, words via dynamic programming 
    for i in range(n+1): 
     for j in range(0, i): 
      w = text[j:i] 
      if P[w] * best[i - len(w)] >= best[i]: 
       best[i] = P[w] * best[i - len(w)] 
       words[i] = w 
    ## Now recover the sequence of best words 
    sequence = []; i = len(words)-1 
    while i > 0: 
     sequence[0:0] = [words[i]] 
     i = i - len(words[i]) 
    ## Return sequence of best words and overall probability 
    return sequence, best[-1] 
4

Я только что скорректировал псевдо-реализацию Viterbi in Wikipedia. Из первоначальной (неправильной) версии мне потребовалось некоторое время, чтобы выяснить, где я ошибся, но я, наконец, справился с этим, частично благодаря реализации Kevin Murphy viterbi_path.m в MatLab HMM toolbox.

В контексте объекта НММ с переменными, как показано:

hmm = HMM() 
hmm.priors = np.array([0.5, 0.5]) # pi = prior probs 
hmm.transition = np.array([[0.75, 0.25], # A = transition probs./2 states 
          [0.32, 0.68]]) 
hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs./3 obs modes 
         [0.1, 0.2, 0.7]]) 

Питон функция для запуска Витерби (лучший путь) алгоритм ниже:

def viterbi (self,observations): 
    """Return the best path, given an HMM model and a sequence of observations""" 
    # A - initialise stuff 
    nSamples = len(observations[0]) 
    nStates = self.transition.shape[0] # number of states 
    c = np.zeros(nSamples) #scale factors (necessary to prevent underflow) 
    viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table 
    psi = np.zeros((nStates,nSamples)) # initialise the best path table 
    best_path = np.zeros(nSamples); # this will be your output 

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) 
    viterbi[:,0] = self.priors.T * self.emission[:,observations(0)] 
    c[0] = 1.0/np.sum(viterbi[:,0]) 
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor 
    psi[0] = 0; 

    # C- Do the iterations for viterbi and psi for time>0 until T 
    for t in range(1,nSamples): # loop through time 
     for s in range (0,nStates): # loop through the states @(t-1) 
      trans_p = viterbi[:,t-1] * self.transition[:,s] 
      psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) 
      viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)] 

     c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor 
     viterbi[:,t] = c[t] * viterbi[:,t] 

    # D - Back-tracking 
    best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state 
    for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step 
     best_path[t-1] = psi[best_path[t],t] 

    return best_path 
+1

Комментарий [jahrulesoverall] (http://stackoverflow.com/users/6925587/jahrulesoverall) неверно отправлен в ответ, * замечания (0) неверны правильно? должны быть наблюдения [0] и наблюдения [t]? * –

+0

Я не понимаю, как вы не получаете ошибку при выполнении 'psi [best_path [t], t]', так как 'best_path' является типом float, и вы можете только индекс с ints? –

+1

@MikeVella Я добавил: 'bp = np.zeros (nSamples) .astype (int)' – Ant

1

Это старый вопрос , но ни один из других ответов был довольно, что мне нужно, потому что мое приложение не имеет конкретных наблюдаемых состояний.

Принимая после @Rhubarb, я также внедрил Kevin Murphey's Matlab implementation (см. viterbi_path.m), но я сохранил его ближе к оригиналу. Я включил простой тестовый пример.

import numpy as np 


def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False): 
    '''Finds the most-probable (Viterbi) path through the HMM state trellis 
    Notation: 
     Z[t] := Observation at time t 
     Q[t] := Hidden state at time t 
    Inputs: 
     prior: np.array(num_hid) 
      prior[i] := Pr(Q[0] == i) 
     transmat: np.ndarray((num_hid,num_hid)) 
      transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i) 
     obslik: np.ndarray((num_hid,num_obs)) 
      obslik[i,t] := Pr(Z[t] | Q[t] == i) 
     scaled: bool 
      whether or not to normalize the probability trellis along the way 
      doing so prevents underflow by repeated multiplications of probabilities 
     ret_loglik: bool 
      whether or not to return the log-likelihood of the best path 
    Outputs: 
     path: np.array(num_obs) 
      path[t] := Q[t] 
    ''' 
    num_hid = obslik.shape[0] # number of hidden states 
    num_obs = obslik.shape[1] # number of observations (not observation *states*) 

    # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)]) 
    trellis_prob = np.zeros((num_hid,num_obs)) 
    # trellis_state[i,t] := best predecessor state given that we ended up in state i at t 
    trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies 
    path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies 

    trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult 
    if scaled: 
     scale = np.ones(num_obs) # only instantiated if necessary to save memory 
     scale[0] = 1.0/np.sum(trellis_prob[:,0]) 
     trellis_prob[:,0] *= scale[0] 

    trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor 
    for t in xrange(1, num_obs): 
     for j in xrange(num_hid): 
      trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult 
      trellis_state[j,t] = trans_probs.argmax() 
      trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs 
      trellis_prob[j,t] *= obslik[j,t] 
     if scaled: 
      scale[t] = 1.0/np.sum(trellis_prob[:,t]) 
      trellis_prob[:,t] *= scale[t] 

    path[-1] = trellis_prob[:,-1].argmax() 
    for t in range(num_obs-2, -1, -1): 
     path[t] = trellis_state[(path[t+1]), t+1] 

    if not ret_loglik: 
     return path 
    else: 
     if scaled: 
      loglik = -np.sum(np.log(scale)) 
     else: 
      p = trellis_prob[path[-1],-1] 
      loglik = np.log(p) 
     return path, loglik 


if __name__=='__main__': 
    # Assume there are 3 observation states, 2 hidden states, and 5 observations 
    priors = np.array([0.5, 0.5]) 
    transmat = np.array([ 
     [0.75, 0.25], 
     [0.32, 0.68]]) 
    emmat = np.array([ 
     [0.8, 0.1, 0.1], 
     [0.1, 0.2, 0.7]]) 
    observations = np.array([0, 1, 2, 1, 0], dtype=int) 
    obslik = np.array([emmat[:,z] for z in observations]).T 
    print viterbi_path(priors, transmat, obslik)        #=> [0 1 1 1 0] 
    print viterbi_path(priors, transmat, obslik, scaled=False)     #=> [0 1 1 1 0] 
    print viterbi_path(priors, transmat, obslik, ret_loglik=True)    #=> (array([0, 1, 1, 1, 0]), -7.776472586614755) 
    print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227) 

Обратите внимание, что эта реализация не использует вероятность выбросов непосредственно, но использует переменную obslik. Обычно emissions[i,j] := Pr(observed_state == j | hidden_state == i) для конкретного наблюдаемого состояния i, что делает emissions.shape == (num_hidden_states, num_obs_states).

Однако, учитывая последовательность observations[t] := observation at time t, все алгоритмы Витерби требуют вероятности этого наблюдения для каждого скрытого состояния. Следовательно, obslik[i,t] := Pr(observations[t] | hidden_state == i). Фактическое значение наблюдаемого состояния не требуется.

0

Я изменил ответ @ Ребарба на условие, когда предельные вероятности уже известны (например, путем вычисления алгоритма Forward Backward).

def viterbi (transition_probabilities, conditional_probabilities): 
    # Initialise everything 
    num_samples = conditional_probabilities.shape[1] 
    num_states = transition_probabilities.shape[0] # number of states 

    c = np.zeros(num_samples) #scale factors (necessary to prevent underflow) 
    viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table 
    best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table 
    best_path = np.zeros(num_samples).astype(np.int32) # this will be your output 

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) 
    viterbi[:,0] = conditional_probabilities[:,0] 
    c[0] = 1.0/np.sum(viterbi[:,0]) 
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor 

    # C- Do the iterations for viterbi and psi for time>0 until T 
    for t in range(1, num_samples): # loop through time 
     for s in range (0,num_states): # loop through the states @(t-1) 
      trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning 
      best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) 
      viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t] 

     c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor 
     viterbi[:,t] = c[t] * viterbi[:,t] 

    ## D - Back-tracking 
    best_path[num_samples-1] = viterbi[:,num_samples-1].argmax() # last state 
    for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step 
     best_path[t-1] = best_path_table[best_path[t],t] 
    return best_path