Estoy haciendo un proyecto de Python en el que me gustaría usar el Algoritmo de Viterbi. ¿Alguien sabe de una implementación completa de Python del algoritmo de Viterbi? La corrección de la de Wikipedia parece estar en duda en la página de discusión. ¿Alguien tiene un puntero?Implementación de Python del algoritmo de Viterbi
Respuesta
Hmm Puedo publicar el mío. Aunque no es bonito, avíseme si necesita alguna aclaración. Escribí esto hace relativamente poco tiempo específicamente para el etiquetado de voz.
class Trellis:
trell = []
def __init__(self, hmm, words):
self.trell = []
temp = {}
for label in hmm.labels:
temp[label] = [0,None]
for word in words:
self.trell.append([word,copy.deepcopy(temp)])
self.fill_in(hmm)
def fill_in(self,hmm):
for i in range(len(self.trell)):
for token in self.trell[i][1]:
word = self.trell[i][0]
if i == 0:
self.trell[i][1][token][0] = hmm.e(token,word)
else:
max = None
guess = None
c = None
for k in self.trell[i-1][1]:
c = self.trell[i-1][1][k][0] + hmm.t(k,token)
if max == None or c > max:
max = c
guess = k
max += hmm.e(token,word)
self.trell[i][1][token][0] = max
self.trell[i][1][token][1] = guess
def return_max(self):
tokens = []
token = None
for i in range(len(self.trell)-1,-1,-1):
if token == None:
max = None
guess = None
for k in self.trell[i][1]:
if max == None or self.trell[i][1][k][0] > max:
max = self.trell[i][1][k][0]
token = self.trell[i][1][k][1]
guess = k
tokens.append(guess)
else:
tokens.append(token)
token = self.trell[i][1][token][1]
tokens.reverse()
return tokens
I encontraron la siguiente code en el ejemplo repositorio de Artificial Intelligence: A Modern Approach. ¿Es algo como esto lo que estás buscando?
def viterbi_segment(text, P):
"""Find the best segmentation of the string of characters, given the
UnigramTextModel P."""
# best[i] = best probability for text[0:i]
# words[i] = best word ending at position i
n = len(text)
words = [''] + list(text)
best = [1.0] + [0.0] * n
## Fill in the vectors best, words via dynamic programming
for i in range(n+1):
for j in range(0, i):
w = text[j:i]
if P[w] * best[i - len(w)] >= best[i]:
best[i] = P[w] * best[i - len(w)]
words[i] = w
## Now recover the sequence of best words
sequence = []; i = len(words)-1
while i > 0:
sequence[0:0] = [words[i]]
i = i - len(words[i])
## Return sequence of best words and overall probability
return sequence, best[-1]
Acabo de corregir la pseudo implementación de Viterbi in Wikipedia. De la versión inicial (incorrecta), me tomó un tiempo descubrir dónde me estaba yendo mal, pero finalmente lo logré, gracias en parte a la implementación de Kevin Murphy del viterbi_path.m
en el MatLab HMM toolbox.
En el contexto de un objeto HMM con variables como se muestra:
hmm = HMM()
hmm.priors = np.array([0.5, 0.5]) # pi = prior probs
hmm.transition = np.array([[0.75, 0.25], # A = transition probs./2 states
[0.32, 0.68]])
hmm.emission = np.array([[0.8, 0.1, 0.1], # B = emission (observation) probs./3 obs modes
[0.1, 0.2, 0.7]])
El pitón función para ejecutar Viterbi (mejor ruta) algoritmo es el siguiente:
def viterbi (self,observations):
"""Return the best path, given an HMM model and a sequence of observations"""
# A - initialise stuff
nSamples = len(observations[0])
nStates = self.transition.shape[0] # number of states
c = np.zeros(nSamples) #scale factors (necessary to prevent underflow)
viterbi = np.zeros((nStates,nSamples)) # initialise viterbi table
psi = np.zeros((nStates,nSamples)) # initialise the best path table
best_path = np.zeros(nSamples); # this will be your output
# B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
viterbi[:,0] = self.priors.T * self.emission[:,observations(0)]
c[0] = 1.0/np.sum(viterbi[:,0])
viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor
psi[0] = 0;
# C- Do the iterations for viterbi and psi for time>0 until T
for t in range(1,nSamples): # loop through time
for s in range (0,nStates): # loop through the states @(t-1)
trans_p = viterbi[:,t-1] * self.transition[:,s]
psi[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
viterbi[s,t] = viterbi[s,t]*self.emission[s,observations(t)]
c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
viterbi[:,t] = c[t] * viterbi[:,t]
# D - Back-tracking
best_path[nSamples-1] = viterbi[:,nSamples-1].argmax() # last state
for t in range(nSamples-1,0,-1): # states of (last-1)th to 0th time step
best_path[t-1] = psi[best_path[t],t]
return best_path
Comentario de [jahrulesoverall] (http://stackoverflow.com/users/6925587/jahrulesoverall) publicado incorrectamente en respuesta, * las observaciones (0) son incorrectas ¿no? deberían ser observaciones [0] y observaciones [t]? * –
No entiendo cómo no se obtiene un error cuando se hace 'psi [ruta_original [t], t]' ya que 'best_path' es de tipo float y se puede solo index con ints? –
@MikeVella Agregué: 'bp = np.zeros (nSamples) .astype (int)' – Ant
Esta es una vieja pregunta , pero ninguna de las otras respuestas fue bastante lo que necesitaba porque mi aplicación no tiene estados específicos observados.
Tomando después de @Rhubarb, también he reimplementado el Matlab implementation de Kevin Murphey (vea viterbi_path.m
), pero lo he mantenido más cerca del original. También incluí un caso de prueba simple.
import numpy as np
def viterbi_path(prior, transmat, obslik, scaled=True, ret_loglik=False):
'''Finds the most-probable (Viterbi) path through the HMM state trellis
Notation:
Z[t] := Observation at time t
Q[t] := Hidden state at time t
Inputs:
prior: np.array(num_hid)
prior[i] := Pr(Q[0] == i)
transmat: np.ndarray((num_hid,num_hid))
transmat[i,j] := Pr(Q[t+1] == j | Q[t] == i)
obslik: np.ndarray((num_hid,num_obs))
obslik[i,t] := Pr(Z[t] | Q[t] == i)
scaled: bool
whether or not to normalize the probability trellis along the way
doing so prevents underflow by repeated multiplications of probabilities
ret_loglik: bool
whether or not to return the log-likelihood of the best path
Outputs:
path: np.array(num_obs)
path[t] := Q[t]
'''
num_hid = obslik.shape[0] # number of hidden states
num_obs = obslik.shape[1] # number of observations (not observation *states*)
# trellis_prob[i,t] := Pr((best sequence of length t-1 goes to state i), Z[1:(t+1)])
trellis_prob = np.zeros((num_hid,num_obs))
# trellis_state[i,t] := best predecessor state given that we ended up in state i at t
trellis_state = np.zeros((num_hid,num_obs), dtype=int) # int because its elements will be used as indicies
path = np.zeros(num_obs, dtype=int) # int because its elements will be used as indicies
trellis_prob[:,0] = prior * obslik[:,0] # element-wise mult
if scaled:
scale = np.ones(num_obs) # only instantiated if necessary to save memory
scale[0] = 1.0/np.sum(trellis_prob[:,0])
trellis_prob[:,0] *= scale[0]
trellis_state[:,0] = 0 # arbitrary value since t == 0 has no predecessor
for t in xrange(1, num_obs):
for j in xrange(num_hid):
trans_probs = trellis_prob[:,t-1] * transmat[:,j] # element-wise mult
trellis_state[j,t] = trans_probs.argmax()
trellis_prob[j,t] = trans_probs[trellis_state[j,t]] # max of trans_probs
trellis_prob[j,t] *= obslik[j,t]
if scaled:
scale[t] = 1.0/np.sum(trellis_prob[:,t])
trellis_prob[:,t] *= scale[t]
path[-1] = trellis_prob[:,-1].argmax()
for t in range(num_obs-2, -1, -1):
path[t] = trellis_state[(path[t+1]), t+1]
if not ret_loglik:
return path
else:
if scaled:
loglik = -np.sum(np.log(scale))
else:
p = trellis_prob[path[-1],-1]
loglik = np.log(p)
return path, loglik
if __name__=='__main__':
# Assume there are 3 observation states, 2 hidden states, and 5 observations
priors = np.array([0.5, 0.5])
transmat = np.array([
[0.75, 0.25],
[0.32, 0.68]])
emmat = np.array([
[0.8, 0.1, 0.1],
[0.1, 0.2, 0.7]])
observations = np.array([0, 1, 2, 1, 0], dtype=int)
obslik = np.array([emmat[:,z] for z in observations]).T
print viterbi_path(priors, transmat, obslik) #=> [0 1 1 1 0]
print viterbi_path(priors, transmat, obslik, scaled=False) #=> [0 1 1 1 0]
print viterbi_path(priors, transmat, obslik, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -7.776472586614755)
print viterbi_path(priors, transmat, obslik, scaled=False, ret_loglik=True) #=> (array([0, 1, 1, 1, 0]), -8.0120386579275227)
Tenga en cuenta que esta aplicación no utiliza probabilidades de emisión directamente, sino que utiliza una variable obslik
. Generalmente, emissions[i,j] := Pr(observed_state == j | hidden_state == i)
para un estado particular observado i
, haciendo emissions.shape == (num_hidden_states, num_obs_states)
.
Sin embargo, dada una secuencia observations[t] := observation at time t
, todo el algoritmo de Viterbi requiere la probabilidad de esa observación para cada estado oculto. Por lo tanto, obslik[i,t] := Pr(observations[t] | hidden_state == i)
. El valor real del estado observado no es necesario.
He modificado la respuesta de @Rubarb para la condición donde las probabilidades marginales ya son conocidas (por ejemplo, calculando el algoritmo Forward Backward).
def viterbi (transition_probabilities, conditional_probabilities):
# Initialise everything
num_samples = conditional_probabilities.shape[1]
num_states = transition_probabilities.shape[0] # number of states
c = np.zeros(num_samples) #scale factors (necessary to prevent underflow)
viterbi = np.zeros((num_states,num_samples)) # initialise viterbi table
best_path_table = np.zeros((num_states,num_samples)) # initialise the best path table
best_path = np.zeros(num_samples).astype(np.int32) # this will be your output
# B- appoint initial values for viterbi and best path (bp) tables - Eq (32a-32b)
viterbi[:,0] = conditional_probabilities[:,0]
c[0] = 1.0/np.sum(viterbi[:,0])
viterbi[:,0] = c[0] * viterbi[:,0] # apply the scaling factor
# C- Do the iterations for viterbi and psi for time>0 until T
for t in range(1, num_samples): # loop through time
for s in range (0,num_states): # loop through the states @(t-1)
trans_p = viterbi[:, t-1] * transition_probabilities[:,s] # transition probs of each state transitioning
best_path_table[s,t], viterbi[s,t] = max(enumerate(trans_p), key=operator.itemgetter(1))
viterbi[s,t] = viterbi[s,t] * conditional_probabilities[s][t]
c[t] = 1.0/np.sum(viterbi[:,t]) # scaling factor
viterbi[:,t] = c[t] * viterbi[:,t]
## D - Back-tracking
best_path[num_samples-1] = viterbi[:,num_samples-1].argmax() # last state
for t in range(num_samples-1,0,-1): # states of (last-1)th to 0th time step
best_path[t-1] = best_path_table[best_path[t],t]
return best_path
- 1. Implementación de Python del algoritmo OPTICS (Clustering)
- 2. Implementación del algoritmo de Dijkstra
- 3. Implementación de Python del algoritmo de alineación BLAST?
- 4. ¿Cuál es la diferencia entre el algoritmo de avance hacia atrás y el algoritmo de Viterbi?
- 5. Implementación del algoritmo C5?
- 6. AdaBoost ML algoritmo python implementación
- 7. Implementación máxima del algoritmo de rectángulo
- 8. Implementación del algoritmo Bentley-Ottmann
- 9. Implementación del algoritmo científico python en Amazon ec2
- 10. Implementación del algoritmo de ordenación más segura
- 11. Implementación del algoritmo de Luhn en Ruby
- 12. Implementación del algoritmo de triangulación Chazelle
- 13. Implementación de C# del algoritmo Bron-Kerbosch
- 14. decodificador Viterbi
- 15. Encontrando la implementación del algoritmo del árbol de intervalos C++
- 16. Implementación MySQL del algoritmo Ray-Casting?
- 17. Implementación del algoritmo Bentley-Ottmann existente
- 18. Implementación del algoritmo DPLL en Prolog
- 19. Implementación C# del 'Algoritmo de polilínea codificada' de Google
- 20. Implementación del algoritmo de inferencia de tipo Damas-Hindley-Milner
- 21. Implementación de Python de Jenkins Hash?
- 22. Implementación del Hacker News algoritmo de clasificación en SQL
- 23. ¿Alguien conoce una implementación del algoritmo de yarowsky?
- 24. Implementación del patrón de repositorio en Python?
- 25. implementación del árbol de sufijos en python
- 26. Algoritmo Python k-means
- 27. problema del vendedor ambulante, algoritmo 2-opt implementación C#
- 28. ¿Qué pasa con mi implementación del algoritmo KMP?
- 29. Implementación de Python del intervalo de puntuación de Wilson?
- 30. Python UPnP/IGD ¿Implementación del cliente?
Estoy un poco confundido por qué esto es más alto que la publicación NLTK, ¿es incorrecta su implementación? ¿Encontraste satisfactorio mi código completamente indocumentado? – placeybordeaux
Probablemente la razón por la que es más fácil piratear para adaptarse a las necesidades que el código NLTK. – chiffa
@placeybordeaux ¿Qué función tiene 'hmm.t (k, token)'? Traté de replicar el código pero no pude entender qué 'hmm.t (k, token)' hacer. ¿Puedes dar un ejemplo para eso? – Mohammed