Package nltk_lite :: Package tag :: Module hmm
[hide private]
[frames] | no frames]

Source Code for Module nltk_lite.tag.hmm

   1  # Natural Language Toolkit: Hidden Markov Model 
   2  # 
   3  # Copyright (C) 2001-2007 University of Pennsylvania 
   4  # Author: Trevor Cohn <tacohn@csse.unimelb.edu.au> 
   5  #         Philip Blunsom <pcbl@csse.unimelb.edu.au> 
   6  #         Tiago Tresoldi <tiago@tresoldi.pro.br> (fixes) 
   7  #         Steven Bird <sb@csse.unimelb.edu.au> (fixes) 
   8  # URL: <http://nltk.sf.net> 
   9  # For license information, see LICENSE.TXT 
  10  # 
  11  # $Id: hmm.py 4107 2007-02-01 00:07:42Z stevenbird $ 
  12   
  13  """ 
  14  Hidden Markov Models (HMMs) largely used to assign the correct label sequence 
  15  to sequential data or assess the probability of a given label and data 
  16  sequence. These models are finite state machines characterised by a number of 
  17  states, transitions between these states, and output symbols emitted while in 
  18  each state. The HMM is an extension to the Markov chain, where each state 
  19  corresponds deterministically to a given event. In the HMM the observation is 
  20  a probabilistic function of the state. HMMs share the Markov chain's 
  21  assumption, being that the probability of transition from one state to another 
  22  only depends on the current state - i.e. the series of states that led to the 
  23  current state are not used. They are also time invariant. 
  24   
  25  The HMM is a directed graph, with probability weighted edges (representing the 
  26  probability of a transition between the source and sink states) where each 
  27  vertex emits an output symbol when entered. The symbol (or observation) is 
  28  non-deterministically generated. For this reason, knowing that a sequence of 
  29  output observations was generated by a given HMM does not mean that the 
  30  corresponding sequence of states (and what the current state is) is known. 
  31  This is the 'hidden' in the hidden markov model. 
  32   
  33  Formally, a HMM can be characterised by: 
  34      - the output observation alphabet. This is the set of symbols which may be 
  35        observed as output of the system.  
  36      - the set of states.  
  37      - the transition probabilities M{a_{ij} = P(s_t = j | s_{t-1} = i)}. These 
  38        represent the probability of transition to each state from a given 
  39        state.  
  40      - the output probability matrix M{b_i(k) = P(X_t = o_k | s_t = i)}. These 
  41        represent the probability of observing each symbol in a given state. 
  42      - the initial state distribution. This gives the probability of starting 
  43        in each state. 
  44   
  45  To ground this discussion, take a common NLP application, part-of-speech (POS) 
  46  tagging. An HMM is desirable for this task as the highest probability tag 
  47  sequence can be calculated for a given sequence of word forms. This differs 
  48  from other tagging techniques which often tag each word individually, seeking 
  49  to optimise each individual tagging greedily without regard to the optimal 
  50  combination of tags for a larger unit, such as a sentence. The HMM does this 
  51  with the Viterbi algorithm, which efficiently computes the optimal path 
  52  through the graph given the sequence of words forms. 
  53   
  54  In POS tagging the states usually have a 1:1 correspondence with the tag 
  55  alphabet - i.e. each state represents a single tag. The output observation 
  56  alphabet is the set of word forms (the lexicon), and the remaining three 
  57  parameters are derived by a training regime. With this information the 
  58  probability of a given sentence can be easily derived, by simply summing the 
  59  probability of each distinct path through the model. Similarly, the highest 
  60  probability tagging sequence can be derived with the Viterbi algorithm, 
  61  yielding a state sequence which can be mapped into a tag sequence. 
  62   
  63  This discussion assumes that the HMM has been trained. This is probably the 
  64  most difficult task with the model, and requires either MLE estimates of the 
  65  parameters or unsupervised learning using the Baum-Welch algorithm, a variant 
  66  of EM. 
  67  """ 
  68   
  69  from nltk_lite.probability import * 
  70  from numpy import * 
  71  import re 
  72   
  73  # _NINF = float('-inf')  # won't work on Windows 
  74  _NINF = float('-1e300') 
  75   
  76  _TEXT = 0  # index of text in a tuple 
  77  _TAG = 1   # index of tag in a tuple 
  78   
79 -class HiddenMarkovModel(object):
80 """ 81 Hidden Markov model class, a generative model for labelling sequence data. 82 These models define the joint probability of a sequence of symbols and 83 their labels (state transitions) as the product of the starting state 84 probability, the probability of each state transition, and the probability 85 of each observation being generated from each state. This is described in 86 more detail in the module documentation. 87 88 This implementation is based on the HMM description in Chapter 8, Huang, 89 Acero and Hon, Spoken Language Processing. 90 """
91 - def __init__(self, symbols, states, transitions, outputs, priors):
92 """ 93 Creates a hidden markov model parametised by the the states, 94 transition probabilities, output probabilities and priors. 95 96 @param symbols: the set of output symbols (alphabet) 97 @type symbols: (seq) of any 98 @param states: a set of states representing state space 99 @type states: seq of any 100 @param transitions: transition probabilities; Pr(s_i | s_j) 101 is the probability of transition from state i 102 given the model is in state_j 103 @type transitions: C{ConditionalProbDistI} 104 @param outputs: output probabilities; Pr(o_k | s_i) is the 105 probability of emitting symbol k when entering 106 state i 107 @type outputs: C{ConditionalProbDistI} 108 @param priors: initial state distribution; Pr(s_i) is the 109 probability of starting in state i 110 @type priors: C{ProbDistI} 111 """ 112 113 self._states = states 114 self._transitions = transitions 115 self._symbols = symbols 116 self._outputs = outputs 117 self._priors = priors 118 self._cache = None
119
120 - def probability(self, sequence):
121 """ 122 Returns the probability of the given symbol sequence. If the sequence 123 is labelled, then returns the joint probability of the symbol, state 124 sequence. Otherwise, uses the forward algorithm to find the 125 probability over all label sequences. 126 127 @return: the probability of the sequence 128 @rtype: float 129 @param sequence: the sequence of symbols which must contain the TEXT 130 property, and optionally the TAG property 131 @type sequence: Token 132 """ 133 return exp(self.log_probability(sequence))
134
135 - def log_probability(self, sequence):
136 """ 137 Returns the log-probability of the given symbol sequence. If the 138 sequence is labelled, then returns the joint log-probability of the 139 symbol, state sequence. Otherwise, uses the forward algorithm to find 140 the log-probability over all label sequences. 141 142 @return: the log-probability of the sequence 143 @rtype: float 144 @param sequence: the sequence of symbols which must contain the TEXT 145 property, and optionally the TAG property 146 @type sequence: Token 147 """ 148 149 T = len(sequence) 150 N = len(self._states) 151 152 if T > 0 and sequence[0][_TAG]: 153 last_state = sequence[0][_TAG] 154 p = self._priors.logprob(last_state) + \ 155 self._outputs[last_state].logprob(sequence[0][_TEXT]) 156 for t in range(1, T): 157 state = sequence[t][_TAG] 158 p += self._transitions[last_state].logprob(state) + \ 159 self._outputs[state].logprob(sequence[t][_TEXT]) 160 last_state = state 161 return p 162 else: 163 alpha = self._forward_probability(sequence) 164 p = _log_add(*alpha[T-1, :]) 165 return p
166
167 - def tag(self, unlabeled_sequence):
168 """ 169 Tags the sequence with the highest probability state sequence. This 170 uses the best_path method to find the Viterbi path. 171 172 @return: a labelled sequence of symbols 173 @rtype: list 174 @param unlabeled_sequence: the sequence of unlabeled symbols 175 @type unlabeled_sequence: list 176 """ 177 178 path = self.best_path(unlabeled_sequence) 179 for i in range(len(path)): 180 unlabeled_sequence[i] = (unlabeled_sequence[i][_TEXT], path[i]) 181 return unlabeled_sequence
182
183 - def _output_logprob(self, state, symbol):
184 """ 185 @return: the log probability of the symbol being observed in the given 186 state 187 @rtype: float 188 """ 189 return self._outputs[state].logprob(symbol)
190
191 - def _create_cache(self):
192 if not self._cache: 193 N = len(self._states) 194 M = len(self._symbols) 195 P = zeros(N, float32) 196 X = zeros((N, N), float32) 197 O = zeros((N, M), float32) 198 for i in range(N): 199 si = self._states[i] 200 P[i] = self._priors.logprob(si) 201 for j in range(N): 202 X[i, j] = self._transitions[si].logprob(self._states[j]) 203 for k in range(M): 204 O[i, k] = self._outputs[si].logprob(self._symbols[k]) 205 S = {} 206 for k in range(M): 207 S[self._symbols[k]] = k 208 self._cache = (P, O, X, S)
209
210 - def best_path(self, unlabeled_sequence):
211 """ 212 Returns the state sequence of the optimal (most probable) path through 213 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 214 programming. 215 216 @return: the state sequence 217 @rtype: sequence of any 218 @param unlabeled_sequence: the sequence of unlabeled symbols 219 @type unlabeled_sequence: list 220 """ 221 222 symbols = [token[_TEXT] for token in unlabeled_sequence] 223 T = len(symbols) 224 N = len(self._states) 225 self._create_cache() 226 P, O, X, S = self._cache 227 228 V = zeros((T, N), float32) 229 B = ones((T, N), int) * -1 230 231 V[0] = P + O[:, S[symbols[0]]] 232 for t in range(1, T): 233 for j in range(N): 234 vs = V[t-1, :] + X[:, j] 235 best = argmax(vs) 236 V[t, j] = vs[best] + O[j, S[symbols[t]]] 237 B[t, j] = best 238 239 current = argmax(V[T-1,:]) 240 sequence = [current] 241 for t in range(T-1, 0, -1): 242 last = B[t, current] 243 sequence.append(last) 244 current = last 245 246 sequence.reverse() 247 return map(self._states.__getitem__, sequence)
248
249 - def best_path_simple(self, unlabeled_sequence):
250 """ 251 Returns the state sequence of the optimal (most probable) path through 252 the HMM. Uses the Viterbi algorithm to calculate this part by dynamic 253 programming. This uses a simple, direct method, and is included for 254 teaching purposes. 255 256 @return: the state sequence 257 @rtype: sequence of any 258 @param unlabeled_sequence: the sequence of unlabeled symbols 259 @type unlabeled_sequence: list 260 """ 261 262 T = len(unlabeled_sequence) 263 N = len(self._states) 264 V = zeros((T, N), float64) 265 B = {} 266 267 # find the starting log probabilities for each state 268 symbol = unlabeled_sequence[0][_TEXT] 269 for i, state in enumerate(self._states): 270 V[0, i] = self._priors.logprob(state) + \ 271 self._output_logprob(state, symbol) 272 B[0, state] = None 273 274 # find the maximum log probabilities for reaching each state at time t 275 for t in range(1, T): 276 symbol = unlabeled_sequence[t][_TEXT] 277 for j in range(N): 278 sj = self._states[j] 279 best = None 280 for i in range(N): 281 si = self._states[i] 282 va = V[t-1, i] + self._transitions[si].logprob(sj) 283 if not best or va > best[0]: 284 best = (va, si) 285 V[t, j] = best[0] + self._output_logprob(sj, symbol) 286 B[t, sj] = best[1] 287 288 # find the highest probability final state 289 best = None 290 for i in range(N): 291 val = V[T-1, i] 292 if not best or val > best[0]: 293 best = (val, self._states[i]) 294 295 # traverse the back-pointers B to find the state sequence 296 current = best[1] 297 sequence = [current] 298 for t in range(T-1, 0, -1): 299 last = B[t, current] 300 sequence.append(last) 301 current = last 302 303 sequence.reverse() 304 return sequence
305
306 - def random_sample(self, rng, length):
307 """ 308 Randomly sample the HMM to generate a sentence of a given length. This 309 samples the prior distribution then the observation distribution and 310 transition distribution for each subsequent observation and state. 311 This will mostly generate unintelligible garbage, but can provide some 312 amusement. 313 314 @return: the randomly created state/observation sequence, 315 generated according to the HMM's probability 316 distributions. The SUBTOKENS have TEXT and TAG 317 properties containing the observation and state 318 respectively. 319 @rtype: list 320 @param rng: random number generator 321 @type rng: Random (or any object with a random() method) 322 @param length: desired output length 323 @type length: int 324 """ 325 326 # sample the starting state and symbol prob dists 327 tokens = [] 328 state = self._sample_probdist(self._priors, rng.random(), self._states) 329 symbol = self._sample_probdist(self._outputs[state], 330 rng.random(), self._symbols) 331 tokens.append((symbol, state)) 332 333 for i in range(1, length): 334 # sample the state transition and symbol prob dists 335 state = self._sample_probdist(self._transitions[state], 336 rng.random(), self._states) 337 symbol = self._sample_probdist(self._outputs[state], 338 rng.random(), self._symbols) 339 tokens.append((symbol, state)) 340 341 return tokens
342
343 - def _sample_probdist(self, probdist, p, samples):
344 cum_p = 0 345 for sample in samples: 346 add_p = probdist.prob(sample) 347 if cum_p <= p <= cum_p + add_p: 348 return sample 349 cum_p += add_p 350 raise Exception('Invalid probability distribution - does not sum to one')
351
352 - def entropy(self, unlabeled_sequence):
353 """ 354 Returns the entropy over labellings of the given sequence. This is 355 given by: 356 357 H(O) = - sum_S Pr(S | O) log Pr(S | O) 358 359 where the summation ranges over all state sequences, S. Let M{Z = 360 Pr(O) = sum_S Pr(S, O)} where the summation ranges over all state 361 sequences and O is the observation sequence. As such the entropy can 362 be re-expressed as: 363 364 H = - sum_S Pr(S | O) log [ Pr(S, O) / Z ] 365 = log Z - sum_S Pr(S | O) log Pr(S, 0) 366 = log Z - sum_S Pr(S | O) [ log Pr(S_0) + sum_t Pr(S_t | S_{t-1}) 367 + sum_t Pr(O_t | S_t) ] 368 369 The order of summation for the log terms can be flipped, allowing 370 dynamic programming to be used to calculate the entropy. Specifically, 371 we use the forward and backward probabilities (alpha, beta) giving: 372 373 H = log Z - sum_s0 alpha_0(s0) beta_0(s0) / Z * log Pr(s0) 374 + sum_t,si,sj alpha_t(si) Pr(sj | si) Pr(O_t+1 | sj) beta_t(sj) 375 / Z * log Pr(sj | si) 376 + sum_t,st alpha_t(st) beta_t(st) / Z * log Pr(O_t | st) 377 378 This simply uses alpha and beta to find the probabilities of partial 379 sequences, constrained to include the given state(s) at some point in 380 time. 381 """ 382 383 T = len(unlabeled_sequence) 384 N = len(self._states) 385 386 alpha = self._forward_probability(unlabeled_sequence) 387 beta = self._backward_probability(unlabeled_sequence) 388 normalisation = _log_add(*alpha[T-1, :]) 389 390 entropy = normalisation 391 392 # starting state, t = 0 393 for i, state in enumerate(self._states): 394 p = exp(alpha[0, i] + beta[0, i] - normalisation) 395 entropy -= p * self._priors.logprob(state) 396 #print 'p(s_0 = %s) =' % state, p 397 398 # state transitions 399 for t0 in range(T - 1): 400 t1 = t0 + 1 401 for i0, s0 in enumerate(self._states): 402 for i1, s1 in enumerate(self._states): 403 p = exp(alpha[t0, i0] + self._transitions[s0].logprob(s1) + 404 self._outputs[s1].logprob(unlabeled_sequence[t1][_TEXT]) + 405 beta[t1, i1] - normalisation) 406 entropy -= p * self._transitions[s0].logprob(s1) 407 #print 'p(s_%d = %s, s_%d = %s) =' % (t0, s0, t1, s1), p 408 409 # symbol emissions 410 for t in range(T): 411 for i, state in enumerate(self._states): 412 p = exp(alpha[t, i] + beta[t, i] - normalisation) 413 entropy -= p * self._outputs[state].logprob(unlabeled_sequence[t][_TEXT]) 414 #print 'p(s_%d = %s) =' % (t, state), p 415 416 return entropy
417
418 - def point_entropy(self, unlabeled_sequence):
419 """ 420 Returns the pointwise entropy over the possible states at each 421 position in the chain, given the observation sequence. 422 """ 423 424 T = len(unlabeled_sequence) 425 N = len(self._states) 426 427 alpha = self._forward_probability(unlabeled_sequence) 428 beta = self._backward_probability(unlabeled_sequence) 429 normalisation = _log_add(*alpha[T-1, :]) 430 431 entropies = zeros(T, float64) 432 probs = zeros(N, float64) 433 for t in range(T): 434 for s in range(N): 435 probs[s] = alpha[t, s] + beta[t, s] - normalisation 436 437 for s in range(N): 438 entropies[t] -= exp(probs[s]) * probs[s] 439 440 return entropies
441
442 - def _exhaustive_entropy(self, unlabeled_sequence):
443 T = len(unlabeled_sequence) 444 N = len(self._states) 445 446 labellings = [[state] for state in self._states] 447 for t in range(T - 1): 448 current = labellings 449 labellings = [] 450 for labelling in current: 451 for state in self._states: 452 labellings.append(labelling + [state]) 453 454 log_probs = [] 455 for labelling in labellings: 456 labelled_sequence = unlabeled_sequence[:] 457 for t, label in enumerate(labelling): 458 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 459 lp = self.log_probability(labelled_sequence) 460 log_probs.append(lp) 461 normalisation = _log_add(*log_probs) 462 463 #ps = zeros((T, N), float64) 464 #for labelling, lp in zip(labellings, log_probs): 465 #for t in range(T): 466 #ps[t, self._states.index(labelling[t])] += exp(lp - normalisation) 467 468 #for t in range(T): 469 #print 'prob[%d] =' % t, ps[t] 470 471 entropy = 0 472 for lp in log_probs: 473 lp -= normalisation 474 entropy -= exp(lp) * lp 475 476 return entropy
477
478 - def _exhaustive_point_entropy(self, unlabeled_sequence):
479 T = len(unlabeled_sequence) 480 N = len(self._states) 481 482 labellings = [[state] for state in self._states] 483 for t in range(T - 1): 484 current = labellings 485 labellings = [] 486 for labelling in current: 487 for state in self._states: 488 labellings.append(labelling + [state]) 489 490 log_probs = [] 491 for labelling in labellings: 492 labelled_sequence = unlabeled_sequence[:] 493 for t, label in enumerate(labelling): 494 labelled_sequence[t] = (labelled_sequence[t][_TEXT], label) 495 lp = self.log_probability(labelled_sequence) 496 log_probs.append(lp) 497 498 normalisation = _log_add(*log_probs) 499 500 probabilities = zeros((T, N), float64) 501 probabilities[:] = _NINF 502 for labelling, lp in zip(labellings, log_probs): 503 lp -= normalisation 504 for t, label in enumerate(labelling): 505 index = self._states.index(label) 506 probabilities[t, index] = _log_add(probabilities[t, index], lp) 507 508 entropies = zeros(T, float64) 509 for t in range(T): 510 for s in range(N): 511 entropies[t] -= exp(probabilities[t, s]) * probabilities[t, s] 512 513 return entropies
514
515 - def _forward_probability(self, unlabeled_sequence):
516 """ 517 Return the forward probability matrix, a T by N array of 518 log-probabilities, where T is the length of the sequence and N is the 519 number of states. Each entry (t, s) gives the probability of being in 520 state s at time t after observing the partial symbol sequence up to 521 and including t. 522 523 @param unlabeled_sequence: the sequence of unlabeled symbols 524 @type unlabeled_sequence: list 525 @return: the forward log probability matrix 526 @rtype: array 527 """ 528 T = len(unlabeled_sequence) 529 N = len(self._states) 530 alpha = zeros((T, N), float64) 531 532 symbol = unlabeled_sequence[0][_TEXT] 533 for i, state in enumerate(self._states): 534 alpha[0, i] = self._priors.logprob(state) + \ 535 self._outputs[state].logprob(symbol) 536 for t in range(1, T): 537 symbol = unlabeled_sequence[t][_TEXT] 538 for i, si in enumerate(self._states): 539 alpha[t, i] = _NINF 540 for j, sj in enumerate(self._states): 541 alpha[t, i] = _log_add(alpha[t, i], alpha[t-1, j] + 542 self._transitions[sj].logprob(si)) 543 alpha[t, i] += self._outputs[si].logprob(symbol) 544 545 return alpha
546
547 - def _backward_probability(self, unlabeled_sequence):
548 """ 549 Return the backward probability matrix, a T by N array of 550 log-probabilities, where T is the length of the sequence and N is the 551 number of states. Each entry (t, s) gives the probability of being in 552 state s at time t after observing the partial symbol sequence from t 553 .. T. 554 555 @return: the backward log probability matrix 556 @rtype: array 557 @param unlabeled_sequence: the sequence of unlabeled symbols 558 @type unlabeled_sequence: list 559 """ 560 T = len(unlabeled_sequence) 561 N = len(self._states) 562 beta = zeros((T, N), float64) 563 564 # initialise the backward values 565 beta[T-1, :] = log(1) 566 567 # inductively calculate remaining backward values 568 for t in range(T-2, -1, -1): 569 symbol = unlabeled_sequence[t+1][_TEXT] 570 for i, si in enumerate(self._states): 571 beta[t, i] = _NINF 572 for j, sj in enumerate(self._states): 573 beta[t, i] = _log_add(beta[t, i], 574 self._transitions[si].logprob(sj) + 575 self._outputs[sj].logprob(symbol) + 576 beta[t + 1, j]) 577 578 return beta
579
580 - def __repr__(self):
581 return '<HiddenMarkovModel %d states and %d output symbols>' \ 582 % (len(self._states), len(self._symbols))
583
584 -class HiddenMarkovModelTrainer(object):
585 """ 586 Algorithms for learning HMM parameters from training data. These include 587 both supervised learning (MLE) and unsupervised learning (Baum-Welch). 588 """
589 - def __init__(self, states=None, symbols=None):
590 """ 591 Creates an HMM trainer to induce an HMM with the given states and 592 output symbol alphabet. A supervised and unsupervised training 593 method may be used. If either of the states or symbols are not given, 594 these may be derived from supervised training. 595 596 @param states: the set of state labels 597 @type states: sequence of any 598 @param symbols: the set of observation symbols 599 @type symbols: sequence of any 600 """ 601 if states: 602 self._states = states 603 else: 604 self._states = [] 605 if symbols: 606 self._symbols = symbols 607 else: 608 self._symbols = []
609
610 - def train(self, labelled_sequences=None, unlabeled_sequences=None, 611 **kwargs):
612 """ 613 Trains the HMM using both (or either of) supervised and unsupervised 614 techniques. 615 616 @return: the trained model 617 @rtype: HiddenMarkovModel 618 @param labelled_sequences: the supervised training data, a set of 619 labelled sequences of observations 620 @type labelled_sequences: list 621 @param unlabeled_sequences: the unsupervised training data, a set of 622 sequences of observations 623 @type unlabeled_sequences: list 624 @param kwargs: additional arguments to pass to the training methods 625 """ 626 assert labelled_sequences or unlabeled_sequences 627 model = None 628 if labelled_sequences: 629 model = self.train_supervised(labelled_sequences, **kwargs) 630 if unlabeled_sequences: 631 if model: kwargs['model'] = model 632 model = self.train_unsupervised(unlabeled_sequences, **kwargs) 633 return model
634
635 - def train_unsupervised(self, unlabeled_sequences, **kwargs):
636 """ 637 Trains the HMM using the Baum-Welch algorithm to maximise the 638 probability of the data sequence. This is a variant of the EM 639 algorithm, and is unsupervised in that it doesn't need the state 640 sequences for the symbols. The code is based on 'A Tutorial on Hidden 641 Markov Models and Selected Applications in Speech Recognition', 642 Lawrence Rabiner, IEEE, 1989. 643 644 @return: the trained model 645 @rtype: HiddenMarkovModel 646 @param unlabeled_sequences: the training data, a set of 647 sequences of observations 648 @type unlabeled_sequences: list 649 @param kwargs: may include the following parameters:: 650 model - a HiddenMarkovModel instance used to begin the Baum-Welch 651 algorithm 652 max_iterations - the maximum number of EM iterations 653 convergence_logprob - the maximum change in log probability to 654 allow convergence 655 """ 656 657 N = len(self._states) 658 M = len(self._symbols) 659 symbol_dict = dict((self._symbols[i], i) for i in range(M)) 660 661 # create a uniform HMM, which will be iteratively refined, unless 662 # given an existing model 663 model = kwargs.get('model') 664 if not model: 665 priors = UniformProbDist(self._states) 666 transitions = DictionaryConditionalProbDist( 667 dict((state, UniformProbDist(self._states)) 668 for state in self._states)) 669 output = DictionaryConditionalProbDist( 670 dict((state, UniformProbDist(self._symbols)) 671 for state in self._states)) 672 model = HiddenMarkovModel(self._symbols, self._states, 673 transitions, output, priors) 674 675 # update model prob dists so that they can be modified 676 model._priors = MutableProbDist(model._priors, self._states) 677 model._transitions = DictionaryConditionalProbDist( 678 dict((s, MutableProbDist(model._transitions[s], self._states)) 679 for s in self._states)) 680 model._outputs = DictionaryConditionalProbDist( 681 dict((s, MutableProbDist(model._outputs[s], self._symbols)) 682 for s in self._states)) 683 684 # iterate until convergence 685 converged = False 686 last_logprob = None 687 iteration = 0 688 max_iterations = kwargs.get('max_iterations', 1000) 689 epsilon = kwargs.get('convergence_logprob', 1e-6) 690 while not converged and iteration < max_iterations: 691 A_numer = ones((N, N), float64) * _NINF 692 B_numer = ones((N, M), float64) * _NINF 693 A_denom = ones(N, float64) * _NINF 694 B_denom = ones(N, float64) * _NINF 695 696 logprob = 0 697 for sequence in unlabeled_sequences: 698 sequence = list(sequence) 699 if not sequence: 700 continue 701 702 # compute forward and backward probabilities 703 alpha = model._forward_probability(sequence) 704 beta = model._backward_probability(sequence) 705 706 # find the log probability of the sequence 707 T = len(sequence) 708 lpk = _log_add(*alpha[T-1, :]) 709 logprob += lpk 710 711 # now update A and B (transition and output probabilities) 712 # using the alpha and beta values. Please refer to Rabiner's 713 # paper for details, it's too hard to explain in comments 714 local_A_numer = ones((N, N), float64) * _NINF 715 local_B_numer = ones((N, M), float64) * _NINF 716 local_A_denom = ones(N, float64) * _NINF 717 local_B_denom = ones(N, float64) * _NINF 718 719 # for each position, accumulate sums for A and B 720 for t in range(T): 721 x = sequence[t][_TEXT] #not found? FIXME 722 if t < T - 1: 723 xnext = sequence[t+1][_TEXT] #not found? FIXME 724 xi = symbol_dict[x] 725 for i in range(N): 726 si = self._states[i] 727 if t < T - 1: 728 for j in range(N): 729 sj = self._states[j] 730 local_A_numer[i, j] = \ 731 _log_add(local_A_numer[i, j], 732 alpha[t, i] + 733 model._transitions[si].logprob(sj) + 734 model._outputs[sj].logprob(xnext) + 735 beta[t+1, j]) 736 local_A_denom[i] = _log_add(local_A_denom[i], 737 alpha[t, i] + beta[t, i]) 738 else: 739 local_B_denom[i] = _log_add(local_A_denom[i], 740 alpha[t, i] + beta[t, i]) 741 742 local_B_numer[i, xi] = _log_add(local_B_numer[i, xi], 743 alpha[t, i] + beta[t, i]) 744 745 # add these sums to the global A and B values 746 for i in range(N): 747 for j in range(N): 748 A_numer[i, j] = _log_add(A_numer[i, j], 749 local_A_numer[i, j] - lpk) 750 for k in range(M): 751 B_numer[i, k] = _log_add(B_numer[i, k], 752 local_B_numer[i, k] - lpk) 753 754 A_denom[i] = _log_add(A_denom[i], local_A_denom[i] - lpk) 755 B_denom[i] = _log_add(B_denom[i], local_B_denom[i] - lpk) 756 757 # use the calculated values to update the transition and output 758 # probability values 759 for i in range(N): 760 si = self._states[i] 761 for j in range(N): 762 sj = self._states[j] 763 model._transitions[si].update(sj, A_numer[i,j] - A_denom[i]) 764 for k in range(M): 765 ok = self._symbols[k] 766 model._outputs[si].update(ok, B_numer[i,k] - B_denom[i]) 767 # Rabiner says the priors don't need to be updated. I don't 768 # believe him. FIXME 769 770 # test for convergence 771 if iteration > 0 and abs(logprob - last_logprob) < epsilon: 772 converged = True 773 774 print 'iteration', iteration, 'logprob', logprob 775 iteration += 1 776 last_logprob = logprob 777 778 return model
779
780 - def train_supervised(self, labelled_sequences, **kwargs):
781 """ 782 Supervised training maximising the joint probability of the symbol and 783 state sequences. This is done via collecting frequencies of 784 transitions between states, symbol observations while within each 785 state and which states start a sentence. These frequency distributions 786 are then normalised into probability estimates, which can be 787 smoothed if desired. 788 789 @return: the trained model 790 @rtype: HiddenMarkovModel 791 @param labelled_sequences: the training data, a set of 792 labelled sequences of observations 793 @type labelled_sequences: list 794 @param kwargs: may include an 'estimator' parameter, a function taking 795 a C{FreqDist} and a number of bins and returning a C{ProbDistI}; 796 otherwise a MLE estimate is used 797 """ 798 799 # default to the MLE estimate 800 estimator = kwargs.get('estimator') 801 if estimator == None: 802 estimator = lambda fdist, bins: MLEProbDist(fdist) 803 804 # count occurences of starting states, transitions out of each state 805 # and output symbols observed in each state 806 starting = FreqDist() 807 transitions = ConditionalFreqDist() 808 outputs = ConditionalFreqDist() 809 for sequence in labelled_sequences: 810 lasts = None 811 for token in sequence: 812 state = token[_TAG] 813 symbol = token[_TEXT] 814 if lasts == None: 815 starting.inc(state) 816 else: 817 transitions[lasts].inc(state) 818 outputs[state].inc(symbol) 819 lasts = state 820 821 # update the state and symbol lists 822 if state not in self._states: 823 self._states.append(state) 824 if symbol not in self._symbols: 825 self._symbols.append(symbol) 826 827 # create probability distributions (with smoothing) 828 N = len(self._states) 829 pi = estimator(starting, N) 830 A = ConditionalProbDist(transitions, estimator, False, N) 831 B = ConditionalProbDist(outputs, estimator, False, len(self._symbols)) 832 833 return HiddenMarkovModel(self._symbols, self._states, A, B, pi)
834
835 -def _log_add(*values):
836 """ 837 Adds the logged values, returning the logarithm of the addition. 838 """ 839 x = max(values) 840 if x > _NINF: 841 sum_diffs = 0 842 for value in values: 843 sum_diffs += exp(value - x) 844 return x + log(sum_diffs) 845 else: 846 return x
847
848 -def demo():
849 # demonstrates HMM probability calculation 850 851 print 852 print "HMM probability calculation demo" 853 print 854 855 # example taken from page 381, Huang et al 856 symbols = ['up', 'down', 'unchanged'] 857 states = ['bull', 'bear', 'static'] 858 859 def pd(values, samples): 860 d = {} 861 for value, item in zip(values, samples): 862 d[item] = value 863 return DictionaryProbDist(d)
864 865 def cpd(array, conditions, samples): 866 d = {} 867 for values, condition in zip(array, conditions): 868 d[condition] = pd(values, samples) 869 return DictionaryConditionalProbDist(d) 870 871 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 872 A = cpd(A, states, states) 873 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 874 B = cpd(B, states, symbols) 875 pi = array([0.5, 0.2, 0.3], float64) 876 pi = pd(pi, states) 877 878 model = HiddenMarkovModel(symbols=symbols, states=states, 879 transitions=A, outputs=B, priors=pi) 880 881 print 'Testing', model 882 883 for test in [['up', 'up'], ['up', 'down', 'up'], 884 ['down'] * 5, ['unchanged'] * 5 + ['up']]: 885 886 sequence = [(t, None) for t in test] 887 888 print 'Testing with state sequence', test 889 print 'probability =', model.probability(sequence) 890 print 'tagging = ', model.tag(sequence) 891 print 'p(tagged) = ', model.probability(sequence) 892 print 'H = ', model.entropy(sequence) 893 print 'H_exh = ', model._exhaustive_entropy(sequence) 894 print 'H(point) = ', model.point_entropy(sequence) 895 print 'H_exh(point)=', model._exhaustive_point_entropy(sequence) 896 print 897 898
899 -def load_pos(num_sents):
900 from nltk_lite.corpora import brown 901 from itertools import islice 902 903 sentences = list(islice(brown.tagged(), num_sents)) 904 905 tag_set = ["'", "''", '(', ')', '*', ',', '.', ':', '--', '``', 'abl', 906 'abn', 'abx', 'ap', 'ap$', 'at', 'be', 'bed', 'bedz', 'beg', 'bem', 907 'ben', 'ber', 'bez', 'cc', 'cd', 'cd$', 'cs', 'do', 'dod', 'doz', 908 'dt', 'dt$', 'dti', 'dts', 'dtx', 'ex', 'fw', 'hv', 'hvd', 'hvg', 909 'hvn', 'hvz', 'in', 'jj', 'jjr', 'jjs', 'jjt', 'md', 'nn', 'nn$', 910 'nns', 'nns$', 'np', 'np$', 'nps', 'nps$', 'nr', 'nr$', 'od', 'pn', 911 'pn$', 'pp$', 'ppl', 'ppls', 'ppo', 'pps', 'ppss', 'ql', 'qlp', 'rb', 912 'rb$', 'rbr', 'rbt', 'rp', 'to', 'uh', 'vb', 'vbd', 'vbg', 'vbn', 913 'vbz', 'wdt', 'wp$', 'wpo', 'wps', 'wql', 'wrb'] 914 915 sequences = [] 916 sequence = [] 917 symbols = set() 918 start_re = re.compile(r'[^-*+]*') 919 for sentence in sentences: 920 for i in range(len(sentence)): 921 word, tag = sentence[i] 922 word = word.lower() # normalize 923 symbols.add(word) # log this word 924 m = start_re.match(tag) 925 # cleanup the tag 926 tag = m.group(0) 927 if tag not in tag_set: 928 tag = '*' 929 sentence[i] = (word, tag) # store cleaned-up tagged token 930 931 return sentences, tag_set, list(symbols)
932
933 -def test_pos(model, sentences, display=False):
934 from sys import stdout 935 936 count = correct = 0 937 for sentence in sentences: 938 orig_tags = [token[_TAG] for token in sentence] 939 sentence = [(token[_TEXT], None) for token in sentence] 940 new_tags = model.best_path(sentence) 941 if display: 942 print 'Untagged:' 943 print sentence 944 print 'HMM-tagged:' 945 print new_tags 946 print 'Entropy:' 947 print model.entropy(sentence) 948 print '-' * 60 949 else: 950 print '\b.', 951 stdout.flush() 952 for orig_tag, new_tag in zip(orig_tags, new_tags): 953 count += 1 954 if orig_tag == new_tag: 955 correct += 1 956 957 print 'accuracy over', count, 'tokens %.1f' % (100.0 * correct / count)
958
959 -def demo_pos():
960 # demonstrates POS tagging using supervised training 961 962 print 963 print "HMM POS tagging demo" 964 print 965 966 print 'Training HMM...' 967 labelled_sequences, tag_set, symbols = load_pos(200) 968 trainer = HiddenMarkovModelTrainer(tag_set, symbols) 969 hmm = trainer.train_supervised(labelled_sequences[10:], 970 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 971 972 print 'Testing...' 973 test_pos(hmm, labelled_sequences[:10], True)
974
975 -def _untag(sentences):
976 unlabeled = [] 977 for sentence in sentences: 978 unlabeled.append((token[_TEXT], None) for token in sentence) 979 return unlabeled
980
981 -def demo_pos_bw():
982 # demonstrates the Baum-Welch algorithm in POS tagging 983 984 print 985 print "Baum-Welch demo for POS tagging" 986 print 987 988 print 'Training HMM (supervised)...' 989 sentences, tag_set, symbols = load_pos(210) 990 symbols = set() 991 for sentence in sentences: 992 for token in sentence: 993 symbols.add(token[_TEXT]) 994 995 trainer = HiddenMarkovModelTrainer(tag_set, list(symbols)) 996 hmm = trainer.train_supervised(sentences[10:200], 997 estimator=lambda fd, bins: LidstoneProbDist(fd, 0.1, bins)) 998 print 'Training (unsupervised)...' 999 # it's rather slow - so only use 10 samples 1000 unlabeled = _untag(sentences[200:210]) 1001 hmm = trainer.train_unsupervised(unlabeled, model=hmm, max_iterations=5) 1002 test_pos(hmm, sentences[:10], True)
1003
1004 -def demo_bw():
1005 # demo Baum Welch by generating some sequences and then performing 1006 # unsupervised training on them 1007 1008 print 1009 print "Baum-Welch demo for market example" 1010 print 1011 1012 # example taken from page 381, Huang et al 1013 symbols = ['up', 'down', 'unchanged'] 1014 states = ['bull', 'bear', 'static'] 1015 1016 def pd(values, samples): 1017 d = {} 1018 for value, item in zip(values, samples): 1019 d[item] = value 1020 return DictionaryProbDist(d)
1021 1022 def cpd(array, conditions, samples): 1023 d = {} 1024 for values, condition in zip(array, conditions): 1025 d[condition] = pd(values, samples) 1026 return DictionaryConditionalProbDist(d) 1027 1028 A = array([[0.6, 0.2, 0.2], [0.5, 0.3, 0.2], [0.4, 0.1, 0.5]], float64) 1029 A = cpd(A, states, states) 1030 B = array([[0.7, 0.1, 0.2], [0.1, 0.6, 0.3], [0.3, 0.3, 0.4]], float64) 1031 B = cpd(B, states, symbols) 1032 pi = array([0.5, 0.2, 0.3], float64) 1033 pi = pd(pi, states) 1034 1035 model = HiddenMarkovModel(symbols=symbols, states=states, 1036 transitions=A, outputs=B, priors=pi) 1037 1038 # generate some random sequences 1039 training = [] 1040 import random 1041 rng = random.Random() 1042 for i in range(10): 1043 item = model.random_sample(rng, 5) 1044 training.append((i[0], None) for i in item) 1045 1046 # train on those examples, starting with the model that generated them 1047 trainer = HiddenMarkovModelTrainer(states, symbols) 1048 hmm = trainer.train_unsupervised(training, model=model, max_iterations=1000) 1049 1050 if __name__ == '__main__': 1051 demo() 1052 demo_pos() 1053 demo_pos_bw() 1054 # demo_bw() 1055