Newer
Older
"""
:Author: Franziska Koehn
:Created: 2015/01/13
This module includes functions around sending and defining queries.
"""
from threading import Lock
requests_lock = Lock()
Franziska Koehn
committed
def get_query_methods():
"""Returns all applicable methods for creating a query."""
Franziska Koehn
committed
return ["AND", "OR"]
def get_operators():
"""Returns all applicable Operators for creating a query."""
Franziska Koehn
committed
return ["LIKE", ">", "<", "<=", ">=", "="]
def download_async(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
"""
Downloads file by using threads (So the GUI will not be bocked while downloading).
**Parameters**
:result: resultset, from which will be downloaded
:host: address of host
:creds: credentials (including user-name and -password)
:rest: REST-API definition
:dest_folder: folder where the downloaded files will be saved
:cb: function for moving spinners
:cb_args: args for function cb
"""
download_thread = Thread(target=download, args=(result, host, creds, rest, dest_folder, cb, cb_args))
def download(result, host, creds, rest, dest_folder='', cb=(lambda *_: None), cb_args=()):
"""
Downloads a file.
**Parameters**
:result: resultset, from which will be downloaded
:host: address of host
:creds: credentials (including user-name and -password)
:rest: REST-API definition
:dest_folder: folder where the downloaded files will be saved
:cb: function for moving spinners
:cb_args: args for function cb
"""
import re
import os
names=[]
def subfunc(f):
key = f.group(0).strip("{}")
r = result[key]
names.append(r)
return r
ret = re.sub('\{\w+\}', subfunc, rest)
path = os.path.join(dest_folder, '-'.join(names))
url = "%s%s" % (host, ret)
download_file(url, creds, path)
Franziska Koehn
committed
def download_file(url, creds, path):
"""
Downloads a file.
**Parameters**
:url: host/REST-API with filled values
:creds: credentials (including user-name and -password)
:path: folder where the downloaded files will be saved
"""
import requests
from base64 import b64encode
from requests.auth import HTTPBasicAuth
if not path.endswith(".zip"):
path += ".zip"
user, passw = creds
with open(path, 'wb') as handle:
response = requests.get(url, stream=True, auth=HTTPBasicAuth(user, passw))
if not response.ok:
raise ValueError(response.status_code)
for block in response.iter_content(1024):
if not block:
break
handle.write(block)
except IOError as e:
print "Error writing file %s, %s" % (path, e)
except ValueError as e:
print "Error downloading file %s, Status Code %s" % (url, e)
def search_for(host, root_element, constraints, search_fields, user, passw):
"""
Does a search for given values. raises xsa -Exceptions
**Parameters**
:host: host-address
:root_element: root-element of search
:constraints: constraints of query
:search_fields: fields which will be returned from server
:user: user-name
:passw:user-password
"""
from pyxnat.core import errors
from httplib2 import ServerNotFoundError
from httplib import ResponseNotReady
from tempfile import mkdtemp
Franziska Koehn
committed
tmp_dir=mkdtemp()
central = Interface(server=host,
user=user,
password=passw,
cachedir=tmp_dir)
except IndexError as e:
raise xsa_errors.ServerNotFoundError("Server not found, check your host-address.")
Franziska Koehn
committed
result = []
try:
result = central.select(root_element,search_fields).where(constraints)
except errors.DatabaseError as e:
if '401' in str(e):
raise xsa_errors.UnauthorizedError("Unauthorizied attempt. Check your User and Password")
raise xsa_errors.ServerNotFoundError("Server not found, check your host-address.")
except ResponseNotReady:
raise xsa_errors.ResponseNotReady("Please check your Host-Address")
if result == []:
raise xsa_errors.QueryError("Please check your query.")
Franziska Koehn
committed
try:
central.disonnect()
except AttributeError:
print "can\'t close connection (wrong pyxnat version?)"
try:
from shutil import rmtree
rmtree(tmp_dir)
except:
pass