Commit 95587a09 authored by Koen van der Veen's avatar Koen van der Veen
Browse files

update

parent c1e3fa97
Showing with 453 additions and 138 deletions
+453 -138
......@@ -11,6 +11,8 @@ index = {"get_cvu": "authenticator.ipynb",
"get_unique_accounts": "gmailimporter.ipynb",
"get_unique_message_channels": "gmailimporter.ipynb",
"GmailImporter": "gmailimporter.ipynb",
"GMAIL_SERVICE_NAME": "gmailimporter.ipynb",
"get_username_password": "imapclient.ipynb",
"IMAPClient": "imapclient.ipynb",
"DEFAULT_GMAIL_HOST": "imapclient.ipynb",
"DEFAULT_PORT": "imapclient.ipynb"}
......
......@@ -40,6 +40,8 @@ class PasswordAuthenticator:
def authenticate(self, plugin):
self.request_user_credentials()
print("To simulate front-end, run:")
print(f"simulate_enter_credentials --plugin=gmail_importer")
login_success = False
for _ in range(self.MAX_LOGIN_ATTEMPTS):
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/imapclient.ipynb (unless otherwise specified).
__all__ = ['IMAPClient', 'DEFAULT_GMAIL_HOST', 'DEFAULT_PORT']
__all__ = ['get_username_password', 'IMAPClient', 'DEFAULT_GMAIL_HOST', 'DEFAULT_PORT']
# Cell
import re
......@@ -10,9 +10,16 @@ from email.utils import getaddresses
from pymemri.pod.client import PodClient
from nbdev.showdoc import show_doc
from pymemri.data.basic import *
from pymemri.data.schema import EmailMessage, MessageChannel, Person, ImporterRun
from pymemri.importers.importer import ImporterBase
from pymemri.data.schema import EmailMessage, MessageChannel, Person
from pymemri.data.itembase import Item
from pymemri.plugin.pluginbase import PluginRun
# Cell
def get_username_password():
credentials_path = HOME_DIR / ".pymemri" / "plugins" / "gmail" / "credentials.json"
credentials = read_json(credentials_path)
username, password = credentials["username"], credentials["password"]
return username,password
# Cell
DEFAULT_GMAIL_HOST = 'imap.gmail.com'
......@@ -24,6 +31,8 @@ class IMAPClient():
assert username is not None and app_pw is not None
self.client = imaplib.IMAP4_SSL(host, port=port)
self.client.login(username, app_pw)
self.username=username
self.password=app_pw
all_mailboxes = self.get_mailboxes_with_attribute(attribute="\\All")
if not len(all_mailboxes):
......@@ -68,10 +77,24 @@ class IMAPClient():
def get_all_mail_ids(self):
"""retrieves all mail ids from the selected mailbox"""
result, data = self.client.uid('search', None, "ALL") # search and return ids instead
return data[0].split()
return list(reversed(data[0].split()))
def get_mails(self, ids, batched=True):
if batched:
return self._get_mails_batches(ids)
else:
return [self.get_mail(id) for id in ids]
def get_mails(self, ids):
return [self.get_mail(id) for id in ids]
def _get_mails_batches(self, ids):
assert self.client.host == DEFAULT_GMAIL_HOST
status, data = self.client.uid('fetch', b','.join(ids), '(RFC822 X-GM-THRID)')
result = []
for i in range(len(ids)):
mail_data = data[i*2]
thread_id = mail_data[0].decode("utf-8").split(" ")[2]
raw_email = mail_data[1]
result.append((raw_email, thread_id))
return result
def get_mail(self, id):
"""Fetches a mail given a id, returns (raw_mail, thread_id)"""
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/gmailimporter.ipynb (unless otherwise specified).
__all__ = ['BASE_PATH', 'part_to_str', 'get_unique_accounts', 'get_unique_message_channels', 'GmailImporter']
__all__ = ['BASE_PATH', 'part_to_str', 'get_unique_accounts', 'get_unique_message_channels', 'GmailImporter',
'GMAIL_SERVICE_NAME']
# Cell
import json
......@@ -13,6 +14,7 @@ from email.utils import getaddresses
from nbdev.showdoc import show_doc
import time
import pathlib
import uuid
from pymemri.pod.client import PodClient
from pymemri.data.basic import *
......@@ -24,9 +26,10 @@ from pymemri.data.itembase import Item
from pymemri.plugin.states import RUN_COMPLETED, RUN_USER_ACTION_NEEDED
from pymemri.plugin.states import RUN_USER_ACTION_COMPLETED, RUN_STARTED, RUN_FAILED
import gmail_importer
from .imap_client import IMAPClient, DEFAULT_GMAIL_HOST, DEFAULT_PORT
from .authenticator import PasswordAuthenticator
import gmail_importer
from pymemri.plugin.pluginbase import run_plugin
# Cell
BASE_PATH = pathlib.Path(gmail_importer.__file__).parent
......@@ -50,7 +53,7 @@ def _get_all_parts(part):
def get_unique_accounts(all_mails):
all_accounts = {}
for email_item in all_mails:
for account in email_item.sender + email_item.receiver + email_item.replyTo:
for account in email_item.sender + email_item.receiver + email_item.replyTo + email_item.cc + email_item.bcc:
if not account.externalId in all_accounts:
all_accounts[account.externalId] = account
......@@ -77,8 +80,11 @@ def get_unique_message_channels(all_mails):
# Cell
GMAIL_SERVICE_NAME = "gmail"
class GmailImporter(PluginBase):
MAX_IMPORTS = 100
MAX_IMPORTS = 31996
# MAX_IMPORTS = 1000
SLEEP_INTERVAL = 1
MAX_LOGIN_ATTEMPTS = 10
......@@ -118,10 +124,14 @@ class GmailImporter(PluginBase):
@staticmethod
def get_accounts(message, field):
addresses = getaddresses(message.get_all(field, []))
return [Account(externalId=address, identifier=name) for name, address in addresses]
accounts = []
for name, address in addresses:
name = name if name != "" else None
accounts.append(Account(externalId=address, identifier=address, displayName=name, service=GMAIL_SERVICE_NAME))
return accounts
@staticmethod
def get_content(message):
def get_content(message, verbose=False):
"""Extracts content from a python email message"""
maintype = message.get_content_maintype()
if maintype == 'multipart':
......@@ -132,7 +142,9 @@ class GmailImporter(PluginBase):
if len(html_parts) > 0:
if len(html_parts) > 1:
error_msg = "\n AND \n".join(html_parts)
print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}")
print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE")
if verbose:
print(error_msg)
return html_parts[0]
else:
return parts[0].get_payload()
......@@ -164,6 +176,10 @@ class GmailImporter(PluginBase):
email_item.add_edge('receiver', a)
for a in self.get_accounts(message, 'reply-to'):
email_item.add_edge('replyTo', a)
for a in self.get_accounts(message, 'cc'):
email_item.add_edge('cc', a)
for a in self.get_accounts(message, 'bcc'):
email_item.add_edge('bcc', a)
if thread_id != None:
thread_item = MessageChannel(externalId=thread_id)
......@@ -185,11 +201,12 @@ class GmailImporter(PluginBase):
n_batches = math.ceil(len(mail_ids) / batch_size)
for i, batch_ids in enumerate(self.batch(mail_ids, n=batch_size)):
print(f"downloading and converting batch {i}/{n_batches}")
for mail, thread_id in self.imap_client.get_mails(batch_ids):
item = self.create_item_from_mail(mail, thread_id=thread_id)
self.client.create_if_external_id_not_exists(item)
# self.client.create_if_external_id_not_exists(item)
mails.append(item)
progress = (i + 1) / n_batches * 1.0
......@@ -199,39 +216,67 @@ class GmailImporter(PluginBase):
"""This is the main function of the Gmail importer. It runs the importer given information
provided in the plugin run."""
self.pluginRun.status = RUN_STARTED
self.client.update_item(self.pluginRun)
self.authenticator = PasswordAuthenticator(client=self.client, pluginRun=self.pluginRun)
print("To simulate front-end, run:")
print(f"password_simulator --run_id {self.pluginRun.id}")
self.authenticator.authenticate(self)
print("Importing emails...")
# TODO: This is not linked to the other accounts (from mails) yet
me_account = Account(isMe=True, identifier=self.imap_client.username, externalId=self.imap_client.username,
service=GMAIL_SERVICE_NAME)
self.client.create(me_account)
mail_ids = self.imap_client.get_all_mail_ids()
print(f"Found {len(mail_ids)} emails")
if self.stop_early_at:
mail_ids = mail_ids[:int(self.stop_early_at)]
mail_ids = mail_ids[-int(self.stop_early_at):]
print(f"Importing latest {len(mail_ids)} emails...")
all_mails = self.get_mails(mail_ids)
print("Importing accounts...")
all_accounts = get_unique_accounts(all_mails)
for item in all_accounts:
self.client.create_if_external_id_not_exists(item)
print("Importing channels...")
all_channels = get_unique_message_channels(all_mails)
for item in all_channels:
self.client.create_if_external_id_not_exists(item)
for email_item in all_mails:
for e in email_item.get_all_edges():
self.client.create_edge(e)
items = all_mails + all_accounts + all_channels
# for x in items:
# if x.id is None: x.id = uuid.uuid4().hex
# temporary fix for missing edge id's (unknown why)
# all_ids = {x.id: True for x in items if x.id is not None}
edges = [e for email_item in all_mails for e in email_item.get_all_edges()]
# edges = [e for e in edges if e.source.id in all_ids and e.target.id in all_ids]
print(f"writing items ({len(items)}) and edges ({len(edges)}) to pod")
self.client.bulk_action(create_items=items, create_edges=edges)
# batch_size = 50
# n_batches = (len(items) // batch_size) + 1
# # mails can be big, therefore => batch
# for i in range(n_batches):
# s, e = i*batch_size, (i+1) * batch_size
# items_batch = items[s:e]
# print(f"Writing items {s}-{e}/{len(items)} to pod")
# self.client.bulk_action(create_items=items_batch)
# batch_size = 10000
# n_batches = (len(edges) // batch_size) + 1
# for i in range(n_batches):
# s, e = i*batch_size, (i+1) * batch_size
# edges_batch = edges[s:e]
# print(f"Writing edges {s}-{e}/{len(edges)} to pod")
# self.client.bulk_action(create_edges=edges_batch)
# self.client.bulk_action(create_edges=edges)
# for email_item in all_mails:
# for e in email_item.get_all_edges():
# self.client.create_edge(e)
self.pluginRun.status = RUN_COMPLETED
self.client.update_item(self.pluginRun)
print(f"Finished running {self}")
def add_to_schema(self):
self.client.add_to_schema(EmailMessage(externalId="", subject="", dateSent=0, content=""))
......
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp authenticator
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:code id: tags:
``` python
# export
# hide
import abc
from time import sleep
from pathlib import Path
from pymemri.data.basic import read_file
from pymemri.plugin.states import RUN_USER_ACTION_NEEDED, RUN_USER_ACTION_COMPLETED
from pymemri.data.schema import CVUStoredDefinition
import time
import gmail_importer
```
%% Cell type:code id: tags:
``` python
# export
LOGIN_CVU = "passwordAuth.cvu"
CVU_BASE_PATH = Path(gmail_importer.__file__).parent.parent / "cvu"
def get_cvu(name, base_path=CVU_BASE_PATH):
path = Path(base_path) / name
cvu_str = read_file(path)
return CVUStoredDefinition(definition=cvu_str, name=name, externalId=name)
```
%% Cell type:markdown id: tags:
## Password Authenticator
Password Authenticator provides an easy interface to setup standard username-password authentication with 3rd party services.
Simply calling `authenticate()` should load the related account with required credentials.
Inheriting class should implement:
- get_token() that tests username-password combination or gets a new session token to be used for future calls
%% Cell type:code id: tags:
``` python
# export
# hide
class PasswordAuthenticator:
MAX_LOGIN_ATTEMPTS = 10
SLEEP_INTERVAL = 1.0
MAX_POLL_TIME = 1000
def __init__(self, client, pluginRun):
self.client = client
self.pluginRun = pluginRun
self.isTest = False
def authenticate(self, plugin):
self.request_user_credentials()
print("To simulate front-end, run:")
print(f"simulate_enter_credentials --plugin=gmail_importer")
login_success = False
for _ in range(self.MAX_LOGIN_ATTEMPTS):
username, password = self.poll_credentials()
try:
plugin.login(username, password)
login_success = True
break
except Exception as e:
print(e)
print("Login failed, invalid credentials.")
login_success = False
if not login_success:
self.pluginRun.status = "error"
self.client.update_item(self.pluginRun)
raise RuntimeError("Reached max login attempts.")
def request_user_credentials(self):
cvu = get_cvu(LOGIN_CVU)
self.client.create(cvu)
self.pluginRun.add_edge("view", cvu)
self.client.create_edge(self.pluginRun.get_edges("view")[0])
self.pluginRun.status = RUN_USER_ACTION_NEEDED
self.client.update_item(self.pluginRun)
def poll_credentials(self):
# request username and password from the user client
# WAIT HERE = BLOCK
start_time = time.time()
while True:
if time.time() - start_time > self.MAX_POLL_TIME:
raise RuntimeError("Stop polling, max time reached.")
print("polling for credentials...")
sleep(self.SLEEP_INTERVAL)
self.pluginRun = self.client.get(self.pluginRun.id)
if self.pluginRun.status == RUN_USER_ACTION_COMPLETED:
account = self.pluginRun.account[0]
return account.identifier, account.secret
```
%% Cell type:code id: tags:
``` python
from pymemri.plugin.pluginbase import PluginBase
from pymemri.plugin.schema import PluginRun, Account
from pymemri.pod.client import PodClient
import threading
class MyAuthenticatedPlugin(PluginBase):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logged_in = False
self.authenticator = PasswordAuthenticator(kwargs["client"], kwargs["pluginRun"])
def login(self, username, password):
if not (username=="username" and password=="password"):
raise ValueError("Wrong credentials.")
def run(self):
self.authenticator.authenticate(self)
self.logged_in = True
print("done!")
def add_to_schema(self):
pass
pod_client = PodClient()
run = PluginRun("", "", "")
account = Account(service="myAuthenticatedPlugin")
run.add_edge("account", account)
run.status = "start"
pod_client.create(run)
pod_client.create(account)
pod_client.create_edge(run.get_edges("account")[0])
```
%% Output
True
%% Cell type:code id: tags:
``` python
# Create Plugin
plugin = MyAuthenticatedPlugin(client=pod_client, pluginRun=run)
```
%% Cell type:code id: tags:
``` python
# Start plugin in background thread
def run_thread():
plugin.run()
assert plugin.logged_in
thread = threading.Thread(target=run_thread)
thread.start()
```
%% Output
<Response [400]> b'Failure: Property externalId not defined in Schema (attempted to use it for json value "passwordAuth.cvu")'
<Response [404]> b'Endpoint not found'
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
Exception in thread Thread-7:
Traceback (most recent call last):
File "/opt/anaconda3/envs/gmail/lib/python3.7/threading.py", line 926, in _bootstrap_inner
self.run()
File "/opt/anaconda3/envs/gmail/lib/python3.7/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/3781541474.py", line 3, in run_thread
plugin.run()
File "/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/3370137843.py", line 17, in run
self.authenticator.authenticate(self)
File "/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/371228646.py", line 19, in authenticate
username, password = self.poll_credentials()
File "/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/371228646.py", line 51, in poll_credentials
self.pluginRun = self.client.get(self.pluginRun.id)
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py", line 271, in get
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py", line 302, in _get_item_expanded
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py", line 269, in get
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py", line 341, in _get_item_with_properties
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py", line 426, in item_from_json
File "/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/data/schema.py", line 22, in get_constructor
ModuleNotFoundError: No module named 'pymemri.indexers'
%% Cell type:code id: tags:
``` python
# Enter password and check if plugin is authenticated
def simulate_enter_password(pod_client, run_id):
run = pod_client.get(run_id)
account = run.account[0]
username = "username"
password = "password"
account.identifier = username
account.secret = password
run.status = "ready"
pod_client.update_item(account)
pod_client.update_item(run)
simulate_enter_password(pod_client, run.id)
time.sleep(4)
assert plugin.logged_in
```
%% Output
done!
---------------------------------------------------------------------------
ModuleNotFoundError Traceback (most recent call last)
/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/2266559817.py in <module>
14 pod_client.update_item(run)
15
---> 16 simulate_enter_password(pod_client, run.id)
17 time.sleep(4)
18 assert plugin.logged_in
/var/folders/q1/ryq93kwj055dlbpngxv1c7z40000gn/T/ipykernel_5103/2266559817.py in simulate_enter_password(pod_client, run_id)
2
3 def simulate_enter_password(pod_client, run_id):
----> 4 run = pod_client.get(run_id)
5 account = run.account[0]
6
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py in get(self, id, expanded)
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py in _get_item_expanded(self, id)
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py in get(self, id, expanded)
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py in _get_item_with_properties(self, id)
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/pod/client.py in item_from_json(self, json)
/opt/anaconda3/envs/gmail/lib/python3.7/site-packages/pymemri/data/schema.py in get_constructor(_type, plugin_class, plugin_package, extra)
ModuleNotFoundError: No module named 'pymemri.indexers'
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin
```
%% Cell type:code id: tags:
``` python
# export
import json
import re
import imaplib
import email
import math
from email import policy
from email.utils import getaddresses
from nbdev.showdoc import show_doc
import time
import pathlib
import uuid
from pymemri.pod.client import PodClient
from pymemri.data.basic import *
from pymemri.data.schema import EmailMessage, MessageChannel, Person
from pymemri.plugin.schema import PluginRun, Account
from pymemri.plugin.pluginbase import PluginBase
from pymemri.data.itembase import Item
from pymemri.plugin.states import RUN_COMPLETED, RUN_USER_ACTION_NEEDED
from pymemri.plugin.states import RUN_USER_ACTION_COMPLETED, RUN_STARTED, RUN_FAILED
import gmail_importer
from gmail_importer.imap_client import IMAPClient, DEFAULT_GMAIL_HOST, DEFAULT_PORT
from gmail_importer.authenticator import PasswordAuthenticator
import gmail_importer
from pymemri.plugin.pluginbase import run_plugin
```
%% Cell type:code id: tags:
``` python
# export
BASE_PATH = pathlib.Path(gmail_importer.__file__).parent
```
%% Cell type:markdown id: tags:
# Email importer
%% Cell type:markdown id: tags:
This importers fetches your emails and accounts over IMAP, it uses the python built-in imap client and some convenience functions for easier usage, batching and importing to the pod. This importer requires you to login with your email address and an app password. It is tested on gmail, but should work for other IMAP-servers.
%% Cell type:markdown id: tags:
> Note: **The recommended usage for Gmail is to enable two-factor authentication. In this case, make sure you allow [SMTP-connections](https://www.gmass.co/blog/gmail-smtp/) and set an application password (explained in the same link)**
%% Cell type:markdown id: tags:
## ImapClient
The `EmailImporter` communicates with email providers over imap. We created a convenience class around pythons imaplib , called the `ImapClient` that lets you list your mailboxes, retriev your mails and get their content.
%% Cell type:code id: tags:
``` python
show_doc(IMAPClient)
```
%% Output
<h2 id="IMAPClient" class="doc_header"><code>class</code> <code>IMAPClient</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L21" class="source_link" style="float:right">[source]</a></h2>
<h2 id="IMAPClient" class="doc_header"><code>class</code> <code>IMAPClient</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L28" class="source_link" style="float:right">[source]</a></h2>
> <code>IMAPClient</code>(**`username`**, **`app_pw`**, **`host`**=*`'imap.gmail.com'`*, **`port`**=*`993`*)
%% Cell type:code id: tags:
``` python
show_doc(IMAPClient.list_mailboxes)
```
%% Output
<h4 id="IMAPClient.list_mailboxes" class="doc_header"><code>IMAPClient.list_mailboxes</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L37" class="source_link" style="float:right">[source]</a></h4>
<h4 id="IMAPClient.list_mailboxes" class="doc_header"><code>IMAPClient.list_mailboxes</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L46" class="source_link" style="float:right">[source]</a></h4>
> <code>IMAPClient.list_mailboxes</code>()
Lists all available mailboxes
%% Cell type:code id: tags:
``` python
show_doc(IMAPClient.get_all_mail_ids)
```
%% Output
<h4 id="IMAPClient.get_all_mail_ids" class="doc_header"><code>IMAPClient.get_all_mail_ids</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L68" class="source_link" style="float:right">[source]</a></h4>
<h4 id="IMAPClient.get_all_mail_ids" class="doc_header"><code>IMAPClient.get_all_mail_ids</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L77" class="source_link" style="float:right">[source]</a></h4>
> <code>IMAPClient.get_all_mail_ids</code>()
retrieves all mail ids from the selected mailbox
%% Cell type:code id: tags:
``` python
show_doc(IMAPClient.get_mail)
```
%% Output
<h4 id="IMAPClient.get_mail" class="doc_header"><code>IMAPClient.get_mail</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L76" class="source_link" style="float:right">[source]</a></h4>
<h4 id="IMAPClient.get_mail" class="doc_header"><code>IMAPClient.get_mail</code><a href="https://gitlab.memri.io/memri/gmail_importer/tree/prod/gmail_importer/imap_client.py#L99" class="source_link" style="float:right">[source]</a></h4>
> <code>IMAPClient.get_mail</code>(**`id`**)
Fetches a mail given a id, returns (raw_mail, thread_id)
%% Cell type:code id: tags:
``` python
# export
# hide
def part_to_str(part):
bytes_ = part.get_payload(decode=True)
charset = part.get_content_charset('iso-8859-1')
chars = bytes_.decode(charset, 'replace')
return chars
def _get_all_parts(part):
payload = part.get_payload()
if isinstance(payload, list):
return [x for p in payload for x in _get_all_parts(p)]
else:
return [part]
def get_unique_accounts(all_mails):
all_accounts = {}
for email_item in all_mails:
for account in email_item.sender + email_item.receiver + email_item.replyTo:
for account in email_item.sender + email_item.receiver + email_item.replyTo + email_item.cc + email_item.bcc:
if not account.externalId in all_accounts:
all_accounts[account.externalId] = account
for email_item in all_mails:
for edge in email_item.get_all_edges():
if isinstance(edge.target, Account):
edge.target = all_accounts[edge.target.externalId]
return list(all_accounts.values())
def get_unique_message_channels(all_mails):
all_channels = {}
for email_item in all_mails:
for channel in email_item.messageChannel:
if not channel.externalId in all_channels:
all_channels[channel.externalId] = channel
for email_item in all_mails:
for edge in email_item.get_all_edges():
if isinstance(edge.target, MessageChannel):
edge.target = all_channels[edge.target.externalId]
return list(all_channels.values())
```
%% Cell type:markdown id: tags:
## EmailImporter
%% Cell type:code id: tags:
``` python
# export
GMAIL_SERVICE_NAME = "gmail"
class GmailImporter(PluginBase):
MAX_IMPORTS = 100
MAX_IMPORTS = 31996
# MAX_IMPORTS = 1000
SLEEP_INTERVAL = 1
MAX_LOGIN_ATTEMPTS = 10
"""Imports gmail emails over imap."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.imap_client = None
self.stop_early_at = self.MAX_IMPORTS
self.authenticator = None
self.apply_run_settings()
def apply_run_settings(self):
settings = self.pluginRun.settings
if settings:
settings = json.loads(settings)
else:
settings = dict()
self.stop_early_at = settings.get("max_imports", self.MAX_IMPORTS)
self.simulate_frontend = settings.get(bool("simulate_frontend"), False)
def login(self, username=None, password=None, imap_host=DEFAULT_GMAIL_HOST, port=DEFAULT_PORT):
assert imap_host is not None and port is not None
print(f'Using, HOST: {imap_host}, PORT: {port}')
self.imap_client = IMAPClient(username=username, app_pw=password, host=imap_host,port=993)
print("Imap client logged in.")
@staticmethod
def get_timestamp_from_message(message):
date = message["date"]
parsed_time = email.utils.parsedate(date)
dt = email.utils.parsedate_to_datetime(date)
timestamp = int(dt.timestamp() * 1000)
return timestamp
@staticmethod
def get_accounts(message, field):
addresses = getaddresses(message.get_all(field, []))
return [Account(externalId=address, identifier=name) for name, address in addresses]
accounts = []
for name, address in addresses:
name = name if name != "" else None
accounts.append(Account(externalId=address, identifier=address, displayName=name, service=GMAIL_SERVICE_NAME))
return accounts
@staticmethod
def get_content(message):
def get_content(message, verbose=False):
"""Extracts content from a python email message"""
maintype = message.get_content_maintype()
if maintype == 'multipart':
parts = _get_all_parts(message)
res = None
html_parts = [part_to_str(part) for part in parts if part.get_content_type() == "text/html"]
if len(html_parts) > 0:
if len(html_parts) > 1:
error_msg = "\n AND \n".join(html_parts)
print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}")
print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE")
if verbose:
print(error_msg)
return html_parts[0]
else:
return parts[0].get_payload()
elif maintype == 'text':
return message.get_payload()
@staticmethod
def get_attachments(message):
return list(message.iter_attachments())
def create_item_from_mail(self, mail, thread_id=None):
"""Creates a schema-item from an existing mail"""
message = email.message_from_bytes(mail, policy=policy.SMTP)
message_id, subject = message["message-id"], message["subject"]
timestamp = self.get_timestamp_from_message(message)
content = self.get_content(message)
attachments = self.get_attachments(message)
email_item = EmailMessage(
externalId=message_id,
subject=subject,
dateSent=timestamp,
content=content
)
for a in self.get_accounts(message, 'from'):
email_item.add_edge('sender', a)
for a in self.get_accounts(message, 'to'):
email_item.add_edge('receiver', a)
for a in self.get_accounts(message, 'reply-to'):
email_item.add_edge('replyTo', a)
for a in self.get_accounts(message, 'cc'):
email_item.add_edge('cc', a)
for a in self.get_accounts(message, 'bcc'):
email_item.add_edge('bcc', a)
if thread_id != None:
thread_item = MessageChannel(externalId=thread_id)
email_item.add_edge('messageChannel', thread_item)
return email_item
@staticmethod
def batch(iterable, n=1):
l = len(iterable)
for ndx in range(0, l, n):
yield iterable[ndx:min(ndx + n, l)]
def get_mails(self, mail_ids, batch_size=100):
"""Gets mails from a list of mail ids. You can pass an importer run and podclient
to update the progress of the process"""
mails = []
n_batches = math.ceil(len(mail_ids) / batch_size)
for i, batch_ids in enumerate(self.batch(mail_ids, n=batch_size)):
print(f"downloading and converting batch {i}/{n_batches}")
for mail, thread_id in self.imap_client.get_mails(batch_ids):
item = self.create_item_from_mail(mail, thread_id=thread_id)
self.client.create_if_external_id_not_exists(item)
# self.client.create_if_external_id_not_exists(item)
mails.append(item)
progress = (i + 1) / n_batches * 1.0
return mails
def run(self):
"""This is the main function of the Gmail importer. It runs the importer given information
provided in the plugin run."""
self.pluginRun.status = RUN_STARTED
self.client.update_item(self.pluginRun)
self.authenticator = PasswordAuthenticator(client=self.client, pluginRun=self.pluginRun)
print("To simulate front-end, run:")
print(f"password_simulator --run_id {self.pluginRun.id}")
self.authenticator.authenticate(self)
print("Importing emails...")
# TODO: This is not linked to the other accounts (from mails) yet
me_account = Account(isMe=True, identifier=self.imap_client.username, externalId=self.imap_client.username,
service=GMAIL_SERVICE_NAME)
self.client.create(me_account)
mail_ids = self.imap_client.get_all_mail_ids()
print(f"Found {len(mail_ids)} emails")
if self.stop_early_at:
mail_ids = mail_ids[:int(self.stop_early_at)]
mail_ids = mail_ids[-int(self.stop_early_at):]
print(f"Importing latest {len(mail_ids)} emails...")
all_mails = self.get_mails(mail_ids)
print("Importing accounts...")
all_accounts = get_unique_accounts(all_mails)
for item in all_accounts:
self.client.create_if_external_id_not_exists(item)
print("Importing channels...")
all_channels = get_unique_message_channels(all_mails)
for item in all_channels:
self.client.create_if_external_id_not_exists(item)
for email_item in all_mails:
for e in email_item.get_all_edges():
self.client.create_edge(e)
self.pluginRun.status = RUN_COMPLETED
self.client.update_item(self.pluginRun)
print(f"Finished running {self}")
items = all_mails + all_accounts + all_channels
# for x in items:
# if x.id is None: x.id = uuid.uuid4().hex
# temporary fix for missing edge id's (unknown why)
# all_ids = {x.id: True for x in items if x.id is not None}
edges = [e for email_item in all_mails for e in email_item.get_all_edges()]
# edges = [e for e in edges if e.source.id in all_ids and e.target.id in all_ids]
print(f"writing items ({len(items)}) and edges ({len(edges)}) to pod")
self.client.bulk_action(create_items=items, create_edges=edges)
# batch_size = 50
# n_batches = (len(items) // batch_size) + 1
# # mails can be big, therefore => batch
# for i in range(n_batches):
# s, e = i*batch_size, (i+1) * batch_size
# items_batch = items[s:e]
# print(f"Writing items {s}-{e}/{len(items)} to pod")
# self.client.bulk_action(create_items=items_batch)
# batch_size = 10000
# n_batches = (len(edges) // batch_size) + 1
# for i in range(n_batches):
# s, e = i*batch_size, (i+1) * batch_size
# edges_batch = edges[s:e]
# print(f"Writing edges {s}-{e}/{len(edges)} to pod")
# self.client.bulk_action(create_edges=edges_batch)
# self.client.bulk_action(create_edges=edges)
# for email_item in all_mails:
# for e in email_item.get_all_edges():
# self.client.create_edge(e)
def add_to_schema(self):
self.client.add_to_schema(EmailMessage(externalId="", subject="", dateSent=0, content=""))
self.client.add_to_schema(MessageChannel(externalId=""))
self.client.add_to_schema(Person(externalId="", firstName=""))
self.client.add_to_schema(Account(externalId=""))
```
%% Cell type:markdown id: tags:
## Methods
%% Cell type:code id: tags:
``` python
show_doc(GmailImporter.get_content)
```
%% Output
<h4 id="GmailImporter.get_content" class="doc_header"><code>GmailImporter.get_content</code><a href="__main__.py#L46" class="source_link" style="float:right">[source]</a></h4>
<h4 id="GmailImporter.get_content" class="doc_header"><code>GmailImporter.get_content</code><a href="__main__.py#L53" class="source_link" style="float:right">[source]</a></h4>
> <code>GmailImporter.get_content</code>(**`message`**)
> <code>GmailImporter.get_content</code>(**`message`**, **`verbose`**=*`False`*)
Extracts content from a python email message
%% Cell type:code id: tags:
``` python
show_doc(GmailImporter.create_item_from_mail)
```
%% Output
<h4 id="GmailImporter.create_item_from_mail" class="doc_header"><code>GmailImporter.create_item_from_mail</code><a href="__main__.py#L70" class="source_link" style="float:right">[source]</a></h4>
<h4 id="GmailImporter.create_item_from_mail" class="doc_header"><code>GmailImporter.create_item_from_mail</code><a href="__main__.py#L79" class="source_link" style="float:right">[source]</a></h4>
> <code>GmailImporter.create_item_from_mail</code>(**`mail`**, **`thread_id`**=*`None`*)
Creates a schema-item from an existing mail
%% Cell type:code id: tags:
``` python
show_doc(GmailImporter.run)
```
%% Output
<h4 id="GmailImporter.run" class="doc_header"><code>GmailImporter.run</code><a href="__main__.py#L121" class="source_link" style="float:right">[source]</a></h4>
<h4 id="GmailImporter.run" class="doc_header"><code>GmailImporter.run</code><a href="__main__.py#L135" class="source_link" style="float:right">[source]</a></h4>
> <code>GmailImporter.run</code>()
This is the main function of the Gmail importer. It runs the importer given information
provided in the plugin run.
%% Cell type:markdown id: tags:
## Usage
%% Cell type:markdown id: tags:
### Run from CLI
%% Cell type:code id: tags:
``` python
pod_client = PodClient.from_local_keys()
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
%% Cell type:code id: tags:
``` python
# For local testing
def create_pluginrun(pod_client):
account = Account(service="gmail_imap")
run = PluginRun("", "gmail_importer.plugin", "GmailImporter")
run.add_edge("account", account)
run.status = RUN_STARTED
pod_client.create(run)
pod_client.create(account)
pod_client.create_edge(run.get_edges("account")[0])
print(f"Created PluginRun {run.id}")
```
%% Cell type:code id: tags:
``` python
# Test with CLI
# To simulate front-end, run: $ password_simulator --run_id <run_id>
# Run id is printed in the plugin output below.
!run_plugin --config_file="../example_config.json"
run_plugin(config_file="../example_config.json")
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
pod_full_address=http://localhost:3030
owner_key=6204292793089736827662050896316540014048568664582659877327581834
writing run info to /Users/koen/.pymemri/plugins/gmail_importer/current_run.json
To simulate front-end, run:
simulate_enter_credentials --plugin=gmail_importer
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
Using, HOST: imap.gmail.com, PORT: 993
Imap client logged in.
Found 32071 emails
Importing latest 1000 emails...
downloading and converting batch 0/10
downloading and converting batch 1/10
downloading and converting batch 2/10
downloading and converting batch 3/10
downloading and converting batch 4/10
downloading and converting batch 5/10
downloading and converting batch 6/10
downloading and converting batch 7/10
WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE
downloading and converting batch 8/10
WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE
downloading and converting batch 9/10
Importing accounts...
Importing channels...
writing items (2419) and edges (4548) to pod
BULK: Writing 312/6967 items/edges
BULK: Writing 565/6967 items/edges
BULK: Writing 779/6967 items/edges
BULK: Writing 6054/6967 items/edges
BULK: Writing 6967/6967 items/edges
Completed Bulk action, written 6967 items/edges
%% Cell type:code id: tags:
``` python
mails = pod_client.search({"type": "EmailMessage"})
print(len(mails))
```
%% Output
1000
%% Cell type:markdown id: tags:
### Run from Docker
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted authenticator.ipynb.
Converted gmailimporter.ipynb.
Converted imapclient.ipynb.
Converted index.ipynb.
Converted password_simulator.ipynb.
......
%% Cell type:code id:feb72a01 tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp imap_client
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:code id:05a13ca6 tags:
``` python
# export
import re
import imaplib, email, math
from email import policy
from email.utils import getaddresses
from pymemri.pod.client import PodClient
from nbdev.showdoc import show_doc
from pymemri.data.basic import *
from pymemri.data.schema import EmailMessage, MessageChannel, Person, ImporterRun
from pymemri.importers.importer import ImporterBase
from pymemri.data.schema import EmailMessage, MessageChannel, Person
from pymemri.data.itembase import Item
from pymemri.plugin.pluginbase import PluginRun
```
%% Cell type:code id:18305e71 tags:
``` python
# export
def get_username_password():
credentials_path = HOME_DIR / ".pymemri" / "plugins" / "gmail" / "credentials.json"
credentials = read_json(credentials_path)
username, password = credentials["username"], credentials["password"]
return username,password
```
%% Cell type:code id:5ecaa367 tags:
``` python
# export
DEFAULT_GMAIL_HOST = 'imap.gmail.com'
DEFAULT_PORT = 993
class IMAPClient():
def __init__(self, username, app_pw, host=DEFAULT_GMAIL_HOST, port=DEFAULT_PORT):
assert username is not None and app_pw is not None
self.client = imaplib.IMAP4_SSL(host, port=port)
self.client.login(username, app_pw)
self.username=username
self.password=app_pw
all_mailboxes = self.get_mailboxes_with_attribute(attribute="\\All")
if not len(all_mailboxes):
raise ValueError("No inboxes found.")
elif len(all_mailboxes) > 1:
print(f"Multiple inboxes found, selecting {all_mailboxes[0]}")
self.inbox_name = all_mailboxes[0]
self.client.select(self.inbox_name) # connect to inbox.
def list_mailboxes(self):
"""Lists all available mailboxes"""
mailboxes = self.client.list()[1]
mailboxes = [mb.decode("utf-8") for mb in mailboxes]
return mailboxes
@staticmethod
def parse_mailbox_string(mailbox_str):
"""
Return mailbox string as a tuple of name, attributes
TODO This might be specific to gmail.
"""
name = re.findall('"(.*?)"', mailbox_str)[-1]
name = '"' + name + '"'
attributes = re.search("\(([^)]+)\)", mailbox_str)[0][1:-1]
attributes = attributes.split()
return name, attributes
def get_mailboxes_with_attribute(self, attribute):
return_list = []
for mailbox in self.list_mailboxes():
try:
name, attributes = self.parse_mailbox_string(mailbox)
if attribute in attributes:
return_list.append(name)
except Exception:
print(f"{mailbox} is not a valid mailbox.")
return return_list
def get_all_mail_ids(self):
"""retrieves all mail ids from the selected mailbox"""
result, data = self.client.uid('search', None, "ALL") # search and return ids instead
return data[0].split()
return list(reversed(data[0].split()))
def get_mails(self, ids):
return [self.get_mail(id) for id in ids]
def get_mails(self, ids, batched=True):
if batched:
return self._get_mails_batches(ids)
else:
return [self.get_mail(id) for id in ids]
def _get_mails_batches(self, ids):
assert self.client.host == DEFAULT_GMAIL_HOST
status, data = self.client.uid('fetch', b','.join(ids), '(RFC822 X-GM-THRID)')
result = []
for i in range(len(ids)):
mail_data = data[i*2]
thread_id = mail_data[0].decode("utf-8").split(" ")[2]
raw_email = mail_data[1]
result.append((raw_email, thread_id))
return result
def get_mail(self, id):
"""Fetches a mail given a id, returns (raw_mail, thread_id)"""
if self.client.host == DEFAULT_GMAIL_HOST:
# Use Google's threading method, in which every thread has an ID
result, data = self.client.uid('fetch', id, '(RFC822 X-GM-THRID)')
# Some IMAP services return a variable number of args in data, email data is in first.
data = data[0]
thread_id = data[0].decode("utf-8").split(" ")[2]
raw_email = data[1]
return (raw_email, thread_id)
else:
# Threading not yet implemented for IMAP threading
result, data = self.client.uid('fetch', id, '(RFC822)')
# Some IMAP services return a variable number of args in data. Email is in first.
data = data[0]
raw_email = data[1]
return (raw_email, None)
```
%% Cell type:markdown id:d3e747eb tags:
## Download mail -
%% Cell type:code id:6fa2476a tags:
``` python
# skip
import gmail_importer # dont move
username, password = get_username_password()
imap_client = IMAPClient(username, password)
```
%% Cell type:code id:a23cace4 tags:
``` python
# skip
ids = imap_client.get_all_mail_ids()[:5000]
```
%% Cell type:code id:9099b7c4 tags:
``` python
len(ids)
```
%% Output
5000
%% Cell type:code id:237458bf tags:
``` python
# skip
res = imap_client.get_mails(ids)
```
%% Cell type:code id:03869932 tags:
``` python
# for m in res:
# message = email.message_from_bytes(m[0], policy=policy.SMTP)
# if message["bcc"] is not None and not ("github" in message["bcc"] or "@amazon" in message["bcc"]):
# print("CC", message["bcc"], "FROM", message["from"])
```
%% Cell type:markdown id:a7e91cc8 tags:
## Run -
%% Cell type:code id:465242ba tags:
``` python
# ci
from pymemri.test_utils import get_ci_variables
email, password = get_ci_variables("email", "password")
client = IMAPClient(email, password)
```
%% Output
---------------------------------------------------------------------------
error Traceback (most recent call last)
<ipython-input-86-c4709fbeb601> in <module>
3
4 email, password = get_ci_variables("email", "password")
----> 5 client = IMAPClient(email, password)
<ipython-input-74-2e16b1c4be82> in __init__(self, username, app_pw, host, port)
8 assert username is not None and app_pw is not None
9 self.client = imaplib.IMAP4_SSL(host, port=port)
---> 10 self.client.login(username, app_pw)
11
12 all_mailboxes = self.get_mailboxes_with_attribute(attribute="\\All")
~/miniconda3/envs/pymemri/lib/python3.9/imaplib.py in login(self, user, password)
610 typ, dat = self._simple_command('LOGIN', user, self._quote(password))
611 if typ != 'OK':
--> 612 raise self.error(dat[-1])
613 self.state = 'AUTH'
614 return typ, dat
error: b'[AUTHENTICATIONFAILED] Invalid credentials (Failure)'
%% Cell type:code id:849d9155 tags:
``` python
# ci
max_emails = 10
ids = client.get_all_mail_ids()
mails = client.get_mails(ids[:max_emails])
assert len(ids)
assert len(mails)
```
%% Cell type:code id:b5701f0d tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted authenticator.ipynb.
Converted gmailimporter.ipynb.
Converted imapclient.ipynb.
Converted index.ipynb.
Converted password_simulator.ipynb.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment