Source code for almir.lib.bconsole
import re
import fcntl
import select
import os
import shlex
import tempfile
from contextlib import contextmanager
from subprocess import Popen, PIPE
from almir.lib.utils import nl2br
CURRENT_DIRECTORY = os.path.dirname(os.path.abspath(__file__))
# Full Backup 10 04-Mar-12 23:05 BackupClient1 *unknown*
UPCOMING_JOB_REGEX = re.compile(r"""
\s*(?P<level>\S+)\s+
(?P<type>\S+)\s+
(?P<priority>\d+)\s+
(?P<date>\S+)\s+
(?P<time>\d+:\d+)\s+
(?P<name>\S+)\s+
(?P<volume>\S+)\s*
""", re.X)
[docs]class BConsole(object):
"""Interface to bconsole binary"""
def __init__(self, bconsole_command='bconsole -n -c %s', config_file=None):
default_config_file = os.path.realpath(os.path.join(CURRENT_DIRECTORY, '..', '..', 'bconsole.conf'))
self.config_file = config_file or default_config_file
self.bconsole_command = bconsole_command % self.config_file
@classmethod
@contextmanager
[docs] def from_temp_config(cls, name, address, port, password):
"""Constructs :class:`BConsole` object with help of passing temporary file for the session.
"""
with tempfile.NamedTemporaryFile() as f:
template = os.path.join(CURRENT_DIRECTORY, '..', '..', 'buildout.d', 'templates', 'bconsole.conf.in')
with open(template) as f2:
config = f2.read()\
.replace('${almir:director_name}', name)\
.replace('${almir:director_port}', port)\
.replace('${almir:director_password}', password)\
.replace('${almir:director_address}', address)
f.write(config)
f.flush()
yield cls(config_file=f.name)
[docs] def start_process(self):
return Popen(shlex.split(self.bconsole_command), stdout=PIPE, stdin=PIPE, stderr=PIPE)
[docs] def is_running(self):
stdout, stderr = self.start_process().communicate('version')
# TODO: output stdout, stderr on debug mode?
if 'version' in stdout.lower():
return True
else:
return False
[docs] def get_upcoming_jobs(self):
# TODO: we can do: .status dir scheduled days=10
jobs = []
p = self.start_process()
stdout, stderr = p.communicate('.status dir scheduled\n')
#if stderr.strip():
# pass # TODO: display flash?
for jobmatch in UPCOMING_JOB_REGEX.finditer(stdout):
jobs.append(jobmatch.groupdict())
return jobs
[docs] def send_command_by_polling(self, command, process=None):
""""""
if command == 'quit':
return process, {'commands': ['Try harder.']}
# start bconsole session if it's not initialized
if process is None:
process = self.start_process()
poll = process.poll()
if poll is not None:
process = None
return process, {'error': 'Connection to director terminated with status %d. Refresh to reconnect.' % poll}
# send bconsole command
if command:
process.stdin.write(command.strip() + '\n')
# make stdout fileobject nonblockable
fp = process.stdout.fileno()
flags = fcntl.fcntl(fp, fcntl.F_GETFL)
fcntl.fcntl(fp, fcntl.F_SETFL, flags | os.O_NONBLOCK)
output = ''
while 1:
# wait for data or timeout
[i, o, e] = select.select([fp], [], [], 1)
if i:
# we have more data
output += process.stdout.read(1000)
else:
# we have a timeout
output = nl2br(output)
return process, {"commands": [output]}