Newer
Older
"""
:Author: Franziska Koehn
:Created: 2015/01/13
This module includes functions around sending and defining queries.
to query the xnat server define:
one string including one Root-Type like:
.. sourcecode:: python
'xnat:mrScanData'
one list including the search-constraints like:
.. code-block:: python
[
('xnat:mrScanData/TYPE', 'LIKE', '%t2%'),
('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
'AND',
[('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
'OR'
]
]
a list of search-fields like:
.. code-block:: python
[ 'xnat:mrScanData/TYPE','xnat:mrScanData/ID']
Franziska Koehn
committed
def get_query_methods():
"""Returns all applicable methods for creating a query."""
Franziska Koehn
committed
return ["AND", "OR"]
def get_operators():
"""Returns all applicable Operators for creating a query."""
Franziska Koehn
committed
return ["LIKE", ">", "<", "<=", ">=", "="]
def download_async(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
Downloads file by using threads (So the GUI will not be blocked while downloading).
:result: dict, resultset from which will be downloaded
:host: str, address of host
:creds: tuple (username, userpassword), credentials (including user-name and -password)
:rest: str, REST-API definition
:dest_folder: str, folder where the downloaded files will be saved
:cb: function, that will be called, when download is finished with the arguments from cb_args
download_thread = Thread(target=download, args=(result, host, creds, rest, dest_folder, cb, cb_args))
def download(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
:result: dict, resultset from which will be downloaded
:host: str, address of host
:creds: tuple (username, userpassword), credentials (including user-name and -password)
:rest: str, REST-API definition
:dest_folder: str, folder where the downloaded files will be saved
:cb: function, that will be called, when download is finished with the arguments from cb_args
import re
import os
names=[]
def subfunc(f):
key = f.group(0).strip("{}")
r = result[key]
names.append(r)
return r
ret = re.sub('\{\w+\}', subfunc, rest)
path = os.path.join(dest_folder, '-'.join(names))
url = "%s%s" % (host, ret)
download_file(url, creds, path)
Franziska Koehn
committed
def download_file(url, creds, path):
Downloads a file from given 'url' and saves it to 'path'.
Returns True if download was finished without problems
Returns False if download raised an exception.
:url: str, host/REST-API with filled values
:creds: tuple(user-name, user-password), credentials
:path: str, folder where the downloaded files will be saved
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not path.endswith(".zip"):
path += ".zip"
user, passw = creds
with open(path, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
raise xsa.errors.WritingError("Error writing file %s, %s" % (path, e))
raise xsa.errors.DownloadError("Error downloading file %s, Status Code %s" % (url, e))
Franziska Koehn
committed
def retry_qry(fun):
def retry(*args, **kwargs):
last_exception = None
retries = 1
while retries >= 0:
try:
return fun(*args, **kwargs)
except Exception as e:
print(("wrapped", e))
if type(last_exception) is type(e):
raise e
last_exception = e
finally:
retries -= 1
return retry
@retry_qry
def query_for_additional_data(rest, result, host, creds):
import requests
from requests.auth import HTTPBasicAuth
from xml.etree import ElementTree
def subfunc(f):
key = f.group(0).strip("{}")
ret = re.sub('\{\w+\}', subfunc, rest)
user, passw = creds
url = "%s%s" % (host, ret)
r = requests.get(url, auth=HTTPBasicAuth(user, passw))
exp = ElementTree.fromstring(r.text)
new_fields = exp.findall(".//xnat:field", {"xnat": "http://nrg.wustl.edu/xnat"})
new_result = result.copy()
for f in new_fields:
fieldname = f.get("name")
value = exp.find(".//xnat:field[@name='"+fieldname+"']", {"xnat": "http://nrg.wustl.edu/xnat"})
new_result[fieldname] = ''.join(value.text).replace("\n","")
Franziska Koehn
committed
@retry_qry
def search_for(host, root_element, constraints, search_fields, user, passw):
:host: str, host-address
:root_element: str, Root-Type of search
:constraints: list, constraints of query
:search_fields: list of str, fields which will be returned from server
:user: str, user-name
:passw: str, user-password
**Raises**
:xsa.errors.ServerNotFoundError:
:xsa.errors.UnauthorizedError:
:xsa.errors.ResponseNotReady:
:xsa.errors.QueryError:
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
import requests
from requests.auth import HTTPBasicAuth
from tempfile import NamedTemporaryFile
from xsa.jsonutil import JsonTable
import json
url= "%s%s" % (host, "/data/search?format=json")
with NamedTemporaryFile(suffix=".xml") as tmp:
tree = create_xml_tree(root_element, constraints, search_fields)
tree.write(tmp, xml_declaration=True)
tmp.flush()
r = requests.post(url, files={'file': open(tmp.name, 'rb')}, auth=HTTPBasicAuth(user, passw))
if r.status_code == requests.codes.ok:
json_file=(json.loads(r.text))['ResultSet']['Result']
return JsonTable(json_file)
def create_xml_tree(root_type, query, fields):
import xml.etree.cElementTree as ET
search_attributes = {"ID":"", "allow-diff-columns":"0", "secure":"false", "brief-description":"MR Sessions", "xmlns:xdat":"http://nrg.wustl.edu/security", "xmlns:xsi":"http://www.w3.org/2001/XMLSchema-instance"}
root = ET.Element("xdat:search", attrib=search_attributes)
ET.SubElement(root, "xdat:root_element_name").text = root_type
for field in fields:
element, field_id = field.split("/")
field = ET.SubElement(root, "xdat:search_field")
ET.SubElement(field, "xdat:element_name").text = element
ET.SubElement(field, "xdat:field_ID").text = field_id
def create_criterias(parent, element, iterator):
try:
if isinstance(element, str):
element = parent.set("method", element)
create_criterias(parent, next(iterator), iterator)
elif isinstance(element, tuple):
if len(element) != 3:
raise errors.QueryError("The query-definition is incorrect.")
else:
citeria = ET.SubElement(parent, "xdat:criteria", attrib={"override_value_formatting":"0"})
ET.SubElement(citeria, "xdat:schema_field").text=element[0]
ET.SubElement(citeria, "xdat:comparison_type").text=element[1]
ET.SubElement(citeria, "value").text=element[2]
create_criterias(parent, next(iterator), iterator)
elif isinstance(element, list):
child = ET.SubElement(parent, "xdat:child_set", method="AND")
iter_child = iter(element)
create_criterias(child, next(iter_child), iter_child)
except StopIteration:
pass
iter_query = iter(query)
where = ET.SubElement(root, "xdat:search_where", method="AND")
create_criterias(where, next(iter_query), iter_query)
return ET.ElementTree(root)