Newer
Older
from threading import Lock
requests_lock = Lock()
Franziska Koehn
committed
def get_query_methods():
return ["AND", "OR"]
def get_operators():
return ["LIKE", ">", "<", "<=", ">=", "="]
def get_credentials(force=False,username="", password="" ):
if get_credentials.cache and not force:
return get_credentials.cache
from getpass import getpass
while not len(username):
username = raw_input("Username: ").strip()
while not len(password):
password = getpass().strip()
get_credentials.cache = username, password
return get_credentials()
get_credentials.cache = None
Franziska Koehn
committed
for r in results:
def download_async(result, host, dest_folder='', cb=(lambda *_: None), cb_args=()):
download_thread = Thread(target=download, args=(result, host, dest_folder, cb, cb_args))
def download(result, host, dest_folder='', cb=(lambda *_: None), cb_args=()):
subject = result['xnat_mrsessiondata_subject_id'] #TODO download andere typen
experiment = result['xnat_mrsessiondata_session_id']
project = result['xnat_mrsessiondata_project']
scan = result['id']
file_name = "%s/%s-%s-%s-%s" % (dest_folder, project, subject, experiment, scan)
download_file(host, project, subject, experiment, scan, file_name)
Franziska Koehn
committed
def download_file(host, project, subject, experiment, scan, file_name):
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not file_name.endswith(".zip"):
file_name += ".zip"
url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan)
user, passw = get_credentials()
try:
with open(file_name, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
except IOError as e:
print "Error writing file %s, %s" % (file_name, e)
except ValueError as e:
print "Error downloading file %s, Status Code %s" % (url, e)
def search_for(host, root_element, constraints, search_fields, user, passw):
from pyxnat.core import errors
from httplib2 import ServerNotFoundError
from httplib import ResponseNotReady
Franziska Koehn
committed
Franziska Koehn
committed
get_credentials(username=user, password=passw, force=True)
user, passw = get_credentials() # TODO
from tempfile import mkdtemp
Franziska Koehn
committed
tmp_dir=mkdtemp()
central = Interface(server=host,
user=user,
password=passw,
cachedir=tmp_dir)
except IndexError as e:
raise xsa_errors.ServerNotFoundError("Server not found, check your host-address.") #TODO Ok?
Franziska Koehn
committed
result = []
try:
result = central.select(root_element,search_fields).where(constraints)
except errors.DatabaseError as e:
if '401' in str(e):
raise xsa_errors.UnauthorizedError("Unauthorizied attempt. Check your User and Password")
raise xsa_errors.ServerNotFoundError("Server not found, check your host-address.")
except ResponseNotReady:
raise xsa_errors.ResponseNotReady("Please check your Host-Address")
if result == []:
raise xsa_errors.QueryError("Please check your query.")
Franziska Koehn
committed
try:
central.disonnect()
except AttributeError:
print "can\'t close connection (wrong pyxnat version?)"
try:
from shutil import rmtree
rmtree(tmp_dir)
except:
pass