Commit 490fc21b authored by Eelco van der Wel's avatar Eelco van der Wel :speech_balloon:
Browse files

automated commit

parents
Pipeline #8453 failed with stage
in 1 minute and 27 seconds
Showing with 540 additions and 0 deletions
+540 -0
.gitignore 0 → 100644
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
\ No newline at end of file
stages:
- build
test:
image: python:3.9
stage: build
services:
- name: ${POD_IMAGE}
alias: pod
entrypoint:
- "/pod"
- "--owners=ANY"
- "--insecure-non-tls=0.0.0.0"
before_script:
- pip install -e .
script:
- curl http://pod:3030/version
- export POD_ADDRESS='http://pod:3030'
- pytest
create_config:
image: python:3.9
only:
- main
- prod
stage:
build
script:
- pip install -e .
- create_plugin_config
artifacts:
paths:
- config.json
- schema.json
build_image:
only:
- dev
- prod
- main
stage: build
before_script:
- echo "Building docker image..."
image:
# We recommend using the CERN version of the Kaniko image: gitlab-registry.cern.ch/ci-tools/docker-image-builder
name: gcr.io/kaniko-project/executor:debug
entrypoint: [""]
script:
# Prepare Kaniko configuration file
- echo "{\"auths\":{\"$CI_REGISTRY\":{\"username\":\"$CI_REGISTRY_USER\",\"password\":\"$CI_REGISTRY_PASSWORD\"}}}" > /kaniko/.docker/config.json
# Build and push the image from the Dockerfile at the root of the project.
# To push to a specific docker tag, amend the --destination parameter, e.g. --destination $CI_REGISTRY_IMAGE:$CI_BUILD_REF_NAME
# See https://docs.gitlab.com/ee/ci/variables/predefined_variables.html#variables-reference for available variables
- /kaniko/executor --context $CI_PROJECT_DIR --dockerfile $CI_PROJECT_DIR/Dockerfile --destination $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-$CI_COMMIT_SHORT_SHA --destination $CI_REGISTRY_IMAGE:$CI_COMMIT_REF_SLUG-latest
Dockerfile 0 → 100644
FROM gitlab.memri.io:5050/memri/plugin-templates/classifier_plugin:latest
ENV DEBIAN_FRONTEND=noninteractive
ARG USE_CACHE=false
WORKDIR /usr/mngc/
COPY ./setup.py ./setup.py
COPY ./setup.cfg ./setup.cfg
COPY ./mngc ./mngc
COPY ./tools ./tools
# use pip cache (set export DOCKER_BUILDKIT=1 and provide --build-arg USE_CACHE=true)
RUN if [ "$USE_CACHE" = "true" ] ; then --mount=type=cache,mode=0755,target=/root/.cache pip3 install -e . ; else pip3 install -e . ; fi
# preload model assets when building docker container
RUN python ./tools/preload.py
CMD ["run_plugin", "--read_args_from_env", "True"]
This diff is collapsed.
# Mngc
{
"pluginType": "app",
"name": "Mngc",
"containerImage": "gitlab.memri.io:5050/eelcovdw/mngc:main-latest",
"icon": null,
"pluginModule": "mngc.plugin",
"pluginName": "ClassifierPlugin",
"description": "A transformer based sentiment analyis plugin"
}
\ No newline at end of file
from typing import List, Any
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
from pymemri.data_loader import load_huggingface_model_for_project
class Model:
def __init__(self, name: str = None, version: str = None) -> None:
"""
A classifier Model should return a list of dictionaries {"label": class_label, "score": class_score} for each class.
For example, for a binary sentiment classification an example input and output could be:
```
input = ["This is great"]
output = [
[{"label": "Positive", "score": 0.9},
{"label": "Negative", "score": 0.1}]
]
```
"""
self.name = name
self.version = version
model = load_huggingface_model_for_project(project_path="eelcovdw/mngc")
tokenizer = AutoTokenizer.from_pretrained("distilroberta-base", model_max_length=512)
self.pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer, return_all_scores=True, truncation=True)
def predict(self, x: List[Any]) -> List[Any]:
return self.pipeline(x)
import json
from typing import Any, Dict, List
from pymemri.plugin.pluginbase import PluginBase
from pymemri.data.itembase import Item, Edge
from pymemri.data.schema import Message, EmailMessage
from .utils import item_to_data
from .model import Model
from .schema import CategoricalPrediction
from .schema import Model as ModelItem
class ClassifierPlugin(PluginBase):
schema_classes = [Message, CategoricalPrediction, ModelItem]
def __init__(
self,
item_type: str = "Message",
item_service: str = None,
model_name: str = "mngc_model",
model_version: str = "0.1",
isMock: bool = True,
**kwargs,
):
"""
ClassifierPlugin is a plugin that wraps any classifier and handles all communication with the Pod and conversion from/to `Item`s
Args:
item_type (str, optional): The Item type this plugin should make predictions on. Defaults to "Message".
item_service (str, optional): The service of Items this plugin should make predictions on. Defaults to None.
model_name (str, optional): Name of the model the plugin should use. Defaults to None.
model_version (str, optional): Version of the model the plugin should use. Defaults to "0.1".
"""
super().__init__(**kwargs)
self.batch_size = 512
self.model_name = model_name
self.model_version = model_version
self.query = {"type": item_type}
if item_service is not None:
self.query["service"] = item_service
if isMock is True:
self.query["isMock"] = True
def run(self):
"""Run `self.model` on all data in `self.client.search(self.query)`"""
print("Loading model...")
self.load_model()
print(f"Start predicting...")
for i, item_batch in enumerate(self.client.search_paginate(
self.query, limit=self.batch_size
)):
print(f"Predicting batch {i:<4}")
item_batch = self.filter_items(item_batch)
prepared_batch = self.prepare_batch(item_batch)
predictions = self.model.predict(prepared_batch)
prediction_items = [self.prediction_to_item(p) for p in predictions]
self.sync_to_pod(item_batch, prediction_items)
print("Done")
def load_model(self):
self.model = Model()
# Without model name, do not create a model item
if self.model_name is None:
return
# Search in pod for existing models with same name and version,
# add a new model if it does not exist.
model_items = self.client.search(
{"type": "Model", "name": self.model_name, "version": self.model_version}
)
if model_items:
self.model_item = model_items[0]
else:
self.model_item = ModelItem(
name=self.model_name, version=self.model_version
)
self.client.create(self.model_item)
def filter_items(self, items: List[Item]) -> List[Item]:
result = []
for item in items:
if not (isinstance(item, EmailMessage) or isinstance(item, Message)):
raise NotImplementedError()
if item.content:
result.append(item)
return result
def prepare_batch(self, batch: List[Item]) -> List[Any]:
"""Prepare a list of items for the model. See `utils.item_to_data` for more information.
Args:
batch (List[Item]): List of Items from the Pod.
Returns:
List[Any]: List of prepared data.
"""
return [item_to_data(item, self.client) for item in batch]
def prediction_to_item(
self, prediction: List[Dict[str, Any]]
) -> CategoricalPrediction:
"""Converts a prediction returned by self.model to a CategoricalPrediction that can be added to the Pod
Args:
prediction (List[Dict[str, float]]): List of predictions. For the correct format, see `.model.Model`.
Returns:
CategoricalPrediction: List of formatted `CategoricalPrediction`s
"""
# Get the label with the highest score
max_label = max(prediction, key=lambda p: p["score"])["label"]
return CategoricalPrediction(
probs=json.dumps(prediction),
value=max_label,
source=f"{self.model.name}:{self.model.version}",
)
def sync_to_pod(self, items: List[Item], predictions: List[CategoricalPrediction]):
"""For each item, prediction, add the prediction to the pod and create an edge called 'label' between Item and Prediction.
Args:
items (List[Item]): [description]
predictions (List[Prediction]): [description]
"""
# Create edges between item and predictions
edges = [
Edge(item, prediction, "label")
for item, prediction in zip(items, predictions)
]
# Create edges between predictions and model
if self.model_item is not None:
edges += [
Edge(prediction, self.model_item, "model") for prediction in predictions
]
self.client.bulk_action(create_items=predictions, create_edges=edges)
from typing import Optional
from pymemri.data.itembase import Item
class CategoricalPrediction(Item):
properties = Item.properties + ["source", "value", "probs"]
edges = Item.edges + ["model"]
def __init__(self, source: str = None, value: str = None, probs: str = None, model: list = None,**kwargs):
super().__init__(**kwargs)
# Properties
self.source: Optional[str] = source
self.value: Optional[str] = value
self.probs = probs
# Edges
self.model = model if model is not None else list()
class Model(Item):
properties = Item.properties + ["name", "version"]
def __init__(self, name: str = None, version: str = None, **kwargs):
super().__init__(**kwargs)
# Properties
self.name: str = name
self.version: str = version
\ No newline at end of file
from io import BytesIO
import PIL
from pymemri.data.schema import Message
from pymemri.data.photo import Photo
from pymemri.data.schema import EmailMessage
from pymemri.pod.client import PodClient
def item_to_data(item, client):
if isinstance(item, Photo):
return photo_to_PIL(item, client)
elif isinstance(item, EmailMessage) or isinstance(item, Message):
return item.content
else:
raise NotImplementedError(
f"Converting Item of type {type(item)} is not implemented."
)
def photo_to_PIL(photo: Photo, client: PodClient):
photo = client.get_photo(photo.id)
return PIL.Image.open(BytesIO(photo.data))
setup.cfg 0 → 100644
[metadata]
name = mngc
version = 0.0.1
author = eelcovdw
description = A transformer based sentiment analyis plugin
long_description = file: README.md
long_description_content_type = text/markdown
url = https://gitlab.memri.io/eelcovdw/mngc
license = MPPL
license_files = LICENSE.txt
classifiers =
Programming Language :: Python :: 3
[options]
packages = find:
python_requires = >=3.6
install_requires =
pymemri
pytest
transformers
sentencepiece
protobuf
torch==1.10.0
[options.packages.find]
where = mngc
setup.py 0 → 100644
import setuptools
if __name__ == "__main__":
setuptools.setup()
import importlib
import json
import pytest
from pymemri.data.schema import Message
from pymemri.pod.client import PodClient
from pymemri.plugin.pluginbase import get_plugin_cls
@pytest.fixture
def client():
return PodClient()
@pytest.fixture
def metadata():
with open("./metadata.json") as f:
content = json.load(f)
return content
def create_dummy_data(client: PodClient) -> dict:
"""
Add dummy data to pod and return the search query that retrieves the data.
"""
raise NotImplementedError()
@pytest.mark.filterwarnings("ignore:Plugin needs a pluginRun as kwarg")
def test_plugin(client, metadata):
search_query = create_dummy_data(client)
# Load plugin
plugin_cls = get_plugin_cls(metadata["pluginModule"], metadata["pluginName"])
plugin = plugin_cls(client=client)
plugin.add_to_schema()
# Run plugin
plugin.query = search_query
plugin.run()
# Check if all data has a prediction
results = client.search(plugin.query)
assert len(results)
assert all([len(result.label)==1 for result in results])
assert all([result.label[0].value for result in results])
"""
`preload.py` is run when building the docker container,
and can be used to download any assets like pretrained models.
"""
from mngc.model import Model
def preload():
# this downloads the model, such that it is stored on disk, and does not need to be loaded at runtime
model = Model()
if __name__=="__main__":
preload()
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment