Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 9 additions & 171 deletions cmping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
- Accounts wait for IMAP_INBOX_IDLE state indicating readiness

2. GROUP CREATION: Sender creates a group chat and adds all receivers
- An initialization message is sent to promote the group
- All receivers must accept the group invitation before ping begins

3. PING SEND: Sender transmits messages to the group at specified intervals
- Messages contain: unique-id timestamp sequence-number
Expand Down Expand Up @@ -233,6 +231,10 @@ def _add_online(self, account):
if self.verbose >= 3:
addr = account.get_config("addr")
print(f" Starting I/O for account: {addr}")

# Enable bot mode in all accounts before starting I/O
# so we don't have to accept contact requests.
account.set_config("bot", "1")
account.start_io()
self.online.append(account)

Expand Down Expand Up @@ -319,8 +321,8 @@ def setup_accounts(args, sender_maker, receiver_maker):
return sender, receivers


def create_and_promote_group(sender, receivers, verbose=0):
"""Create a group chat and send initial message to promote it.
def create_group(sender, receivers, verbose=0):
"""Create a group chat.

Returns:
group: The created group chat object
Expand All @@ -337,164 +339,9 @@ def create_and_promote_group(sender, receivers, verbose=0):
print(f" Adding {receiver_addr} to group")
group.add_contact(contact)

# Send an initial message to promote the group
# This sends invitations to all members; progress is shown in wait_for_receivers_to_join()
if verbose >= 3:
print(" Sending group initialization message")
group.send_text("cmping group chat initialized")

return group


def wait_for_receivers_to_join(args, sender, receivers, timeout_seconds=30):
"""Wait concurrently for all receivers to join the group with progress display.

Timing: This function's duration is tracked as 'group_join_time'.

Args:
args: Command line arguments (for verbose flag)
sender: Sender account
receivers: List of receiver accounts
timeout_seconds: Maximum time to wait for all receivers

Returns:
int: Number of receivers that successfully joined
"""
print("# Waiting for receivers to come online", end="", flush=True)
sender_addr = sender.get_config("addr")
start_time = time.time()

# Track which receivers have joined
joined_receivers = set()
joined_addrs = [] # Track addresses in order they joined
receiver_threads_queue = queue.Queue()

def wait_for_receiver_join(idx, receiver, deadline):
"""Thread function to wait for a single receiver to join.

Args:
idx: Index of the receiver
receiver: Receiver account object
deadline: Timestamp when timeout should occur

Note:
Communicates results via receiver_threads_queue, does not return values.
Queue messages: ("joined", idx, addr), ("error", idx, msg),
("timeout", idx, None), ("exception", idx, error_str)
"""
try:
while time.time() < deadline:
event = receiver.wait_for_event()
if args.verbose >= 3:
# Log all events during group joining phase
receiver_addr = receiver.get_config("addr")
log_event_verbose(event, receiver_addr)

if event.kind == EventType.INCOMING_MSG:
msg = receiver.get_message_by_id(event.msg_id)
snapshot = msg.get_snapshot()
sender_contact = msg.get_sender_contact()
sender_contact_snapshot = sender_contact.get_snapshot()
if (
sender_contact_snapshot.address == sender_addr
and "cmping group chat initialized" in snapshot.text
):
chat_id = snapshot.chat_id
receiver_group = receiver.get_chat_by_id(chat_id)
receiver_group.accept()
receiver_threads_queue.put(
("joined", idx, receiver.get_config("addr"))
)
return
elif event.kind == EventType.ERROR and args.verbose >= 1:
receiver_threads_queue.put(("error", idx, event.msg))
# Timeout occurred
receiver_threads_queue.put(("timeout", idx, None))
except Exception as e:
receiver_threads_queue.put(("exception", idx, str(e)))

# Start a thread for each receiver
deadline = start_time + timeout_seconds
threads = []
for idx, receiver in enumerate(receivers):
t = threading.Thread(
target=wait_for_receiver_join, args=(idx, receiver, deadline)
)
t.start()
threads.append(t)

# Monitor progress and show spinner
total_receivers = len(receivers)
while len(joined_receivers) < total_receivers and time.time() < deadline:
try:
event_type, idx, data = receiver_threads_queue.get(timeout=0.5)
if event_type == "joined":
joined_receivers.add(idx)
joined_addrs.append(data) # Track the address
print(
f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
end="",
flush=True,
)
elif event_type == "error":
if args.verbose >= 1:
print(f"\n✗ ERROR during group joining for receiver {idx}: {data}")
print(
f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
end="",
flush=True,
)
elif event_type == "timeout":
print(
f"\n# WARNING: receiver {idx} did not join group within {timeout_seconds}s"
)
print(
f"# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
end="",
flush=True,
)
elif event_type == "exception":
print(f"\n# ERROR: receiver {idx} encountered exception: {data}")
print(
f"# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
end="",
flush=True,
)
except queue.Empty:
# Update spinner even when no events
print(
f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers}",
end="",
flush=True,
)

# Wait for threads to complete with a short timeout
for t in threads:
t.join(timeout=1.0)

# Final status
print(
f"\r# Waiting for receivers to come online {len(joined_receivers)}/{total_receivers} - Complete!"
)

# Print receiver info based on verbosity level
# -vv or higher: print full list of addresses
# -v or normal: just print count
if joined_addrs:
if args.verbose >= 2:
print(f"# Receivers online: {', '.join(joined_addrs)}")
elif args.verbose >= 1:
print(f"# Receivers online: {len(joined_addrs)}")

# Check if all receivers joined
if len(joined_receivers) < total_receivers:
print(
f"# WARNING: Only {len(joined_receivers)}/{total_receivers} receivers joined the group"
)

return len(joined_receivers)


def wait_profiles_online(maker):
"""Wait for all profiles to be online with spinner progress.

Expand Down Expand Up @@ -583,8 +430,7 @@ def perform_ping(args):

Timing Phases:
1. account_setup_time: Time to create and configure all accounts
2. group_join_time: Time for all receivers to join the group
3. message_time: Time to send and receive all ping messages
2. message_time: Time to send and receive all ping messages

Returns:
Pinger: The pinger object with results
Expand Down Expand Up @@ -645,16 +491,9 @@ def perform_ping(args):

account_setup_time = time.time() - account_setup_start

# Phase 2: Group Join (timed)
group_join_start = time.time()

# Create group and promote it
group = create_and_promote_group(sender, receivers, verbose=args.verbose)

# Wait for all receivers to join the group
wait_for_receivers_to_join(args, sender, receivers)
# Phase 2: Group creation
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very minor: This says Phase 2, which, referring to the timed phases (line 433), its no longer timed.


group_join_time = time.time() - group_join_start
group = create_group(sender, receivers, verbose=args.verbose)

# Phase 3: Message Ping/Pong (timed)
message_start = time.time()
Expand Down Expand Up @@ -742,7 +581,6 @@ def perform_ping(args):
# Print timing and rate statistics
print("--- timing statistics ---")
print(f"account setup: {format_duration(account_setup_time)}")
print(f"group join: {format_duration(group_join_time)}")
print(f"message send/recv: {format_duration(message_time)}")

# Calculate message rates
Expand Down