HOST = None def main(host): import distutils.core root_element = 'xnat:mrScanData' constraints = [ ('xnat:mrScanData/TYPE', 'LIKE', '%t1%'), ('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'), 'AND', [('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'), ('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'), 'OR' ] ] results = search_for(host, root_element, constraints) print "Search results (%s):" % len(results) print results while True: try: is_downl = distutils.util.strtobool(raw_input("download? ")) break except ValueError: print "invalid character. Please use y or n." if is_downl: print "downloading files..." download_all(results, host) print "finished" else: print "download canceled" def get_search_fields(): return [{'label':'type', 'field':'xnat:mrScanData/TYPE'}, {'label':'fov_x', 'field':'xnat:mrScanData/PARAMETERS_FOV_X'}, {'label':'fov_y', 'field':'xnat:mrScanData/PARAMETERS_FOV_Y'}, {'label':'tr', 'field': 'xnat:mrScanData/PARAMETERS_TR'}, {'label':'te', 'field': 'xnat:mrScanData/PARAMETERS_TE'}, {'label':'ti', 'field': 'xnat:mrScanData/PARAMETERS_TI'}, {'label':'flip', 'field': 'xnat:mrScanData/PARAMETERS_FLIP'}, {'label':'voxel_res_x', 'field':'xnat:mrScanData/PARAMETERS_VOXELRES_X'}, {'label':'voxel_res_y', 'field':'xnat:mrScanData/PARAMETERS_VOXELRES_Y'}, {'label':'voxel_res_z', 'field':'xnat:mrScanData/PARAMETERS_VOXELRES_Z'} ] def get_query_methods(): return ["AND", "OR"] def get_operators(): return ["LIKE", ">", "<", "<=", ">=", "="] def download_all(results, host=None): for r in results: download(r, host) def download_f(*args, **kwargs): download(*args, **kwargs) # if this does not return False, the function will be repeatedly called by timeout_add return False def download(result, host=None): if not host: host = HOST print(result) subject = result['xnat_mrsessiondata_subject_id'] experiment = result['xnat_mrsessiondata_session_id'] project = result['xnat_mrsessiondata_project'] scan = result['id'] file_name = "%s_%s_%s" % (subject, result['type'], scan) download_file(host, project, subject, experiment, scan, file_name) def search_for_mrScanData(host, constraints, user, passw): get_credentials(username=user, password=passw, force=True) return search_for(host, 'xnat:mrScanData', constraints) def search_for(host, root_element, constraints): from pyxnat import Interface from pyxnat.core import errors from tempfile import mkdtemp from httplib2 import ServerNotFoundError user, passw = get_credentials() tmp_dir=mkdtemp() try: central = Interface(server="%s" % host, user=user, password=passw, cachedir=tmp_dir) except IndexError: return "ServerNotFoundError" #TODO Eigene Fehlerklasse search_fields = [ 'xnat:mrSessionData/PROJECT', #project id 'xnat:mrSessionData/SUBJECT_ID', #subject id 'xnat:mrSessionData/SESSION_ID', #experiment id 'xnat:mrScanData/ID', #scan id 'xnat:mrScanData/TYPE', #scan type ] result = [] try: result = central.select(root_element,search_fields).where(constraints) except errors.DatabaseError: return "DatabaseError" #TODO Eigene Fehlerklasse except ServerNotFoundError: return "ServerNotFoundError" #TODO Eigene Fehlerklasse try: central.disonnect() except AttributeError: print "can\'t close connection (wrong pyxnat version?)" try: from shutil import rmtree rmtree(tmp_dir) except: pass return result def get_credentials(force=False,username="", password="" ): if get_credentials.cache and not force: return get_credentials.cache from getpass import getpass while not len(username): username = raw_input("Username: ").strip() while not len(password): password = getpass().strip() get_credentials.cache = username, password return get_credentials() get_credentials.cache = None def download_file(host, project, subject, experiment, scan, file_name): import requests from base64 import b64encode from requests.auth import HTTPBasicAuth if not file_name.endswith(".zip"): file_name += ".zip" url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan) user, passw = get_credentials() try: with open(file_name, 'wb') as handle: response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw)) if not response.ok: raise ValueError(response.status_code) for block in response.iter_content(1024): if not block: break handle.write(block) except IOError as e: print "Error writing file %s, %s" % (file_name, e) except ValueError as e: print "Error downloading file %s, Status Code %s" % (url, e) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='search and download') parser.add_argument('--host', type=str, help='hostname or ip-address (including port).', default="localhost:8080") parser.add_argument('--user', type=str, help='user:passw', default=":") #TODO force download (boolean) #TODO read constraints und root_element from file #TODO GUI args = parser.parse_args() u,p = args.user.split(':') get_credentials(username=u, password=p) main(args.host)