def on_create():
election_history.append("create record")
leadership_history.append("get leadership")
def on_update():
election_history.append("update record")
def on_change():
election_history.append("change record")
mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, None)
def on_started_leading():
leadership_history.append("start leading")
def on_stopped_leading():
leadership_history.append("stop leading")
Create config 4.5 4 3
config = electionconfig.Config(lock=mock_lock, lease_duration=2.5,
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading,
onstopped_leading=on_stopped_leading)
Enter leader election
leaderelection.LeaderElection(config).run()
self.assert_history(election_history, ["create record", "update record", "update record", "update record"])
self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"])
def test_leader_election(self):
election_history = []
leadership_history = []
def on_create_A():
election_history.append("A creates record")
leadership_history.append("A gets leadership")
def on_update_A():
election_history.append("A updates record")
def on_change_A():
election_history.append("A gets leadership")
mock_lock_A = MockResourceLock("mock", "mock_namespace", "MockA", thread_lock, on_create_A, on_update_A, on_change_A, None)
mock_lock_A.renew_count_max = 3
def on_started_leading_A():
leadership_history.append("A starts leading")
def on_stopped_leading_A():
leadership_history.append("A stops leading")
config_A = electionconfig.Config(lock=mock_lock_A, lease_duration=2.5,
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_A,
onstopped_leading=on_stopped_leading_A)
def on_create_B():
election_history.append("B creates record")
leadership_history.append("B gets leadership")
def on_update_B():
election_history.append("B updates record")
def on_change_B():
leadership_history.append("B gets leadership")
mock_lock_B = MockResourceLock("mock", "mock_namespace", "MockB", thread_lock, on_create_B, on_update_B, on_change_B, None)
mock_lock_B.renew_count_max = 4
def on_started_leading_B():
leadership_history.append("B starts leading")
def on_stopped_leading_B():
leadership_history.append("B stops leading")
config_B = electionconfig.Config(lock=mock_lock_B, lease_duration=2.5,
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_B,
onstopped_leading=on_stopped_leading_B)
mock_lock_B.leader_record = mock_lock_A.leader_record
threading.daemon = True
Enter leader election for A
threading.Thread(target=leaderelection.LeaderElection(config_A).run()).start()
Enter leader election for B
threading.Thread(target=leaderelection.LeaderElection(config_B).run()).start()
time.sleep(5)
self.assert_history(election_history,
["A creates record",
"A updates record",
"A updates record",
"B updates record",
"B updates record",
"B updates record",
"B updates record"])
self.assert_history(leadership_history,
["A gets leadership",
"A starts leading",
"A stops leading",
"B gets leadership",
"B starts leading",
"B stops leading"])
"""Expected behavior: to check if the leader stops leading if it fails to update the lock within the renew_deadline
and stops leading after finally timing out. The difference between each try comes out to be approximately the sleep
time.
Example:
create record: 0s
on try update: 1.5s
on update: zzz s
on try update: 3s
on update: zzz s
on try update: 4.5s
on try update: 6s
Timeout - Leader Exits"""
def test_Leader_election_with_renew_deadline(self):
election_history = []
leadership_history = []
def on_create():
election_history.append("create record")
leadership_history.append("get leadership")
def on_update():
election_history.append("update record")
def on_change():
election_history.append("change record")
def on_try_update():
election_history.append("try update record")
mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, on_try_update)
mock_lock.renew_count_max = 3
def on_started_leading():
leadership_history.append("start leading")
def on_stopped_leading():
leadership_history.append("stop leading")
Create config
config = electionconfig.Config(lock=mock_lock, lease_duration=2.5,
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading,
onstopped_leading=on_stopped_leading)
Enter leader election
leaderelection.LeaderElection(config).run()
self.assert_history(election_history,
["create record",
"try update record",
"update record",
"try update record",
"update record",
"try update record",
"try update record"])
self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"])
def assert_history(self, history, expected):
self.assertIsNotNone(expected)
self.assertIsNotNone(history)
self.assertEqual(len(expected), len(history))