Commit 7cab3430 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

Interruptable CLI provider and script timeouts

parent 9875d67d
......@@ -38,8 +38,12 @@ class Service(SAEService):
done(controller,response=c)
else:
e=Error()
e.code=ERR_SCRIPT_EXCEPTION
e.text=script.error_traceback
if script.to_cancel: # Timeout
e.code=ERR_TIMEOUT
e.text="Timed out"
else:
e.code=ERR_SCRIPT_EXCEPTION
e.text=script.error_traceback
done(controller,error=e)
try:
profile=profile_registry[request.access_profile.profile]
......@@ -446,6 +450,9 @@ class Activator(Daemon,FSM):
# Send collected PM data
if self.get_state()=="ESTABLISHED" and self.pm_data_queue:
self.send_pm_data()
# Cancel stale scripts
if self.get_state()=="ESTABLISHED":
self.cancel_stale_scripts()
# Perform default daemon/fsm machinery
super(Activator,self).tick()
......@@ -643,6 +650,15 @@ class Activator(Daemon,FSM):
d.value=value
self.pm_data_queue=[]
self.sae_stream.proxy.pm_data(r,pm_data_callback)
##
## Cancel stale scripts
##
def cancel_stale_scripts(self):
to_cancel=[st for st in self.script_threads if st.is_stale()]
for script in to_cancel:
logging.info("Canceling stale script %s(%s)"%(script.name,script.access_profile.address))
st.cancel_script()
# Signal handlers
# SIGUSR1 returns process info
......
......@@ -44,6 +44,7 @@ enum ErrorCode {
ERR_SCRIPT_EXCEPTION =14; // Script terminated with exception
ERR_ACTIVATOR_NOT_AVAILABLE =15; // Activator not available now
ERR_DOWN =16; // Host is down
ERR_TIMEOUT =17; // Script timed out
}
//
......
......@@ -79,6 +79,10 @@ _ERRORCODE = descriptor.EnumDescriptor(
name='ERR_DOWN', index=16, number=16,
options=None,
type=None),
descriptor.EnumValueDescriptor(
name='ERR_TIMEOUT', index=17, number=17,
options=None,
type=None),
],
options=None,
)
......@@ -123,6 +127,7 @@ ERR_INVALID_SCRIPT = 13
ERR_SCRIPT_EXCEPTION = 14
ERR_ACTIVATOR_NOT_AVAILABLE = 15
ERR_DOWN = 16
ERR_TIMEOUT = 17
TELNET = 0
SSH = 1
HTTP = 2
......
......@@ -32,7 +32,8 @@ scheme_id={
"ssh" : SSH,
"http" : HTTP,
}
##
class TimeOutError(Exception): pass
##
##
##
......@@ -111,8 +112,11 @@ class Script(threading.Thread):
TELNET=scheme_id["telnet"]
SSH=scheme_id["ssh"]
HTTP=scheme_id["http"]
TIMEOUT=120 # 2min by default
def __init__(self,profile,activator,access_profile,parent=None,**kwargs):
self.start_time=time.time()
self.to_cancel=False
self.parent=parent
self.access_profile=access_profile
if self.access_profile.address:
......@@ -137,6 +141,11 @@ class Script(threading.Thread):
self.strip_echo=True
self.kwargs=kwargs
self.scripts=ScriptProxy(self)
##
## Checks script is stale and must be terminated
##
def is_stale(self):
return time.time()-self.start_time > self.TIMEOUT
@classmethod
def implements_interface(cls,interface):
......@@ -147,6 +156,9 @@ class Script(threading.Thread):
def debug(self,msg):
logging.debug("[%s] %s"%(self.debug_name,msg))
def error(self,msg):
logging.error("[%s] %s"%(self.debug_name,msg))
def guarded_run(self):
self.debug("Guarded run")
......@@ -168,6 +180,8 @@ class Script(threading.Thread):
self.debug("Running")
try:
self.result=self.serialize_result(self.guarded_run())
except TimeOutError:
self.error("Timed out")
except:
t,v,tb=sys.exc_info()
r=[str(t),str(v)]
......@@ -181,6 +195,20 @@ class Script(threading.Thread):
def execute(self,**kwargs):
return None
##
## Request CLI provider's queue
## Handle cancel condition
##
def cli_queue_get(self):
while True:
try:
return self.cli_provider.queue.get(block=True,timeout=1)
except Queue.Empty:
if self.to_cancel:
self.error("Canceled")
raise TimeOutError()
else:
continue
def request_cli_provider(self):
if self.parent:
......@@ -194,7 +222,7 @@ class Script(threading.Thread):
else:
raise Exception("Invalid access scheme '%d' for CLI"%self.access_profile.scheme)
self.cli_provider=s_class(self.activator.factory,self.profile,self.access_profile)
self.cli_provider.queue.get(block=True) # Wait until provider in PROMPT
self.cli_queue_get()
self.debug("CLI Provider is ready")
return self.cli_provider
......@@ -202,7 +230,7 @@ class Script(threading.Thread):
self.debug("cli(%s)"%cmd)
self.request_cli_provider()
self.cli_provider.submit(cmd)
data=self.cli_provider.queue.get(block=True)
data=self.cli_queue_get()
if self.strip_echo and data.lstrip().startswith(cmd):
data=self.strip_first_lines(data.lstrip())
self.debug("cli() returns:\n---------\n%s\n---------"%repr(data))
......@@ -241,6 +269,11 @@ class Script(threading.Thread):
else:
result[int(x)]=None
return sorted(result.keys())
##
## Cancel script
##
def cancel_script(self):
self.to_cancel=True
##
##
##
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment