2012-03-15 23 views
14

Estoy haciendo un proyecto de Python en el que me gustaría usar el Algoritmo de Viterbi. ¿Alguien sabe de una implementación completa de Python del algoritmo de Viterbi? La corrección de la de Wikipedia parece estar en duda en la página de discusión. ¿Alguien tiene un puntero?Implementación de Python del algoritmo de Viterbi

Respuesta

7

Hmm Puedo publicar el mío. Aunque no es bonito, avíseme si necesita alguna aclaración. Escribí esto hace relativamente poco tiempo específicamente para el etiquetado de voz.

class Trellis: 
    trell = [] 
    def __init__(self, hmm, words): 
     self.trell = [] 
     temp = {} 
     for label in hmm.labels: 
      temp[label] = [0,None] 
     for word in words: 
      self.trell.append([word,copy.deepcopy(temp)]) 
     self.fill_in(hmm) 

    def fill_in(self,hmm): 
     for i in range(len(self.trell)): 
      for token in self.trell[i][1]: 
       word = self.trell[i][0] 
       if i == 0: 
        self.trell[i][1][token][0] = hmm.e(token,word) 
       else: 
        max = None 
        guess = None 
        c = None 
        for k in self.trell[i-1][1]: 
         c = self.trell[i-1][1][k][0] + hmm.t(k,token) 
         if max == None or c > max: 
          max = c 
          guess = k 
        max += hmm.e(token,word) 
        self.trell[i][1][token][0] = max 
        self.trell[i][1][token][1] = guess 

    def return_max(self): 
     tokens = [] 
     token = None 
     for i in range(len(self.trell)-1,-1,-1): 
      if token == None: 
       max = None 
       guess = None 
       for k in self.trell[i][1]: 
        if max == None or self.trell[i][1][k][0] > max: 
         max = self.trell[i][1][k][0] 
         token = self.trell[i][1][k][1] 
         guess = k 
       tokens.append(guess) 
      else: 
       tokens.append(token) 
       token = self.trell[i][1][token][1] 
     tokens.reverse() 
     return tokens 
+1

Estoy un poco confundido por qué esto es más alto que la publicación NLTK, ¿es incorrecta su implementación? ¿Encontraste satisfactorio mi código completamente indocumentado? – placeybordeaux

+1

Probablemente la razón por la que es más fácil piratear para adaptarse a las necesidades que el código NLTK. – chiffa

+0

@placeybordeaux ¿Qué función tiene 'hmm.t (k, token)'? Traté de replicar el código pero no pude entender qué 'hmm.t (k, token)' hacer. ¿Puedes dar un ejemplo para eso? – Mohammed

11

I encontraron la siguiente code en el ejemplo repositorio de Artificial Intelligence: A Modern Approach. ¿Es algo como esto lo que estás buscando?

def viterbi_segment(text, P): 
    """Find the best segmentation of the string of characters, given the 
    UnigramTextModel P.""" 
    # best[i] = best probability for text[0:i] 
    # words[i] = best word ending at position i 
    n = len(text) 
    words = [''] + list(text) 
    best = [1.0] + [0.0] * n 
    ## Fill in the vectors best, words via dynamic programming 
    for i in range(n+1): 
     for j in range(0, i): 
      w = text[j:i] 
      if P[w] * best[i - len(w)] >= best[i]: 
       best[i] = P[w] * best[i - len(w)] 
       words[i] = w 
    ## Now recover the sequence of best words 
    sequence = []; i = len(words)-1 
    while i > 0: 
     sequence[0:0] = [words[i]] 
     i = i - len(words[i]) 
    ## Return sequence of best words and overall probability 
    return sequence, best[-1] 
4

Acabo de corregir la pseudo implementación de Viterbi in Wikipedia. De la versión inicial (incorrecta), me tomó un tiempo descubrir dónde me estaba yendo mal, pero finalmente lo logré, gracias en parte a la implementación de Kevin Murphy del viterbi_path.m en el MatLab HMM toolbox.

En el contexto de un objeto HMM con variables como se muestra:

hmm = HMM() 
hmm.priors = np.array([0.5, 0.5]) # pi = prior probs 
hmm.transition = np.array([[0.75, 0.25], # A = transition probs./2 states 
          [0.32, 0.68]]) 
hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs./3 obs modes 
         [0.1, 0.2, 0.7]]) 

El pitón función para ejecutar Viterbi (mejor ruta) algoritmo es el siguiente:

def viterbi (self,observations): 
    """Return the best path, given an HMM model and a sequence of observations""" 
    # A - initialise stuff 
    nSamples = len(observations[0]) 
    nStates = self.transition.shape[0] # number of states 
    c = np.zeros(nSamples) #scale factors (necessary to prevent underflow) 
    viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table 
    psi = np.zeros((nStates,nSamples)) # initialise the best path table 
    best_path = np.zeros(nSamples); # this will be your output 

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) 
    viterbi[:,0] = self.priors.T * self.emission[:,observations(0)] 
    c[0] = 1.0/np.sum(viterbi[:,0]) 
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor 
    psi[0] = 0; 

    # C- Do the iterations for viterbi and psi for time>0 until T 
    for t in range(1,nSamples): # loop through time 
     for s in range (0,nStates): # loop through the states @(t-1) 
      trans_p = viterbi[:,t-1] * self.transition[:,s] 
      psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) 
      viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)] 

     c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor 
     viterbi[:,t] = c[t] * viterbi[:,t] 

    # D - Back-tracking 
    best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state 
    for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step 
     best_path[t-1] = psi[best_path[t],t] 

    return best_path 
+1

Comentario de [jahrulesoverall] (http://stackoverflow.com/users/6925587/jahrulesoverall) publicado incorrectamente en respuesta, * las observaciones (0) son incorrectas ¿no? deberían ser observaciones [0] y observaciones [t]? * –

+0

No entiendo cómo no se obtiene un error cuando se hace 'psi [ruta_original [t], t]' ya que 'best_path' es de tipo float y se puede solo index con ints? –

+1

@MikeVella Agregué: 'bp = np.zeros (nSamples) .astype (int)' – Ant

1

Esta es una vieja pregunta , pero ninguna de las otras respuestas fue bastante lo que necesitaba porque mi aplicación no tiene estados específicos observados.

Tomando después de @Rhubarb, también he reimplementado el Matlab implementation de Kevin Murphey (vea viterbi_path.m), pero lo he mantenido más cerca del original. También incluí un caso de prueba simple.

import numpy as np 


def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False): 
    '''Finds the most-probable (Viterbi) path through the HMM state trellis 
    Notation: 
     Z[t] := Observation at time t 
     Q[t] := Hidden state at time t 
    Inputs: 
     prior: np.array(num_hid) 
      prior[i] := Pr(Q[0] == i) 
     transmat: np.ndarray((num_hid,num_hid)) 
      transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i) 
     obslik: np.ndarray((num_hid,num_obs)) 
      obslik[i,t] := Pr(Z[t] | Q[t] == i) 
     scaled: bool 
      whether or not to normalize the probability trellis along the way 
      doing so prevents underflow by repeated multiplications of probabilities 
     ret_loglik: bool 
      whether or not to return the log-likelihood of the best path 
    Outputs: 
     path: np.array(num_obs) 
      path[t] := Q[t] 
    ''' 
    num_hid = obslik.shape[0] # number of hidden states 
    num_obs = obslik.shape[1] # number of observations (not observation *states*) 

    # trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)]) 
    trellis_prob = np.zeros((num_hid,num_obs)) 
    # trellis_state[i,t] := best predecessor state given that we ended up in state i at t 
    trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies 
    path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies 

    trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult 
    if scaled: 
     scale = np.ones(num_obs) # only instantiated if necessary to save memory 
     scale[0] = 1.0/np.sum(trellis_prob[:,0]) 
     trellis_prob[:,0] *= scale[0] 

    trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor 
    for t in xrange(1, num_obs): 
     for j in xrange(num_hid): 
      trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult 
      trellis_state[j,t] = trans_probs.argmax() 
      trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs 
      trellis_prob[j,t] *= obslik[j,t] 
     if scaled: 
      scale[t] = 1.0/np.sum(trellis_prob[:,t]) 
      trellis_prob[:,t] *= scale[t] 

    path[-1] = trellis_prob[:,-1].argmax() 
    for t in range(num_obs-2, -1, -1): 
     path[t] = trellis_state[(path[t+1]), t+1] 

    if not ret_loglik: 
     return path 
    else: 
     if scaled: 
      loglik = -np.sum(np.log(scale)) 
     else: 
      p = trellis_prob[path[-1],-1] 
      loglik = np.log(p) 
     return path, loglik 


if __name__=='__main__': 
    # Assume there are 3 observation states, 2 hidden states, and 5 observations 
    priors = np.array([0.5, 0.5]) 
    transmat = np.array([ 
     [0.75, 0.25], 
     [0.32, 0.68]]) 
    emmat = np.array([ 
     [0.8, 0.1, 0.1], 
     [0.1, 0.2, 0.7]]) 
    observations = np.array([0, 1, 2, 1, 0], dtype=int) 
    obslik = np.array([emmat[:,z] for z in observations]).T 
    print viterbi_path(priors, transmat, obslik)        #=> [0 1 1 1 0] 
    print viterbi_path(priors, transmat, obslik, scaled=False)     #=> [0 1 1 1 0] 
    print viterbi_path(priors, transmat, obslik, ret_loglik=True)    #=> (array([0, 1, 1, 1, 0]), -7.776472586614755) 
    print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227) 

Tenga en cuenta que esta aplicación no utiliza probabilidades de emisión directamente, sino que utiliza una variable obslik. Generalmente, emissions[i,j] := Pr(observed_state == j | hidden_state == i) para un estado particular observado i, haciendo emissions.shape == (num_hidden_states, num_obs_states).

Sin embargo, dada una secuencia observations[t] := observation at time t, todo el algoritmo de Viterbi requiere la probabilidad de esa observación para cada estado oculto. Por lo tanto, obslik[i,t] := Pr(observations[t] | hidden_state == i). El valor real del estado observado no es necesario.

0

He modificado la respuesta de @Rubarb para la condición donde las probabilidades marginales ya son conocidas (por ejemplo, calculando el algoritmo Forward Backward).

def viterbi (transition_probabilities, conditional_probabilities): 
    # Initialise everything 
    num_samples = conditional_probabilities.shape[1] 
    num_states = transition_probabilities.shape[0] # number of states 

    c = np.zeros(num_samples) #scale factors (necessary to prevent underflow) 
    viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table 
    best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table 
    best_path = np.zeros(num_samples).astype(np.int32) # this will be your output 

    # B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b) 
    viterbi[:,0] = conditional_probabilities[:,0] 
    c[0] = 1.0/np.sum(viterbi[:,0]) 
    viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor 

    # C- Do the iterations for viterbi and psi for time>0 until T 
    for t in range(1, num_samples): # loop through time 
     for s in range (0,num_states): # loop through the states @(t-1) 
      trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning 
      best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1)) 
      viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t] 

     c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor 
     viterbi[:,t] = c[t] * viterbi[:,t] 

    ## D - Back-tracking 
    best_path[num_samples-1] = viterbi[:,num_samples-1].argmax() # last state 
    for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step 
     best_path[t-1] = best_path_table[best_path[t],t] 
    return best_path 
Cuestiones relacionadas