Commit ab252c80 authored by Jameson Rollins's avatar Jameson Rollins

guardview: ncurses multi-node status view

This is an urwid/ncurses based EPICS client program to show a status
overview of many clients at once.  Connections for many nodes should happen
fast, with status updates happening in callbacks.

Needs some work, but mostly works as needed.
parent 250a4997
Pipeline #14745 failed with stage
in 7 minutes and 5 seconds
import sys
from . import ui
def main():
nodes = sys.argv[1:]
ui.UI(nodes=nodes)
if __name__ == '__main__':
main()
import urwid
import logging
############################################################
class BufferHelp(urwid.WidgetWrap):
def __init__(self, ui, target=None):
self.ui = ui
self.target = target
# if self.target:
# #self.ui.set_header([urwid.Text("help: " + tname)])
# else:
# #self.ui.set_header([urwid.Text("help")])
pile = []
if self.target and hasattr(self.target, 'keys'):
tname = self.target.name
pile.append(urwid.Text('%s commands:' % (tname)))
pile.append(urwid.Text(''))
for key, cmd in self.target.keys.iteritems():
pile.append(self.row('target', cmd, key))
pile.append(urwid.Text(''))
pile.append(urwid.Text(''))
pile.append(urwid.Text('Global commands:'))
pile.append(urwid.Text(''))
for key, cmd in self.ui.keys.iteritems():
pile.append(self.row('ui', cmd, key))
pile.append(urwid.Text(''))
pile.append(urwid.Text(''))
pile.append(urwid.Text('Use the SHIFT key to select text with the mouse.'))
w = urwid.Filler(urwid.Pile(pile))
self.__super.__init__(w)
def row(self, c, cmd, key):
hstring = eval('str(self.%s.%s.__doc__)' % (c, cmd))
return urwid.Columns([('fixed', 10, urwid.Text(key)),
urwid.Text(hstring),
])
def keypress(self, size, key):
return key #self.ui.keypress(key)
This diff is collapsed.
ui = [
('footer', 'white', 'dark blue'),
('footer_error', 'white', 'dark red'),
('prompt', 'black', 'light green'),
('header', '', '', '', 'g62, bold', '#008'),
]
node = [
('node name', 'white, bold', '', '', '', ''),
#('node name focus', 'white', 'dark gray', '', 'white', 'g19'),
('node attr', '', '', '', '', ''),
#('node attr focus', '', 'dark gray', '', '', 'g19'),
('ok', '', '', '', '#0f0, bold', ''),
('error', '', '', '', '#f00, bold', ''),
('connect', '', '', '', '#f80, bold', ''),
('indicator green', '', '', '', '', '#080'),
('indicator orange', '', '', '', '', '#f80'),
('indicator red', '', '', '', '', '#c00'),
('state', '', '', '', '#ff0', ''),
('state request', '', '', '', '#ff0', ''),
('state ok', '', '', '', '#0f0', ''),
# ('node name', '', ''),
# ('node name focus', 'white, bold', ''),
# ('node attr', '', ''),
# ('node attr focus', 'white, bold', ''),
]
focus_map = {}
node_focus = []
for a in node:
b = list(a)
b[0] += ' focus'
if b[5] == '':
b[5] = 'g19'
focus_map[a[0]] = b[0]
node_focus.append(tuple(b))
PALETTE = ui + node + node_focus
import os
import urwid
import fnmatch
import collections
import logging
if os.getenv('GUARDCTRL_LOG_FILE', None):
logging.basicConfig(filename=os.getenv('GUARDCTRL_LOG_FILE'),
level=logging.INFO)
from . import palette
from .nodes import NodeItem, BufferNodes
from .help import BufferHelp
############################################################
class UI():
keys = collections.OrderedDict([
('l', "prompt_limit"),
('=', "refresh"),
('q', "kill_buffer"),
('Q', "quit"),
('?', "help"),
])
def __init__(self, nodes):
#self.nodes = nodes
#self.devnull = open(os.devnull, 'wb') #open('/dev/null', 'rw')
#self.log = StringIO.StringIO()
self.header_string = "guardctrl"
self.status_string = "ENTER: node control, g: node graph, l: limit, q: kill buffer, Q: quit, ?: help"
# nodes display objects
node_width = max([len(node) for node in nodes])
self.nodes = []
for node in nodes:
self.nodes.append(NodeItem(self, node, node_width))
self.resolved_pvs = []
# make a list to hold the buffer objects
self.buffers = []
self.view = urwid.Frame(urwid.SolidFill())
self.set_status()
self.new_buffer(BufferNodes(self))
#self.new_buffer(BufferHelp(self))
self.mainloop = urwid.MainLoop(
self.view,
palette.PALETTE,
unhandled_input=self.keypress,
#handle_mouse=False,
)
self.mainloop.screen.set_terminal_properties(colors=88)
self.mainloop.set_alarm_in(2, self.update_ctrl_vars)
self.mainloop.run()
def keypress(self, key):
if key in self.keys:
logging.debug("KEY: %s %s" % (self.__class__.__name__, key))
cmd = "self.%s()" % (self.keys[key])
eval(cmd)
return True
else:
return key
##########
def set_status(self, text=None):
if not text:
text = self.status_string
self.view.set_footer(urwid.AttrMap(urwid.Text(text), 'footer'))
def error(self, text):
self.view.set_footer(urwid.AttrMap(urwid.Text(text), 'footer_error'))
def new_buffer(self, buffer):
logging.info("new buffer: %s" % buffer)
self.buffers.append(buffer)
self.view.body = urwid.AttrMap(buffer, 'body')
#UI(self.nodes, cmd=cmd, devs=self.devs)
self.set_status()
def kill_buffer(self):
"""kill current buffer"""
if len(self.buffers) <= 1:
return
#self.quit()
self.buffers.pop()
self.view.body = urwid.AttrMap(self.buffers[-1], 'body')
self.set_status()
def prompt(self, string):
prompt = PromptEdit(string)
self.view.set_footer(urwid.AttrMap(prompt, 'prompt'))
self.view.set_focus('footer')
return prompt
##########
def update_ctrl_vars(self, *args, **kwargs):
updates = False
for node in self.nodes:
for pv in node.dev._pvs.values():
pvname = pv.pvname
if pvname in self.resolved_pvs:
continue
if not pv.connected:
continue
logging.info("CTRL UPDATE: %s" % (pvname))
#pv.get_ctrlvars()
v = pv.get(as_string=True)
logging.info("CTRL UPDATE: %s = %s" % (pvname, v))
pv.run_callbacks()
self.resolved_pvs.append(pvname)
updates = True
if updates:
self.mainloop.draw_screen()
self.mainloop.set_alarm_in(2, self.update_ctrl_vars)
def nodes_from_glob(self, glob):
if not glob:
return self.nodes
if type(glob) is str:
glob = glob.strip().split()
nodes = []
for node in self.nodes:
for s in glob:
if fnmatch.fnmatch(node.name, s):
nodes.append(node)
return nodes
def prompt_limit(self):
"""limit node list (with globbing)"""
prompt = 'limit node list (globbing ok): '
urwid.connect_signal(self.prompt(prompt), 'done', self._prompt_limit_done)
def _prompt_limit_done(self, string):
self.view.set_focus('body')
urwid.disconnect_signal(self, self.prompt, 'done', self._prompt_limit_done)
if not string:
self.set_status()
return
buffer = BufferNodes(self, string)
self.new_buffer(buffer)
def help(self):
"""help"""
buffer = BufferHelp(self, self.buffers[-1])
self.new_buffer(buffer)
def quit(self):
"""quit"""
for node in self.nodes:
for pv in node.dev._pvs.values():
pv.clear_callbacks()
self.dev = None
raise urwid.ExitMainLoop()
############################################################
class PromptEdit(urwid.Edit):
__metaclass__ = urwid.signals.MetaSignals
signals = ['done']
def keypress(self, size, key):
if key == 'enter':
urwid.emit_signal(self, 'done', self.get_edit_text())
return
elif key == 'esc':
urwid.emit_signal(self, 'done', None)
return
urwid.Edit.keypress(self, size, key)
import os
import subprocess
def xclip(text, isfile=False):
"""Copy text or file contents into X clipboard."""
f = None
if isfile:
f = open(text, 'r')
sin = f
else:
sin = subprocess.PIPE
p = subprocess.Popen(["xclip", "-i"],
stdin=sin)
p.communicate(text)
if f:
f.close()
def background_cmd(ui, cmd):
#subprocess.call(cmd)
subprocess.Popen(cmd)
return
#FD = open(os.devnull, 'wb')
stdin = open('/dev/null', 'rb')
stdout = open('/dev/null', 'wb')
stderr = open('FOO', 'wb')
#FD = open('/dev/null', 'rwb')
subprocess.call(cmd,
stdin=stdin,
stdout=stdout,
stderr=foo, #stdout, #subprocess.STDOUT, #ui.devnull)
close_fds=True,
)
# p = subprocess.Popen(cmd,
# stdin=stdin,
# stdout=ui.log, #subprocess.PIPE,
# stderr=subprocess.STDOUT,
# )
# p.communicate()
def view_file(self, doc):
"""open document file"""
path = doc.get_fullpaths()
if not path:
self.ui.set_status('No file for document id:%d.' % entry.docid)
return
path = path[0]
if not os.path.exists(path):
self.ui.set_status('ERROR: id:%d: file not found.' % entry.docid)
return
#self.ui.set_status('opening file: %s...' % path)
subprocess.Popen(['xdg-open', path],
stdin=self.ui.devnull,
stdout=self.ui.devnull,
stderr=self.ui.devnull)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment