Commit 0ddb3d4a authored by Aziz Berkay Yesilyurt's avatar Aziz Berkay Yesilyurt
Browse files

merge changes

parent 33aa01c7
Showing with 797 additions and 680 deletions
+797 -680
......@@ -10,4 +10,5 @@ dist/
.vscode/
.DS_Store
**/.DS_Store
*.cfg
\ No newline at end of file
*.cfg
*.db**
......@@ -20,6 +20,7 @@ run tests:
script:
- curl http://pod:3030/version
- export POD_ADDRESS='http://pod:3030'
- pre-commit run --all-files
- pytest -s
build_image:
......
repos:
- repo: https://github.com/psf/black
rev: '22.8.0'
hooks:
- id: black
args: ["--line-length", "100"]
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/pycqa/flake8
rev: '5.0.4'
hooks:
- id: flake8
args: ["--max-line-length", "100"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.971' # Use the sha / tag you want to point at
hooks:
- id: mypy
additional_dependencies: [types-all]
\ No newline at end of file
repos:
- repo: https://github.com/psf/black
rev: '22.8.0'
hooks:
- id: black
args: ["--line-length", "100"]
- repo: https://github.com/pycqa/isort
rev: 5.10.1
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black", "--filter-files"]
- repo: https://github.com/pycqa/flake8
rev: '5.0.4'
hooks:
- id: flake8
args: ["--max-line-length", "100"]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.971' # Use the sha / tag you want to point at
hooks:
- id: mypy
additional_dependencies: [types-all]
\ No newline at end of file
FROM python:3.9 as twitter
FROM python:3.9 as pymemri
#optional TWiTTER arguments and environment variables to inject during build or runtime.
ARG TWITTER_APP_KEY
ARG TWITTER_APP_SECRET
ENV TWITTER_APP_KEY=$TWITTER_APP_KEY
ENV TWITTER_APP_SECRET=$TWITTER_APP_SECRET
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y libgl1-mesa-glx
WORKDIR /usr/src/twitter
ENV PYTHONUNBUFFERED=1
ENV PYTHONIOENCODING=UTF-8
# In order to leverage docker caching, copy only the minimal
# information needed to install dependencies
COPY ./setup.py ./setup.py
RUN touch ./README.md
# Install dependencies
RUN python3 setup.py egg_info
RUN pip3 install -r twitter.egg-info/requires.txt
# Copy the real project-s sources (docker caching is broken from here onwards)
COPY ./README.md ./README.md
COPY ./twitter ./twitter
# Build the final image
WORKDIR /app
COPY . /app
RUN pip3 install --editable .
CMD ["run_plugin", "--read_args_from_env", "True"]
{
"containerImage": "twitter_importer",
"pluginModule": "twitter.plugin",
"plugin": "twitter.plugin.TwitterPlugin",
"pluginName": "TwitterPlugin",
"plugin": "twitter.Plugin",
"pluginName": "Plugin",
"status": "not started",
"config": "{\"callback_url\": \"http://localhost:4000\"}"
}
}
\ No newline at end of file
from setuptools import find_packages, setup
from setuptools import setup
packages = find_packages()
install_requires = [
"pymemri==0.0.34",
# "pymemri @ git+https://gitlab.memri.io/memri/pymemri.git@v0.0.31",
"pytest ~= 7.1",
"tweepy @ git+https://github.com/alpdeniz/tweepy",
"loguru ~= 0.6.0",
]
setup_kwargs = {
"name": "twitter",
"version": "0.2.1",
"description": "Twitter Importer Plugin for Memri",
"long_description": None,
"author": "Alp Deniz Ogut",
"author_email": "alpdeniz@protonmail.com",
"url": "https://memri.io",
"packages": packages,
"install_requires": install_requires,
"python_requires": ">=3.6,<4.0",
}
setup(**setup_kwargs)
if __name__ == "__main__":
setup()
import os
from pathlib import Path
ASSETS_PATH=Path(os.path.abspath(__file__)).parent / "assets"
\ No newline at end of file
ASSETS_PATH = Path(os.path.abspath(__file__)).parent / "assets"
import shelve
import requests
class CachedSession(requests.Session):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cache = shelve.open("cache")
adapter = requests.adapters.HTTPAdapter(pool_connections=100, pool_maxsize=100)
self.mount("http://", adapter)
def request(self, method, url, *args, **kwargs):
key = method + url
if key not in self.cache:
self.cache[key] = super().request(method, url, *args, **kwargs)
return self.cache[key]
from dataclasses import dataclass
from datetime import datetime
import json
import os
import threading
from dataclasses import dataclass
from datetime import datetime
from time import sleep, time
from typing import Union
from unittest import mock
from unittest.mock import MagicMock
from loguru import logger
from tweepy.api import pagination, payload
import json
import pytest
from assets import ASSETS_PATH
from loguru import logger
from pymemri.data.basic import read_json
from pymemri.data.schema import Account, OauthFlow, PluginRun
from pymemri.pod.client import PodClient
from pymemri.plugin.pluginbase import PluginBase
import pytest
from twitter.constants import SERVICE_NAME
from twitter.plugin import TwitterPlugin
from tweepy.models import User, Status
from pymemri.pod.client import PodClient
from tweepy.api import pagination
from tweepy.models import Status, User
from twitter import Plugin
from twitter.schema import SERVICE_NAME
@dataclass
class Context:
run: PluginRun
pod: PodClient
plugin_thread: threading.Thread
plugin: Union[PluginBase, None] = None
plugin: PluginBase
def create_plugin_thread(plugin: PluginBase):
def create_plugin_thread(plugin: Plugin):
def start_plugin():
plugin._run()
......@@ -38,19 +37,29 @@ def create_plugin_thread(plugin: PluginBase):
return plugin_thread
@pytest.fixture
def context() -> Context:
def context():
run = PluginRun(
containerImage="",
)
credentials_str = os.environ.get("TWITTER_CI_CREDENTIALS")
credentials = json.loads(credentials_str)
oauth = OauthFlow(accessToken=credentials["access_token"], accessTokenSecret=credentials["access_token_secret"], service=SERVICE_NAME)
me = Account(service=SERVICE_NAME, isMe=True, identifier=credentials['access_token'], secret=credentials['access_token_secret'])
oauth = OauthFlow(
accessToken=credentials["access_token"],
accessTokenSecret=credentials["access_token_secret"],
service=SERVICE_NAME,
)
me = Account(
service=SERVICE_NAME,
isMe=True,
identifier=credentials["access_token"],
secret=credentials["access_token_secret"],
)
pod = PodClient()
twitter_plugin = TwitterPlugin(client=pod, pluginRun=run)
twitter_plugin = Plugin(client=pod, pluginRun=run)
twitter_plugin.add_to_schema()
pod.create(run)
......@@ -74,7 +83,7 @@ def context() -> Context:
def test_plugin_with_real_account(context: Context):
context.plugin_thread.start()
timeout = time() + 60*5 # 5 minutes from now
timeout = time() + 60 * 5 # 5 minutes from now
# Plugin should start importing the data
logger.info("Waiting to import all the data...")
......@@ -90,27 +99,28 @@ def test_plugin_with_real_account(context: Context):
if time() > timeout:
pytest.fail(
f"Plugin did not import data in reasonable amount of time, stuck at progress {context.run.progress}")
"Plugin did not import data in reasonable amount of time, ",
"stuck at progress {context.run.progress}",
)
sleep(0.2)
# test data is in the pod
me = context.pod.search({'service': SERVICE_NAME, 'isMe': True})[0]
accounts = context.pod.search({'service': SERVICE_NAME, 'type':'Account'})
tweets = context.pod.search({'service': SERVICE_NAME, 'type':'Tweet'})
photos = context.pod.search({'type':'Photo'})
me = context.pod.search({"service": SERVICE_NAME, "isMe": True})[0]
accounts = context.pod.search({"service": SERVICE_NAME, "type": "Account"})
tweets = context.pod.search({"service": SERVICE_NAME, "type": "Tweet"})
photos = context.pod.search({"type": "Photo"})
logger.info("#Accounts", len(accounts))
logger.info("#Tweets", len(tweets))
logger.info("#Images", len(photos))
assert not me.handle is False
assert not me.displayName is False
assert not me.handle
assert not me.displayName
assert len(accounts) > 1
assert len(tweets) > 1
assert len(photos) > 1
def test_plugin(context: Context):
# client = PodClient()
# plugin = TwitterPlugin(client=client)
......@@ -132,38 +142,37 @@ def test_plugin(context: Context):
hashtags = client.search({"type": "Hashtag"})
assert len(hashtags) == 2
def test_first_processing(context: Context):
class mockAPI:
def __init__(self):
pass
@pagination(mode='next')
@pagination(mode="next")
def get_followers(self, *args, **kwargs):
sleep(1.4)
return User.parse_list(None, read_json(str(ASSETS_PATH / "followers.json")))
@pagination(mode='next')
@pagination(mode="next")
def get_friends(self, *args, **kwargs):
# depends on internet speed obviously (request1+request2)
sleep(0.73 + 0.5)
return User.parse_list(None, read_json(str(ASSETS_PATH / "friends.json")))
@pagination(mode='next')
def home_timeline(self,*args, **kwargs):
sleep(0.63+ 0.71 + 0.75 + 0.65 + 0.24)
@pagination(mode="next")
def home_timeline(self, *args, **kwargs):
sleep(0.63 + 0.71 + 0.75 + 0.65 + 0.24)
return [Status.parse(None, x) for x in read_json(str(ASSETS_PATH / "timeline.json"))]
start = time()
# client = PodClient()
# client = context.pod
# context.plugin.start()
plugin = context.plugin
plugin: Plugin = context.plugin
# plugin = TwitterPlugin(client=client)
plugin.add_to_schema()
plugin.api = mockAPI()
plugin.api.client = mockAPI()
plugin.sync()
delta = time()-start
delta = time() - start
plugin.teardown()
print(f"{delta:.2f} seconds spent on importing")
import re
from cached_session import CachedSession
from twitter import Plugin, PodClient
from twitter.schema import Account, Photo, Tweet
def get_plugin(client):
client.add_to_schema(Account, Tweet, Photo)
plugin = Plugin(client=client)
plugin.api.client.session = CachedSession()
return plugin
def test_benchmark():
client = PodClient()
client.add_to_schema(Account, Tweet, Photo)
plugin = get_plugin(client)
plugin.run()
client = PodClient(
url=client.api._url,
database_key=client.database_key,
owner_key=client.owner_key,
)
# tweets = client.search({"type": "Tweet"})
# assert len(tweets) == 119
# assert len([tweet.author[0] for tweet in tweets if tweet.author]) == len(tweets)
# accounts = client.search({"type": "Account"})
# assert len([tweet.photo[0] for tweet in tweets if tweet.photo]) == 22
# assert len(accounts) == 38
# assert len(
# [author.profilePicture[0] for author in accounts if author.profilePicture]
# ) == len(accounts)
# print(client.owner_key, client.database_key)
def test_duplicates():
client = PodClient()
client.add_to_schema(Account, Tweet, Photo)
plugin = get_plugin(client)
plugin.run()
before_tweets = client.search({"type": "Tweet"})
before_accounts = client.search({"type": "Account"})
assert len(before_tweets)
assert len(before_accounts)
client = PodClient(
url=plugin.client.api._url,
database_key=plugin.client.database_key,
owner_key=plugin.client.owner_key,
)
plugin = get_plugin(client)
plugin.run()
after_tweets = client.search({"type": "Tweet"})
after_accounts = client.search({"type": "Account"})
assert len(before_tweets) == len(after_tweets)
assert len(before_accounts) == len(after_accounts)
def test_threads():
client = PodClient()
client.add_to_schema(Account, Tweet, Photo)
plugin = get_plugin(client)
plugin.run()
client = PodClient(
url=client.api._url,
database_key=client.database_key,
owner_key=client.owner_key,
)
tweets = client.search({"type": "Tweet"})
print_threads(tweets)
def print_threads(tweets, indent=0):
tweets.sort(key=lambda tweet: tweet.postDate)
for tweet in tweets:
if indent == 0 and tweet.tweetType != "tweet":
continue
print(
" " * indent * 4,
author_to_str(tweet.author[0], tweet),
get_tweet_content(tweet),
)
print_threads(tweet.replies, indent=indent + 1)
# for reply in tweet.replies:
# print(" ", author_to_str(reply.author[0], reply), reply.message)
def author_to_str(author, tweet):
return f"@{author.handle} : ({tweet.replyCount}, {tweet.retweetCount}, {tweet.likeCount}): "
def get_tweet_content(tweet):
# remove handles at the beginning of the tweet
message = re.sub(r"^(@\w+ )+", "", tweet.message)
return message[:50].replace("\n", " ")
from pymemri.pod.client import PodClient
from .plugin import Plugin
__all__ = ("Plugin", "PodClient")
from .callback_handler import get_request_handler
__all__ = ("get_request_handler",)
import http.server
import os
from typing import Callable, Type
from urllib.parse import parse_qs, urlsplit
os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"
def get_request_handler(
on_callback: Callable,
callback_path: str = "/oauth",
) -> Type[http.server.BaseHTTPRequestHandler]:
"""
This is a factory function that returns a request handler class.
The returned class will have a reference to the client and oauth_token_secret
variables that are passed to this function.
This is needed because the request handler class is instantiated by the
TCPServer class, and we need to pass the client and oauth_token_secret
variables to the request handler class.
"""
class MyHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
params = urlsplit(self.path)
if params.path == callback_path:
captured_value = parse_qs(self.path)
on_callback(captured_value["code"][0])
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(bytes("Authenticated, succesfully created oauth item", "utf-8"))
return MyHttpRequestHandler
import socketserver
import webbrowser
from queue import Queue
from typing import List
from pymemri.pod.client import PodClient
from pymemri.pod.utils import DEFAULT_POD_KEY_PATH, read_pod_key
from twitter.cli import get_request_handler
class TokenCallbackHandler:
"""Nothing more than a server that listens for a callback from Twitter"""
def __init__(
self,
client: PodClient,
scheme: str = "http",
host: str = "localhost",
port: int = 3667,
callback_path="/oauth",
scopes: List[str] = ["tweet.read", "users.read", "offline.access"],
):
self.callback_url = f"{scheme}://{host}:{port}{callback_path}"
self.client = client
self.url = client.get_oauth2_authorization_url(
"twitter",
scopes=scopes,
redirect_uri=self.callback_url,
)
socketserver.TCPServer.allow_reuse_address = True
self.server = socketserver.TCPServer(
(host, port),
get_request_handler(
callback_path=callback_path,
on_callback=self.on_callback,
),
)
self.token_queue: Queue[str] = Queue(maxsize=1)
def on_callback(self, authorization_response: str):
self.token_queue.put(authorization_response)
def get_token(self) -> str:
self.server.handle_request()
token = self.token_queue.get()
access_token = self.client.oauth2_authorize(
platform="twitter", code=token, redirect_uri=self.callback_url
)
return access_token
def get_url(self) -> str:
return self.url
def simulate_oauth2_flow(
database_key: str = None,
owner_key: str = None,
pod_full_address: str = DEFAULT_POD_KEY_PATH,
):
pod_full_address = "http://localhost:3030"
if database_key is None:
database_key = read_pod_key("database_key")
if owner_key is None:
owner_key = read_pod_key("owner_key")
if None in [pod_full_address, database_key, owner_key]:
raise ValueError("Missing Pod credentials")
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
client = PodClient(
url=pod_full_address,
database_key=database_key,
owner_key=owner_key,
)
token = run_oauth2_flow(client=client)
print(token)
def run_oauth2_flow(client: PodClient):
handler = TokenCallbackHandler(
client=client,
scheme="http",
host="localhost",
port=3667,
)
url = handler.get_url()
webbrowser.open(url)
token = handler.get_token()
return token
simulate_oauth2_flow()
import os
# Global service name
SERVICE_NAME = "twitter"
# App credentials
APP_KEY = os.environ.get("TWITTER_APP_KEY")
APP_SECRET = os.environ.get("TWITTER_APP_SECRET")
APP_CALLBACK = os.environ.get("TWITTER_APP_CALLBACK") or "oob" # or https://localhost:3667/oauth
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from queue import PriorityQueue
from threading import Thread
from typing import Any, Dict, List
from loguru import logger
from pydantic import BaseModel, Field
from pymemri.data.schema import Edge, Item
from pymemri.pod.client import PodClient
from .schema import Account, Tweet
@dataclass(order=True)
class PrioritizedItem:
priority: int
item: Any = field(compare=False)
class PodUpdate(BaseModel):
class Config:
copy_on_model_validation = "none"
create_items: List[Item] = Field(default_factory=list)
create_edges: List[Edge] = Field(default_factory=list)
update_items: List[Item] = Field(default_factory=list)
created_at: float = Field(default_factory=time.time)
def __add__(self, other):
return PodUpdate(
create_items=self.create_items + other.create_items,
create_edges=self.create_edges + other.create_edges,
update_items=self.update_items + other.update_items,
)
def __lt__(self, other):
return self.priority < other.priority
@property
def priority(self):
"""Updates that create:
- Tweets have priority 0
- Account have priority 1
- Other updates have priority based on the put time to the queue.
"""
tweet_types = [item for item in self.create_items if isinstance(item, Tweet)]
account_types = [item for item in self.create_items if isinstance(item, Account)]
if tweet_types:
return 0
elif account_types:
return 1
else:
return self.created_at
def is_empty(self):
return (
len(self.create_items) == 0
and len(self.create_edges) == 0
and len(self.update_items) == 0
)
class ItemConsumer:
def __init__(self, *, client: PodClient) -> None:
self.queue: PriorityQueue[Item] = PriorityQueue()
self.client = client
self.will_stop = False
self.executor = ThreadPoolExecutor(max_workers=10)
self.thread = Thread(target=self.consume_thread)
self.types: Dict[str, int] = {}
def start(self):
self.start_time = time.time()
self.thread.start()
def add_count(self, create_items):
# Count the types in create_items and print
for item in create_items:
if item.__class__.__name__ in self.types:
self.types[item.__class__.__name__] += 1
else:
self.types[item.__class__.__name__] = 1
logger.info(self.types)
if self.types["Tweet"] == 236 and getattr(self, "end_time", None) is None:
self.end_time = time.time()
print("Time taken: ", self.end_time - self.start_time)
def consume_thread(self):
while not (self.will_stop and self.queue.empty()):
pod_update = self.get_batched_updates(50)
if pod_update.is_empty():
continue
is_ok = self.client.bulk_action(
create_items=pod_update.create_items,
create_edges=pod_update.create_edges,
update_items=pod_update.update_items,
)
self.add_count(pod_update.create_items)
self.executor.submit(self.upload_photo_data, pod_update.create_items)
if not is_ok:
logger.error("Error in bulk action")
def upload_photo_data(self, items: List[Item]):
photo_items = [item for item in items if item.__class__.__name__ == "Photo"]
for photo in photo_items:
is_ok = self.client._upload_image(photo.data, asyncFlag=False)
if not is_ok:
logger.error("Error in uploading image")
def join(self):
self.will_stop = True
self.thread.join()
def get_batched_updates(self, n: int, timeout: float = 0.1) -> PodUpdate:
# timeout and n should be tuned
all_pod_updates = PodUpdate()
for _ in range(n):
if self.queue.empty():
break
pod_update = self.queue.get(timeout=timeout)
if pod_update is None:
continue
all_pod_updates += pod_update
return all_pod_updates
def put(
self,
*,
create_items=None,
update_items=None,
create_edges=None,
):
pod_update = PodUpdate(
create_items=create_items or [],
create_edges=create_edges or [],
update_items=update_items or [],
)
self.queue.put(pod_update)
from .constants import SERVICE_NAME
def get_user(client, externalId: str=None, isMe=None):
search_filter = {'type': 'Account', 'service': SERVICE_NAME}
if externalId:
search_filter['externalId'] = externalId
if isMe is not None:
search_filter['isMe'] = isMe
try:
result = client.search(search_filter)
return result[0]
except Exception as e:
raise Exception(f"Exception while getting user with params {search_filter}: {e}")
def get_external_ids(client, itemType: str):
external_ids = []
search_filter = {'type': itemType, 'service': SERVICE_NAME}
try:
for batch in client.search_paginate(search_filter, 100):
external_ids += [m.externalId for m in batch]
finally:
return set(external_ids)
def get_file_hashes(client):
hashes = []
search_filter = {'type': 'File'}
try:
for batch in client.search_paginate(search_filter, 100):
hashes += [m.sha256 for m in batch]
finally:
return hashes
def get_item_by_external_id(client, itemType: str, externalId: str):
search_filter = {'type': itemType, 'externalId': externalId, 'service': SERVICE_NAME}
result = client.search(search_filter)
return result[0]
\ No newline at end of file
This diff is collapsed.
from datetime import datetime
from typing import Optional, List
from pymemri.data.schema import Item, Account, Photo, Website
""" Items to be stored in the pod. """
import enum
from typing import Optional
class Account(Account):
description: Optional[str] = None
import tweepy
from pymemri.data import schema
from pymemri.data.schema import Item
class Hashtag(Item):
SERVICE_NAME = "twitter"
class HashableItem(Item):
def __hash__(self):
return hash(self.__class__.__name__ + self.externalId)
class Account(HashableItem, schema.Account):
description: Optional[str]
followersCount: Optional[int]
followingCount: Optional[int]
tweetCount: Optional[int]
listedCount: Optional[int]
verified: Optional[bool]
@classmethod
def from_tweepy(cls, user: tweepy.User) -> "Account":
return cls(
externalId=user.id,
service=SERVICE_NAME,
displayName=user.name,
handle=user.username,
description=user.description,
followersCount=user.public_metrics["followers_count"],
followingCount=user.public_metrics["following_count"],
tweetCount=user.public_metrics["tweet_count"],
listedCount=user.public_metrics["listed_count"],
verified=user.verified,
)
class TweetType(str, enum.Enum):
TWEET = "tweet"
RETWEET = "retweet"
QUOTE = "quote"
REPLY = "reply"
@classmethod
def from_tweepy(cls, tweet):
str2tweettype = {
"retweeted": TweetType.RETWEET,
"quoted": TweetType.QUOTE,
"replied_to": TweetType.REPLY,
}
if tweet.referenced_tweets is None:
return cls.TWEET
return str2tweettype.get(tweet.referenced_tweets[0].type, TweetType.TWEET)
REFERENCABLE_TWEETS = frozenset((TweetType.RETWEET, TweetType.QUOTE, TweetType.REPLY))
class Photo(HashableItem, schema.Photo):
pass
class Tweet(Item):
message: Optional[str] = None
service: Optional[str] = None
postDate: Optional[datetime] = None
reply: List["Tweet"] = []
links: List["Website"] = []
mention: List["Account"] = []
author: List["Account"] = []
photo: List["Photo"] = []
hashtag: List["Hashtag"] = []
class Tweet(HashableItem, schema.Tweet):
@classmethod
def from_tweepy(cls, tweet) -> "Tweet":
tweet_type = TweetType.from_tweepy(tweet)
return cls(
externalId=tweet.id,
message=tweet.text,
conversationId=tweet.conversation_id,
service=SERVICE_NAME,
tweetType=tweet_type.value,
postDate=tweet.created_at,
retweetCount=tweet.public_metrics["retweet_count"],
replyCount=tweet.public_metrics["reply_count"],
likeCount=tweet.public_metrics["like_count"],
)
def is_referencable(self) -> bool:
return self.tweetType in REFERENCABLE_TWEETS
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment