Compare commits

...

16 Commits

Author SHA1 Message Date
holger krekel
6b850007bb add another test case 2024-12-17 17:40:02 +01:00
holger krekel
1ffee4089b simplification and renaming 2024-12-17 17:30:02 +01:00
holger krekel
78da7ce38b remove explciit send() call 2024-12-17 17:01:32 +01:00
holger krekel
b0a08cd1b2 fix all cases with clocks 2024-12-17 16:58:37 +01:00
holger krekel
a54f1390aa rename 2024-12-17 15:19:00 +01:00
holger krekel
30e0cd7328 actually drain mailbox 2024-12-17 14:28:40 +01:00
holger krekel
5238c85dc5 streamline function signatures 2024-12-17 14:27:36 +01:00
holger krekel
156270be3b a first solution with lamport clocks without tombstones 2024-12-16 23:11:42 +01:00
holger krekel
0e9076db00 make members attribute only contain ids, not objects 2024-12-16 21:58:56 +01:00
holger krekel
90f49a04e8 small refactor 2024-12-16 20:01:37 +01:00
holger krekel
4747d7abc2 refactor for clearer algorithm 2024-12-16 19:49:23 +01:00
holger krekel
3d0692e2bf add failing test for removing a member while one peer is offline 2024-12-16 19:19:34 +01:00
holger krekel
43897a66f0 streamline 2024-12-16 18:31:29 +01:00
holger krekel
ca431f61b9 fixes 2024-12-16 18:03:55 +01:00
holger krekel
99dc991bb0 split mailboxes by sender 2024-12-16 17:49:44 +01:00
holger krekel
6b817d6c25 a first little group consistency simulation 2024-12-16 15:23:00 +01:00

View File

@@ -0,0 +1,229 @@
import pytest
@pytest.fixture
def relay():
return Relay()
class Relay:
def __init__(self):
self.peers = {}
def make_peers(self, num):
for i in range(num):
newpeer = Peer(relay=self, num=i)
self.peers[newpeer.id] = newpeer
return self.peers.values()
def dump(self, title):
print()
print(f"# {title}")
for peer_id, peer in self.peers.items():
pending = sum(len(x) for x in peer.from2mailbox.values())
members = ",".join(peer.members)
print(f"{peer_id} clock={peer.current_clock} members={members} pending={pending}")
print()
def receive_all(self, peers=None):
peers = peers if peers is not None else list(self.peers.values())
for peer in peers:
# drain peer mailbox by reading messages from each sender separately
for from_peer in self.peers.values():
pending = peer.from2mailbox.pop(from_peer, [])
if from_peer.id != peer.id:
for msg in pending:
msg.receive(peer)
def assert_group_consistency(self):
peers = list(self.peers.values())
for peer1, peer2 in zip(peers, peers[1:]):
assert peer1.members == peer2.members
assert peer1.current_clock == peer2.current_clock
nums = ",".join(sorted(peer1.members))
print(f"{peer1.id} and {peer2.id} have same members {nums}")
class Message:
def __init__(self, sender, **payload):
self.sender = sender
self.payload = payload
self.recipients = set(sender.members)
# we increment clock on AddMemberMessage and DelMemberMessage
sender.current_clock += self.clock_inc
self.clock = sender.current_clock
self.send()
def __repr__(self):
nums = ",".join(self.recipients)
return f"<{self.__class__.__name__} clock={self.clock} {self.sender.id}->{nums} {self.payload}>"
def send(self):
print(f"sending {self}")
for peer_id in self.sender.members:
peer = self.sender.relay.peers[peer_id]
peer.from2mailbox.setdefault(self.sender, []).append(self)
class AddMemberMessage(Message):
clock_inc = 1
def __init__(self, sender, member):
sender.members.add(member)
super().__init__(sender, member=member)
def receive(self, peer):
if not peer.members: # create group
peer.members = self.recipients.copy()
peer.current_clock = self.clock
return
peer.members.add(self.payload["member"])
if peer.current_clock < self.clock:
peer.members.update(self.recipients)
peer.current_clock = self.clock
elif peer.current_clock == self.clock:
if peer.members != self.recipients:
peer.current_clock += 1
class DelMemberMessage(Message):
clock_inc = 1
def send(self):
super().send()
self.sender.members.remove(self.payload["member"])
def receive(self, peer):
member = self.payload["member"]
if member in peer.members:
if peer.current_clock <= self.clock:
peer.members.remove(member)
peer.current_clock = self.clock
class ChatMessage(Message):
clock_inc = 0
def receive(self, peer):
print(f"receive {peer.id} clock={peer.current_clock} msgclock={self.clock}")
if peer.current_clock < self.clock:
print(f"{peer.id} is outdated, using incoming memberslist")
peer.members = set(self.recipients)
peer.current_clock = self.clock
print(f"-> NEWCLOCK: {peer.current_clock}")
elif peer.current_clock == self.clock:
if peer.members != set(self.recipients):
print(f"{peer.id} has different members than incoming same-clock message")
print(f"{peer.id} resetting to incoming recipients, and increase own clock")
peer.members = set(self.recipients)
peer.current_clock = self.clock + 1
else:
print(f"{peer.id} has newer clock than incoming message")
class Peer:
"""A peer in a group"""
def __init__(self, relay, num):
self.relay = relay
self.id = f"p{num}"
self.members = set()
self.from2mailbox = {}
self.current_clock = 0
def __eq__(self, other):
return self.id == other.id
def __hash__(self):
return int(self.id[1:])
def __repr__(self):
clock = self.current_clock
return f"<Peer {self.id} members={','.join(self.members)} clock={clock}>"
def immediate_create_group(self, peers):
assert not self.members
self.members.add(self.id)
for peer in peers:
AddMemberMessage(self, member=peer.id)
self.relay.receive_all()
### Tests
def test_add_and_remove(relay):
p0, p1, p2, p3 = relay.make_peers(4)
# create group
p0.immediate_create_group([p1])
assert p0.members == p1.members == set([p0.id, p1.id])
# add members
AddMemberMessage(p0, member=p2.id)
AddMemberMessage(p0, member=p3.id)
relay.receive_all()
relay.assert_group_consistency()
DelMemberMessage(p3, member=p0.id)
relay.receive_all()
relay.assert_group_consistency()
def test_concurrent_add(relay):
p0, p1, p2, p3 = relay.make_peers(4)
p0.immediate_create_group([p1])
# concurrent adding and then let base set send a chat message
AddMemberMessage(p1, member=p2.id)
AddMemberMessage(p0, member=p3.id)
relay.receive_all()
relay.dump("after concurrent add")
# only now do p0 and p1 know of each others additions
# so p0 or p1 needs to send another message to get consistent membership
ChatMessage(p0)
relay.receive_all()
relay.dump("after chatmessage")
relay.assert_group_consistency()
def test_add_remove_and_stale_member_sends_chatmessage(relay):
p0, p1, p2, p3 = relay.make_peers(4)
p0.immediate_create_group([p1, p2, p3])
# p3 is offline and p0 deletes p2
DelMemberMessage(p0, member=p2.id)
relay.receive_all([p0, p1, p2])
# p3 sends a message with old memberlist and goes online
ChatMessage(p3)
relay.receive_all()
relay.assert_group_consistency()
ChatMessage(p0)
relay.receive_all()
assert p0.members == set(["p0", "p1", "p3"])
def test_add_remove_and_stale_member_sends_addition(relay):
p0, p1, p2, p3, p4 = relay.make_peers(5)
p0.immediate_create_group([p1, p2, p3])
# p3 is offline and p0 deletes p2
DelMemberMessage(p0, member=p2.id)
relay.receive_all([p0, p1, p2])
# p3 sends a message with member addition and goes online
AddMemberMessage(p3, member=p4.id)
relay.receive_all()
relay.dump("after p3 is online")
# we need a chat message from a higher clock state to heal consistency
ChatMessage(p0)
relay.receive_all()
relay.dump("after p0 sent chatmessage")
relay.assert_group_consistency()
assert p0.members == set(["p0", "p1", "p3", "p4"])