Commit b392e0f9 authored by Franziska Koehn's avatar Franziska Koehn
Browse files

xnat_search.py splitted in:

queries.py - contains methods for downloading and searching
datatypereader.py - contains methods for readen datatype and their fields from json files
xsa_skript.py -  skript for starting xsa without a gui
parent b81b5d13
......@@ -33,11 +33,12 @@ class XnatSearchApp(gtk.Window):
vbox_root.pack_start(hpaned, True, True, 5)
def on_search_query(_, host, credentials, root_type, query, labels):
import xnat_search
import xsalogic.datatypereader as type_reader
import xsalogic.queries as queries
user, _, passw = credentials.partition(":")
fields = xnat_search.get_fields_from_labels(labels,root_type)
query_results = xnat_search.search_for(host, 'xnat:mrScanData', query, fields, user, passw)
xnat_search.HOST = host
fields = type_reader.get_fields_from_labels(labels,root_type)
query_results = queries.search_for(host, 'xnat:mrScanData', query, fields, user, passw)
queries.HOST = host
#TODO catch errors
......
import xsalogic.queries as queries
def main(host, user, passw):
import distutils.core
root_element = 'xnat:mrScanData'
constraints = [
('xnat:mrScanData/TYPE', 'LIKE', '%t1%'),
('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
'AND',
[('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
'OR'
]
]
search_fields=[ 'xnat:mrScanData/TYPE',
'xnat:mrSessionData/PROJECT',
'xnat:mrSessionData/SUBJECT_ID',
'xnat:mrSessionData/SESSION_ID',
'xnat:mrScanData/ID'
]
results = queries.search_for(host, root_element, constraints,search_fields, user, passw)
print "Search results (%s):" % len(results)
print results
while True:
try:
is_downl = distutils.util.strtobool(raw_input("download? "))
break
except ValueError:
print "invalid character. Please use y or n."
if is_downl:
print "downloading files..."
queries.download_all(results, host)
print "finished"
else:
print "download canceled"
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='search and download')
parser.add_argument('--host', type=str, help='hostname or ip-address (including port).', default="localhost:8080")
parser.add_argument('--user', type=str, help='user:passw', default=":")
args = parser.parse_args()
u,p = args.user.split(':')
main(args.host, u, p)
......@@ -108,10 +108,10 @@ class ComboBoxRootType(gtk.ComboBox):
return None
def show_data(self, root_type, data):
import xnat_search
import xsalogic.datatypereader as type_reader
self.store.clear()
for key in data.headers():
label = xnat_search.get_field_label(root_type, key)
label = type_reader.get_field_label(root_type, key)
self.store.append([label, key])
......
......@@ -26,9 +26,9 @@ class QueryView(gtk.VBox):
def __init__(self, *args, **kwargs):
super(QueryView, self).__init__(*args, **kwargs)
import xnat_search
import xsalogic.datatypereader as type_reader
import gobject
gobject.signal_new("send-query", QueryView, gobject.SIGNAL_RUN_FIRST,
gobject.TYPE_NONE, (gobject.TYPE_STRING, gobject.TYPE_STRING, gobject.TYPE_STRING, gobject.TYPE_PYOBJECT, gobject.TYPE_PYOBJECT))
......@@ -43,7 +43,7 @@ class QueryView(gtk.VBox):
def changed_cb(combobox):
selected = combobox.get_selected_item()
fields = xnat_search.get_field_labels_required_of_type(selected)
fields = type_reader.get_field_labels_required_of_type(selected)
self.treeView_search.set_root_type(selected)
self.treeView_fields.show_fields(fields)
......@@ -52,7 +52,7 @@ class QueryView(gtk.VBox):
cell = gtk.CellRendererText()
self.combobox.pack_start(cell)
self.combobox.add_attribute(cell, 'text', 0)
for t in xnat_search.get_data_types():
for t in type_reader.get_data_types():
self.combobox.append_text(t)
hBox_root_type.pack_start(self.combobox, True, True,0)
......@@ -339,21 +339,21 @@ class TreeViewQuery(gtk.TreeView):
self.store.append(None, (self.inital_value, '', '', False, self.tooltip))
def create_type_list(self):
import xnat_search
self.types = xnat_search.get_field_labels_of_type(self.root_type)
import xsalogic.datatypereader as type_reader
self.types = type_reader.get_field_labels_of_type(self.root_type)
def create_method_list(self):
import xnat_search
self.methods = xnat_search.get_query_methods()
import xsalogic.queries as queries
self.methods = queries.get_query_methods()
def create_operator_list(self):
import xnat_search
self.operators = xnat_search.get_operators()
import xsalogic.queries as queries
self.operators = queries.get_operators()
def get_query(self, type):
import xnat_search
import xsalogic.datatypereader as type_reader
search_fields = xnat_search.get_fields_of_type(type)
search_fields = type_reader.get_fields_of_type(type)
query = []
def add_next_to_querry(iter):
......
......@@ -21,7 +21,7 @@ class ResultsDownloadView(gtk.VBox):
def event_clicked_download(button, *_):
import gobject
import xnat_search
import xsalogic.queries as queries
enabled = (d for d in self.treeViewResults.store if d[0])
for d in enabled:
def stop_spinner(_row):
......@@ -29,7 +29,7 @@ class ResultsDownloadView(gtk.VBox):
_row[0] = False # Checkbox to false
d[3] = 0
d[2] = True # Show Spinner
xnat_search.download_async(d[1], button.destination, cb=stop_spinner, cb_args=(d,))
queries.download_async(d[1], button.destination, cb=stop_spinner, cb_args=(d,))
def event_clicked_toggle_selection(*_):
self.treeViewResults.toggle_selection()
......@@ -149,7 +149,7 @@ class TreeViewResults(gtk.TreeView):
self.store.append(result)
import xnat_search
keys_ = list(xnat_search.get_field_label(root_type, key) for key in keys)
self.create_columns(keys_) # TODO labels
import xsalogic.datatypereader as type_reader
keys_ = list(type_reader.get_field_label(root_type, key) for key in keys)
self.create_columns(keys_)
self.set_model(self.store)
# TODO nur einmal einlesen
def get_data_types():
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
result = []
for file in files:
with open(join(DIR,file)) as f:
try:
data=json.loads(f.read())['root-type']
result.append(data)
except ValueError:
pass
return result
def get_field_labels_of_type(type):
result = []
fields = get_fields_of_type(type)
if fields is not None:
for f in fields:
result.append(f['label'])
return result
def get_field_labels_required_of_type(type): # TODO rename
result = []
fields = get_fields_of_type(type)
if fields is not None:
for f in fields:
result.append((f['label'],f['required']))
return result
def get_fields_of_type(type):
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
for file in files:
with open(join(DIR,file)) as f:
try:
t = json.loads(f.read())
if t['root-type'] == type:
return t['fields']
except ValueError:
pass
def get_field_label(type, key):
fields = get_fields_of_type(type)
for f in fields:
if "key" in f and f["key"] == key:
return f["label"]
return key
def get_fields_from_labels(labels, type):
result=[]
fields = get_fields_of_type(type)
if fields is not None:
for l in labels:
for f in fields:
if f['label'] == l:
result.append(f['field'])
break
return result
HOST = None
from threading import Lock
requests_lock = Lock()
def main(host):
import distutils.core
root_element = 'xnat:mrScanData'
constraints = [
('xnat:mrScanData/TYPE', 'LIKE', '%t1%'),
('xnat:mrScanData/PARAMETERS_FLIP', '>=', '10'),
'AND',
[('xnat:mrScanData/PARAMETERS_TE', '>', '2.0'),
('xnat:mrScanData/PARAMETERS_TE', '<', '2.0'),
'OR'
]
]
results = search_for(host, root_element, constraints)
print "Search results (%s):" % len(results)
print results
while True:
try:
is_downl = distutils.util.strtobool(raw_input("download? "))
break
except ValueError:
print "invalid character. Please use y or n."
if is_downl:
print "downloading files..."
download_all(results, host)
print "finished"
else:
print "download canceled"
def read_json_from_file():
pass
def get_data_types():
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
result = []
for file in files:
with open(join(DIR,file)) as f:
try:
data=json.loads(f.read())['root-type']
result.append(data)
except ValueError:
pass
return result
def get_field_labels_of_type(type):
result = []
fields = get_fields_of_type(type)
if fields is not None:
for f in fields:
result.append(f['label'])
return result
def get_field_labels_required_of_type(type):
result = []
fields = get_fields_of_type(type)
if fields is not None:
for f in fields:
result.append((f['label'],f['required']))
return result
def get_fields_of_type(type):
from os import listdir
from os.path import isfile, join
import json
DIR = 'datatypes/'
files = [ f for f in listdir(DIR) if isfile(join(DIR,f)) ]
for file in files:
with open(join(DIR,file)) as f:
try:
t = json.loads(f.read())
if t['root-type'] == type:
return t['fields']
except ValueError:
pass
def get_field_label(type, key):
fields = get_fields_of_type(type)
for f in fields:
if "key" in f and f["key"] == key:
return f["label"]
return key
def get_fields_from_labels(labels, type):
result=[]
fields = get_fields_of_type(type)
if fields is not None:
for l in labels:
for f in fields:
if f['label'] == l:
result.append(f['field'])
break
return result
def get_query_methods():
return ["AND", "OR"]
def get_operators():
return ["LIKE", ">", "<", "<=", ">=", "="]
def get_credentials(force=False,username="", password="" ):
if get_credentials.cache and not force:
return get_credentials.cache
from getpass import getpass
while not len(username):
username = raw_input("Username: ").strip()
while not len(password):
password = getpass().strip()
get_credentials.cache = username, password
return get_credentials()
get_credentials.cache = None
def download_all(results, host=None):
for r in results:
download(r, host)
......@@ -129,7 +37,7 @@ def download(result, dest_folder='', host=None, cb=(lambda *_: None), cb_args=()
host = HOST
requests_lock.acquire()
print(result)
subject = result['xnat_mrsessiondata_subject_id']
subject = result['xnat_mrsessiondata_subject_id'] #TODO download andere typen
experiment = result['xnat_mrsessiondata_session_id']
project = result['xnat_mrsessiondata_project']
scan = result['id']
......@@ -138,6 +46,33 @@ def download(result, dest_folder='', host=None, cb=(lambda *_: None), cb_args=()
requests_lock.release()
cb(*cb_args)
def download_file(host, project, subject, experiment, scan, file_name):
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not file_name.endswith(".zip"):
file_name += ".zip"
url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan)
user, passw = get_credentials()
try:
with open(file_name, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
except IOError as e:
print "Error writing file %s, %s" % (file_name, e)
except ValueError as e:
print "Error downloading file %s, Status Code %s" % (url, e)
def search_for(host, root_element, constraints, search_fields, user, passw):
from pyxnat import Interface
from pyxnat.core import errors
......@@ -145,7 +80,6 @@ def search_for(host, root_element, constraints, search_fields, user, passw):
from httplib2 import ServerNotFoundError
get_credentials(username=user, password=passw, force=True)
user, passw = get_credentials()
tmp_dir=mkdtemp()
......@@ -177,59 +111,3 @@ def search_for(host, root_element, constraints, search_fields, user, passw):
except:
pass
return result
def get_credentials(force=False,username="", password="" ):
if get_credentials.cache and not force:
return get_credentials.cache
from getpass import getpass
while not len(username):
username = raw_input("Username: ").strip()
while not len(password):
password = getpass().strip()
get_credentials.cache = username, password
return get_credentials()
get_credentials.cache = None
def download_file(host, project, subject, experiment, scan, file_name):
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not file_name.endswith(".zip"):
file_name += ".zip"
url= "%s/data/archive/projects/%s/subjects/%s/experiments/%s/scans/%s/resources/DICOM/files?format=zip" % (host, project, subject, experiment, scan)
user, passw = get_credentials()
try:
with open(file_name, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
except IOError as e:
print "Error writing file %s, %s" % (file_name, e)
except ValueError as e:
print "Error downloading file %s, Status Code %s" % (url, e)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='search and download')
parser.add_argument('--host', type=str, help='hostname or ip-address (including port).', default="localhost:8080")
parser.add_argument('--user', type=str, help='user:passw', default=":")
#TODO force download (boolean)
#TODO read constraints und root_element from file
#TODO GUI
args = parser.parse_args()
u,p = args.user.split(':')
get_credentials(username=u, password=p)
main(args.host)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment