Newer
Older
"""
:Author: Franziska Koehn
:Created: 2015/01/13
This module includes functions around sending and defining queries.
to query the xnat server define:
one string including one Root-Type like:
.. sourcecode:: python
'xnat:mrScanData'
one list including the search-constraints like:
.. code-block:: python
[
('xnat:mrScanData/TYPE', 'LIKE', '%t2%'),
('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
'AND',
[('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
'OR'
]
]
a list of search-fields like:
.. code-block:: python
[ 'xnat:mrScanData/TYPE','xnat:mrScanData/ID']
Franziska Koehn
committed
def get_query_methods():
"""Returns all applicable methods for creating a query."""
Franziska Koehn
committed
return ["AND", "OR"]
def get_operators():
"""Returns all applicable Operators for creating a query."""
Franziska Koehn
committed
return ["LIKE", ">", "<", "<=", ">=", "="]
def download_async(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
Downloads file by using threads (So the GUI will not be blocked while downloading).
:result: dict, resultset from which will be downloaded
:host: str, address of host
:creds: tuple (username, userpassword), credentials (including user-name and -password)
:rest: str, REST-API definition
:dest_folder: str, folder where the downloaded files will be saved
:cb: function, that will be called, when download is finished with the arguments from cb_args
download_thread = Thread(target=download, args=(result, host, creds, rest, dest_folder, cb, cb_args))
def download(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
:result: dict, resultset from which will be downloaded
:host: str, address of host
:creds: tuple (username, userpassword), credentials (including user-name and -password)
:rest: str, REST-API definition
:dest_folder: str, folder where the downloaded files will be saved
:cb: function, that will be called, when download is finished with the arguments from cb_args
import re
import os
names=[]
def subfunc(f):
key = f.group(0).strip("{}")
r = result[key]
names.append(r)
return r
ret = re.sub('\{\w+\}', subfunc, rest)
path = os.path.join(dest_folder, '-'.join(names))
url = "%s%s" % (host.strip("/"), ret)
download_file(url, creds, path)
Franziska Koehn
committed
def download_file(url, creds, path):
Downloads a file from given 'url' and saves it to 'path'.
Returns True if download was finished without problems
Returns False if download raised an exception.
:url: str, host/REST-API with filled values
:creds: tuple(user-name, user-password), credentials
:path: str, folder where the downloaded files will be saved
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not path.endswith(".zip"):
path += ".zip"
user, passw = creds
with open(path, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
raise xsa.errors.WritingError("Error writing file %s, %s" % (path, e))
raise xsa.errors.DownloadError("Error downloading file %s, Status Code %s" % (url, e))
def download_file_iter(url, creds):
"""
Downloads a file from given 'url'.
Returns Iter of response
Raises xsa.errors.WritingError and xsa.errors.DownloadError
**Parameters**
:url: str, host/REST-API with filled values
:creds: tuple(user-name, user-password), credentials
"""
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
user, passw = creds
try:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
return response.headers.get("content-length", None), response.iter_content(1024)
raise xsa.errors.WritingError("Error writing file %s" % e)
except ValueError as e:
raise xsa.errors.DownloadError("Error downloading file %s, Status Code %s" % (url, e))
def prepare_rest(result, rest, host):
import re
def subfunc(f):
key = f.group(0).strip("{}")
return result[key]
ret = re.sub('\{\w+\}', subfunc, rest)
filename = ret.rpartition("/")[0].strip("/").replace("/","_")
Franziska Koehn
committed
def retry_qry(fun):
def retry(*args, **kwargs):
last_exception = None
retries = 1
while retries >= 0:
try:
return fun(*args, **kwargs)
except Exception as e:
if type(last_exception) is type(e):
raise e
last_exception = e
finally:
retries -= 1
return retry
@retry_qry
def query_for_additional_data(rest, result, host, creds):
import requests
from requests.auth import HTTPBasicAuth
from xml.etree import ElementTree
def subfunc(f):
key = f.group(0).strip("{}")
ret = re.sub('\{\w+\}', subfunc, rest)
user, passw = creds
url = "%s%s" % (host.strip("/"), ret)
r = requests.get(url, auth=HTTPBasicAuth(user, passw))
exp = ElementTree.fromstring(r.text)
new_fields = exp.findall(".//xnat:field", {"xnat": "http://nrg.wustl.edu/xnat"})
new_result = result.copy()
for f in new_fields:
fieldname = f.get("name")
value = exp.find(".//xnat:field[@name='"+fieldname+"']", {"xnat": "http://nrg.wustl.edu/xnat"})
new_result[fieldname] = ''.join(value.text).replace("\n","")
Franziska Koehn
committed
@retry_qry
def search_for(host, root_element, constraints, search_fields, user, passw):
:host: str, host-address
:root_element: str, Root-Type of search
:constraints: list, constraints of query
:search_fields: list of str, fields which will be returned from server
:user: str, user-name
:passw: str, user-password
**Raises**
:xsa.errors.ServerNotFoundError:
:xsa.errors.UnauthorizedError:
:xsa.errors.ResponseNotReady:
:xsa.errors.QueryError:
import requests
from requests.auth import HTTPBasicAuth
from tempfile import NamedTemporaryFile
from xsa.jsonutil import JsonTable
import json
url = "%s%s" % (host.strip("/"), "/data/search?format=json")
with NamedTemporaryFile(suffix=".xml") as tmp:
tree = create_xml_tree(root_element, constraints, search_fields)
tree.write(tmp, xml_declaration=True)
tmp.flush()
try:
r = requests.post(url, files={'file': open(tmp.name, 'rb')}, auth=HTTPBasicAuth(user, passw))
except requests.exceptions.InvalidSchema as e:
raise xsa.errors.ServerNotFoundError(e)
except requests.exceptions.InvalidURL as e:
raise xsa.errors.ServerNotFoundError(e)
except requests.RequestException as e:
raise xsa.errors.Error(e)
print("response=",r.status_code)
if r.status_code == requests.codes.ok:
json_file=(json.loads(r.text))['ResultSet']['Result']
return JsonTable(json_file)
elif r.status_code == requests.codes.unauthorized:
raise xsa.errors.UnauthorizedError("HTTP 401: unauthorized")
elif r.status_code == requests.codes.forbidden:
raise xsa.errors.Error("HTTP 403: forbidden (check query definition, e.g. roottype)")
elif r.status_code == requests.codes.not_found:
raise xsa.errors.ServerNotFoundError("HTTP 404: not found (check host-address)")
elif r.status_code == requests.codes.internal_server_error:
raise xsa.errors.Error("HTTP 500: Internal Server Error (check query definition, e.g. constraints)")
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
def create_xml_tree(root_type, query, fields):
import xml.etree.cElementTree as ET
search_attributes = {"ID":"", "allow-diff-columns":"0", "secure":"false", "brief-description":"MR Sessions", "xmlns:xdat":"http://nrg.wustl.edu/security", "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance"}
root = ET.Element("xdat:search", attrib=search_attributes)
ET.SubElement(root, "xdat:root_element_name").text = root_type
for field in fields:
element, field_id = field.split("/")
field = ET.SubElement(root, "xdat:search_field")
ET.SubElement(field, "xdat:element_name").text = element
ET.SubElement(field, "xdat:field_ID").text = field_id
def create_criterias(parent, element, iterator):
try:
if isinstance(element, str):
element = parent.set("method", element)
create_criterias(parent, next(iterator), iterator)
elif isinstance(element, tuple):
if len(element) != 3:
raise errors.QueryError("The query-definition is incorrect.")
else:
citeria = ET.SubElement(parent, "xdat:criteria", attrib={"override_value_formatting":"0"})
ET.SubElement(citeria, "xdat:schema_field").text=element[0]
ET.SubElement(citeria, "xdat:comparison_type").text=element[1]
ET.SubElement(citeria, "value").text=element[2]
create_criterias(parent, next(iterator), iterator)
elif isinstance(element, list):
child = ET.SubElement(parent, "xdat:child_set", method="AND")
iter_child = iter(element)
create_criterias(child, next(iter_child), iter_child)
except StopIteration:
pass
iter_query = iter(query)
where = ET.SubElement(root, "xdat:search_where", method="AND")
create_criterias(where, next(iter_query), iter_query)
return ET.ElementTree(root)