Commit 78466f1f authored by Alp Deniz Ogut's avatar Alp Deniz Ogut
Browse files

Refactor

parent 3c15286b
Showing with 63 additions and 76 deletions
+63 -76
......@@ -19,7 +19,7 @@ def get_external_ids(client, itemType: str):
for batch in client.search_paginate(search_filter, 100):
external_ids += [m.externalId for m in batch]
finally:
return external_ids
return set(external_ids)
def get_file_hashes(client):
hashes = []
......
......@@ -10,42 +10,59 @@ from pymemri.data.schema import Edge, Message, MessageChannel, OauthFlow, Photo
from pymemri.plugin.pluginbase import PluginBase
from pymemri.plugin.states import RUN_USER_ACTION_COMPLETED, RUN_USER_ACTION_NEEDED
from . import helpers
from .helpers import get_external_ids, get_item_by_external_id
from .constants import *
from .schema import Account, Tweet
from .schema import Account, Tweet, Hashtag
THROTTLE = 5*60
class TwitterPlugin(PluginBase):
me = None
item_queue = []
update_queue = []
edge_queue = []
file_queue = []
writing = False
auth_item = None
running = True
items = {'Account': {}, 'Tweet': {}, 'Message': {}}
existing_item_ids = {'Account': set(), 'Tweet': set(), 'Message': set()}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.me = None
self.accounts = {}
self.chats = {}
self.item_queue = []
self.update_queue = []
self.edge_queue = []
self.file_queue = []
self.writing = False
self.auth_item = None
self.running = True
# Get existing item ids
logger.info("Fetching existing item ids")
self.existing_item_ids["Tweet"] = get_external_ids(self.client, itemType="Tweet")
self.existing_item_ids["Account"] = get_external_ids(self.client, itemType="Account")
self.existing_item_ids["Message"] = get_external_ids(self.client, itemType="Message")
self.cv = Condition()
def item_exists(self, itemType, externalId):
if externalId in self.existing_item_ids[itemType]:
return True
if externalId in self.items[itemType]:
return True
return False
def get_item(self, itemType, externalId):
externalId = str(externalId)
if externalId in self.items[itemType]:
return self.items[itemType][externalId]
else:
try:
return get_item_by_external_id(self.client, itemType, externalId)
except:
return None
def set_item(self, itemType, item):
self.items[itemType][item.externalId] = item
return item
def setup(self):
"""
Setup plugin
"""
# Get existing item ids
self.existing_photo_hashes = helpers.get_file_hashes(self.client)
self.existing_account_ids = helpers.get_external_ids(self.client, "Account")
self.existing_tweet_ids = helpers.get_external_ids(self.client, "Tweet")
# self.existing_message_ids = helpers.get_external_ids(self.client, 'Message')
self.existing_channel_ids = helpers.get_external_ids(
self.client, "MessageChannel"
)
if not APP_KEY or not APP_SECRET:
raise ValueError("Environment variables TWITTER_APP_KEY and TWITTER_APP_SECRET are required")
......@@ -63,9 +80,7 @@ class TwitterPlugin(PluginBase):
self.api = tweepy.API(oauth, wait_on_rate_limit=True)
# Update me
self.me = self.process_user(
user=self.api.verify_credentials(), isMe=True, target_account=self.me
)
self.me = self.process_user(user=self.api.verify_credentials(), target_account=self.me)
self.write()
def run(self):
......@@ -100,16 +115,14 @@ class TwitterPlugin(PluginBase):
ctr = 0
logger.info("getting friends")
for f in tweepy.Cursor(self.api.get_friends, count=200).items():
if f.id_str not in self.existing_account_ids:
if not self.item_exists("Account", f.id_str):
self.process_user(f)
self.existing_account_ids.append(f.id_str)
else:
account = helpers.get_user(self.client, externalId=f.id_str)
self.accounts[account.externalId] = account
logger.debug(f"Account exists {account.displayName}")
logger.debug(f"Friend exists {t.id_str}")
break
ctr += 1
self.set_progress(min(29, 10 + ctr)/100)
logger.info(f"imported {ctr} friends")
self.write()
self.set_progress(30/100)
......@@ -117,13 +130,10 @@ class TwitterPlugin(PluginBase):
logger.info("getting followers")
ctr = 0
for f in tweepy.Cursor(self.api.get_followers, count=200).items():
if f.id_str not in self.existing_account_ids:
if not self.item_exists("Account", f.id_str):
self.process_user(f)
self.existing_account_ids.append(f.id_str)
else:
account = helpers.get_user(self.client, externalId=f.id_str)
self.accounts[account.externalId] = account
logger.debug(f"Account exists {account.displayName}")
logger.debug(f"Follower exists {t.id_str}")
break
ctr += 1
self.set_progress(min(49, 30 + ctr)/100)
......@@ -145,9 +155,8 @@ class TwitterPlugin(PluginBase):
trim_user=True,
tweet_mode="extended",
).items():
if t.id_str not in self.existing_tweet_ids:
if not self.item_exists("Tweet", f.id_str):
self.process_tweet(t)
self.existing_tweet_ids.append(t.id_str)
else:
logger.debug(f"Tweet exists {t.id_str}")
break
......@@ -158,32 +167,23 @@ class TwitterPlugin(PluginBase):
self.write()
def process_user(self, user, isMe=None, target_account=None):
def process_user(self, user, target_account=None):
"""
Cast twitter user into Memri account
"""
if not target_account:
try:
account = helpers.get_user(
self.client,
isMe=isMe,
externalId=None if user is None else str(user.id),
)
except:
account = Account(service=SERVICE_NAME, isMe=isMe)
self.client.create(account)
else:
account = target_account
items = []
updates = []
edges = []
files = []
account.externalId = user.id_str
account.handle = user.screen_name
account.displayName = user.name
account.description = user.description
if target_account:
account = target_account
updates.append(account)
else:
account = Account(service=SERVICE_NAME, externalId=user.id_str, handle=user.screen_name, displayName=user.name, description=user.description)
items.append(account)
if user.profile_image_url:
try:
picture = self.create_photo(account.externalId, user.profile_image_url)
......@@ -196,12 +196,9 @@ class TwitterPlugin(PluginBase):
except Exception as e:
logger.error(f"Exception while creating user image: {e}")
updates.append(account)
self.add_to_queue(items=items, updates=updates, edges=edges, files=files)
self.accounts[account.externalId] = account
self.set_item("Account", account)
logger.debug(f"Added account: {account.displayName}")
return account
def create_photo(self, id, url):
response = requests.get(url)
......@@ -218,7 +215,7 @@ class TwitterPlugin(PluginBase):
Cast tweet into its Memri form
"""
account = self.get_account_by_user_id(tweet.user.id)
account = self.get_item("Account", tweet.user.id)
# get the full text of the tweet
tweet_text = tweet.full_text
# if Retweet get the full text from the retweet
......@@ -240,9 +237,10 @@ class TwitterPlugin(PluginBase):
logger.debug(f"Adding tweet: {post.externalId}")
self.process_tweet_images(tweet, post)
self.set_item("Tweet", post)
def process_tweet_images(self, tweet, post):
for media in tweet.entities.get("media", []):
def process_tweet_images(self, raw_tweet, tweet):
for media in raw_tweet.entities.get("media", []):
if media["type"] == "photo":
try:
picture = self.create_photo(media["id_str"], media["media_url"])
......@@ -251,7 +249,7 @@ class TwitterPlugin(PluginBase):
items=[picture, picture.file[0]],
edges=[
*(file for file in picture.get_edges("file")),
Edge(post, picture, "photo")],
Edge(tweet, picture, "photo")],
files=[picture.data],
)
except Exception as e:
......@@ -306,20 +304,9 @@ class TwitterPlugin(PluginBase):
items.append(message)
edges.append(Edge(message, chat, "messageChannel"))
self.add_to_queue(items=items, edges=edges)
self.set_item("Message", message)
logger.debug(f"Adding direct message: {message.externalId}")
def get_account_by_user_id(self, id):
"""
Gets a twitter user in form of Memri Account
"""
try:
account = self.accounts[str(id)]
except:
user = self.api.get_user(user_id=id)
account = self.process_user(user)
return account
def add_to_queue(self, items=None, updates=None, edges=None, files=None):
"""
Load queues for bulk write
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment