#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu May 24 16:41:31 2018
@author: Paolo Cozzi <cozzi@ibba.cnr.it>
"""
import copy
import requests
import logging
import datetime
import collections
from url_normalize import url_normalize
from . import settings
from .client import Client, Document
from .exceptions import USIConnectionError, NotReadyError, USIDataError
logger = logging.getLogger(__name__)
[docs]class Root(Document):
"""Models the USI API Root_ endpoint
Attributes:
api_root (str): The base URL for API endpoints
.. _Root: https://submission-test.ebi.ac.uk/api/docs/ref_root_endpoint.html
"""
# define the default url
api_root = None
[docs] def __init__(self, auth):
# calling the base class method client
Client.__init__(self, auth)
Document.__init__(self)
# setting api_root
self.api_root = settings.ROOT_URL + "/api/"
# setting things. All stuff is inherithed
self.get(self.api_root)
def __str__(self):
return "Biosample API root at %s" % (self.api_root)
[docs] def get_user_teams(self):
"""Follow userTeams url and returns all teams belonging to user
Yield:
Team: a team object
"""
# follow url
document = self.follow_tag('userTeams')
# check if I have submission
if 'teams' not in document._embedded:
logger.warning("You haven't any team yet!")
return
# now iterate over teams and create new objects
for document in document.paginate():
for team_data in document._embedded['teams']:
team = Team(self.auth, team_data)
logger.debug("Found %s team" % (team.name))
# returning teams as generator
yield team
[docs] def get_team_by_name(self, team_name):
"""Get a :py:class:`Team` object by name
Args:
team_name (str): the name of the team
Returns:
Team: a team object
"""
logger.debug("Searching for %s" % (team_name))
for team in self.get_user_teams():
if team.name == team_name:
return team
# if I arrive here, no team is found
raise NameError("team: {team} not found".format(team=team_name))
[docs] def get_user_submissions(self, status=None, team=None):
"""Follow the userSubmission url and returns all submission owned by
the user
Args:
status (str): filter user submissions using this status
team (str): filter user submissions belonging to this team
Returns:
list: A list of :py:class:`Submission` objects
"""
# follow url
document = self.follow_tag('userSubmissions')
# check if I have submission
if 'submissions' not in document._embedded:
logger.warning("You haven't any submission yet!")
return
# now iterate over submissions and create new objects
for document in document.paginate():
for submission_data in document._embedded['submissions']:
submission = Submission(self.auth, submission_data)
if status and submission.status != status:
logger.debug("Filtering %s submission" % (submission.name))
continue
if team and submission.team != team:
logger.debug("Filtering %s submission" % (submission.name))
continue
logger.debug("Found %s submission" % (submission.name))
yield submission
[docs] def get_submission_by_name(self, submission_name):
"""Got a specific submission object by providing its name
Args:
submission_name (str): input submission name
Returns:
Submission: The desidered submission as instance
"""
# define submission url
url = "/".join([self.api_root, 'submissions', submission_name])
# fixing url (normalizing)
url = url_normalize(url)
# create a new submission object
submission = Submission(self.auth)
# doing a request
try:
submission.get(url)
except USIDataError as exc:
if submission.last_status_code == 404:
# if I arrive here, no submission is found
raise NameError(
"submission: '{name}' not found".format(
name=submission_name))
else:
raise exc
return submission
[docs]class User(Document):
"""Deal with EBI AAP endpoint to get user information
Attributes:
name (str): Output of ``Auth.claims['nickname']``
data (dict): data (dict): data from AAP read with
:py:meth:`response.json() <requests.Response.json>`
userName (str): AAP username
email (str): AAP email
userReference (str): AAP userReference
"""
user_url = None
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# calling the base class method client
Client.__init__(self, auth)
Document.__init__(self)
# define the base user url
self.user_url = settings.AUTH_URL + "/users"
# my class attributes
self.name = self.auth.claims["nickname"]
self.data = None
# other attributes
self.userName = None
self.email = None
self.userReference = None
self.links = None
# dealing with this type of documents.
if data:
logger.debug("Reading data for user")
self.read_data(data)
[docs] def get_my_id(self):
"""Get user id using own credentials, and set userReference attribute
Returns:
str: the user AAP reference as a string
"""
# defining URL
url = "%s/%s" % (self.user_url, self.name)
logger.debug("Getting info from %s" % (url))
# read url and get my attributes
self.get(url)
# returning user id
return self.userReference
[docs] def get_user_by_id(self, user_id):
"""Get a :py:class:`User` object by user_id
Args:
user_id (str): the required user_id
Returns:
User: a user object
"""
# defining URL
url = "%s/%s" % (self.user_url, user_id)
logger.debug("Getting info from %s" % (url))
# create a new user obj
user = User(self.auth)
# read url and get data
user.get(url)
# returning user
return user
[docs] @classmethod
def create_user(cls, user, password, confirmPwd, email, full_name,
organisation):
"""Create another user into biosample AAP and return its ID
Args:
user (str): the new username
password (str): the user password
confirmPwd (str): the user confirm password
email (str): the user email
full_name (str): Full name of the user
organisation (str): organisation name
Returns:
str: the new user_id as a string
"""
# check that passwords are the same
if password != confirmPwd:
raise ValueError("passwords don't match!!!")
# the AAP url
url = settings.AUTH_URL + "/auth"
# define a new header
headers = {}
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
# TODO: use more informative parameters
data = {
"username": user,
"password": password,
"confirmPwd": confirmPwd,
"email": email,
"name": full_name,
"organisation": organisation
}
# call a post method a deal with response. I don't need a client
# object to create a new user
session = requests.Session()
response = session.post(url, json=data, headers=headers)
if response.status_code != 200:
raise USIConnectionError(response.text)
# returning user id
return response.text
[docs] def create_team(self, description, centreName):
"""Create a new team
Args:
description (str): team description
centreName (str): team center name
Returns:
Team: the new team as a :py:class:`Team` instance
"""
url = settings.ROOT_URL + "/api/user/teams"
# define a new header. Copy the dictionary, don't use the same object
headers = copy.copy(self.headers)
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
data = {
"description": description,
"centreName": centreName
}
# call a post method a deal with response
response = self.post(url, payload=data, headers=headers)
# If I create a new team, the Auth object need to be updated
logger.warning(
"You need to generate a new token in order to see the new "
"generated team")
# create a new team object
team = Team(self.auth, response.json())
return team
[docs] def get_teams(self):
"""Get teams belonging to this instance
Returns:
list: a list of :py:class:`Team` objects
"""
url = settings.ROOT_URL + "/api/user/teams"
# create a new document
document = Document(auth=self.auth)
document.get(url)
# now iterate over teams and create new objects
for document in document.paginate():
for team_data in document._embedded['teams']:
team = Team(self.auth, team_data)
logger.debug("Found %s team" % (team.name))
# returning teams as generator
yield team
[docs] def get_team_by_name(self, team_name):
"""Get a team by name
Args:
team_name (str): the required team
Returns:
Team: the desidered :py:class:`Team` instance
"""
logger.debug("Searching for %s" % (team_name))
for team in self.get_teams():
if team.name == team_name:
return team
# if I arrive here, no team is found
raise NameError("team: {team} not found".format(team=team_name))
[docs] def get_domains(self):
"""Get domains belonging to this instance
Returns:
list: a list of :py:class:`Domain` objects
"""
url = settings.AUTH_URL + "/my/domains"
# make a request with a client object
response = Client.get(self, url)
# iterate over domains (they are a list of objects)
for domain_data in response.json():
domain = Domain(self.auth, domain_data)
logger.debug("Found %s domain" % (domain.name))
# returning domain as a generator
yield domain
[docs] def get_domain_by_name(self, domain_name):
"""Get a domain by name
Args:
domain_name (str): the required team
Returns:
Domain: the desidered :py:class:`Domain` instance
"""
logger.debug("Searching for %s" % (domain_name))
# get all domains
for domain in self.get_domains():
if domain.domainName == domain_name:
return domain
# if I arrive here, no team is found
raise NameError("domain: {domain} not found".format(
domain=domain_name))
[docs] def add_user_to_team(self, user_id, domain_id):
"""Add a user to a team
Args:
user_id (str): the required user_id
domain_id (str) the required domain_id
Returns:
Domain: the updated :py:class:`Domain` object"""
url = (
"{auth_url}/domains/{domain_id}/"
"{user_id}/user".format(
domain_id=domain_id,
user_id=user_id,
auth_url=settings.AUTH_URL)
)
response = self.put(url)
domain_data = response.json()
return Domain(self.auth, domain_data)
[docs]class Domain(Document):
"""
A class to deal with AAP domain objects
Attributes:
name (str): domain name
data (dict): data (dict): data from AAP read with
:py:meth:`response.json() <requests.Response.json>`
domainName (str): AAP domainName
domainDesc (str): AAP domainDesc
domainReference (str): AAP domainReference
link (dict): ``links`` data read from AAP response
"""
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# calling the base class method client
Client.__init__(self, auth)
Document.__init__(self)
# my class attributes
self.data = None
# other attributes
self.domainName = None
self.domainDesc = None
self.domainReference = None
self.links = None
self._users = None
# dealing with this type of documents.
if data:
logger.debug("Reading data for team")
self.read_data(data)
# this class lacks of a name attribute, so
self.name = self.domainName
def __str__(self):
if not self.domainReference:
return "domain not yet initialized"
reference = self.domainReference.split("-")[1]
return "%s %s %s" % (reference, self.name, self.domainDesc)
@property
def users(self):
"""Get users belonging to this domain"""
if not self._users and isinstance(self.links, list):
for url in self.links:
if 'user' in url['href']:
# using the base get method
response = Client.get(self, url['href'])
break
tmp_data = response.json()
# parse users as User objects
self._users = []
for user_data in tmp_data:
self._users.append(User(self.auth, data=user_data))
return self._users
@users.setter
def users(self, value):
self._users = value
[docs] def create_profile(self, attributes={}):
"""Create a profile for this domain
Args:
attributes (dict): a dictionary of attributes
"""
# see this url for more information
# https://explore.api.aai.ebi.ac.uk/docs/profile/index.html#resource-create_domain_profile
url = settings.AUTH_URL + "/profiles"
# define a new header. Copy the dictionary, don't use the same object
headers = copy.copy(self.headers)
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
# define data
data = {
"domain": {
"domainReference": self.domainReference
},
"attributes": attributes
}
# call a post method a deal with response
response = self.post(url, payload=data, headers=headers)
# create a new domain object
domain = Domain(self.auth, response.json())
return domain
[docs]class Team(Document):
"""A class to deal with USI Team_ objects
Attributes:
name (str): team name
data (dict): data (dict): data from USI read with
:py:meth:`response.json() <requests.Response.json>`
.. _Team: https://submission-test.ebi.ac.uk/api/docs/ref_teams.html
"""
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# calling the base class method client
Client.__init__(self, auth)
Document.__init__(self)
# my class attributes
self.name = None
self.data = None
self.description = None
self.profile = None
# dealing with this type of documents.
if data:
logger.debug("Reading data for team")
self.read_data(data)
def __str__(self):
return "{0} ({1})".format(self.name, self.description)
[docs] def get_submissions(self, status=None):
"""Follows submission url and get submissions from this team
Args:
status (str): filter submission using status
Returns:
list: A list of :py:class:`Submission` objects"""
# follow url
document = self.follow_tag('submissions')
# check if I have submission
if 'submissions' not in document._embedded:
logger.warning("You haven't any submission yet!")
return
# now iterate over submissions and create new objects
for document in document.paginate():
for submission_data in document._embedded['submissions']:
submission = Submission(self.auth, submission_data)
if status and submission.status != status:
logger.debug("Filtering %s submission" % (submission.name))
continue
logger.debug("Found %s submission" % (submission.name))
yield submission
[docs] def create_submission(self):
"""Create a new submission
Returns:
Submission: the new submission as an instance"""
# get the url for submission:create. I don't want a document using
# get method, I need instead a POST request
url = self._links['submissions:create']['href']
# define a new header. Copy the dictionary, don't use the same object
headers = copy.copy(self.headers)
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
# call a post method a deal with response
response = self.post(url, payload={}, headers=headers)
# create a new Submission object
submission = Submission(auth=self.auth, data=response.json())
# there are some difference between a new submission and
# an already defined submission
logger.debug("reload submission to fix issues")
# calling this method will reload submission and its status
submission.reload()
return submission
# helper functions
[docs]def check_relationship(sample_data, team):
"""Check relationship and add additional fields if missing"""
# create a copy of sample_data
sample_data = copy.copy(sample_data)
# check relationship if exists
if 'sampleRelationships' not in sample_data:
return sample_data
for relationship in sample_data['sampleRelationships']:
if 'team' not in relationship:
logger.debug("Adding %s to relationship" % (team))
# setting the referenced object
relationship['team'] = team
# this is the copied sample_data, not the original one!!!
return sample_data
[docs]def check_releasedate(sample_data):
"""Add release date to sample data if missing"""
# create a copy of sample_data
sample_data = copy.copy(sample_data)
# add a default release date if missing
if 'releaseDate' not in sample_data:
today = datetime.date.today()
logger.warning("Adding %s as releasedate")
sample_data['releaseDate'] = str(today)
# this is the copied sample_data, not the original one!!!
return sample_data
[docs]class TeamMixin(object):
[docs] def __init__(self):
"""Instantiate the class"""
# my class attributes
self._team = None
@property
def team(self):
"""Get/Set team name"""
# get team name
if isinstance(self._team, str):
team_name = self._team
elif isinstance(self._team, dict):
team_name = self._team['name']
elif self._team is None:
team_name = ""
else:
raise NotImplementedError(
"Unknown type: %s" % type(self._team)
)
return team_name
@team.setter
def team(self, value):
self._team = value
[docs]class Submission(TeamMixin, Document):
"""A class to deal with USI Submissions_
Attributes:
id (str): submission id (:py:meth:`~name` for compatibility)
createdDate (str): created date
lastModifiedDate (str): last modified date
lastModifiedBy (str): last user_id who modified this submission
submissionStatus (str): submission status
submitter (dict): submitter data
createdBy (str): user_id who create this submission
submissionDate (str): date when this submission is submitted to
biosample
.. _Submissions: https://submission-test.ebi.ac.uk/api/docs/ref_submissions.html
""" # noqa
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# this will track submission name
self.id = None
# calling the base class method client
super().__init__()
# now setting up Client and document class attributes
Client.__init__(self, auth)
Document.__init__(self)
# my class attributes
self.createdDate = None
self.lastModifiedDate = None
self.lastModifiedBy = None
self.submissionStatus = None
self.submitter = None
self.createdBy = None
# when this attribute appears? maybe when submission take place
self.submissionDate = None
# each document need to parse data as dictionary, since there could be
# more submission read from the same page. I cant read data from
# self.last_response itself, cause I can't have a last response
if data:
self.read_data(data)
def __str__(self):
if not self.name:
return "Submission not yet initialized"
return "%s %s %s %s" % (
self.name,
self.team,
self.lastModifiedDate.date(),
self.status,)
# for compatibility
@property
def name(self):
"""Get/Set Submission :py:attr:`~id`"""
return self.id
@name.setter
def name(self, submission_id):
if submission_id != self.id:
logger.debug(
"Overriding id (%s > %s)" % (self.id, submission_id))
self.id = submission_id
[docs] def read_data(self, data, force_keys=False):
"""Read data from a dictionary object and set class attributes
Args:
data (dict): a data dictionary object read with
:py:meth:`response.json() <requests.Response.json>`
force_keys (bool): If True, define a new class attribute from data
keys
"""
logger.debug("Reading data for submission")
super().read_data(data, force_keys)
# check for name
if 'self' in self._links:
name = self._links['self']['href'].split("/")[-1]
# remove {?projection} name
if '{?projection}' in name:
logger.debug("removing {?projection} from name")
name = name.replace("{?projection}", "")
logger.debug("Using %s as submission name" % (name))
self.name = name
[docs] def check_ready(self):
"""Test if a submission can be submitted or not (Must have completed
validation processes)
Returns:
bool: True if ready for submission
"""
# Try to determine url manually
url = (
"{api_root}/api/submissions/"
"{submission_name}/availableSubmissionStatuses".format(
submission_name=self.name,
api_root=settings.ROOT_URL)
)
# read a url in a new docume nt
document = Document.read_url(self.auth, url)
if hasattr(document, "_embedded"):
if 'statusDescriptions' in document._embedded:
return True
# default response
return False
@property
def status(self):
"""Return :py:attr:`~submissionStatus` attribute. Follow
``submissionStatus`` link and update attribute is such attribute is
None
Returns:
str: submission status as a string"""
if self.submissionStatus is None:
self.update_status()
return self.submissionStatus
[docs] def update_status(self):
"""Update :py:attr:`~submissionStatus` attribute by following
``submissionStatus`` link"""
document = self.follow_tag('submissionStatus')
self.submissionStatus = document.status
[docs] def create_sample(self, sample_data):
"""Create a sample from a dictionary
Args:
sample_data (dict): a dictionary of data
Returns:
Sample: a :py:class:`Sample` object"""
# check sample_data for required attributes
fixed_data = check_relationship(sample_data, self.team)
fixed_data = check_releasedate(fixed_data)
# debug
logger.debug(fixed_data)
# check if submission has the contents key
if 'contents' not in self._links:
# reload submission object in order to add items to it
self.reload()
# get the url for sample create
document = self.follow_tag("contents")
# get the url for submission:create. I don't want a document using
# get method, I need instead a POST request
url = document._links['samples:create']['href']
# define a new header. Copy the dictionary, don't use the same object
headers = copy.copy(self.headers)
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
# call a post method a deal with response
response = self.post(url, payload=fixed_data, headers=headers)
# create a new sample
sample = Sample(auth=self.auth, data=response.json())
# returning sample as and object
return sample
[docs] def get_samples(self, status=None, has_errors=None,
ignorelist=[]):
"""Returning all samples as a list. Can filter by errors and error
types::
# returning samples with errors in other checks than Ena
submission.get_samples(has_errors=True, ignorelist=['Ena'])
# returning samples which validation is still in progress
submission.get_samples(status='Pending')
Get all sample with errors in other fields than *Ena* databank
Args:
status (str): filter samples by validation status
(Pending, Complete)
has_errors (bool): filter samples with errors or none
ingnore_list (list): a list of errors to ignore
Yield:
Sample: a :py:class:`Sample` object
"""
# get sample url in one step
self_url = self._links['self']['href']
samples_url = "/".join([self_url, "contents/samples"])
# read a new document
document = Document.read_url(self.auth, samples_url)
# empty submission hasn't '_embedded' key
if '_embedded' not in document.data:
logger.warning("You haven't any samples yet!")
return
# now iterate over samples and create new objects
for document in document.paginate():
for sample_data in document._embedded['samples']:
sample = Sample(self.auth, sample_data)
if (status and
sample.get_validation_result().validationStatus
!= status):
logger.debug("Filtering %s sample" % (sample))
continue
if has_errors and has_errors != sample.has_errors(ignorelist):
logger.debug("Filtering %s sample" % (sample))
continue
logger.debug("Found %s sample" % (sample))
yield sample
[docs] def get_validation_results(self):
"""Return validation results for submission
Yield:
ValidationResult: a :py:class:`ValidationResult` object"""
# deal with different subission instances
if 'validationResults' not in self._links:
logger.warning("reloading submission")
self.reload()
document = self.follow_tag('validationResults')
# now iterate over validationresults and create new objects
for document in document.paginate():
for validation_data in document._embedded['validationResults']:
validation_result = ValidationResult(
self.auth, validation_data)
logger.debug("Found %s sample" % (validation_result))
yield validation_result
[docs] def get_status(self):
"""Count validation statues for submission
Returns:
collections.Counter: A counter object for different validation
status"""
# get validation results
validations = self.get_validation_results()
# get statuses
statuses = [validation.validationStatus for validation in validations]
return collections.Counter(statuses)
# there are errors that could be ignored
[docs] def has_errors(self, ignorelist=[]):
"""Count sample errors for a submission
Args:
ignorelist (list): ignore samples with errors in these databanks
Returns:
collections.Counter: A counter object for samples with errors and
with no errors"""
# check errors only if validation is completed
if 'Pending' in self.get_status():
raise NotReadyError(
"You can check errors after validation is completed")
# get validation results
validations = self.get_validation_results()
# get errors
errors = [
validation.has_errors(ignorelist) for validation in validations]
return collections.Counter(errors)
[docs] def delete(self):
"""Delete this submission instance from USI"""
url = self._links['self:delete']['href']
logger.info("Removing submission %s" % self.name)
# don't return anything
Client.delete(self, url)
[docs] def reload(self):
"""call :py:meth:`Document.follow_self_url` and reload class
attributes"""
logger.info("Refreshing data data for submission")
self.follow_self_url()
# reload submission status
self.update_status()
[docs] def finalize(self, ignorelist=[]):
"""Finalize a submission to insert data into biosample
Args:
ignorelist (list): ignore samples with errors in these databanks
Returns:
Document: output of finalize submission as a :py:class:`Document`
object
"""
if not self.check_ready():
raise NotReadyError("Submission not ready for finalization")
# raise exception if submission has errors
if True in self.has_errors(ignorelist):
raise USIDataError("Submission has errors, fix them")
# refresh my data
self.reload()
document = self.follow_tag('submissionStatus')
# get the url to change
url = document._links['submissionStatus']['href']
# define a new header. Copy the dictionary, don't use the same object
headers = copy.copy(self.headers)
# add new element to headers
headers['Content-Type'] = 'application/json;charset=UTF-8'
response = self.put(
url,
payload={'status': 'Submitted'},
headers=headers)
# create a new document
document = Document(auth=self.auth, data=response.json())
# copying last responsponse in order to improve data assignment
logger.debug("Assigning %s to document" % (response))
document.last_response = response
document.last_status_code = response.status_code
# update submission status
self.update_status()
return document
[docs]class Sample(TeamMixin, Document):
"""A class to deal with USI Samples_
Attributes:
alias (str): The sample alias (used to reference the same object)
team (dict): team data
title (str): sample title
description (str): sample description
attributes (dict): sample attributes
sampleRelationships (list): relationship between samples
taxonId (int): taxon id
taxon (str): taxon name
releaseDate (str): when this sample will be relased to public
createdDate (str): created date
lastModifiedDate (str): last modified date
createdBy (str): user_id who create this sample
lastModifiedBy (str): last user_id who modified this sample
accession (str): the biosample_id after submission to USI
.. _Samples: https://submission-test.ebi.ac.uk/api/docs/ref_samples.html
"""
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# calling the base class method client
super().__init__()
# now setting up Client and document class attributes
Client.__init__(self, auth)
Document.__init__(self)
# my class attributes
self.alias = None
self.team = None
self.title = None
self.description = None
self.attributes = None
self.sampleRelationships = None
self.taxonId = None
self.taxon = None
self.releaseDate = None
self.createdDate = None
self.lastModifiedDate = None
self.createdBy = None
self.lastModifiedBy = None
# when this attribute appears? maybe when submission take place
self.accession = None
if data:
self.read_data(data)
def __str__(self):
# get accession or alias
if self.accession:
return "%s (%s)" % (self.accession, self.title)
else:
return "%s (%s)" % (self.alias, self.title)
[docs] def read_data(self, data, force_keys=False):
"""Read data from a dictionary object and set class attributes
Args:
data (dict): a data dictionary object read with
:py:meth:`response.json() <requests.Response.json>`
force_keys (bool): If True, define a new class attribute from data
keys
"""
logger.debug("Reading data for Sample")
super().read_data(data, force_keys)
# check for name
if 'self' in self._links:
self.name = self._links['self']['href'].split("/")[-1]
logger.debug("Using %s as sample name" % (self.name))
[docs] def delete(self):
"""Delete this instance from a submission"""
url = self._links['self:delete']['href']
logger.info("Removing sample %s from submission" % self.name)
# don't return anything
Client.delete(self, url)
[docs] def reload(self):
"""call :py:meth:`Document.follow_self_url` and reload class
attributes"""
logger.info("Refreshing data data for sample")
self.follow_self_url()
[docs] def patch(self, sample_data):
"""Update sample by patching data with :py:meth:`Client.patch`
Args:
sample_data (dict): sample data to update"""
# check sample_data for required attributes
fixed_data = check_relationship(sample_data, self.team)
fixed_data = check_releasedate(fixed_data)
url = self._links['self']['href']
logger.info("patching sample %s with %s" % (self.name, fixed_data))
super().patch(url, payload=fixed_data)
# reloading data
self.reload()
[docs] def get_validation_result(self):
"""Return validation results for submission
Returns:
ValidationResult: the :py:class:`ValidationResult` of this sample
"""
document = self.follow_tag('validationResult', force_keys=True)
return ValidationResult(self.auth, document.data)
# there are errors that could be ignored
[docs] def has_errors(self, ignorelist=[]):
"""Return True if validation results throw an error
Args:
ignorelist (list): ignore errors in these databanks
Returns:
bool: True if sample has an errors in one or more databank"""
validation = self.get_validation_result().has_errors(ignorelist)
if validation:
logger.error("Got error(s) for %s" % (self))
return validation
[docs]class ValidationResult(Document):
[docs] def __init__(self, auth, data=None):
"""Instantiate the class
Args:
auth (Auth): a valid :py:class:`Auth <pyUSIrest.auth.Auth>` object
data (dict): instantiate the class from a dictionary of user data
"""
# calling the base class method client
Client.__init__(self, auth)
Document.__init__(self)
# my class attributes
self.version = None
self.expectedResults = None
self.errorMessages = None
self.overallValidationOutcomeByAuthor = None
self.validationStatus = None
if data:
self.read_data(data)
def __str__(self):
message = self.validationStatus
if self.overallValidationOutcomeByAuthor:
message += " %s" % (str(self.overallValidationOutcomeByAuthor))
return message
# there are errors that could be ignored
[docs] def has_errors(self, ignorelist=[]):
"""Return True if validation has errors
Args:
ignorelist (list): ignore errors in these databanks
Returns:
bool: True if sample has errors for at least one databank"""
has_errors = False
for key, value in self.overallValidationOutcomeByAuthor.items():
if value == 'Error' and key not in ignorelist:
message = ", ".join(self.errorMessages[key])
logger.error(message)
has_errors = True
return has_errors