--- title: Title keywords: fastai sidebar: home_sidebar nb_path: "nbs/authenticator.ipynb" ---
{% raw %}
{% endraw %} {% raw %}

get_cvu[source]

get_cvu(name, base_path=Path('/home/eelco/projects/plugins/gmail/cvu'))

{% endraw %} {% raw %}
{% endraw %}

Password Authenticator

Password Authenticator provides an easy interface to setup standard username-password authentication with 3rd party services.

Simply calling authenticate() should load the related account with required credentials.

Inheriting class should implement:

  • get_token() that tests username-password combination or gets a new session token to be used for future calls
{% raw %}
from pymemri.plugin.pluginbase import PluginBase
from pymemri.plugin.schema import PluginRun, Account
from pymemri.pod.client import PodClient
import threading

class MyAuthenticatedPlugin(PluginBase):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.logged_in = False
        self.authenticator = PasswordAuthenticator(kwargs["client"], kwargs["pluginRun"])
        
    def login(self, username, password):
        if not (username=="username" and password=="password"):
            raise ValueError("Wrong credentials.")
            
    def run(self):
        self.authenticator.authenticate(self)
        self.logged_in = True
        print("done!")
    
    def add_to_schema(self):
        pass

pod_client = PodClient()

run = PluginRun("", "", "")
account = Account(service="myAuthenticatedPlugin")
run.add_edge("account", account)
run.status = "start"

pod_client.create(run)
pod_client.create(account)
pod_client.create_edge(run.get_edges("account")[0])
True
{% endraw %} {% raw %}
plugin = MyAuthenticatedPlugin(client=pod_client, pluginRun=run)   
{% endraw %} {% raw %}
def run_thread():
    plugin.run()
    assert plugin.logged_in
    
thread = threading.Thread(target=run_thread)
thread.start()
<Response [400]> b'Failure: Property externalId not defined in Schema (attempted to use it for json value "passwordAuth.cvu")'
<Response [404]> b'Endpoint not found'
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
polling for credentials...
{% endraw %} {% raw %}
def simulate_enter_password(pod_client, run_id):
    run = pod_client.get(run_id)
    account = run.account[0]

    username = "username"
    password = "password"
    account.identifier = username
    account.secret = password
    run.status = "ready"

    pod_client.update_item(account)
    pod_client.update_item(run)

simulate_enter_password(pod_client, run.id)    
time.sleep(4)
assert plugin.logged_in
done!
{% endraw %}