Commit 990292ab authored by angella's avatar angella
Browse files

Migrated to plugin_template

parent 0c8f6cc8
Showing with 585 additions and 177 deletions
+585 -177
import pymemri
from pymemri.data.schema import *
DEFAULT_POD_ADDRESS = "http://localhost:3030"
POD_VERSION = "v3"
class Account(Item):
# e.g. username, password or access key, secret key etc.
def __init__(
self,
externalId,
displayName,
handle,
service=None,
identifier=None,
secret=None,
code=None,
refreshToken=None,
errorMessage=None,
externalId=None,
displayName=None,
handle=None,
description=None,
protected=None,
dateCreated=None,
verified=None,
contributorsEnabled=None,
notifications=None,
deleted=None,
id=None,
deleted=None,
):
super().__init__(
id=id, dateCreated=dateCreated, externalId=externalId, deleted=deleted
)
self.service = service
self.identifier = identifier
self.secret = secret
self.refreshToken = refreshToken
self.code = code
self.errorMessage = errorMessage
self.displayName = displayName
self.handle = handle
self.description = description
......@@ -36,6 +44,13 @@ class Account(Item):
@classmethod
def from_json(cls, json):
id = json.get("id", None)
deleted = json.get("deleted", None)
service = json.get("service", None)
identifier = json.get("identifier", None)
secret = json.get("secret", None)
code = json.get("code", None)
refreshToken = json.get("refreshToken", None)
errorMessage = json.get("errorMessage", None)
externalId = json.get("externalId", None)
displayName = json.get("displayName", None)
handle = json.get("handle", None)
......@@ -46,8 +61,13 @@ class Account(Item):
contributorsEnabled = json.get("contributorsEnabled", None)
notifications = json.get("notifications", None)
return cls(
id=id,
res = cls(
service=service,
identifier=identifier,
secret=secret,
code=code,
refreshToken=refreshToken,
errorMessage=errorMessage,
externalId=externalId,
displayName=displayName,
handle=handle,
......@@ -57,7 +77,94 @@ class Account(Item):
verified=verified,
contributorsEnabled=contributorsEnabled,
notifications=notifications,
id=id,
deleted=deleted,
)
return res
class Plugin(Item):
def __init__(self, name, container, account=None, id=None, deleted=None):
super().__init__(id=id, deleted=deleted)
self.name = name
self.container = container
self.account = account if account else []
@classmethod
def from_json(cls, json):
id = json.get("id", None)
deleted = json.get("deleted", None)
name = json.get("name", None)
container = json.get("container", None)
all_edges = json.get("allEdges", None)
account = []
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge._type == "account" or edge._type == "~account":
account.append(edge)
res = cls(
name=name, container=container, account=account, id=id, deleted=deleted
)
for e in res.get_all_edges():
e.source = res
return res
class StartPlugin(Item):
def __init__(
self,
targetItemId,
container,
state=None,
oAuthUrl=None,
interval=None,
view=None,
id=None,
deleted=None,
):
super().__init__(id=id, deleted=deleted)
self.targetItemId = targetItemId
self.container = container
self.state = state
self.oAuthUrl = oAuthUrl
self.interval = interval
# CVUStoredDefinitions
self.view = view if view else None
@classmethod
def from_json(cls, json):
id = json.get("id", None)
deleted = json.get("deleted", None)
targetItemId = json.get("targetItemId", None)
container = json.get("container", None)
state = json.get("state", None)
oAuthUrl = json.get("oAuthUrl", None)
interval = json.get("interval", None)
all_edges = json.get("allEdges", None)
view = []
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge._type == "view" or edge._type == "~view":
view.append(edge)
res = cls(
targetItemId=targetItemId,
container=container,
state=state,
view=view,
oAuthUrl=oAuthUrl,
interval=interval,
id=id,
deleted=deleted,
)
return res
# This will include withheld countries
......@@ -230,4 +337,3 @@ class Color(Item):
name = json.get("name", None)
return cls(id=id, name=name)
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/TwitterImporter.ipynb (unless otherwise specified).
__all__ = ["TwitterImporter"]
# Cell
# External imports
import pymemri
from pymemri.data.schema import *
from pymemri.data.photo import *
from pymemri.pod.client import *
import tweepy
import numpy as np
from nbdev.showdoc import show_doc
from PIL import Image
import requests
import cv2
# Cell
# export
from twitter_importer.data.schema import *
from config.settings import tokens
from twitter_importer.pod_controller import PodController
......@@ -32,74 +17,15 @@ from twitter_importer.data.schema import (
Symbol,
Resource,
Message,
Color,
)
# Cell
from PIL import Image
class TwitterImporter:
def __init__(self):
self.client = PodClient()
self.importer_handler = ImporterHandler(self.client)
self.pod_controller = PodController(self.client)
def add_to_schema(self):
"""
Adds tables to the schema in the pod.
Returns:
test_connection(boolean): A True or false if the pod is connected.
"""
account_schema = Account(
"id_str",
"name",
"screen_name",
"description",
"protected",
self.importer_handler.datetime_to_int("Fri Jun 11 08:21:33 +0000 2021"),
"verified",
"contributors_enabled",
"notifications",
)
self.client.add_to_schema(account_schema)
country_schema = Country("country_code")
self.client.add_to_schema(country_schema)
color_schema = Color("color")
self.client.add_to_schema(color_schema)
language_schema = Language("lang")
self.client.add_to_schema(language_schema)
# Schema for images such as profile photos
x = np.random.randint(0, 255 + 1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
self.client.add_to_schema(IPhoto.from_np(x))
person_schema = Person("id_str", "name")
self.client.add_to_schema(person_schema)
tweet_schema = Message(
self.importer_handler.datetime_to_int("Fri Jun 11 08:21:33 +0000 2021"),
"id_str",
"full_text",
10,
5,
"favorited",
"retweeted",
)
self.client.add_to_schema(tweet_schema)
resource_schema = Resource("source")
self.client.add_to_schema(resource_schema)
hashtag_schema = Hashtag("hashtag")
self.client.add_to_schema(hashtag_schema)
symbol_schema = Symbol("symbol")
self.client.add_to_schema(symbol_schema)
def __init__(self, client):
self.importer_handler = ImporterHandler(client)
self.pod_controller = PodController(client)
self.client = client
def followers_importer(self, twitter_client, api, current_user):
"""
......@@ -219,27 +145,3 @@ class TwitterImporter:
except tweepy.TweepError as error:
print(error)
def run(self, username, max_amount=None):
"""
Imports user, friends, followers and timeline into the pod client.
Parameters:
pod_client (object): An instance of the PodClient client.
tokens (object): An object with twitter tokens to access the API.
username (str): A string for the user from whom to fetch data.
Returns:
followers (dict): An object of followers.
"""
print("Running twitter importer")
twitter_client = TwitterClient(username, max_amount)
api = twitter_client.authenticate_user(
tokens["ACCESS_TOKEN"],
tokens["ACCESS_SECRET"],
tokens["CONSUMER_KEY"],
tokens["CONSUMER_SECRET"],
)
print("Preparing to fetch user data...")
self.import_data(api, twitter_client, username, max_amount)
......@@ -7,7 +7,6 @@ from datetime import datetime
# internal imports
from twitter_importer.data.schema import *
from twitter_importer.data.schema import *
from twitter_importer.data.schema import (
Country,
Language,
......@@ -108,28 +107,6 @@ class ImporterHandler:
if (index + 1) == len(followings):
print("Finished importing user followings and their tweets")
def handle_timeline(self, user_timeline, associated_account):
"""
Takes in a user timeline and associated account which is the message sender.
Generates Message, adds tweets.
Parameters:
timeline (list): A list of tweets returned from twitter.
associated_account (object): User Account that will have the sender edge.
Returns:
"""
# Add timeline
if user_timeline is not None:
for timeline in user_timeline:
story = timeline._json
story_to_add = self.format_message(story)
story_from_db = self.pod_controller.add_tweet(
story, story_to_add, associated_account
)
print(story_from_db)
else:
print("No tweets found")
def format_message(self, story):
"""
Takes in a dictionary of the story and creates an Message object
......@@ -177,7 +154,8 @@ class ImporterHandler:
def datetime_to_int(self, date_time_string):
# Convert string to date
new_datetime = datetime.strptime(date_time_string, "%a %b %d %H:%M:%S +0000 %Y")
date_time_value = date_time_string
new_datetime = datetime.strptime(date_time_value, "%a %b %d %H:%M:%S +0000 %Y")
# Return datetime as int
datetime_now = int(new_datetime.strftime("%Y%m%d%H%M%S"))
return datetime_now
File moved
import logging
from time import time, sleep
import numpy as np
from pymemri.data.photo import *
from twitter_importer.importer import ImporterHandler
from twitter_importer.data.schema import (
Plugin,
StartPlugin,
Country,
Language,
Account,
Person,
Hashtag,
Symbol,
Resource,
Message,
Color,
)
RUN_IDLE = "idle" # 1
RUN_INITIALIZED = "initilized" # 2
RUN_USER_ACTION_NEEDED = "userActionNeeded" # 2-3
RUN_USER_ACTION_COMPLETED = "ready" # 2-3
RUN_STARTED = "start" # 3
RUN_FAILED = "error" # 3-4
RUN_COMPLETED = "done" # 4
RUN_STATE_POLLING_INTERVAL = 0.6
RUN_USER_ACTION_TIMEOUT = 120
logging.basicConfig(format="%(asctime)s [%(levelname)s] - %(message)s")
# A draft class to be inherited by the actual plugin class, to handle plugin flows like auth, state, progress...
# Requires the plugin class to have a "run" method.
class PluginFlow:
def __init__(self, client, run_id=None, plugin_id=None):
self.client = client
self.run_id = run_id
self.plugin_id = plugin_id
self._setup_schema()
self.initialized()
def start(self):
""" Plugin run wrapper - sets status and allows daemon mode intervals """
logging.warning("Running")
self.started()
while True:
# Plugin class provides this method - plugin's main run logic
self.run()
# daemon run interval
run = self.get_run(expanded=False)
if (
not run.interval
): # interval is falsy. To terminate, set run.interval = 0 or None
break
sleep(run.interval)
self.completed()
# =======================================
# ---------- PLUGIN METHODS -------------
def get_account_from_plugin(self, service=None):
# Update plugin
plugin = self.client.get(self.plugin_id)
# get its connected accounts
account_edges = plugin.get_edges("account")
# if multiple accounts are used
if len(account_edges) > 1 and service:
for account_edge in account_edges:
account = account_edge.traverse(plugin)
if account.service == service:
return account
# assumes there is only one account
elif len(account_edges) == 1:
return account_edges[0].traverse(plugin)
def ask_user_for_accounts(self, service, view, oauth_url=None):
# start userActionNeeded flow
vars = {"state": RUN_USER_ACTION_NEEDED, "oAuthUrl": oauth_url}
self._set_run_vars(vars)
self._set_run_view(view)
# poll here
start_time = time()
# handle timeouts
while RUN_USER_ACTION_TIMEOUT > time() - start_time:
sleep(RUN_STATE_POLLING_INTERVAL)
run_state = self._get_run_state()
if run_state == RUN_USER_ACTION_COMPLETED:
# Now the client has set up the account as an edge to the plugin
return self.get_account_from_plugin(service=service)
raise Exception("PluginFlow: User input timeout")
def set_account_vars(self, vars_dictionary, service=None):
account = self.get_account_from_plugin(service=service)
if account:
for k, v in vars_dictionary.items():
setattr(account, k, v)
self.client.update_item(account)
logging.warning(f"ACCOUNT updated: {account.__dict__}")
else:
# Create account item
account = Account(**vars_dictionary)
# Save accounts as an edge to the plugin
self.client.create(account)
logging.warning(f"ACCOUNT created: {account.__dict__}")
# add the account to the plugin item
plugin = self.client.get(self.plugin_id)
plugin.add_edge("account", account)
plugin.update(self.client)
# =======================================
# ---------- USER CLIENT METHODS -------------
def install_plugin(self, name, container):
plugin = Plugin(name=name, container=container)
self.client.create(plugin)
# set in instance
self.plugin_id = plugin.id
# TODO: Add an edge from the plugin to the desired CVUStoredDefinitions
# start_plugin.view = createLoginCVUs()
return plugin
def trigger_plugin(self, interval=None):
plugin = self.client.get(self.plugin_id)
starter = StartPlugin(
targetItemId=self.plugin_id,
container=plugin.container,
state=RUN_IDLE,
interval=interval,
)
# add cvus here
self.client.create(starter)
self.run_id = starter.id
print(
f"Started plugin {plugin.name} - {self.plugin_id} and run id {self.run_id}"
)
def terminate_run(self):
self._set_run_vars({"interval": None})
# =======================================
# ---------- COMMON METHODS -------------
def get_CVU(self, run):
try:
run = self.get_run(expanded=True)
return run.get_edges("view")[0]
except:
return None
def initialized(self):
logging.warning("PLUGIN run is initialized")
self.state = RUN_INITIALIZED
if self.run_id:
self._set_run_vars({"state": RUN_INITIALIZED})
def started(self):
logging.warning("PLUGIN run is started")
self.state = RUN_STARTED
if self.run_id:
self._set_run_vars({"state": RUN_STARTED})
def failed(self, error):
logging.error(f"PLUGIN run is failed: {error}")
print("Exception while running plugin:", error)
self.state = RUN_FAILED
if self.run_id:
self._set_run_vars({"state": RUN_FAILED, "message": str(error)})
def completed(self):
logging.warning("PLUGIN run is completed")
self.state = RUN_COMPLETED
if self.run_id:
self._set_run_vars({"state": RUN_COMPLETED})
def complete_user_action(self):
self._set_run_vars({"state": RUN_USER_ACTION_COMPLETED})
def is_user_action_needed(self):
return self._get_run_state() == RUN_USER_ACTION_NEEDED
def is_completed(self):
return self._get_run_state() == RUN_COMPLETED
def is_daemon(self):
run = self.get_run(expanded=False)
return run.interval and run.interval > 0
def get_run(self, expanded=False):
return self.client.get(self.run_id, expanded=expanded)
# =======================================
# --------- INTERNAL METHODS ------------
def _get_run_state(self):
start_plugin = self.get_run()
return start_plugin.state
def _set_run_vars(self, vars):
start_plugin = self.client.get(self.run_id, expanded=False)
for k, v in vars.items():
setattr(start_plugin, k, v)
self.client.update_item(start_plugin)
def _set_run_view(self, view_name):
found_cvu = None
views = self.client.search({"type": "CVUStoredDefinition"}) # 'name': view_name
for v in views:
if v.name == view_name:
found_cvu = v
if not found_cvu:
logging.error("CVU is NOT FOUND")
return
run = self.get_run()
bound_CVU_edge = self.get_CVU(
run
) # index error here if there is no already bound CVU
if bound_CVU_edge:
logging.warning(f"Plugin Run already has a view. Updating with {view_name}")
bound_CVU_edge.target = found_cvu # update CVU
bound_CVU_edge.update(
self.client
) # having doubts if this really updates the existing edge
else:
logging.warning(f"Plugin Run does not have a view. Creating {view_name}")
run.add_edge("view", found_cvu)
run.update()
def _setup_schema(self):
importer_handler = ImporterHandler(self.client)
self.client.add_to_schema(
Account(
identifier="",
secret="",
service="",
code="",
refreshToken="",
errorMessage="",
externalId="id_str",
displayName="name",
handle="screen_name",
description="description",
protected="protected",
dateCreated=importer_handler.datetime_to_int(
"Fri Jun 11 08:21:33 +0000 2021"
),
verified="verified",
contributorsEnabled="contributors_enabled",
notifications="notifications",
)
)
self.client.add_to_schema(Plugin(name="", container=""))
self.client.add_to_schema(
StartPlugin(
targetItemId="", container="", state="", oAuthUrl="", interval=1
)
)
self.client.add_to_schema(Country("country_code"))
self.client.add_to_schema(Color("color"))
self.client.add_to_schema(Language("lang"))
# Schema for images such as profile photos
x = np.random.randint(0, 255 + 1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
self.client.add_to_schema(photo)
self.client.add_to_schema(Person("id_str", "name"))
self.client.add_to_schema(
Message(
"created_at", "id_str", "full_text", 10, 5, "favorited", "retweeted",
)
)
self.client.add_to_schema(Resource("source"))
self.client.add_to_schema(Hashtag("hashtag"))
self.client.add_to_schema(Symbol("symbol"))
# export
# External imports
import pymemri
from pymemri.data.schema import *
from pymemri.data.photo import *
from pymemri.pod.client import *
......@@ -8,10 +7,8 @@ import numpy as np
from nbdev.showdoc import show_doc
from PIL import Image
import requests
import cv2
# Local imports
# export
from twitter_importer.data.schema import *
from twitter_importer.data.schema import (
Country,
......
import logging
from time import sleep
from twitter_importer.plugin_flow.plugin_flow import PluginFlow
from twitter_importer.service_api.service_api import ServiceAPI
from config.settings import tokens
from twitter_importer.twitter_client import TwitterClient
from twitter_importer.importer import TwitterImporter
SERVICE_NAME = "demo-service"
NUM_LOGIN_TRIES = 3
# in case this is an app that uses OAuth
APP_ID = ""
APP_SECRET = ""
class PluginRunner(PluginFlow):
def __init__(self, client, plugin_id=None, run_id=None):
super().__init__(client=client, plugin_id=plugin_id, run_id=run_id)
self.dummy_service = ServiceAPI()
# activate plugin flow
if plugin_id and run_id:
# init login process
# self.start_auth_oauth()
self.start_auth_userpass_and_two_factor()
def run(self, username, max_amount=None):
"""
Imports user, friends, followers and timeline into the pod client.
Parameters:
pod_client (object): An instance of the PodClient client.
tokens (object): An object with twitter tokens to access the API.
username (str): A string for the user from whom to fetch data.
Returns:
followers (dict): An object of followers.
"""
twitter_client = TwitterClient(username, max_amount)
importer = TwitterImporter(self.client)
api = twitter_client.authenticate_user(
tokens["ACCESS_TOKEN"],
tokens["ACCESS_SECRET"],
tokens["CONSUMER_KEY"],
tokens["CONSUMER_SECRET"],
)
importer.import_data(api, twitter_client, username, max_amount)
# ====================================
# Example authentication methods
# Existing account login check
def check_existing_auth(self):
account = self.get_account_from_plugin(SERVICE_NAME)
if account:
result = self.dummy_service.login(account.identifier, account.secret)
if result["success"]:
logging.warning("Logged in with existing credentials")
return True
return False
# Example standard username password login flow
def start_auth_userpass_and_two_factor(self):
if self.check_existing_auth():
return True
for _ in range(NUM_LOGIN_TRIES):
# we request username and password here
account = self.ask_user_for_accounts(SERVICE_NAME, "username-password-view")
# check if they work
result = self.dummy_service.login(account.identifier, account.secret)
if result and result["success"] == True:
if result["2fa_required"]:
logging.warning("Authentication requires 2FA")
for _ in range(NUM_LOGIN_TRIES):
account = self.ask_user_for_accounts(SERVICE_NAME, "2fa-view")
result = self.dummy_service.complete_2fa(account.code)
if result["success"]:
logging.warning("Authentication 2FA success")
return True
# set error message for UI
self.set_account_vars(
{"errorMessage": "Incorrect 2fa code"}, service=SERVICE_NAME
)
else:
logging.warning("Authentication success")
return True
# set error message for UI
self.set_account_vars(
{"errorMessage": "Incorrect credentials"}, service=SERVICE_NAME
)
# give up
return False
# Example OAuth flow
def start_auth_oauth(self):
if self.check_existing_auth():
return True
oauth_url = self.dummy_service.get_oauth_url(APP_ID, APP_SECRET)
for i in range(NUM_LOGIN_TRIES):
account = self.ask_user_for_accounts(SERVICE_NAME, "oauth-view", oauth_url)
result = self.dummy_service.get_oauth_token(account.code)
if result["success"]:
# set new token
self.set_account_vars(
{"refreshToken": result["refreshToken"], "secret": result["token"]}
)
return True
# set error message for UI
self.set_account_vars(
{"errorMessage": "Incorrect credentials"}, SERVICE_NAME
)
return False
This diff is collapsed.
# nbdev quickstart
With nbdev we create the code in Notebooks, where we specify the use off cells using special tags. We list the most widely used tags here to get you started quickly
## Tags
When you create a new notebook, add `#default_exp <packagename>.<modulename>` to the top of your notebook to define the Python module to export to. For example, if you have a notebook file `nbs/data.email.ipynb`, with as first line:
```
#default_exp data.email
```
The notebook will write to a file called `integrators/data/email.py` when nbdev is commanded to sync using `nbdev_build_lib`.
All cells `#export` will be converted to code in the outputfile, e.g.
```
# export
def times_two(i): return i*2
```
Will be written to the file specified in #default_exp. All cells without the `#export` tag, will be converted to tests by default.
By default, all cells are included in the documentation, unless you add the keyword `#hide`, e.g.
```
# hide
for i in range(1000000):
print(i)
```
Will not appear in the documentation. Lastly, Notebooks with a name that start with an underscore, are ignored by nbdev completely. Nbdev has many other functionalities, see the [nbdev docs](https://nbdev.fast.ai/) for more information.
## CLI
After developing your code in Notebooks, you can use the nbdev CLI:
- `nbdev_build_lib` to convert the Notebooks `/nbs` to the library in `/integrators`
- `nbdev_test_nbs` to run the tests (all cells without #export tags)
- `nbdev_build_docs` to generate the docs in `/docs`
- `nbdev_clean_nbs` to clean the Notebooks' metadata to prevent Git conflicts, if you followed the normal installation, this is not necessary.
- `nbdev_fix_merge` to fix notebook files (make them readable) when they contain a merge conflict.
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment