Commit 33aa01c7 authored by Koen van der Veen's avatar Koen van der Veen
Browse files

Merge branch 'async-retrieval' into 'dev'

get tweets and followers async

See merge request memri/plugins/twitter!31
1 merge request!31get tweets and followers async
Pipeline #12198 passed with stage
in 1 minute and 39 seconds
Showing with 87 additions and 73 deletions
+87 -73
......@@ -14,6 +14,8 @@ from pymemri.plugin.states import RUN_USER_ACTION_COMPLETED, RUN_USER_ACTION_NEE
from .helpers import get_external_ids, get_item_by_external_id
from .constants import *
from .schema import Account, Tweet, Hashtag
from concurrent.futures import ThreadPoolExecutor
THROTTLE = 5*60
HASHTAG_REGEX = re.compile(r'(#[a-zA-Z0-9]+)')
......@@ -47,6 +49,7 @@ class TwitterPlugin(PluginBase):
self.photo_handler_thread = Thread(target=self.photo_downloader).start()
self.force_trigger_write_slow = False
self.has_slow_initial_import_data=True
self.n_initial_import=200
def slow_write_poller(self):
while True and not self.stop:
......@@ -158,86 +161,97 @@ class TwitterPlugin(PluginBase):
self.write()
logger.info("[+] Twitter plugin run is completed")
def sync(self):
ctr = 0
logger.info("getting friends")
start= time()
for f in tweepy.Cursor(self.api.get_friends, count=200).items():
if not self.item_exists("Account", f.id_str):
self.process_user(f)
else:
logger.debug(f"Friend exists {f.id_str}")
break
ctr += 1
# without this if this is slowing things down a lot with a dev pod
if ctr % 100 == 0:
start_progress = time()
self.set_progress(min(29, 10 + ctr)/100)
logger.debug(f"settings progress took {time() - start_progress}")
logger.info(f"imported {ctr} friends")
self.write()
start_progress = time()
self.set_progress(30/100)
logger.debug(f"settings progress took {time() - start_progress}")
logger.debug(f"getting friends took {time() - start}")
start= time()
logger.info("getting followers")
ctr = 0
for f in tweepy.Cursor(self.api.get_followers, count=200).items():
if not self.item_exists("Account", f.id_str):
self.process_user(f)
else:
logger.debug(f"Follower exists {f.id_str}")
break
ctr += 1
# without this if this is slowing things down a lot with a dev pod
if ctr % 100 == 0:
self.set_progress(min(49, 30 + ctr)/100)
self.write()
logger.info(f"imported {ctr} followers")
self.set_progress(0.5)
logger.debug(f"getting followers took {time() - start}")
# for dm in tweepy.Cursor(self.api.get_direct_messages, count=50).items():
# if dm.id not in self.existing_message_ids:
# self.process_message(dm)
def get_all_followers(self):
return [f for f in tweepy.Cursor(self.api.get_followers, count=200).items()]
start= time()
logger.info("getting tweets")
ctr = 0
n_initial_import = 200
for t in tweepy.Cursor(
def get_home_timeline_tweets(self, n):
return [t for t in tweepy.Cursor(
self.api.home_timeline,
count=n_initial_import,
count=n,
exclude_replies=True,
trim_user=True,
tweet_mode="extended",
).items():
if not self.item_exists("Tweet", t.id_str):
self.process_tweet(t)
else:
logger.debug(f"Tweet exists {t.id_str}")
break
ctr += 1
if ctr > n_initial_import:
self.set_progress(1.0)
break
elif ctr % 100 == 0:
# without this if, progress is slowing things down a lot with a dev pod
self.set_progress(min(99, (0.5 + (ctr / n_initial_import) * 0.5)))
logger.info(f"imported {ctr} tweets")
self.set_progress(1.0)
self.write()
logger.debug(f"getting tweets took {time() - start}")
logger.info(f"{self.n_download_profile_pictures} images imported")
).items()]
def sync(self):
with ThreadPoolExecutor(max_workers=2) as executor:
followers_future = executor.submit(self.get_all_followers)
tweets_future = executor.submit(self.get_home_timeline_tweets, self.n_initial_import)
ctr = 0
logger.info("getting friends")
start= time()
for f in tweepy.Cursor(self.api.get_friends, count=200).items():
if not self.item_exists("Account", f.id_str):
self.process_user(f)
else:
logger.debug(f"Friend exists {f.id_str}")
break
ctr += 1
# without this if this is slowing things down a lot with a dev pod
if ctr % 100 == 0:
start_progress = time()
self.set_progress(min(29, 10 + ctr)/100)
logger.debug(f"settings progress took {time() - start_progress}")
logger.info(f"imported {ctr} friends")
self.write()
start_progress = time()
self.set_progress(30/100)
logger.debug(f"settings progress took {time() - start_progress}")
logger.debug(f"getting friends took {time() - start}")
start= time()
logger.info("getting followers")
ctr = 0
for f in followers_future.result():
if not self.item_exists("Account", f.id_str):
self.process_user(f)
else:
logger.debug(f"Follower exists {f.id_str}")
break
ctr += 1
# without this if this is slowing things down a lot with a dev pod
if ctr % 100 == 0:
self.set_progress(min(49, 30 + ctr)/100)
self.write()
logger.info(f"imported {ctr} followers")
self.set_progress(0.5)
logger.debug(f"getting followers took {time() - start}")
# for dm in tweepy.Cursor(self.api.get_direct_messages, count=50).items():
# if dm.id not in self.existing_message_ids:
# self.process_message(dm)
start= time()
logger.info("getting tweets")
ctr = 0
for t in tweets_future.result():
if not self.item_exists("Tweet", t.id_str):
self.process_tweet(t)
else:
logger.debug(f"Tweet exists {t.id_str}")
break
ctr += 1
if ctr > self.n_initial_import:
self.set_progress(1.0)
break
elif ctr % 100 == 0:
# without this if, progress is slowing things down a lot with a dev pod
self.set_progress(min(99, (0.5 + (ctr / self.n_initial_import) * 0.5)))
logger.info(f"imported {ctr} tweets")
self.set_progress(1.0)
self.write()
logger.debug(f"getting tweets took {time() - start}")
logger.info(f"{self.n_download_profile_pictures} images imported")
self.start_slow_writing=True
self.start_slow_writing=True
def process_user(self, user, target_account=None):
"""
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment