Commit ec6c7a44 authored by Alp Deniz Ogut's avatar Alp Deniz Ogut
Browse files

Handle waiting for async responses & completion

parent 043d844e
Showing with 82 additions and 69 deletions
+82 -69
......@@ -11,21 +11,27 @@ from .constants import *
class WhatsappPlugin(PluginBase):
service = SERVICE_NAME
socket = None
me_id = None
contacts = {}
chats = {}
messages = {}
pictures = {}
counter = {
GOT_CONTACTS: (0, 1), # expected 1 response after auth
GOT_CHATS: (0, 1), # expected 1 response after auth
GOT_MESSAGES: (0, 0), # expected n response to n requests
GOT_PICS: (0, 0) # expected n response to n requests
}
def __init__(self, skip_create=False, *args, **kwargs):
super().__init__(*args, **kwargs)
self.socket = None
self.done = []
self.skip_create=skip_create
self.me_id = None
self.contacts = {}
self.chats = {}
self.messages = {}
self.skip_create = skip_create
def run(self):
try:
self.connect()
self.authenticate()
......@@ -34,20 +40,8 @@ class WhatsappPlugin(PluginBase):
return
try:
while GOT_CHATS not in self.done:
sleep(0.2)
print("[+] Collecting chat messages...")
self.collect_chat_messages()
while GOT_CONTACTS not in self.done:
sleep(0.2)
print("[+] Collecting profile pictures...")
self.collect_profile_pictures()
while GOT_MESSAGES not in self.done:
sleep(0.5)
except KeyboardInterrupt:
print("Keyboard interrupt")
except Exception as e:
......@@ -57,7 +51,7 @@ class WhatsappPlugin(PluginBase):
print("[+] Whatsapp plugin run complete")
def connect(self):
onMessageCallback = lambda obj: self.handle_response_data(obj)
onMessageCallback = lambda obj, pend=None: self.handle_response_data(obj, pend)
self.socket = WhatsAppWebClient(None, onMessageCallback, None)
assert self.socket.activeWs, "Could not connect"
......@@ -90,23 +84,30 @@ class WhatsappPlugin(PluginBase):
self.store_credentials()
print("[+] Saved session")
def wait_for(self, ctr):
while self.counter[ctr][0] < self.counter[ctr][1]:
sleep(0.2)
print("GOT ALL", ctr.upper())
def collect_profile_pictures(self):
picture_requested = 0
self.wait_for(GOT_CONTACTS)
print("[+] Collecting profile pictures...")
for contact in self.contacts.values():
print(f"\tRequesting profile picture of {contact.externalId}", flush=True)
self.socket.get_profile_picture(contact.externalId, self.process_profile_picture_response)
picture_requested += 1
sleep(0.3)
self.done.append(GOT_PICS)
self.counter[GOT_PICS][1] += 1
sleep(0.5)
self.wait_for(GOT_PICS)
def collect_chat_messages(self):
chat_requested = 0
self.wait_for(GOT_CHATS)
print("[+] Collecting chat messages...")
for chat in self.chats.values():
print(f"\tRequesting chats with {chat.externalId}", flush=True)
self.socket.request_chat_history(chat.externalId)
chat_requested += 1
sleep(0.3)
self.done.append(GOT_MESSAGES)
self.counter[GOT_MESSAGES][1] += 1
sleep(0.5)
self.wait_for(GOT_MESSAGES)
def load_existing_login(self):
me = helpers.get_me(self.client)
......@@ -132,7 +133,7 @@ class WhatsappPlugin(PluginBase):
## ---- Response callbacks ---- ##
def handle_response_data(self, response_data):
def handle_response_data(self, response_data, pend=None):
action, action_meta, chat_or_contact_data = response_data
if action_meta is not None:
add_location = action_meta.get("add", None)
......@@ -150,7 +151,7 @@ class WhatsappPlugin(PluginBase):
elif response_type == "chat":
self.handle_chats(response_data)
elif response_type == "message":
self.handle_chat_messages(chat_or_contact_data)
self.handle_chat_messages(chat_or_contact_data, pend)
else:
print("\tSkipping type", response_type, ", add location:", add_location)
......@@ -159,12 +160,14 @@ class WhatsappPlugin(PluginBase):
_, _, contacts = response_data
for _, contact, _ in contacts:
account = helpers.contact_to_account(contact)
if account.externalId in ['status@broadcast']:
continue
if account.externalId not in self.contacts:
accounts.append(account)
self.contacts[account.externalId] = account
self.client.bulk_action(create_items=accounts)
self.done.append(GOT_CONTACTS)
self.counter[GOT_CONTACTS] += 1
print(f"created {len(self.contacts)} accounts")
def handle_chats(self, response_data):
......@@ -177,44 +180,52 @@ class WhatsappPlugin(PluginBase):
self.chats[chat.externalId] = chat
self.client.bulk_action(create_items=chats)
self.done.append(GOT_CHATS)
self.counter[GOT_CHATS] += 1
print(f"created {len(self.chats)} chats")
def handle_chat_messages(self, chat_data):
if len(chat_data) == 0:
return
# extract chat info
jid, first_message_id, first_message_owner = helpers.extract_chat_info_from_message(chat_data[0])
print(f"\timporting batch of chat messages - {len(chat_data)} - {jid}")
if len(chat_data) >= MESSAGES_BATCH_SIZE:
print("\tRequesting more messages", jid)
self.socket.request_chat_history(jid, count=MESSAGES_BATCH_SIZE, before=first_message_id, is_owner=first_message_owner)
edges = []
message_items = []
for message_json in chat_data:
message_item = helpers.process_message(message_json)
if message_item:
if message_item.externalId not in message_items:
print("Skipping existing message", message_item.content)
continue
if jid in self.chats:
edges.append(Edge(message_item, self.chats[jid], 'messageChannel'))
if getattr(message_item, '_sender_id', None):
if message_item._sender_id in self.contacts:
edges.append(Edge(message_item, self.contacts[message_item._sender_id], 'sender'))
edges.append(Edge(message_item, self.contacts[self.me_id], 'receiver'))
else:
if message_item._receiver_id in self.contacts:
edges.append(Edge(message_item, self.contacts[message_item._receiver_id], 'receiver'))
edges.append(Edge(message_item, self.contacts[self.me_id], 'sender'))
message_items.append(message_item)
self.messages[message_item.externalId] = message_item
if not self.skip_create:
self.client.bulk_action(create_items=message_items, create_edges=edges)
print(f"created {len(message_items)} messages")
def handle_chat_messages(self, chat_data, pend=None):
try:
if len(chat_data) > 0:
# extract paging info
jid, first_message_id, first_message_owner = helpers.extract_chat_info_from_message(chat_data[0])
print(f"\timporting batch of chat messages - {len(chat_data)} - {jid}")
if pend and pend['desc'] == '_chat_history':
# request the next pagechat info
if len(chat_data) >= MESSAGES_BATCH_SIZE:
print("\tRequesting more messages", jid)
self.socket.request_chat_history(jid, count=MESSAGES_BATCH_SIZE, before=first_message_id, is_owner=first_message_owner)
edges = []
message_items = []
for message_json in chat_data:
message_item = helpers.process_message(message_json)
if message_item:
if message_item.externalId in self.messages:
print("Skipping existing message", message_item.content, message_item.externalId)
continue
if jid in self.chats:
edges.append(Edge(message_item, self.chats[jid], 'messageChannel'))
if getattr(message_item, '_sender_id', None):
if message_item._sender_id in self.contacts:
edges.append(Edge(message_item, self.contacts[message_item._sender_id], 'sender'))
edges.append(Edge(message_item, self.contacts[self.me_id], 'receiver'))
else:
if message_item._receiver_id in self.contacts:
edges.append(Edge(message_item, self.contacts[message_item._receiver_id], 'receiver'))
edges.append(Edge(message_item, self.contacts[self.me_id], 'sender'))
message_items.append(message_item)
self.messages[message_item.externalId] = message_item
if not self.skip_create:
self.client.bulk_action(create_items=message_items, create_edges=edges)
print(f"created {len(message_items)} messages")
except Exception as e:
print("Exception while handling chat messages", e)
finally:
# If it is a response to our chat history request
if pend and pend['desc'] == '_chat_history':
self.counter[GOT_MESSAGES] += 1
def process_profile_picture_response(self, jid, data):
try:
......@@ -226,6 +237,8 @@ class WhatsappPlugin(PluginBase):
related_account.update(self.client)
except Exception as e:
print("Error while importing profile picture", e)
finally:
self.counter[GOT_PICS] += 1
def present_qr_code_to_user(self, qr_code, socket):
print("presenting qr_code to user")
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment