Skip to content

Commit 103aea9

Browse files
committed
Merge branch 'hsm' of https://github.com/fkromer/python-patterns into fkromer-hsm
2 parents ca277dd + b91a526 commit 103aea9

File tree

2 files changed

+272
-0
lines changed

2 files changed

+272
-0
lines changed

hsm.py

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
"""
2+
Implementation of the HSM (hierachrical state machine) or
3+
NFSM (nested finite state machine) C++ example from
4+
http://www.eventhelix.com/RealtimeMantra/HierarchicalStateMachine.htm#.VwqLVEL950w
5+
in Python
6+
7+
- single source 'message type' for state transition changes
8+
- message type considered, messages (comment) not considered to avoid complexity
9+
"""
10+
11+
12+
class UnsupportedMessageType(BaseException):
13+
pass
14+
15+
16+
class UnsupportedState(BaseException):
17+
pass
18+
19+
20+
class UnsupportedTransition(BaseException):
21+
pass
22+
23+
24+
class HierachicalStateMachine(object):
25+
26+
def __init__(self):
27+
self._active_state = Active(self) # Unit.Inservice.Active()
28+
self._standby_state = Standby(self) # Unit.Inservice.Standby()
29+
self._suspect_state = Suspect(self) # Unit.OutOfService.Suspect()
30+
self._failed_state = Failed(self) # Unit.OutOfService.Failed()
31+
self._current_state = self._standby_state
32+
self.states = {'active': self._active_state,
33+
'standby': self._standby_state,
34+
'suspect': self._suspect_state,
35+
'failed': self._failed_state}
36+
self.message_types = {'fault trigger': self._current_state.on_fault_trigger,
37+
'switchover': self._current_state.on_switchover,
38+
'diagnostics passed': self._current_state.on_diagnostics_passed,
39+
'diagnostics failed': self._current_state.on_diagnostics_failed,
40+
'operator inservice': self._current_state.on_operator_inservice}
41+
42+
def _next_state(self, state):
43+
try:
44+
self._current_state = self.states[state]
45+
except KeyError:
46+
raise UnsupportedState
47+
48+
def _send_diagnostics_request(self):
49+
return 'send diagnostic request'
50+
51+
def _raise_alarm(self):
52+
return 'raise alarm'
53+
54+
def _clear_alarm(self):
55+
return 'clear alarm'
56+
57+
def _perform_switchover(self):
58+
return 'perform switchover'
59+
60+
def _send_switchover_response(self):
61+
return 'send switchover response'
62+
63+
def _send_operator_inservice_response(self):
64+
return 'send operator inservice response'
65+
66+
def _send_diagnostics_failure_report(self):
67+
return 'send diagnostics failure report'
68+
69+
def _send_diagnostics_pass_report(self):
70+
return 'send diagnostics pass report'
71+
72+
def _abort_diagnostics(self):
73+
return 'abort diagnostics'
74+
75+
def _check_mate_status(self):
76+
return 'check mate status'
77+
78+
def on_message(self, message_type): # message ignored
79+
if message_type in self.message_types.keys():
80+
self.message_types[message_type]()
81+
else:
82+
raise UnsupportedMessageType
83+
84+
85+
class Unit(object):
86+
87+
def __init__(self, HierachicalStateMachine):
88+
self.hsm = HierachicalStateMachine
89+
90+
def on_switchover(self):
91+
raise UnsupportedTransition
92+
93+
def on_fault_trigger(self):
94+
raise UnsupportedTransition
95+
96+
def on_diagnostics_failed(self):
97+
raise UnsupportedTransition
98+
99+
def on_diagnostics_passed(self):
100+
raise UnsupportedTransition
101+
102+
def on_operator_inservice(self):
103+
raise UnsupportedTransition
104+
105+
106+
class Inservice(Unit):
107+
108+
def __init__(self, HierachicalStateMachine):
109+
self._hsm = HierachicalStateMachine
110+
111+
def on_fault_trigger(self):
112+
self._hsm._next_state('suspect')
113+
self._hsm._send_diagnostics_request()
114+
self._hsm._raise_alarm()
115+
116+
def on_switchover(self):
117+
self._hsm._perform_switchover()
118+
self._hsm._check_mate_status()
119+
self._hsm._send_switchover_response()
120+
121+
122+
class Active(Inservice):
123+
124+
def __init__(self, HierachicalStateMachine):
125+
self._hsm = HierachicalStateMachine
126+
127+
def on_fault_trigger(self):
128+
super(Active, self).perform_switchover()
129+
super(Active, self).on_fault_trigger()
130+
131+
def on_switchover(self):
132+
self._hsm.on_switchover() # message ignored
133+
self._hsm.next_state('standby')
134+
135+
136+
class Standby(Inservice):
137+
138+
def __init__(self, HierachicalStateMachine):
139+
self._hsm = HierachicalStateMachine
140+
141+
def on_switchover(self):
142+
super(Standby, self).on_switchover() #message ignored
143+
self._hsm._next_state('active')
144+
145+
146+
class OutOfService(Unit):
147+
148+
def __init__(self, HierachicalStateMachine):
149+
self._hsm = HierachicalStateMachine
150+
151+
def on_operator_inservice(self):
152+
self._hsm.on_switchover() # message ignored
153+
self._hsm.send_operator_inservice_response()
154+
self._hsm.next_state('suspect')
155+
156+
157+
class Suspect(OutOfService):
158+
159+
def __init__(self, HierachicalStateMachine):
160+
self._hsm = HierachicalStateMachine
161+
162+
def on_diagnostics_failed(self):
163+
super(Suspect, self).send_diagnostics_failure_report()
164+
super(Suspect, self).next_state('failed')
165+
166+
def on_diagnostics_passed(self):
167+
super(Suspect, self).send_diagnostics_pass_report()
168+
super(Suspect, self).clear_alarm() # loss of redundancy alarm
169+
super(Suspect, self).next_state('standby')
170+
171+
def on_operator_inservice(self):
172+
super(Suspect, self).abort_diagnostics()
173+
super(Suspect, self).on_operator_inservice() # message ignored
174+
175+
176+
class Failed(OutOfService):
177+
'''No need to override any method.'''
178+
179+
def __init__(self, HierachicalStateMachine):
180+
self._hsm = HierachicalStateMachine
181+

test_hsm.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
from hsm import HierachicalStateMachine, UnsupportedMessageType,\
2+
UnsupportedState, UnsupportedTransition, Active, Standby, Suspect, Failed
3+
from sys import version_info
4+
5+
if version_info < (2, 7):
6+
import unittest2 as unittest
7+
else:
8+
import unittest
9+
10+
from unittest.mock import patch
11+
12+
13+
class HsmMethodTest(unittest.TestCase):
14+
15+
@classmethod
16+
def setUpClass(cls):
17+
cls.hsm = HierachicalStateMachine()
18+
19+
def test_initial_state_shall_be_standby(cls):
20+
cls.assertEqual(isinstance(cls.hsm._current_state, Standby), True)
21+
22+
def test_unsupported_state_shall_raise_exception(cls):
23+
with cls.assertRaises(UnsupportedState) as context:
24+
cls.hsm._next_state('missing')
25+
26+
def test_unsupported_message_type_shall_raise_exception(cls):
27+
with cls.assertRaises(UnsupportedMessageType) as context:
28+
cls.hsm.on_message('trigger')
29+
30+
def test_calling_next_state_shall_change_current_state(cls):
31+
cls.hsm._current_state = Standby # initial state
32+
cls.hsm._next_state('active')
33+
cls.assertEqual(isinstance(cls.hsm._current_state, Active), True)
34+
cls.hsm._current_state = Standby(cls.hsm) # initial state
35+
36+
def test_method_perform_switchover_shall_return_specifically(cls):
37+
""" Exemplary HierachicalStateMachine method test.
38+
(here: _perform_switchover()). Add additional test cases... """
39+
return_value = cls.hsm._perform_switchover()
40+
expected_return_value = 'perform switchover'
41+
cls.assertEqual(return_value, expected_return_value)
42+
43+
44+
class StandbyStateTest(unittest.TestCase):
45+
""" Exemplary 2nd level state test class (here: Standby state). Add missing
46+
state test classes... """
47+
48+
@classmethod
49+
def setUpClass(cls):
50+
cls.hsm = HierachicalStateMachine()
51+
52+
def setUp(cls):
53+
cls.hsm._current_state = Standby(cls.hsm)
54+
55+
def test_given_standby_on_message_switchover_shall_set_active(cls):
56+
cls.hsm.on_message('switchover')
57+
cls.assertEqual(isinstance(cls.hsm._current_state, Active), True)
58+
59+
def test_given_standby_on_message_switchover_shall_call_hsm_methods(cls):
60+
with patch.object(cls.hsm, '_perform_switchover') as mock_perform_switchover,\
61+
patch.object(cls.hsm, '_check_mate_status') as mock_check_mate_status,\
62+
patch.object(cls.hsm, '_send_switchover_response') as mock_send_switchover_response,\
63+
patch.object(cls.hsm, '_next_state') as mock_next_state:
64+
cls.hsm.on_message('switchover')
65+
cls.assertEqual(mock_perform_switchover.call_count, 1)
66+
cls.assertEqual(mock_check_mate_status.call_count, 1)
67+
cls.assertEqual(mock_send_switchover_response.call_count, 1)
68+
cls.assertEqual(mock_next_state.call_count, 1)
69+
70+
def test_given_standby_on_message_fault_trigger_shall_set_suspect(cls):
71+
cls.hsm.on_message('fault trigger')
72+
cls.assertEqual(isinstance(cls.hsm._current_state, Suspect), True)
73+
74+
def test_given_standby_on_message_diagnostics_failed_shall_raise_exception_and_keep_in_state(cls):
75+
with cls.assertRaises(UnsupportedTransition) as context:
76+
cls.hsm.on_message('diagnostics failed')
77+
cls.assertEqual(isinstance(cls.hsm._current_state, Standby), True)
78+
79+
def test_given_standby_on_message_diagnostics_passed_shall_raise_exception_and_keep_in_state(cls):
80+
with cls.assertRaises(UnsupportedTransition) as context:
81+
cls.hsm.on_message('diagnostics passed')
82+
cls.assertEqual(isinstance(cls.hsm._current_state, Standby), True)
83+
84+
def test_given_standby_on_message_operator_inservice_shall_raise_exception_and_keep_in_state(cls):
85+
with cls.assertRaises(UnsupportedTransition) as context:
86+
cls.hsm.on_message('operator inservice')
87+
cls.assertEqual(isinstance(cls.hsm._current_state, Standby), True)
88+
89+
90+
if __name__ == "__main__":
91+
unittest.main()

0 commit comments

Comments
 (0)