Skip to content
xnat_search.py 6.98 KiB
Newer Older
Franziska Koehn's avatar
Franziska Koehn committed
from threading import Lock
requests_lock = Lock()

def main(host):
    import distutils.core

    root_element = 'xnat:mrScanData'
    constraints = [
        ('xnat:mrScanData/TYPE', 'LIKE', '%t1%'),
        ('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
        'AND',
        [('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
         ('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
         'OR'
         ]
        ]



    results = search_for(host, root_element, constraints)

    print "Search results (%s):" % len(results)
    print results

    while True:
        try:
            is_downl = distutils.util.strtobool(raw_input("download? "))
            break
        except ValueError:
            print "invalid character. Please use y or n."

    if is_downl:
        print "downloading files..."
        download_all(results, host)
        print "finished"
    else:
        print "download canceled"

def read_json_from_file():
    pass

def get_data_types():
    from os import listdir
    from os.path import isfile, join
    import json

    DIR = 'datatypes/'
    files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
    for file in files:
        with open(join(DIR,file)) as f:
            try:
                yield json.loads(f.read())['root-type']
            except ValueError:
                yield None

def get_fields_of_type(type):
    from os import listdir
    from os.path import isfile, join
    import json

    DIR = 'datatypes/'
    files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
    for file in files:
        with open(join(DIR,file)) as f:
            try:
                t = json.loads(f.read())
                if t['root-type'] == type:
                    return t['fields']
            except ValueError:
                pass

    import json
    return json.loads("""[{"label":"type", "field":"xnat:mrScanData/TYPE"},
            {"label":"fov_x", "field":"xnat:mrScanData/PARAMETERS_FOV_X"},
            {"label":"fov_y", "field":"xnat:mrScanData/PARAMETERS_FOV_Y"},
            {"label":"tr", "field": "xnat:mrScanData/PARAMETERS_TR"},
            {"label":"te", "field": "xnat:mrScanData/PARAMETERS_TE"},
            {"label":"ti", "field": "xnat:mrScanData/PARAMETERS_TI"},
            {"label":"flip", "field": "xnat:mrScanData/PARAMETERS_FLIP"},
            {"label":"voxel_res_x", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_X"},
            {"label":"voxel_res_y", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_Y"},
            {"label":"voxel_res_z", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_Z"}
            ]""")

def get_query_methods():
    return ["AND", "OR"]

def get_operators():
    return ["LIKE", ">", "<", "<=", ">=", "="]

def download_all(results, host=None):
        download(r, host)

def download_async(result, dest_folder='', host=None, cb=(lambda *_: None), cb_args=()):
Franziska Koehn's avatar
Franziska Koehn committed
    from threading import Thread
    download_thread = Thread(target=download, args=(result, dest_folder, host, cb, cb_args))
Franziska Koehn's avatar
Franziska Koehn committed
    download_thread.start()
    return download_thread
def download(result, dest_folder='', host=None, cb=(lambda *_: None), cb_args=()):
    if not host:
        host = HOST
Franziska Koehn's avatar
Franziska Koehn committed
    requests_lock.acquire()
    print(result)
    subject = result['xnat_mrsessiondata_subject_id']
    experiment = result['xnat_mrsessiondata_session_id']
    project = result['xnat_mrsessiondata_project']
    scan = result['id']
    file_name = "%s/%s_%s_%s" % (dest_folder, subject, result['type'], scan)
    download_file(host, project, subject, experiment, scan, file_name)
Franziska Koehn's avatar
Franziska Koehn committed
    requests_lock.release()
    cb(*cb_args)

def search_for_mrScanData(host, constraints, user, passw):
Franziska Koehn's avatar
Franziska Koehn committed
    get_credentials(username=user, password=passw, force=True)
    return search_for(host, 'xnat:mrScanData', constraints)

def search_for(host, root_element, constraints):
    from pyxnat import Interface
    from pyxnat.core import errors
    from httplib2 import ServerNotFoundError

    try:
        central = Interface(server="%s" % host,
                  user=user,
                  password=passw,
                  cachedir=tmp_dir)
    except IndexError:
        return "ServerNotFoundError"    #TODO Eigene Fehlerklasse

    search_fields = [
        'xnat:mrSessionData/PROJECT',       #project id
        'xnat:mrSessionData/SUBJECT_ID',    #subject id
        'xnat:mrSessionData/SESSION_ID',    #experiment id
        'xnat:mrScanData/ID',               #scan id
        'xnat:mrScanData/TYPE',             #scan type
        ]

    result = []

    try:
        result =  central.select(root_element,search_fields).where(constraints)
    except errors.DatabaseError:
        return "DatabaseError"   #TODO Eigene Fehlerklasse
    except ServerNotFoundError:
        return "ServerNotFoundError"    #TODO Eigene Fehlerklasse

    try:
        central.disonnect()
    except AttributeError:
        print "can\'t close connection (wrong pyxnat version?)"

    try:
        from shutil import rmtree
        rmtree(tmp_dir)
    except:
        pass
    return result


def get_credentials(force=False,username="", password="" ):
    if get_credentials.cache and not force:
        return get_credentials.cache
    from getpass import getpass
    while not len(username):
        username = raw_input("Username: ").strip()
    while not len(password):
        password = getpass().strip()
    get_credentials.cache = username, password
    return get_credentials()
get_credentials.cache = None

def download_file(host, project, subject, experiment, scan, file_name):
    import requests
    from base64 import b64encode
    from requests.auth import HTTPBasicAuth

    if not file_name.endswith(".zip"):
        file_name += ".zip"

    url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan)

    user, passw = get_credentials()

    try:
        with open(file_name, 'wb') as handle:
            response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
            if not response.ok:
                raise ValueError(response.status_code)
            for block in response.iter_content(1024):
                if not block:
                    break
                handle.write(block)
    except IOError as e:
        print "Error writing file %s, %s" % (file_name, e)
    except ValueError as e:
        print "Error downloading file %s, Status Code %s" % (url, e)

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='search and download')
    parser.add_argument('--host', type=str, help='hostname or ip-address (including port).', default="localhost:8080")
    parser.add_argument('--user', type=str, help='user:passw', default=":")

    #TODO force download (boolean)
    #TODO read constraints und root_element from file
    #TODO GUI

    args = parser.parse_args()

    u,p = args.user.split(':')
    get_credentials(username=u, password=p)
    main(args.host)