Newer
Older
Franziska Koehn
committed
from threading import Lock
requests_lock = Lock()
Franziska Koehn
committed
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def main(host):
import distutils.core
root_element = 'xnat:mrScanData'
constraints = [
('xnat:mrScanData/TYPE', 'LIKE', '%t1%'),
('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
'AND',
[('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
'OR'
]
]
results = search_for(host, root_element, constraints)
print "Search results (%s):" % len(results)
print results
while True:
try:
is_downl = distutils.util.strtobool(raw_input("download? "))
break
except ValueError:
print "invalid character. Please use y or n."
if is_downl:
print "downloading files..."
download_all(results, host)
print "finished"
else:
print "download canceled"
Franziska Koehn
committed
def read_json_from_file():
pass
def get_data_types():
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
for file in files:
with open(join(DIR,file)) as f:
try:
yield json.loads(f.read())['root-type']
except ValueError:
yield None
def get_fields_of_type(type):
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
for file in files:
with open(join(DIR,file)) as f:
try:
t = json.loads(f.read())
if t['root-type'] == type:
return t['fields']
except ValueError:
pass
Franziska Koehn
committed
def get_search_fields():
Franziska Koehn
committed
import json
return json.loads("""[{"label":"type", "field":"xnat:mrScanData/TYPE"},
{"label":"fov_x", "field":"xnat:mrScanData/PARAMETERS_FOV_X"},
{"label":"fov_y", "field":"xnat:mrScanData/PARAMETERS_FOV_Y"},
{"label":"tr", "field": "xnat:mrScanData/PARAMETERS_TR"},
{"label":"te", "field": "xnat:mrScanData/PARAMETERS_TE"},
{"label":"ti", "field": "xnat:mrScanData/PARAMETERS_TI"},
{"label":"flip", "field": "xnat:mrScanData/PARAMETERS_FLIP"},
{"label":"voxel_res_x", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_X"},
{"label":"voxel_res_y", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_Y"},
{"label":"voxel_res_z", "field":"xnat:mrScanData/PARAMETERS_VOXELRES_Z"}
]""")
Franziska Koehn
committed
def get_query_methods():
return ["AND", "OR"]
def get_operators():
return ["LIKE", ">", "<", "<=", ">=", "="]
def download_all(results, host=None):
Franziska Koehn
committed
for r in results:
def download_async(result, host=None, cb=(lambda *_: None), cb_args=()):
from threading import Thread
download_thread = Thread(target=download, args=(result, host, cb, cb_args))
download_thread.start()
return download_thread
def download(result, host=None, cb=(lambda *_: None), cb_args=()):
print(result)
subject = result['xnat_mrsessiondata_subject_id']
experiment = result['xnat_mrsessiondata_session_id']
project = result['xnat_mrsessiondata_project']
scan = result['id']
file_name = "%s_%s_%s" % (subject, result['type'], scan)
download_file(host, project, subject, experiment, scan, file_name)
Franziska Koehn
committed
def search_for_mrScanData(host, constraints, user, passw):
get_credentials(username=user, password=passw, force=True)
Franziska Koehn
committed
return search_for(host, 'xnat:mrScanData', constraints)
def search_for(host, root_element, constraints):
from pyxnat import Interface
Franziska Koehn
committed
from tempfile import mkdtemp
from httplib2 import ServerNotFoundError
Franziska Koehn
committed
user, passw = get_credentials()
tmp_dir=mkdtemp()
try:
central = Interface(server="%s" % host,
user=user,
password=passw,
cachedir=tmp_dir)
except IndexError:
return "ServerNotFoundError" #TODO Eigene Fehlerklasse
Franziska Koehn
committed
search_fields = [
'xnat:mrSessionData/PROJECT', #project id
'xnat:mrSessionData/SUBJECT_ID', #subject id
'xnat:mrSessionData/SESSION_ID', #experiment id
'xnat:mrScanData/ID', #scan id
'xnat:mrScanData/TYPE', #scan type
]
result = []
try:
result = central.select(root_element,search_fields).where(constraints)
except errors.DatabaseError:
return "DatabaseError" #TODO Eigene Fehlerklasse
except ServerNotFoundError:
return "ServerNotFoundError" #TODO Eigene Fehlerklasse
Franziska Koehn
committed
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
try:
central.disonnect()
except AttributeError:
print "can\'t close connection (wrong pyxnat version?)"
try:
from shutil import rmtree
rmtree(tmp_dir)
except:
pass
return result
def get_credentials(force=False,username="", password="" ):
if get_credentials.cache and not force:
return get_credentials.cache
from getpass import getpass
while not len(username):
username = raw_input("Username: ").strip()
while not len(password):
password = getpass().strip()
get_credentials.cache = username, password
return get_credentials()
get_credentials.cache = None
def download_file(host, project, subject, experiment, scan, file_name):
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not file_name.endswith(".zip"):
file_name += ".zip"
url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan)
user, passw = get_credentials()
try:
with open(file_name, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
except IOError as e:
print "Error writing file %s, %s" % (file_name, e)
except ValueError as e:
print "Error downloading file %s, Status Code %s" % (url, e)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='search and download')
parser.add_argument('--host', type=str, help='hostname or ip-address (including port).', default="localhost:8080")
parser.add_argument('--user', type=str, help='user:passw', default=":")
#TODO force download (boolean)
#TODO read constraints und root_element from file
#TODO GUI
args = parser.parse_args()
u,p = args.user.split(':')
get_credentials(username=u, password=p)
main(args.host)