Skip to content
helpers.py 1.99 KiB
Newer Older
Bengfort's avatar
Bengfort committed
# (c) 2021 MPIB <https://www.mpib-berlin.mpg.de/>,
#
# This file is part of castellum-scheduler.
#
# castellum-scheduler is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Castellum is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with Castellum. If not, see
# <http://www.gnu.org/licenses/>.

import json
import os
Bengfort's avatar
Bengfort committed
import selectors
Bengfort's avatar
Bengfort committed
import uuid
from contextlib import contextmanager
from pathlib import Path


def get_path(_id):
    return Path('/tmp/') / 'scheduler-{}.fifo'.format(_id)


@contextmanager
def create_fifo(path):
    os.mkfifo(path, mode=0o600)
    try:
        with open(path, 'r+b', 0) as fh:
            yield fh
    finally:
        # FIXME: unreliable for some reason
        os.unlink(path)


Bengfort's avatar
Bengfort committed
def raise_on_timeout(fh, timeout):
    with selectors.DefaultSelector() as sel:
        sel.register(fh, selectors.EVENT_READ)
        if fh not in sel.select(timeout=timeout):
            raise TimeoutError


Bengfort's avatar
Bengfort committed
def send(path, data):
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    # FIXME locking
    bdata = json.dumps(data).encode('utf-8') + b'\n'
    with open(path, 'wb') as fh:
        return fh.write(bdata)


Bengfort's avatar
Bengfort committed
def request(data, timeout=10):
Bengfort's avatar
Bengfort committed
    _id = str(uuid.uuid4())
    with create_fifo(get_path(_id)) as fh:
        send(get_path('request'), (_id, data))
Bengfort's avatar
Bengfort committed
        raise_on_timeout(fh, timeout)
Bengfort's avatar
Bengfort committed
        return json.loads(fh.readline())


def listen():
    with create_fifo(get_path('request')) as fh:
        while True:
            yield json.loads(fh.readline())


def response(_id, success):
    send(get_path(_id), success)