mirror of
https://github.com/chatmail/core.git
synced 2026-05-08 09:26:29 +03:00
fix all cases with clocks
This commit is contained in:
@@ -16,14 +16,23 @@ class Relay:
|
|||||||
self.peers[newpeer.id] = newpeer
|
self.peers[newpeer.id] = newpeer
|
||||||
return self.peers.values()
|
return self.peers.values()
|
||||||
|
|
||||||
|
def dump(self, title):
|
||||||
|
print(f"# {title}")
|
||||||
|
for peer_id, peer in self.peers.items():
|
||||||
|
pending = sum(len(x) for x in peer.from2mailbox.values())
|
||||||
|
members = ",".join(peer.members)
|
||||||
|
print(f"{peer_id} clock={peer.current_clock} members={members} pending={pending}")
|
||||||
|
print()
|
||||||
|
|
||||||
def receive_all(self, peers=None):
|
def receive_all(self, peers=None):
|
||||||
peers = peers if peers else self.peers.values()
|
peers = peers if peers is not None else list(self.peers.values())
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
# drain peer mailbox by reading messages from each sender separately
|
# drain peer mailbox by reading messages from each sender separately
|
||||||
for from_peer in self.peers.values():
|
for from_peer in self.peers.values():
|
||||||
for msg in peer.from2mailbox.pop(from_peer, []):
|
pending = peer.from2mailbox.pop(from_peer, [])
|
||||||
msg.receive_imf(peer)
|
if from_peer.id != peer.id:
|
||||||
peer.current_clock = max(peer.current_clock, msg.clock) + 1
|
for msg in pending:
|
||||||
|
msg.receive(peer)
|
||||||
|
|
||||||
def assert_same_members(self):
|
def assert_same_members(self):
|
||||||
peers = list(self.peers.values())
|
peers = list(self.peers.values())
|
||||||
@@ -36,9 +45,9 @@ class Relay:
|
|||||||
class Message:
|
class Message:
|
||||||
def __init__(self, sender, **payload):
|
def __init__(self, sender, **payload):
|
||||||
self.sender = sender
|
self.sender = sender
|
||||||
self.recipients = list(sender.members)
|
|
||||||
self.relay = sender.relay
|
|
||||||
self.payload = payload
|
self.payload = payload
|
||||||
|
self.recipients = set(sender.members)
|
||||||
|
sender.current_clock += self.inc
|
||||||
self.clock = sender.current_clock
|
self.clock = sender.current_clock
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
@@ -47,28 +56,59 @@ class Message:
|
|||||||
|
|
||||||
def send(self):
|
def send(self):
|
||||||
for peer_id in self.sender.members:
|
for peer_id in self.sender.members:
|
||||||
peer = self.relay.peers[peer_id]
|
peer = self.sender.relay.peers[peer_id]
|
||||||
peer.from2mailbox.setdefault(self.sender, []).append(self)
|
peer.from2mailbox.setdefault(self.sender, []).append(self)
|
||||||
|
|
||||||
|
|
||||||
class AddMemberMessage(Message):
|
class AddMemberMessage(Message):
|
||||||
def receive_imf(self, peer):
|
inc = 1
|
||||||
peer.members.add(self.payload["newmember"])
|
|
||||||
|
def __init__(self, sender, member):
|
||||||
|
sender.members.add(member)
|
||||||
|
super().__init__(sender, member=member)
|
||||||
|
|
||||||
|
def receive(self, peer):
|
||||||
|
if not peer.members:
|
||||||
|
peer.members = self.recipients.copy()
|
||||||
|
peer.current_clock = self.clock
|
||||||
|
return
|
||||||
|
|
||||||
|
peer.members.add(self.payload["member"])
|
||||||
|
if peer.current_clock < self.clock:
|
||||||
peer.members.update(self.recipients)
|
peer.members.update(self.recipients)
|
||||||
|
peer.current_clock = self.clock
|
||||||
|
|
||||||
|
|
||||||
class DelMemberMessage(Message):
|
class DelMemberMessage(Message):
|
||||||
def receive_imf(self, peer):
|
inc = 1
|
||||||
|
|
||||||
|
def send(self):
|
||||||
|
super().send()
|
||||||
|
self.sender.members.remove(self.payload["member"])
|
||||||
|
|
||||||
|
def receive(self, peer):
|
||||||
member = self.payload["member"]
|
member = self.payload["member"]
|
||||||
if member in peer.members:
|
if member in peer.members:
|
||||||
|
if peer.current_clock <= self.clock:
|
||||||
peer.members.remove(member)
|
peer.members.remove(member)
|
||||||
|
peer.current_clock = self.clock
|
||||||
|
|
||||||
|
|
||||||
class ChatMessage(Message):
|
class ChatMessage(Message):
|
||||||
def receive_imf(self, peer):
|
inc = 0
|
||||||
|
|
||||||
|
def receive(self, peer):
|
||||||
|
print(f"receive {peer.id} clock={peer.current_clock} msgclock={self.clock}")
|
||||||
if peer.current_clock < self.clock:
|
if peer.current_clock < self.clock:
|
||||||
|
print(f"{peer.id} is outdated, using incoming memberslist")
|
||||||
peer.members = set(self.recipients)
|
peer.members = set(self.recipients)
|
||||||
peer.current_clock = self.clock
|
peer.current_clock = self.clock
|
||||||
|
print(f"-> NEWCLOCK: {peer.current_clock}")
|
||||||
|
elif peer.current_clock == self.clock:
|
||||||
|
if peer.members != set(self.recipients):
|
||||||
|
print(f"{peer.id} has different members than incoming same-clock message")
|
||||||
|
peer.members = set(self.recipients)
|
||||||
|
peer.current_clock = self.clock + 1
|
||||||
|
|
||||||
|
|
||||||
class Peer:
|
class Peer:
|
||||||
@@ -95,24 +135,9 @@ class Peer:
|
|||||||
assert not self.members
|
assert not self.members
|
||||||
self.members.add(self.id)
|
self.members.add(self.id)
|
||||||
for peer in peers:
|
for peer in peers:
|
||||||
self.send_add_member(peer.id)
|
AddMemberMessage(self, member=peer.id).send()
|
||||||
self.relay.receive_all()
|
self.relay.receive_all()
|
||||||
|
|
||||||
def send_add_member(self, newmember):
|
|
||||||
assert isinstance(newmember, str)
|
|
||||||
self.members.add(newmember)
|
|
||||||
AddMemberMessage(self, newmember=newmember).send()
|
|
||||||
|
|
||||||
def send_del_member(self, member):
|
|
||||||
assert isinstance(member, str)
|
|
||||||
assert member in self.members
|
|
||||||
msg = DelMemberMessage(self, member=member)
|
|
||||||
msg.send()
|
|
||||||
self.members.remove(member)
|
|
||||||
|
|
||||||
def send_chatmessage(self):
|
|
||||||
ChatMessage(self).send()
|
|
||||||
|
|
||||||
|
|
||||||
### Tests
|
### Tests
|
||||||
|
|
||||||
@@ -125,12 +150,12 @@ def test_add_and_remove(relay):
|
|||||||
assert p0.members == p1.members == set([p0.id, p1.id])
|
assert p0.members == p1.members == set([p0.id, p1.id])
|
||||||
|
|
||||||
# add members
|
# add members
|
||||||
p0.send_add_member(p2.id)
|
AddMemberMessage(p0, member=p2.id).send()
|
||||||
p0.send_add_member(p3.id)
|
AddMemberMessage(p0, member=p3.id).send()
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
relay.assert_same_members()
|
relay.assert_same_members()
|
||||||
|
|
||||||
p3.send_del_member(p0.id)
|
DelMemberMessage(p3, member=p0.id).send()
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
relay.assert_same_members()
|
relay.assert_same_members()
|
||||||
|
|
||||||
@@ -140,16 +165,14 @@ def test_concurrent_add(relay):
|
|||||||
|
|
||||||
p0.immediate_create_group([p1])
|
p0.immediate_create_group([p1])
|
||||||
# concurrent adding and then let base set send a chat message
|
# concurrent adding and then let base set send a chat message
|
||||||
p1.send_add_member(p2.id)
|
AddMemberMessage(p1, member=p2.id).send()
|
||||||
p0.send_add_member(p3.id)
|
AddMemberMessage(p0, member=p3.id).send()
|
||||||
|
|
||||||
# now p0 and p1 send a regular message
|
|
||||||
p0.send_chatmessage()
|
|
||||||
p1.send_chatmessage()
|
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
|
|
||||||
|
relay.dump("after concurrent add")
|
||||||
# only now do p0 and p1 know of each others additions
|
# only now do p0 and p1 know of each others additions
|
||||||
# so p0 or p1 needs to send another message to get consistent membership
|
# so p0 or p1 needs to send another message to get consistent membership
|
||||||
p0.send_chatmessage()
|
ChatMessage(p0).send()
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
relay.assert_same_members()
|
relay.assert_same_members()
|
||||||
|
|
||||||
@@ -159,17 +182,20 @@ def test_add_remove_and_stale_old_suddenly_sends(relay):
|
|||||||
|
|
||||||
p0.immediate_create_group([p1, p2, p3])
|
p0.immediate_create_group([p1, p2, p3])
|
||||||
|
|
||||||
# p3 is offline and a member get deleted
|
# p3 is offline and p0 deletes p2
|
||||||
p0.send_del_member(p2.id)
|
DelMemberMessage(p0, member=p2.id).send()
|
||||||
relay.receive_all([p0, p1, p2])
|
relay.receive_all([p0, p1, p2])
|
||||||
|
relay.dump("p0 has deleted p3")
|
||||||
|
|
||||||
# p3 sends a message with old memberlist and goes online
|
# p3 sends a message with old memberlist and goes online
|
||||||
p3.send_chatmessage()
|
ChatMessage(p3).send()
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
|
|
||||||
|
relay.dump("after p3 sent an old memberlist")
|
||||||
# p0 sends a message which should update all peers' members
|
# p0 sends a message which should update all peers' members
|
||||||
p0.send_chatmessage()
|
ChatMessage(p0).send()
|
||||||
relay.receive_all()
|
relay.receive_all()
|
||||||
|
relay.dump("final")
|
||||||
|
|
||||||
relay.assert_same_members()
|
relay.assert_same_members()
|
||||||
assert p0.members == set([p0.id, p1.id, p3.id])
|
assert p0.members == set([p0.id, p1.id, p3.id])
|
||||||
|
|||||||
Reference in New Issue
Block a user