Newer
Older
# (c) 2021 MPIB <https://www.mpib-berlin.mpg.de/>,
#
# This file is part of castellum-scheduler.
#
# castellum-scheduler is free software; you can redistribute it and/or
# modify it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# Castellum is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with Castellum. If not, see
# <http://www.gnu.org/licenses/>.
import json
import os
import uuid
from contextlib import contextmanager
from pathlib import Path
def get_path(_id):
return Path('/tmp/') / 'scheduler-{}.fifo'.format(_id)
@contextmanager
def create_fifo(path):
os.mkfifo(path, mode=0o600)
try:
with open(path, 'r+b', 0) as fh:
yield fh
finally:
# FIXME: unreliable for some reason
os.unlink(path)
def raise_on_timeout(fh, timeout):
with selectors.DefaultSelector() as sel:
sel.register(fh, selectors.EVENT_READ)
if fh not in sel.select(timeout=timeout):
raise TimeoutError
def send(path, data):
if not os.path.exists(path):
raise FileNotFoundError(path)
# FIXME locking
bdata = json.dumps(data).encode('utf-8') + b'\n'
with open(path, 'wb') as fh:
return fh.write(bdata)
_id = str(uuid.uuid4())
with create_fifo(get_path(_id)) as fh:
send(get_path('request'), (_id, data))