|  | 
|  | 1 | +# encoding=utf8 | 
|  | 2 | + | 
|  | 3 | +import numpy as np | 
|  | 4 | +import csv | 
|  | 5 | + | 
|  | 6 | +class HMM(object): | 
|  | 7 | +    def __init__(self,N,M): | 
|  | 8 | +        self.A = np.zeros((N,N))        # 状态转移概率矩阵 | 
|  | 9 | +        self.B = np.zeros((N,M))        # 观测概率矩阵 | 
|  | 10 | +        self.Pi = np.array([1.0/N]*N)   # 初始状态概率矩阵 | 
|  | 11 | + | 
|  | 12 | +        self.N = N                      # 可能的状态数 | 
|  | 13 | +        self.M = M                      # 可能的观测数 | 
|  | 14 | + | 
|  | 15 | +    def cal_probality(self, O): | 
|  | 16 | +        self.T = len(O) | 
|  | 17 | +        self.O = O | 
|  | 18 | + | 
|  | 19 | +        self.forward() | 
|  | 20 | +        return sum(self.alpha[self.T-1]) | 
|  | 21 | + | 
|  | 22 | +    def forward(self): | 
|  | 23 | +        """ | 
|  | 24 | +        前向算法 | 
|  | 25 | +        """ | 
|  | 26 | +        self.alpha = np.zeros((self.T,self.N)) | 
|  | 27 | + | 
|  | 28 | +        # 公式 10.15 | 
|  | 29 | +        for i in range(self.N): | 
|  | 30 | +            self.alpha[0][i] = self.Pi[i]*self.B[i][self.O[0]] | 
|  | 31 | + | 
|  | 32 | +        # 公式10.16 | 
|  | 33 | +        for t in range(1,self.T): | 
|  | 34 | +            for i in range(self.N): | 
|  | 35 | +                sum = 0 | 
|  | 36 | +                for j in range(self.N): | 
|  | 37 | +                    sum += self.alpha[t-1][j]*self.A[j][i] | 
|  | 38 | +                self.alpha[t][i] = sum * self.B[i][self.O[t]] | 
|  | 39 | + | 
|  | 40 | +    def backward(self): | 
|  | 41 | +        """ | 
|  | 42 | +        后向算法 | 
|  | 43 | +        """ | 
|  | 44 | +        self.beta = np.zeros((self.T,self.N)) | 
|  | 45 | + | 
|  | 46 | +        # 公式10.19 | 
|  | 47 | +        for i in range(self.N): | 
|  | 48 | +            self.beta[self.T-1][i] = 1 | 
|  | 49 | + | 
|  | 50 | +        # 公式10.20 | 
|  | 51 | +        for t in range(self.T-2,-1,-1): | 
|  | 52 | +            for i in range(self.N): | 
|  | 53 | +                for j in range(self.N): | 
|  | 54 | +                    self.beta[t][i] += self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] | 
|  | 55 | + | 
|  | 56 | +    def cal_gamma(self, i, t): | 
|  | 57 | +        """ | 
|  | 58 | +        公式 10.24 | 
|  | 59 | +        """ | 
|  | 60 | +        numerator = self.alpha[t][i]*self.beta[t][i] | 
|  | 61 | +        denominator = 0 | 
|  | 62 | + | 
|  | 63 | +        for j in range(self.N): | 
|  | 64 | +            denominator += self.alpha[t][j]*self.beta[t][j] | 
|  | 65 | + | 
|  | 66 | +        return numerator/denominator | 
|  | 67 | + | 
|  | 68 | +    def cal_ksi(self, i, j, t): | 
|  | 69 | +        """ | 
|  | 70 | +        公式 10.26 | 
|  | 71 | +        """ | 
|  | 72 | + | 
|  | 73 | +        numerator = self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] | 
|  | 74 | +        denominator = 0 | 
|  | 75 | + | 
|  | 76 | +        for i in range(self.N): | 
|  | 77 | +            for j in range(self.N): | 
|  | 78 | +                denominator += self.alpha[t][i]*self.A[i][j]*self.B[j][self.O[t+1]]*self.beta[t+1][j] | 
|  | 79 | + | 
|  | 80 | +        return numerator/denominator | 
|  | 81 | + | 
|  | 82 | +    def init(self): | 
|  | 83 | +        """ | 
|  | 84 | +        随机生成 A,B,Pi | 
|  | 85 | +        并保证每行相加等于 1 | 
|  | 86 | +        """ | 
|  | 87 | +        import random | 
|  | 88 | +        for i in range(self.N): | 
|  | 89 | +            randomlist = [random.randint(0,100) for t in range(self.N)] | 
|  | 90 | +            Sum = sum(randomlist) | 
|  | 91 | +            for j in range(self.N): | 
|  | 92 | +                self.A[i][j] = randomlist[j]/Sum | 
|  | 93 | + | 
|  | 94 | +        for i in range(self.N): | 
|  | 95 | +            randomlist = [random.randint(0,100) for t in range(self.M)] | 
|  | 96 | +            Sum = sum(randomlist) | 
|  | 97 | +            for j in range(self.M): | 
|  | 98 | +                self.B[i][j] = randomlist[j]/Sum | 
|  | 99 | + | 
|  | 100 | +    def train(self, O, MaxSteps = 100): | 
|  | 101 | +        self.T = len(O) | 
|  | 102 | +        self.O = O | 
|  | 103 | + | 
|  | 104 | +        # 初始化 | 
|  | 105 | +        self.init() | 
|  | 106 | + | 
|  | 107 | +        step = 0 | 
|  | 108 | +        # 递推 | 
|  | 109 | +        while step<MaxSteps: | 
|  | 110 | +            step+=1 | 
|  | 111 | +            print(step) | 
|  | 112 | +            tmp_A = np.zeros((self.N,self.N)) | 
|  | 113 | +            tmp_B = np.zeros((self.N,self.M)) | 
|  | 114 | +            tmp_pi = np.array([0.0]*self.N) | 
|  | 115 | + | 
|  | 116 | +            self.forward() | 
|  | 117 | +            self.backward() | 
|  | 118 | + | 
|  | 119 | +            # a_{ij} | 
|  | 120 | +            for i in range(self.N): | 
|  | 121 | +                for j in range(self.N): | 
|  | 122 | +                    numerator=0.0 | 
|  | 123 | +                    denominator=0.0 | 
|  | 124 | +                    for t in range(self.T-1): | 
|  | 125 | +                        numerator += self.cal_ksi(i,j,t) | 
|  | 126 | +                        denominator += self.cal_gamma(i,t) | 
|  | 127 | +                    tmp_A[i][j] = numerator/denominator | 
|  | 128 | + | 
|  | 129 | +            # b_{jk} | 
|  | 130 | +            for j in range(self.N): | 
|  | 131 | +                for k in range(self.M): | 
|  | 132 | +                    numerator = 0.0 | 
|  | 133 | +                    denominator = 0.0 | 
|  | 134 | +                    for t in range(self.T): | 
|  | 135 | +                        if k == self.O[t]: | 
|  | 136 | +                            numerator += self.cal_gamma(j,t) | 
|  | 137 | +                        denominator += self.cal_gamma(j,t) | 
|  | 138 | +                    tmp_B[j][k] = numerator / denominator | 
|  | 139 | + | 
|  | 140 | +            # pi_i | 
|  | 141 | +            for i in range(self.N): | 
|  | 142 | +                tmp_pi[i] = self.cal_gamma(i,0) | 
|  | 143 | + | 
|  | 144 | +            self.A = tmp_A | 
|  | 145 | +            self.B = tmp_B | 
|  | 146 | +            self.Pi = tmp_pi | 
|  | 147 | + | 
|  | 148 | +    def generate(self, length): | 
|  | 149 | +        import random | 
|  | 150 | +        I = [] | 
|  | 151 | + | 
|  | 152 | +        # start | 
|  | 153 | +        ran = random.randint(0,1000)/1000.0 | 
|  | 154 | +        i = 0 | 
|  | 155 | +        while self.Pi[i]<ran or self.Pi[i]<0.0001: | 
|  | 156 | +            ran -= self.Pi[i] | 
|  | 157 | +            i += 1 | 
|  | 158 | +        I.append(i) | 
|  | 159 | + | 
|  | 160 | +        # 生成状态序列 | 
|  | 161 | +        for i in range(1,length): | 
|  | 162 | +            last = I[-1] | 
|  | 163 | +            ran = random.randint(0, 1000) / 1000.0 | 
|  | 164 | +            i = 0 | 
|  | 165 | +            while self.A[last][i] < ran or self.A[last][i]<0.0001: | 
|  | 166 | +                ran -= self.A[last][i] | 
|  | 167 | +                i += 1 | 
|  | 168 | +            I.append(i) | 
|  | 169 | + | 
|  | 170 | +        # 生成观测序列 | 
|  | 171 | +        Y = [] | 
|  | 172 | +        for i in range(length): | 
|  | 173 | +            k = 0 | 
|  | 174 | +            ran = random.randint(0, 1000) / 1000.0 | 
|  | 175 | +            while self.B[I[i]][k] < ran or self.B[I[i]][k]<0.0001: | 
|  | 176 | +                ran -= self.B[I[i]][k] | 
|  | 177 | +                k += 1 | 
|  | 178 | +            Y.append(k) | 
|  | 179 | + | 
|  | 180 | +        return Y | 
|  | 181 | + | 
|  | 182 | + | 
|  | 183 | + | 
|  | 184 | +def triangle(length): | 
|  | 185 | +    ''' | 
|  | 186 | +    三角波 | 
|  | 187 | +    ''' | 
|  | 188 | +    X = [i for i in range(length)] | 
|  | 189 | +    Y = [] | 
|  | 190 | + | 
|  | 191 | +    for x in X: | 
|  | 192 | +        x = x % 6 | 
|  | 193 | +        if x <= 3: | 
|  | 194 | +            Y.append(x) | 
|  | 195 | +        else: | 
|  | 196 | +            Y.append(6-x) | 
|  | 197 | +    return X,Y | 
|  | 198 | + | 
|  | 199 | +def sin(length): | 
|  | 200 | +    ''' | 
|  | 201 | +    三角波 | 
|  | 202 | +    ''' | 
|  | 203 | +    import math | 
|  | 204 | +    X = [i for i in range(length)] | 
|  | 205 | +    Y = [] | 
|  | 206 | + | 
|  | 207 | +    for x in X: | 
|  | 208 | +        x = x % 20 | 
|  | 209 | +        Y.append(int(math.sin((x*math.pi)/10)*50)+50) | 
|  | 210 | +    return X,Y | 
|  | 211 | + | 
|  | 212 | + | 
|  | 213 | + | 
|  | 214 | +def show_data(x,y): | 
|  | 215 | +    import matplotlib.pyplot as plt | 
|  | 216 | +    plt.plot(x, y, 'g') | 
|  | 217 | +    plt.show() | 
|  | 218 | + | 
|  | 219 | +    return y | 
|  | 220 | + | 
|  | 221 | + | 
|  | 222 | +if __name__ == '__main__': | 
|  | 223 | +    hmm = HMM(10,4) | 
|  | 224 | +    tri_x, tri_y = triangle(20) | 
|  | 225 | +     | 
|  | 226 | +    hmm.train(tri_y) | 
|  | 227 | +    y = hmm.generate(100) | 
|  | 228 | +    x = [i for i in range(100)] | 
|  | 229 | +    show_data(x,y) | 
|  | 230 | + | 
|  | 231 | +    # hmm = HMM(15,101) | 
|  | 232 | +    # sin_x, sin_y = sin(40) | 
|  | 233 | +    # show_data(sin_x, sin_y) | 
|  | 234 | +    # hmm.train(sin_y) | 
|  | 235 | +    # y = hmm.generate(100) | 
|  | 236 | +    # x = [i for i in range(100)] | 
|  | 237 | +    # show_data(x,y) | 
|  | 238 | + | 
0 commit comments