Skip to content
GitLab
Explore
Projects
Groups
Snippets
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
Dima Gerasimov
pyIntegrators
Commits
f0efbbc8
Commit
f0efbbc8
authored
4 years ago
by
Youp
Browse files
Options
Download
Email Patches
Plain Diff
- Refactored gmail-importer, code is now clearer/easier to test
- Started implementing files/attachment downloading
parent
bc30b264
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
integrators/_nbdev.py
+4
-3
integrators/_nbdev.py
integrators/importers/gmail.py
+107
-64
integrators/importers/gmail.py
nbs/gmail.ipynb
+81
-84
nbs/gmail.ipynb
with
192 additions
and
151 deletions
+192
-151
integrators/_nbdev.py
+
4
-
3
View file @
f0efbbc8
...
...
@@ -9,11 +9,12 @@ index = {"read_file": "basic.ipynb",
"PYI_HOME"
:
"basic.ipynb"
,
"PYI_TESTDATA"
:
"basic.ipynb"
,
"IMAPClient"
:
"gmail.ipynb"
,
"part_to_str"
:
"gmail.ipynb"
,
"get_html"
:
"gmail.ipynb"
,
"decode_header"
:
"gmail.ipynb"
,
"get_message_content"
:
"gmail.ipynb"
,
"get_addresses_from_message"
:
"gmail.ipynb"
,
"get_timestamp_from_message"
:
"gmail.ipynb"
,
"create_item_from_mail"
:
"gmail.ipynb"
,
"download_mails"
:
"gmail.ipynb"
,
"merge_duplicate_items"
:
"gmail.ipynb"
,
"GmailImporter"
:
"gmail.ipynb"
,
"GeoIndexer"
:
"indexers.GeoIndexer.ipynb"
,
"LOCATION_EDGE"
:
"indexers.GeoIndexer.ipynb"
,
...
...
This diff is collapsed.
Click to expand it.
integrators/importers/gmail.py
+
107
-
64
View file @
f0efbbc8
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/gmail.ipynb (unless otherwise specified).
__all__
=
[
'IMAPClient'
,
'
part_to_str'
,
'get_html'
,
'decode_header
'
,
'get_timestamp_from_message'
,
'create_item_from_mail'
,
'GmailImporter'
]
__all__
=
[
'IMAPClient'
,
'
get_message_content'
,
'get_addresses_from_message
'
,
'get_timestamp_from_message'
,
'create_item_from_mail'
,
'download_mails'
,
'merge_duplicate_items'
,
'GmailImporter'
]
# Cell
import
imaplib
,
email
from
..data.schema
import
Account
,
EmailMessage
,
MessageChannel
from
..pod.client
import
PodClient
from
email
import
policy
# Cell
...
...
@@ -16,7 +17,6 @@ class IMAPClient():
def
__init__
(
self
,
username
,
app_pw
,
host
=
'imap.gmail.com'
,
inbox
=
'"[Gmail]/All Mail"'
):
self
.
client
=
imaplib
.
IMAP4_SSL
(
host
)
self
.
client
.
login
(
username
,
app_pw
)
# TODO: let user select mailbox
self
.
client
.
select
(
inbox
)
# connect to inbox.
def
list_mailboxes
(
self
):
...
...
@@ -43,45 +43,59 @@ class IMAPClient():
result
,
data
=
self
.
client
.
uid
(
'fetch'
,
uid
,
'(X-GM-THRID X-GM-MSGID)'
)
return
data
[
0
].
decode
(
"utf-8"
).
split
(
" "
)[
2
]
# @staticmethod
def
part_to_str
(
part
):
bytes_
=
part
.
get_payload
(
decode
=
True
)
charset
=
part
.
get_content_charset
(
'iso-8859-1'
)
chars
=
bytes_
.
decode
(
charset
,
'replace'
)
return
chars
# @staticmethod
def
get_html
(
email_message_instance
):
maintype
=
email_message_instance
.
get_content_maintype
()
if
maintype
==
'multipart'
:
parts
=
_get_all_parts
(
email_message_instance
)
res
=
None
html_parts
=
[
part_to_str
(
part
)
for
part
in
parts
if
part
.
get_content_type
()
==
"text/html"
]
if
len
(
html_parts
)
>
0
:
if
len
(
html_parts
)
>
1
:
error_msg
=
"
\n
AND
\n
"
.
join
(
html_parts
)
print
(
f
"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE
{
error_msg
}
"
)
return
html_parts
[
0
]
else
:
return
parts
[
0
].
get_payload
()
elif
maintype
==
'text'
:
return
email_message_instance
.
get_payload
()
# @staticmethod
def
_get_all_parts
(
part
):
payload
=
part
.
get_payload
()
if
isinstance
(
payload
,
list
):
return
[
x
for
p
in
payload
for
x
in
_get_all_parts
(
p
)]
# # @staticmethod
# def part_to_str(part):
# bytes_ = part.get_payload(decode=True)
# charset = part.get_content_charset('iso-8859-1')
# chars = bytes_.decode(charset, 'replace')
# return chars
# # @staticmethod
# def get_html(email_message_instance):
# maintype = email_message_instance.get_content_maintype()
# if maintype == 'multipart':
# parts = _get_all_parts(email_message_instance)
# res = None
# html_parts = [part_to_str(part) for part in parts if part.get_content_type() == "text/html"]
# if len(html_parts) > 0:
# if len(html_parts) > 1:
# error_msg = "\n AND \n".join(html_parts)
# print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}")
# return html_parts[0]
# else:
# return parts[0].get_payload()
# elif maintype == 'text':
# return email_message_instance.get_payload()
# # @staticmethod
# def _get_all_parts(part):
# payload = part.get_payload()
# if isinstance(payload, list):
# return [x for p in payload for x in _get_all_parts(p)]
# else:
# return [part]
def
get_message_content
(
message
):
# content = get_html(message)
# # TODO: proper escaping here
# content = content.replace("=3D", "=")
# USED FOR DOWNLOADING ATTACHMENTS
# for i, x in enumerate(message.iter_attachments()):
# f = open(f"tmp/gmail/{i}.png", 'wb')
# f.write(x.get_content())
# f.close()
content
=
message
.
get_body
().
get_content
()
attachments
=
[]
return
(
content
,
attachments
)
def
get_addresses_from_message
(
message
,
field
):
if
message
[
field
]
is
None
:
return
[]
else
:
return
[
part
]
# @staticmethod
def
decode_header
(
header
):
return
str
(
email
.
header
.
make_header
(
email
.
header
.
decode_header
(
header
)))
# Cell
return
email
.
utils
.
getaddresses
([
message
[
field
]])
def
get_timestamp_from_message
(
message
):
date
=
message
[
"date"
]
...
...
@@ -91,40 +105,38 @@ def get_timestamp_from_message(message):
return
timestamp
def
create_item_from_mail
(
mail_utf8
,
thread_id
=
None
):
message
=
email
.
message_from_string
(
mail_utf8
)
# Cell
def
create_item_from_mail
(
mail
,
thread_id
=
None
):
# message = email.message_from_string(mail_utf8)
message
=
email
.
message_from_bytes
(
mail
,
policy
=
policy
.
SMTP
)
message_id
=
message
[
"message-id"
]
.
replace
(
"/"
,
""
)
message_id
=
message
[
"message-id"
]
subject
=
message
[
"subject"
]
timestamp
=
get_timestamp_from_message
(
message
)
from_name
,
from_mail
=
email
.
utils
.
parseaddr
(
message
[
"from"
])
from_name
=
decode_header
(
from_name
)
to_name
,
to_mail
=
email
.
utils
.
parseaddr
(
message
[
"to"
])
to_name
=
decode_header
(
to_name
)
from_tuples
=
parse_addresses
(
message
,
'from'
)
to_tuples
=
parse_addresses
(
message
,
'to'
)
reply_to_tuples
=
parse_addresses
(
message
,
'reply-to'
)
reply_to_name
,
reply_to_mail
=
email
.
utils
.
parseaddr
(
message
[
"reply-to"
])
reply_to_name
=
decode_header
(
reply_to_name
)
content
=
get_html
(
message
)
content
=
content
.
replace
(
"=3D"
,
"="
)
importJson
=
mail_utf8
(
content
,
attachments
)
=
get_message_content
(
message
)
# importJson = mail_utf8
# TODO: is dateSent the right way to go? Might differ for whether you're sender or receiver
# TODO: importJson
# TODO: MAIL namespace
email_item
=
EmailMessage
(
externalId
=
message_id
,
subject
=
subject
,
dateSent
=
timestamp
,
content
=
content
)
# Create Edges to accounts
for
address
in
[
from_
mail
]
if
from_mail
!=
""
else
[]
:
for
name
,
address
in
from_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'sender'
,
address_item
)
for
address
in
[
to_
mail
]
if
to_mail
!=
""
else
[]
:
for
name
,
address
in
to_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'receiver'
,
address_item
)
for
address
in
[
reply_to_
mail
]
if
reply_to_mail
!=
""
else
[]
:
for
name
,
address
in
reply_to_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'replyTo'
,
address_item
)
...
...
@@ -133,9 +145,40 @@ def create_item_from_mail(mail_utf8, thread_id=None):
message_channel
=
MessageChannel
(
externalId
=
thread_id
)
email_item
.
add_edge
(
'messageChannel'
,
message_channel
)
print
(
f
'
{
email_item
}
'
)
return
email_item
def
download_mails
(
imap_client
,
gmail_ids
,
stop_at
):
all_mails
=
[]
# Download files
for
i
,
gmail_id
in
enumerate
(
gmail_ids
):
if
stop_at
is
not
None
and
i
>=
stop_at
:
print
(
f
"stopped early at
{
stop_at
}
"
)
break
mail
=
imap_client
.
get_mail
(
gmail_id
)
thread_id
=
imap_client
.
get_x_gm_thrid
(
gmail_id
)
item
=
create_item_from_mail
(
mail
,
thread_id
=
thread_id
)
all_mails
.
append
(
item
)
return
all_mails
# TODO: should probably become a general utility function
def
merge_duplicate_items
(
all_mails
):
all_accounts
=
{}
for
email_item
in
all_mails
:
for
edge
in
email_item
.
get_all_edges
():
account
=
edge
.
traverse
(
email_item
)
if
not
account
.
externalId
in
all_accounts
:
all_accounts
[
account
.
externalId
]
=
account
for
email_item
in
all_mails
:
for
edge
in
email_item
.
get_all_edges
():
edge
.
target
=
all_accounts
[
edge
.
target
.
externalId
]
return
all_accounts
# Cell
from
..data.schema
import
*
...
...
@@ -150,11 +193,11 @@ class GmailImporter(IndexerBase):
# TODO: Get imap_host from importer_run
imap_host
=
'imap.gmail.com'
imap_client
=
IMAPClient
(
username
=
importer_run
.
username
,
app_pw
=
importer_run
.
password
,
host
=
imap_host
)
imap_client
=
IMAPClient
(
username
=
importer_run
.
username
,
app_pw
=
importer_run
.
password
,
host
=
imap_host
)
gmail_ids
=
imap_client
.
get_all_mail_uids
()
all_mails
=
download_mails
(
imap_client
,
gmail_ids
,
10
)
print
(
all_mails
)
all_mails
=
download_mails
(
imap_client
,
gmail_ids
,
None
)
# Merge email accounts/messageChannels here
# TODO: create better way to do this
...
...
@@ -168,6 +211,6 @@ class GmailImporter(IndexerBase):
for
(
external_id
,
item
)
in
all_accounts
.
items
():
pod_client
.
create
(
item
)
# Create all edges from emails to accounts
# Create all edges from emails to accounts
/messageThreads
for
email_item
in
all_mails
:
pod_client
.
create_edges
(
email_item
.
get_all_edges
())
\ No newline at end of file
This diff is collapsed.
Click to expand it.
nbs/gmail.ipynb
+
81
-
84
View file @
f0efbbc8
...
...
@@ -21,7 +21,8 @@
"\n",
"import imaplib, email\n",
"from integrators.data.schema import Account, EmailMessage, MessageChannel\n",
"from integrators.pod.client import PodClient"
"from integrators.pod.client import PodClient\n",
"from email import policy"
]
},
{
...
...
@@ -37,7 +38,6 @@
" def __init__(self, username, app_pw, host='imap.gmail.com', inbox='\"[Gmail]/All Mail\"'):\n",
" self.client = imaplib.IMAP4_SSL(host)\n",
" self.client.login(username, app_pw)\n",
" # TODO: let user select mailbox\n",
" self.client.select(inbox) # connect to inbox.\n",
" \n",
" def list_mailboxes(self):\n",
...
...
@@ -64,43 +64,67 @@
" result, data = self.client.uid('fetch', uid, '(X-GM-THRID X-GM-MSGID)')\n",
" return data[0].decode(\"utf-8\").split(\" \")[2]\n",
" \n",
"# @staticmethod\n",
"def part_to_str(part):\n",
" bytes_ = part.get_payload(decode=True)\n",
" charset = part.get_content_charset('iso-8859-1')\n",
" chars = bytes_.decode(charset, 'replace')\n",
" return chars\n",
"#
#
@staticmethod\n",
"
#
def part_to_str(part):\n",
"
#
bytes_ = part.get_payload(decode=True)\n",
"
#
charset = part.get_content_charset('iso-8859-1')\n",
"
#
chars = bytes_.decode(charset, 'replace')\n",
"
#
return chars\n",
"\n",
"# @staticmethod\n",
"def get_html(email_message_instance):\n",
" maintype = email_message_instance.get_content_maintype()\n",
" if maintype == 'multipart':\n",
"#
#
@staticmethod\n",
"
#
def get_html(email_message_instance):\n",
"
#
maintype = email_message_instance.get_content_maintype()\n",
"
#
if maintype == 'multipart':\n",
"\n",
" parts = _get_all_parts(email_message_instance) \n",
" res = None\n",
" html_parts = [part_to_str(part) for part in parts if part.get_content_type() == \"text/html\"]\n",
" if len(html_parts) > 0:\n",
" if len(html_parts) > 1:\n",
" error_msg = \"\\n AND \\n\".join(html_parts)\n",
" print(f\"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}\")\n",
" return html_parts[0]\n",
" else: \n",
" return parts[0].get_payload()\n",
"
#
parts = _get_all_parts(email_message_instance) \n",
"
#
res = None\n",
"
#
html_parts = [part_to_str(part) for part in parts if part.get_content_type() == \"text/html\"]\n",
"
#
if len(html_parts) > 0:\n",
"
#
if len(html_parts) > 1:\n",
"
#
error_msg = \"\\n AND \\n\".join(html_parts)\n",
"
#
print(f\"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}\")\n",
"
#
return html_parts[0]\n",
"
#
else: \n",
"
#
return parts[0].get_payload()\n",
"\n",
" elif maintype == 'text':\n",
" return email_message_instance.get_payload()\n",
"
#
elif maintype == 'text':\n",
"
#
return email_message_instance.get_payload()\n",
"\n",
"# @staticmethod\n",
"def _get_all_parts(part):\n",
" payload = part.get_payload()\n",
" if isinstance(payload, list):\n",
" return [x for p in payload for x in _get_all_parts(p)]\n",
" else:\n",
" return [part] \n",
"# # @staticmethod\n",
"# def _get_all_parts(part):\n",
"# payload = part.get_payload()\n",
"# if isinstance(payload, list):\n",
"# return [x for p in payload for x in _get_all_parts(p)]\n",
"# else:\n",
"# return [part] \n",
"\n",
"def get_message_content(message):\n",
"# content = get_html(message)\n",
"# # TODO: proper escaping here\n",
"# content = content.replace(\"=3D\", \"=\")\n",
"\n",
"# @staticmethod\n",
"def decode_header(header):\n",
" return str(email.header.make_header(email.header.decode_header(header)))"
"# USED FOR DOWNLOADING ATTACHMENTS\n",
"# for i, x in enumerate(message.iter_attachments()):\n",
"# f = open(f\"tmp/gmail/{i}.png\", 'wb')\n",
"# f.write(x.get_content())\n",
"# f.close()\n",
" content = message.get_body().get_content()\n",
" attachments = []\n",
" return (content, attachments)\n",
"\n",
"def get_addresses_from_message(message, field):\n",
" if message[field] is None:\n",
" return []\n",
" else:\n",
" return email.utils.getaddresses([message[field]])\n",
" \n",
"def get_timestamp_from_message(message):\n",
" date = message[\"date\"]\n",
" parsed_time = email.utils.parsedate(date)\n",
" dt = email.utils.parsedate_to_datetime(date)\n",
" timestamp = int(dt.timestamp() * 1000)\n",
" \n",
" return timestamp"
]
},
{
...
...
@@ -111,48 +135,36 @@
"source": [
"# export\n",
"\n",
"def get_timestamp_from_message(message):\n",
" date = message[\"date\"]\n",
" parsed_time = email.utils.parsedate(date)\n",
" dt = email.utils.parsedate_to_datetime(date)\n",
" timestamp = int(dt.timestamp() * 1000)\n",
" \n",
" return timestamp\n",
"\n",
"def create_item_from_mail(mail_utf8, thread_id=None):\n",
" message = email.message_from_string(mail_utf8)\n",
"def create_item_from_mail(mail, thread_id=None):\n",
" # message = email.message_from_string(mail_utf8)\n",
" message = email.message_from_bytes(mail, policy=policy.SMTP)\n",
" \n",
" message_id = message[\"message-id\"]
.replace(\"/\", \"\")
\n",
" message_id = message[\"message-id\"]\n",
" subject = message[\"subject\"]\n",
" timestamp = get_timestamp_from_message(message)\n",
" \n",
" from_name, from_mail = email.utils.parseaddr(message[\"from\"])\n",
" from_name = decode_header(from_name)\n",
"\n",
" to_name, to_mail = email.utils.parseaddr(message[\"to\"])\n",
" to_name = decode_header(to_name)\n",
" from_tuples = parse_addresses(message,'from')\n",
" to_tuples = parse_addresses(message,'to')\n",
" reply_to_tuples = parse_addresses(message,'reply-to')\n",
"\n",
" reply_to_name, reply_to_mail = email.utils.parseaddr(message[\"reply-to\"])\n",
" reply_to_name = decode_header(reply_to_name)\n",
"\n",
" content = get_html(message)\n",
" content = content.replace(\"=3D\", \"=\")\n",
" importJson = mail_utf8\n",
" (content, attachments) = get_message_content(message)\n",
" # importJson = mail_utf8\n",
" \n",
" # TODO: is dateSent the right way to go? Might differ for whether you're sender or receiver\n",
" # TODO: importJson\n",
" # TODO: MAIL namespace\n",
" email_item = EmailMessage(externalId=message_id, subject=subject, dateSent=timestamp, content=content)\n",
" \n",
" # Create Edges to accounts\n",
" for address in
[
from_
mail] if from_mail != \"\" else []
:\n",
" for
name,
address in from_
tuples
:\n",
" address_item = Account(externalId=address)\n",
" email_item.add_edge('sender', address_item)\n",
" \n",
" for address in
[
to_
mail] if to_mail != \"\" else []
:\n",
" for
name,
address in to_
tuples
:\n",
" address_item = Account(externalId=address)\n",
" email_item.add_edge('receiver', address_item)\n",
" \n",
" for address in
[
reply_to_
mail] if reply_to_mail != \"\" else []
:\n",
" for
name,
address in reply_to_
tuples
:\n",
" address_item = Account(externalId=address)\n",
" email_item.add_edge('replyTo', address_item)\n",
" \n",
...
...
@@ -161,16 +173,8 @@
" message_channel = MessageChannel(externalId=thread_id)\n",
" email_item.add_edge('messageChannel', message_channel)\n",
" \n",
" print(f'{email_item}')\n",
" return email_item"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
" return email_item\n",
"\n",
"def download_mails(imap_client, gmail_ids, stop_at):\n",
" all_mails = []\n",
" \n",
...
...
@@ -180,16 +184,10 @@
" print(f\"stopped early at {stop_at}\")\n",
" break\n",
"\n",
" #gmail_id = gmail_id.decode(\"utf-8\")\n",
" mail = imap_client.get_mail(gmail_id)\n",
" try:\n",
" mail_utf8 = mail.decode(\"utf-8\") \n",
" except (UnicodeDecodeError, AttributeError):\n",
" print(f\"Skipping email {gmail_id}, not utf-8 encoded\")\n",
" return\n",
"\n",
" thread_id = imap_client.get_x_gm_thrid(gmail_id)\n",
" item = create_item_from_mail(mail
_utf8
, thread_id=thread_id)\n",
" item = create_item_from_mail(mail, thread_id=thread_id)\n",
" all_mails.append(item)\n",
" \n",
" return all_mails\n",
...
...
@@ -230,11 +228,11 @@
" # TODO: Get imap_host from importer_run\n",
" imap_host = 'imap.gmail.com'\n",
" \n",
" imap_client = IMAPClient(username=importer_run.username, app_pw=importer_run.password, host=imap_host)\n",
" imap_client = IMAPClient(username=importer_run.username, \n",
" app_pw=importer_run.password, \n",
" host=imap_host)\n",
" gmail_ids = imap_client.get_all_mail_uids()\n",
" all_mails = download_mails(imap_client, gmail_ids, 10)\n",
"\n",
" print(all_mails)\n",
" all_mails = download_mails(imap_client, gmail_ids, None)\n",
"\n",
" # Merge email accounts/messageChannels here\n",
" # TODO: create better way to do this\n",
...
...
@@ -248,7 +246,7 @@
" for (external_id, item) in all_accounts.items():\n",
" pod_client.create(item)\n",
"\n",
" # Create all edges from emails to accounts\n",
" # Create all edges from emails to accounts
/messageThreads
\n",
" for email_item in all_mails:\n",
" pod_client.create_edges(email_item.get_all_edges())"
]
...
...
@@ -271,7 +269,6 @@
"pod_client.delete_all()\n",
"\n",
"importer_run = ImporterRun.from_data(progress=0, username=imap_user, password=imap_pw)\n",
"print(importer_run.__dict__)\n",
"\n",
"importer = GmailImporter.from_data()\n",
"importer.run(importer_run=importer_run, pod_client=pod_client)"
...
...
@@ -283,7 +280,7 @@
"metadata": {},
"outputs": [],
"source": [
"test = \"\"\"\\\n",
"test =
b
\"\"\"\\\n",
"Message-id: 1234\\r\n",
"From: user1 <a@gmail.com>\\r\n",
"To: user1 <b@gmail.com>\\r\n",
...
...
@@ -293,7 +290,7 @@
"\n",
"This is content\"\"\"\n",
"\n",
"mail_message = email.message_from_string(test)\n",
"
#
mail_message = email.message_from_string(test)\n",
"mail_item = create_item_from_mail(test, 'message_channel_id')\n",
"\n",
"assert mail_item.externalId == '1234'\n",
...
...
%% Cell type:code id: tags:
```
python
%
load_ext
autoreload
%
autoreload
2
# default_exp importers.gmail
```
%% Cell type:code id: tags:
```
python
# export
import
imaplib
,
email
from
integrators.data.schema
import
Account
,
EmailMessage
,
MessageChannel
from
integrators.pod.client
import
PodClient
from
email
import
policy
```
%% Cell type:code id: tags:
```
python
# export
class
IMAPClient
():
def
__init__
(
self
,
username
,
app_pw
,
host
=
'imap.gmail.com'
,
inbox
=
'"[Gmail]/All Mail"'
):
self
.
client
=
imaplib
.
IMAP4_SSL
(
host
)
self
.
client
.
login
(
username
,
app_pw
)
# TODO: let user select mailbox
self
.
client
.
select
(
inbox
)
# connect to inbox.
def
list_mailboxes
(
self
):
return
self
.
client
.
list
()
def
get_all_mail_uids
(
self
):
result
,
data
=
self
.
client
.
uid
(
'search'
,
None
,
"ALL"
)
# search and return uids instead
return
data
[
0
].
split
()
def
get_mail
(
self
,
uid
):
result
,
data
=
self
.
client
.
uid
(
'fetch'
,
uid
,
'(RFC822)'
)
raw_email
=
data
[
0
][
1
]
return
raw_email
def
get_all_mails
(
self
,
uids
):
res
=
[]
for
uid
in
tqdm
(
uids
):
result
,
data
=
self
.
client
.
uid
(
'fetch'
,
uid
,
'(RFC822)'
)
raw_email
=
data
[
0
][
1
]
res
.
append
(
raw_email
)
return
res
def
get_x_gm_thrid
(
self
,
uid
):
result
,
data
=
self
.
client
.
uid
(
'fetch'
,
uid
,
'(X-GM-THRID X-GM-MSGID)'
)
return
data
[
0
].
decode
(
"utf-8"
).
split
(
" "
)[
2
]
# @staticmethod
def
part_to_str
(
part
):
bytes_
=
part
.
get_payload
(
decode
=
True
)
charset
=
part
.
get_content_charset
(
'iso-8859-1'
)
chars
=
bytes_
.
decode
(
charset
,
'replace'
)
return
chars
# @staticmethod
def
get_html
(
email_message_instance
):
maintype
=
email_message_instance
.
get_content_maintype
()
if
maintype
==
'multipart'
:
parts
=
_get_all_parts
(
email_message_instance
)
res
=
None
html_parts
=
[
part_to_str
(
part
)
for
part
in
parts
if
part
.
get_content_type
()
==
"text/html"
]
if
len
(
html_parts
)
>
0
:
if
len
(
html_parts
)
>
1
:
error_msg
=
"
\n
AND
\n
"
.
join
(
html_parts
)
print
(
f
"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE
{
error_msg
}
"
)
return
html_parts
[
0
]
else
:
return
parts
[
0
].
get_payload
()
elif
maintype
==
'text'
:
return
email_message_instance
.
get_payload
()
# @staticmethod
def
_get_all_parts
(
part
):
payload
=
part
.
get_payload
()
if
isinstance
(
payload
,
list
):
return
[
x
for
p
in
payload
for
x
in
_get_all_parts
(
p
)]
# # @staticmethod
# def part_to_str(part):
# bytes_ = part.get_payload(decode=True)
# charset = part.get_content_charset('iso-8859-1')
# chars = bytes_.decode(charset, 'replace')
# return chars
# # @staticmethod
# def get_html(email_message_instance):
# maintype = email_message_instance.get_content_maintype()
# if maintype == 'multipart':
# parts = _get_all_parts(email_message_instance)
# res = None
# html_parts = [part_to_str(part) for part in parts if part.get_content_type() == "text/html"]
# if len(html_parts) > 0:
# if len(html_parts) > 1:
# error_msg = "\n AND \n".join(html_parts)
# print(f"WARNING: FOUND MULTIPLE HTML PARTS IN ONE MESSAGE {error_msg}")
# return html_parts[0]
# else:
# return parts[0].get_payload()
# elif maintype == 'text':
# return email_message_instance.get_payload()
# # @staticmethod
# def _get_all_parts(part):
# payload = part.get_payload()
# if isinstance(payload, list):
# return [x for p in payload for x in _get_all_parts(p)]
# else:
# return [part]
def
get_message_content
(
message
):
# content = get_html(message)
# # TODO: proper escaping here
# content = content.replace("=3D", "=")
# USED FOR DOWNLOADING ATTACHMENTS
# for i, x in enumerate(message.iter_attachments()):
# f = open(f"tmp/gmail/{i}.png", 'wb')
# f.write(x.get_content())
# f.close()
content
=
message
.
get_body
().
get_content
()
attachments
=
[]
return
(
content
,
attachments
)
def
get_addresses_from_message
(
message
,
field
):
if
message
[
field
]
is
None
:
return
[]
else
:
return
[
part
]
# @staticmethod
def
decode_header
(
header
):
return
str
(
email
.
header
.
make_header
(
email
.
header
.
decode_header
(
header
)))
```
%% Cell type:code id: tags:
```
python
# export
return
email
.
utils
.
getaddresses
([
message
[
field
]])
def
get_timestamp_from_message
(
message
):
date
=
message
[
"date"
]
parsed_time
=
email
.
utils
.
parsedate
(
date
)
dt
=
email
.
utils
.
parsedate_to_datetime
(
date
)
timestamp
=
int
(
dt
.
timestamp
()
*
1000
)
return
timestamp
```
def
create_item_from_mail
(
mail_utf8
,
thread_id
=
None
):
message
=
email
.
message_from_string
(
mail_utf8
)
%% Cell type:code id: tags:
message_id
=
message
[
"message-id"
].
replace
(
"/"
,
""
)
subject
=
message
[
"subject"
]
timestamp
=
get_timestamp_from_message
(
message
)
```
python
# export
from_name
,
from_mail
=
email
.
utils
.
parseaddr
(
message
[
"from"
])
from_name
=
decode_header
(
from_name
)
def
create_item_from_mail
(
mail
,
thread_id
=
None
):
# message = email.message_from_string(mail_utf8)
message
=
email
.
message_from_bytes
(
mail
,
policy
=
policy
.
SMTP
)
to_name
,
to_mail
=
email
.
utils
.
parseaddr
(
message
[
"to"
])
to_name
=
decode_header
(
to_name
)
message_id
=
message
[
"message-id"
]
subject
=
message
[
"subject"
]
timestamp
=
get_timestamp_from_message
(
message
)
reply_to_name
,
reply_to_mail
=
email
.
utils
.
parseaddr
(
message
[
"reply-to"
])
reply_to_name
=
decode_header
(
reply_to_name
)
from_tuples
=
parse_addresses
(
message
,
'from'
)
to_tuples
=
parse_addresses
(
message
,
'to'
)
reply_to_tuples
=
parse_addresses
(
message
,
'reply-to'
)
content
=
get_html
(
message
)
content
=
content
.
replace
(
"=3D"
,
"="
)
importJson
=
mail_utf8
(
content
,
attachments
)
=
get_message_content
(
message
)
# importJson = mail_utf8
# TODO: is dateSent the right way to go? Might differ for whether you're sender or receiver
# TODO: importJson
# TODO: MAIL namespace
email_item
=
EmailMessage
(
externalId
=
message_id
,
subject
=
subject
,
dateSent
=
timestamp
,
content
=
content
)
# Create Edges to accounts
for
address
in
[
from_
mail
]
if
from_mail
!=
""
else
[]
:
for
name
,
address
in
from_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'sender'
,
address_item
)
for
address
in
[
to_
mail
]
if
to_mail
!=
""
else
[]
:
for
name
,
address
in
to_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'receiver'
,
address_item
)
for
address
in
[
reply_to_
mail
]
if
reply_to_mail
!=
""
else
[]
:
for
name
,
address
in
reply_to_
tuples
:
address_item
=
Account
(
externalId
=
address
)
email_item
.
add_edge
(
'replyTo'
,
address_item
)
# Create edge to MessageChannel
if
thread_id
!=
None
:
message_channel
=
MessageChannel
(
externalId
=
thread_id
)
email_item
.
add_edge
(
'messageChannel'
,
message_channel
)
print
(
f
'
{
email_item
}
'
)
return
email_item
```
%% Cell type:code id: tags:
```
python
def
download_mails
(
imap_client
,
gmail_ids
,
stop_at
):
all_mails
=
[]
# Download files
for
i
,
gmail_id
in
enumerate
(
gmail_ids
):
if
stop_at
is
not
None
and
i
>=
stop_at
:
print
(
f
"stopped early at
{
stop_at
}
"
)
break
#gmail_id = gmail_id.decode("utf-8")
mail
=
imap_client
.
get_mail
(
gmail_id
)
try
:
mail_utf8
=
mail
.
decode
(
"utf-8"
)
except
(
UnicodeDecodeError
,
AttributeError
):
print
(
f
"Skipping email
{
gmail_id
}
, not utf-8 encoded"
)
return
thread_id
=
imap_client
.
get_x_gm_thrid
(
gmail_id
)
item
=
create_item_from_mail
(
mail
_utf8
,
thread_id
=
thread_id
)
item
=
create_item_from_mail
(
mail
,
thread_id
=
thread_id
)
all_mails
.
append
(
item
)
return
all_mails
# TODO: should probably become a general utility function
def
merge_duplicate_items
(
all_mails
):
all_accounts
=
{}
for
email_item
in
all_mails
:
for
edge
in
email_item
.
get_all_edges
():
account
=
edge
.
traverse
(
email_item
)
if
not
account
.
externalId
in
all_accounts
:
all_accounts
[
account
.
externalId
]
=
account
for
email_item
in
all_mails
:
for
edge
in
email_item
.
get_all_edges
():
edge
.
target
=
all_accounts
[
edge
.
target
.
externalId
]
return
all_accounts
```
%% Cell type:code id: tags:
```
python
# export
from
integrators.data.schema
import
*
from
integrators.imports
import
*
from
integrators.indexers.indexer
import
IndexerBase
,
test_registration
class
GmailImporter
(
IndexerBase
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
def
run
(
self
,
importer_run
,
pod_client
=
None
):
# TODO: Get imap_host from importer_run
imap_host
=
'imap.gmail.com'
imap_client
=
IMAPClient
(
username
=
importer_run
.
username
,
app_pw
=
importer_run
.
password
,
host
=
imap_host
)
imap_client
=
IMAPClient
(
username
=
importer_run
.
username
,
app_pw
=
importer_run
.
password
,
host
=
imap_host
)
gmail_ids
=
imap_client
.
get_all_mail_uids
()
all_mails
=
download_mails
(
imap_client
,
gmail_ids
,
10
)
print
(
all_mails
)
all_mails
=
download_mails
(
imap_client
,
gmail_ids
,
None
)
# Merge email accounts/messageChannels here
# TODO: create better way to do this
all_accounts
=
merge_duplicate_items
(
all_mails
)
# Create all email and account items
all_thread_ids
=
set
()
for
email_item
in
all_mails
:
pod_client
.
create
(
email_item
)
all_thread_ids
.
add
(
email_item
.
messageChannel
[
0
].
externalId
)
for
(
external_id
,
item
)
in
all_accounts
.
items
():
pod_client
.
create
(
item
)
# Create all edges from emails to accounts
# Create all edges from emails to accounts
/messageThreads
for
email_item
in
all_mails
:
pod_client
.
create_edges
(
email_item
.
get_all_edges
())
```
%% Cell type:code id: tags:
```
python
%
nbdev_slow_test
# Store your credentials in this file:
file
=
open
(
'tmp/credentials_gmail.txt'
,
'r'
)
imap_host
=
'imap.gmail.com'
imap_user
=
file
.
readline
()
imap_pw
=
file
.
readline
()
pod_client
=
PodClient
()
pod_client
.
delete_all
()
importer_run
=
ImporterRun
.
from_data
(
progress
=
0
,
username
=
imap_user
,
password
=
imap_pw
)
print
(
importer_run
.
__dict__
)
importer
=
GmailImporter
.
from_data
()
importer
.
run
(
importer_run
=
importer_run
,
pod_client
=
pod_client
)
```
%% Cell type:code id: tags:
```
python
test
=
"""
\
test
=
b
"""
\
Message-id: 1234
\r
From: user1 <a@gmail.com>
\r
To: user1 <b@gmail.com>
\r
Reply-to: user1 <c@gmail.com>
\r
Subject: the subject
\r
Date: Mon, 04 May 2020 00:37:44 -0700
\r
This is content"""
mail_message
=
email
.
message_from_string
(
test
)
#
mail_message = email.message_from_string(test)
mail_item
=
create_item_from_mail
(
test
,
'message_channel_id'
)
assert
mail_item
.
externalId
==
'1234'
assert
mail_item
.
sender
[
0
].
externalId
==
'a@gmail.com'
assert
mail_item
.
receiver
[
0
].
externalId
==
'b@gmail.com'
assert
mail_item
.
replyTo
[
0
].
externalId
==
'c@gmail.com'
assert
mail_item
.
subject
==
'the subject'
assert
mail_item
.
content
==
'This is content'
assert
mail_item
.
dateSent
==
get_timestamp_from_message
(
mail_message
)
assert
mail_item
.
messageChannel
[
0
].
externalId
==
'message_channel_id'
```
...
...
This diff is collapsed.
Click to expand it.
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment
Menu
Explore
Projects
Groups
Snippets