activator.py 23.6 KB
Newer Older
Dmitry Volodin's avatar
Dmitry Volodin committed
1
2
3
4
5
6
7
8
9
# -*- coding: utf-8 -*-
##----------------------------------------------------------------------
## Service Activator Daemon
##----------------------------------------------------------------------
## Copyright (C) 2007-2009 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
"""
"""
10
from __future__ import with_statement
11
import os,logging,pty,signal,time,re,sys,signal,Queue,cPickle,tempfile
12
from errno import ECONNREFUSED
Dmitry Volodin's avatar
Dmitry Volodin committed
13
from noc.sa.profiles import profile_registry
Dmitry Volodin's avatar
Dmitry Volodin committed
14
from noc.sa.script import script_registry,ScriptSocket
15
from noc.sa.rpc import RPCSocket,file_hash,get_digest
Dmitry Volodin's avatar
Dmitry Volodin committed
16
from noc.sa.protocols.sae_pb2 import *
17
from noc.sa.protocols.pm_pb2 import *
18
from noc.lib.fileutils import safe_rewrite
Dmitry Volodin's avatar
Dmitry Volodin committed
19
from noc.lib.daemon import Daemon
Dmitry Volodin's avatar
Dmitry Volodin committed
20
from noc.lib.fsm import FSM,check_state
21
from noc.lib.nbsocket import ConnectedTCPSocket,ConnectedTCPSSLSocket,SocketFactory,PTYSocket,HAS_SSL,ListenUDPSocket
22
from noc.lib.debug import DEBUG_CTX_CRASH_PREFIX
23
from noc.lib.pmhash import pmhash
Dmitry Volodin's avatar
Dmitry Volodin committed
24
from threading import Lock
Dmitry Volodin's avatar
Dmitry Volodin committed
25

Dmitry Volodin's avatar
Dmitry Volodin committed
26
27
28
29
##
##
##
class Service(SAEService):
30
31
32
    def ping(self,controller,request,done):
        done(controller,response=PingResponse())
        
Dmitry Volodin's avatar
Dmitry Volodin committed
33
34
    def script(self,controller,request,done):
        def script_callback(script):
35
            if script.result:
Dmitry Volodin's avatar
Dmitry Volodin committed
36
37
                c=ScriptResponse()
                c.result=script.result
38
39
40
                done(controller,response=c)
            else:
                e=Error()
41
42
                e.code=ERR_SCRIPT_EXCEPTION
                e.text=script.error_traceback
43
                done(controller,error=e)
44
45
46
47
48
49
50
51
        try:
            profile=profile_registry[request.access_profile.profile]
        except:
            e=Error()
            e.code=ERR_INVALID_PROFILE
            e.text="Invalid profile '%s'"%request.access_profile.profile
            done(controller,error=e)
            return
Dmitry Volodin's avatar
Dmitry Volodin committed
52
53
54
55
56
57
58
59
        try:
            script_class=script_registry[request.script]
        except:
            e=Error()
            e.code=ERR_INVALID_SCRIPT
            e.text="Invalid script '%s'"%request.script
            done(controller,error=e)
            return
60
61
62
63
64
65
66
        if request.access_profile.scheme not in profile.supported_schemes:
            e=Error()
            e.code=ERR_INVALID_SCHEME
            e.text="Access scheme '%s' is not supported for profile '%s'"%(self.code_to_scheme(request.access_profile.scheme),
                request.access_profile.profile)
            done(controller,error=e)
            return
67
68
69
70
71
72
73
74
75
76
        # Check host was checked by ping. Reject executing of script on known unreachable hosts
        if self.activator.ping_check_results\
                and request.access_profile.address in self.activator.ping_check_results\
                and not self.activator.ping_check_results[request.access_profile.address]:
            e=Error()
            e.code=ERR_DOWN
            e.text="Host is down"
            done(controller,error=e)
            return
        # Check [activator]/max_pull_config limit
Dmitry Volodin's avatar
Dmitry Volodin committed
77
        if self.activator.factory.count_subclass_sockets(ScriptSocket)>=self.activator.config.getint("activator","max_pull_config"):
78
79
            e=Error()
            e.code=ERR_OVERLOAD
Dmitry Volodin's avatar
Dmitry Volodin committed
80
            e.text="script concurrent session limit reached"
81
82
            done(controller,error=e)
            return
Dmitry Volodin's avatar
Dmitry Volodin committed
83
        kwargs={}
84
        for a in request.kwargs:
85
            kwargs[str(a.key)]=cPickle.loads(str(a.value))
Dmitry Volodin's avatar
Dmitry Volodin committed
86
        self.activator.run_script(request.script,request.access_profile,script_callback,**kwargs)
Dmitry Volodin's avatar
Dmitry Volodin committed
87
88
    
    def ping_check(self,controller,request,done):
89
        def ping_check_callback(unreachable):
90
            u=set(unreachable)
Dmitry Volodin's avatar
Dmitry Volodin committed
91
            r=PingCheckResponse()
92
            self.activator.ping_check_results={} # Reset previous ping checks
93
94
95
            for a in request.addresses:
                if a in u:
                    r.unreachable.append(a)
96
                    self.activator.ping_check_results[a]=False
97
98
                else:
                    r.reachable.append(a)
99
                    self.activator.ping_check_results[a]=True
Dmitry Volodin's avatar
Dmitry Volodin committed
100
101
            done(controller,response=r)
        self.activator.ping_check([a for a in request.addresses],ping_check_callback)
Dmitry Volodin's avatar
Dmitry Volodin committed
102
##
103
## Activator-SAE channel
104
105
##
class ActivatorSocket(RPCSocket,ConnectedTCPSocket):
106
107
    def __init__(self,factory,address,port,local_address=None):
        ConnectedTCPSocket.__init__(self,factory,address,port,local_address)
108
109
110
111
112
113
114
115
116
117
118
119
120
        RPCSocket.__init__(self,factory.activator.service)
        
    def activator_event(self,event):
        self.factory.activator.event(event)
    
    def on_connect(self):
        self.activator_event("connect")
    
    def on_close(self):
        self.activator_event("close")
    
    def on_conn_refused(self):
        self.activator_event("refused")
Dmitry Volodin's avatar
Dmitry Volodin committed
121
##
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
## SSL-enabled Activator socket
##
class ActivatorSSLSocket(RPCSocket,ConnectedTCPSSLSocket):
    def __init__(self,factory,address,port,local_address=None):
        ConnectedTCPSSLSocket.__init__(self,factory,address,port,local_address)
        RPCSocket.__init__(self,factory.activator.service)
        
    def activator_event(self,event):
        self.factory.activator.event(event)

    def on_connect(self):
        self.activator_event("connect")

    def on_close(self):
        self.activator_event("close")

    def on_conn_refused(self):
        self.activator_event("refused")
##
Dmitry Volodin's avatar
Dmitry Volodin committed
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
## External fping process.
## Runs fping, supplies a list of checked hosts
## and reads a list uf unreachable hosts
##
class FPingProbeSocket(PTYSocket):
    def __init__(self,factory,fping_path,addresses,callback):
        self.result=""
        # Write hosts list to temporary file
        h,self.tmp_path=tempfile.mkstemp()
        f=os.fdopen(h,"w")
        f.write("\n".join(addresses)+"\n")
        f.close()
        self.callback=callback
        # Fping requires root to read hosts from file. Run it through the wrapper
        PTYSocket.__init__(self,factory,["./scripts/stdin-wrapper",self.tmp_path,fping_path,"-A","-u"])
        
    def on_close(self):
        os.unlink(self.tmp_path)
        # fping issues duplicated addresses sometimes.
        # Remove duplicates
        r={}
        for u in [x.strip() for x in self.result.split("\n") if x.strip()]:
            r[u]=None
        self.callback(r.keys())
        
    def on_read(self,data):
        self.result+=data
##
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
## PM Collector socket
##
class PMCollectorSocket(ListenUDPSocket):
    def __init__(self,activator,address,port):
        self.activator=activator
        super(PMCollectorSocket,self).__init__(activator.factory,address,port)

    def on_read(self,data,address,port):
        msg=PMMessage()
        try:
            msg.ParseFromString(data)
        except:
            return
        # Check hash
        if pmhash(address,self.activator.pm_data_secret,[d.timestamp for d in msg.result]+[d.timestamp for d in msg.data])!=msg.checksum:
            logging.error("Invalid PM hash in packet from %s"%address)
            return
        # Queue data
        self.activator.queue_pm_result([(d.probe_name,d.probe_type,d.timestamp,d.service,d.result,d.message) for d in msg.result if d.probe_name])
        self.activator.queue_pm_data([(d.name,d.timestamp,d.is_null,d.value) for d in msg.data if d.name])
##
Dmitry Volodin's avatar
Dmitry Volodin committed
190
191
## Activator supervisor and daemon
##
192
class Activator(Daemon,FSM):
Dmitry Volodin's avatar
Dmitry Volodin committed
193
    daemon_name="noc-activator"
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
    FSM_NAME="Activator"
    DEFAULT_STATE="IDLE"
    STATES={
        "IDLE": {
                "timeout" : "CONNECT",
                "close"   : "IDLE",
                },
        "CONNECT" : {
                "timeout" : "IDLE",
                "refused" : "IDLE",
                "close"   : "IDLE",
                "connect" : "CONNECTED",
                },
        "CONNECTED" : {
                "timeout" : "IDLE",
                "close"   : "IDLE",
210
211
212
213
214
215
216
                "register": "REGISTRED",
                "error"   : "IDLE",
        },
        "REGISTRED" : {
                "timeout" : "IDLE",
                "auth"    : "AUTHENTICATED",
                "close"   : "IDLE",
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
                "error"   : "IDLE",
        },
        "AUTHENTICATED" : {
                "establish" : "ESTABLISHED",
                "upgrade"   : "UPGRADE",
                "close"   : "IDLE",
        },
        "UPGRADE" : {
                "establish" : "ESTABLISHED",
                "close"     : "IDLE",
        },
        "ESTABLISHED" : {
                "close"   : "IDLE",
        }
    }
232
233
    def __init__(self):
        Daemon.__init__(self)
234
235
        self.activator_name=self.config.get("activator","name")
        logging.info("Running activator '%s'"%self.activator_name)
236
        self.stand_alone_mode=self.config.get("activator","software_update") and not os.path.exists(os.path.join("sa","sae.py"))
Dmitry Volodin's avatar
Dmitry Volodin committed
237
        self.service=Service()
238
        self.service.activator=self
239
240
        self.factory=SocketFactory(tick_callback=self.tick)
        self.factory.activator=self
Dmitry Volodin's avatar
Dmitry Volodin committed
241
        self.children={}
242
        self.ping_check_results={} # address -> last ping check result
243
        self.sae_stream=None
244
        self.event_sources=set()
245
246
        self.trap_collectors=[]   # List of SNMP Trap collectors
        self.syslog_collectors=[] # List of SYSLOG collectors
247
        self.pm_data_collectors=[] # List of PM Data collectors
Dmitry Volodin's avatar
Dmitry Volodin committed
248
        logging.info("Loading profile classes")
Dmitry Volodin's avatar
Dmitry Volodin committed
249
250
        profile_registry.register_all() # Should be performed from ESTABLISHED state
        script_registry.register_all()
251
        self.nonce=None
252
        FSM.__init__(self)
253
        self.next_filter_update=None
254
        self.next_crashinfo_check=None
Dmitry Volodin's avatar
Dmitry Volodin committed
255
256
257
        self.script_threads={}
        self.script_lock=Lock()
        self.script_call_queue=Queue.Queue()
258
259
260
        self.pm_data_queue=[]
        self.pm_result_queue=[]
        self.pm_data_secret=self.config.get("activator","pm_data_secret")
261
262
263
264
265
266
267
    ##
    ## IDLE state 
    ##
    def on_IDLE_enter(self):
        if self.sae_stream:
            self.sae_stream.close()
            self.sae_stream=None
268
269
270
271
        if self.trap_collectors:
            self.stop_trap_collectors()
        if self.syslog_collectors:
            self.stop_syslog_collectors()
272
273
        if self.pm_data_collectors:
            self.stop_pm_data_collectors()
274
275
276
277
278
279
        self.set_timeout(5)
    ##
    ## CONNECT state
    ##
    def on_CONNECT_enter(self):
        self.set_timeout(10)
280
281
282
283
284
285
        if HAS_SSL and self.config.get("sae","ssl_host"):
            self.sae_stream=ActivatorSSLSocket(self.factory,self.config.get("sae","ssl_host"),self.config.getint("sae","ssl_port"),
                self.config.get("sae","local_address"))
        else:
            self.sae_stream=ActivatorSocket(self.factory,self.config.get("sae","host"),self.config.getint("sae","port"),
                self.config.get("sae","local_address"))
286
287
288
289
290
291
292
    ##
    ## CONNECTED state
    ##
    def on_CONNECTED_enter(self):
        self.set_timeout(10)
        self.register()
    ##
293
    ## REGISTRED
294
    ##
295
296
297
    def on_REGISTRED_enter(self):
        self.set_timeout(10)
        self.auth()
298
299
300
301
    ##
    ## AUTHENTICATED
    ##
    def on_AUTHENTICATED_enter(self):
302
        if self.stand_alone_mode:
303
304
305
306
307
308
309
310
311
312
313
314
315
316
            self.event("upgrade")
        else:
            logging.info("In-bundle package. Skiping software updates")
            self.event("establish")
    ##
    ## UPGRADE
    ##
    def on_UPGRADE_enter(self):
        logging.info("Requesting software update")
        self.manifest()
    ##
    ## ESTABLISHED
    ##
    def on_ESTABLISHED_enter(self):
Dmitry Volodin's avatar
Dmitry Volodin committed
317
        to_refresh_filters=False
318
        self.next_filter_update=None
319
        if self.config.get("activator","listen_traps"):
320
            self.start_trap_collectors()
Dmitry Volodin's avatar
Dmitry Volodin committed
321
            to_refresh_filters=True
Dmitry Volodin's avatar
Dmitry Volodin committed
322
        if self.config.get("activator","listen_syslog"):
323
            self.start_syslog_collectors()
Dmitry Volodin's avatar
Dmitry Volodin committed
324
            to_refresh_filters=True
325
326
327
        if self.config.get("activator","listen_pm_data"):
            self.start_pm_data_collectors()
            to_refresh_filters=True
Dmitry Volodin's avatar
Dmitry Volodin committed
328
329
        if to_refresh_filters:
            self.get_event_filter()
330
331
        if self.stand_alone_mode:
            self.check_crashinfo()
332
    ##
333
    ## Launch SNMP Trap collectors
334
    ##
335
336
    def start_trap_collectors(self):
        logging.debug("Starting trap collectors")
337
        from noc.sa.trapcollector import TrapCollector
338
339
340
341
342
343
344
        self.trap_collectors=[
            TrapCollector(self,ip,port)
            for ip,port
            in self.resolve_addresses(self.config.get("activator","listen_traps"),162)
        ]
    ##
    ## Disable SNMP Trap collectors
345
    ##
346
347
348
349
350
351
    def stop_trap_collectors(self):
        if self.trap_collectors:
            logging.debug("Stopping trap collectors")
            for tc in self.trap_collectors:
                tc.close()
            self.trap_collectors=[]
Dmitry Volodin's avatar
Dmitry Volodin committed
352
    ##
353
    ## Launch syslog collectors
Dmitry Volodin's avatar
Dmitry Volodin committed
354
    ##
355
356
    def start_syslog_collectors(self):
        logging.debug("Starting syslog collectors")
Dmitry Volodin's avatar
Dmitry Volodin committed
357
        from noc.sa.syslogcollector import SyslogCollector
358
359
360
361
362
363
364
365
366
367
368
369
370
371
        self.syslog_collectors=[
            SyslogCollector(self,ip,port)
            for ip,port
            in self.resolve_addresses(self.config.get("activator","listen_syslog"),514)
        ]
    ##
    ## Disable syslog collectors
    ##
    def stop_syslog_collectors(self):
        if self.syslog_collectors:
            logging.debug("Stopping syslog collectors")
            for sc in self.syslog_collectors:
                sc.close()
            self.syslog_collectors=[]
Dmitry Volodin's avatar
Dmitry Volodin committed
372
    ##
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
    ## Launch PM data collectors
    ##
    def start_pm_data_collectors(self):
        logging.debug("Starting PM Data collectors")
        self.pm_data_collectors=[
            PMCollectorSocket(self,ip,port)
            for ip,port
            in self.resolve_addresses(self.config.get("activator","listen_pm_data"),19704)
        ]
    ##
    ## Disable syslog collectors
    ##
    def stop_pm_data_collectors(self):
        if self.pm_data_collectors:
            logging.debug("Stopping PM Data collectors")
            for pdc in self.pm_data_collectors:
                pdc.close()
            self.pm_data_collectors=[]
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
392
393
394
    ## Script support
    ##
    def run_script(self,name,access_profile,callback,**kwargs):
Dmitry Volodin's avatar
Dmitry Volodin committed
395
        logging.info("Script %s(%s)"%(name,access_profile.address))
Dmitry Volodin's avatar
Dmitry Volodin committed
396
397
398
        pv,pos,sn=name.split(".",2)
        profile=profile_registry["%s.%s"%(pv,pos)]()        
        script=script_registry[name](profile,self,access_profile,**kwargs)
399
400
401
        with self.script_lock:
            self.script_threads[script]=callback
            logging.info("%d script threads"%(len(self.script_threads)))
Dmitry Volodin's avatar
Dmitry Volodin committed
402
        script.start()
403

Dmitry Volodin's avatar
Dmitry Volodin committed
404
    def on_script_exit(self,script):
405
406
407
408
        logging.info("Script %s(%s) completed"%(script.name,script.access_profile.address))
        with self.script_lock:
            cb=self.script_threads.pop(script)
            logging.info("%d script threads left"%(len(self.script_threads)))
Dmitry Volodin's avatar
Dmitry Volodin committed
409
410
411
412
413
        cb(script)
        
    def request_call(self,f,*args,**kwargs):
        logging.debug("Requesting call: %s(*%s,**%s)"%(f,args,kwargs))
        self.script_call_queue.put((f,args,kwargs))
Dmitry Volodin's avatar
Dmitry Volodin committed
414
415
416
417
418
419
420
421
    ##
    ##
    ##
    def ping_check(self,addresses,callback):
        fping_probe_socket=FPingProbeSocket(self.factory,self.config.get("path","fping"),addresses,callback)
    ##
    ##
    ##
422
423
    def check_event_source(self,address):
        return address in self.event_sources
Dmitry Volodin's avatar
Dmitry Volodin committed
424
425
        
    ##
426
427
    ## Main event loop
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
428
    def run(self):
429
        self.factory.run(run_forever=True)
430
431
    ##
    def tick(self):
Dmitry Volodin's avatar
Dmitry Volodin committed
432
        # Request filter updates
433
        if self.get_state()=="ESTABLISHED" and self.next_filter_update and time.time()>self.next_filter_update:
434
            self.get_event_filter()
435
        # Check for pending crashinfos
436
        if self.stand_alone_mode and self.next_crashinfo_check and time.time()>self.next_crashinfo_check and self.get_state()=="ESTABLISHED":
437
            self.check_crashinfo()
Dmitry Volodin's avatar
Dmitry Volodin committed
438
439
440
441
442
443
444
445
        # Perform delayed calls
        while not self.script_call_queue.empty():
            try:
                f,args,kwargs=self.script_call_queue.get_nowait()
            except:
                break
            logging.debug("Calling delayed %s(*%s,**%s)"%(f,args,kwargs))
            apply(f,args,kwargs)
446
447
448
        # Send collected PM data
        if self.get_state()=="ESTABLISHED" and self.pm_data_queue:
            self.send_pm_data()
Dmitry Volodin's avatar
Dmitry Volodin committed
449
        # Perform default daemon/fsm machinery
450
        super(Activator,self).tick()
Dmitry Volodin's avatar
Dmitry Volodin committed
451
452
453
454
455
456
457
458
459
                
    def register_stream(self,stream):
        logging.debug("Registering stream %s"%str(stream))
        self.streams[stream]=None
        
    def release_stream(self,stream):
        logging.debug("Releasing stream %s"%str(stream))
        del self.streams[stream]
                
460
461
462
    def reboot(self):
        logging.info("Rebooting")
        os.execv(sys.executable,[sys.executable]+sys.argv)
Dmitry Volodin's avatar
Dmitry Volodin committed
463
        
Dmitry Volodin's avatar
Dmitry Volodin committed
464
465
466
467
    # Handlers
    ##
    ## Register
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
468
    @check_state("CONNECTED")
Dmitry Volodin's avatar
Dmitry Volodin committed
469
    def register(self):
470
        def register_callback(transaction,response=None,error=None):
471
472
            if self.get_state()!="CONNECTED":
                return
473
474
            if error:
                logging.error("Registration error: %s"%error.text)
475
                self.event("error")
476
                return
477
            logging.info("Registration accepted")
478
            self.nonce=response.nonce
479
            self.event("register")
Dmitry Volodin's avatar
Dmitry Volodin committed
480
        logging.info("Registering as '%s'"%self.config.get("activator","name"))
Dmitry Volodin's avatar
Dmitry Volodin committed
481
        r=RegisterRequest()
482
        r.name=self.activator_name
483
484
485
486
        self.sae_stream.proxy.register(r,register_callback)
    ##
    ## Auth
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
487
    @check_state("REGISTRED")
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
    def auth(self):
        def auth_callback(transaction,response=None,error=None):
            if self.get_state()!="REGISTRED":
                return
            if error:
                logging.error("Authentication failed: %s"%error.text)
                self.event("error")
                return
            logging.info("Authenticated")
            self.event("auth")
        logging.info("Authenticating")
        r=AuthRequest()
        r.name=self.config.get("activator","name")
        r.digest=get_digest(r.name,self.config.get("activator","secret"),self.nonce)
        self.sae_stream.proxy.auth(r,auth_callback)
Dmitry Volodin's avatar
Dmitry Volodin committed
503
504
505
        
    ##
    ##
506
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
507
    @check_state("UPGRADE")
508
509
    def manifest(self):
        def manifest_callback(transaction,response=None,error=None):
510
511
            if self.get_state()!="UPGRADE":
                return
512
513
514
515
516
517
518
            if error:
                logging.error("Manifest error: %s"%error.text)
                self.manifest_transaction=None
                return
            if transaction.id==self.manifest_transaction.id:
                update_list=[]
                for cs in response.files:
519
                    if not os.path.exists(cs.name) or cs.hash!=file_hash(cs.name):
520
521
522
523
                        update_list.append(cs.name)
                self.manifest_transaction=None
                if update_list:
                    self.software_upgrade(update_list)
524
525
                else:
                    self.event("establish")
526
527
528
529
530
531
532
533
534
            else:
                logging.error("Transaction id mismatch")
                self.manifest_transaction=None
        logging.info("Requesting manifest")
        r=ManifestRequest()
        self.manifest_transaction=self.sae_stream.proxy.manifest(r,manifest_callback)
    ##
    ## 
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
535
    @check_state("UPGRADE")
536
537
538
539
540
541
542
543
544
    def software_upgrade(self,update_list):
        def software_upgrade_callback(transaction,response=None,error=None):
            if error:
                logging.error("Upgrade error: %s"%error.text)
                self.software_upgrade_transaction=None
                return
            if transaction.id==self.software_upgrade_transaction.id:
                logging.info("Upgrading software")
                for u in response.codes:
545
546
                    logging.info("Upgrade: %s"%u.name)
                    safe_rewrite(u.name,u.code)
547
548
549
550
551
                self.software_upgrade_transaction=None
                self.reboot()
            else:
                logging.error("Transaction id mismatch")
                self.software_upgrade_transaction=None
552
        logging.debug("Requesting software upgrade for %s"%str(update_list))
553
554
555
556
        r=SoftwareUpgradeRequest()
        for f in update_list:
            r.names.append(f)
        self.software_upgrade_transaction=self.sae_stream.proxy.software_upgrade(r,software_upgrade_callback)
Dmitry Volodin's avatar
Dmitry Volodin committed
557
558
559
    ##
    ##
    ##
Dmitry Volodin's avatar
Dmitry Volodin committed
560
    @check_state("ESTABLISHED")
Dmitry Volodin's avatar
Dmitry Volodin committed
561
562
    def get_event_filter(self):
        def event_filter_callback(transaction,response=None,error=None):
Dmitry Volodin's avatar
Dmitry Volodin committed
563
            if error:
Dmitry Volodin's avatar
Dmitry Volodin committed
564
                logging.error("get_event_filter error: %s"%error.text)
Dmitry Volodin's avatar
Dmitry Volodin committed
565
                return
566
            self.event_sources=set(response.sources)
567
            self.next_filter_update=time.time()+response.expire
Dmitry Volodin's avatar
Dmitry Volodin committed
568
569
570
        r=EventFilterRequest()
        self.sae_stream.proxy.event_filter(r,event_filter_callback)
    ##
571
572
    ## Collect crashinfo files and send them as system events to SAE
    ## (Called only in standalone mode)
Dmitry Volodin's avatar
Dmitry Volodin committed
573
    ##
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
    @check_state("ESTABLISHED")
    def check_crashinfo(self):
        if not self.config.get("main","logfile"):
            return
        c_d=os.path.dirname(self.config.get("main","logfile"))
        if not os.path.isdir(c_d):
            return
        for fn in [fn for fn in os.listdir(c_d) if fn.startswith(DEBUG_CTX_CRASH_PREFIX)]:
            # Load and unpickle crashinfo
            path=os.path.join(c_d,fn)
            f=open(path)
            data=f.read()
            f.close()
            data=cPickle.loads(data)
            ts=data["ts"]
            del data["ts"]
            # Send event. "" is an virtual address of ROOT object
            self.on_event(ts,"",data)
            os.unlink(path)
        # Next check - after 60 seconds timeout
        self.next_crashinfo_check=time.time()+60
        
    ##
    ## Send FM event to SAE
Dmitry Volodin's avatar
Dmitry Volodin committed
598
    ##
599
600
    def on_event(self,timestamp,ip,body):
        def on_event_callback(transaction,response=None,error=None):
Dmitry Volodin's avatar
Dmitry Volodin committed
601
            if error:
Dmitry Volodin's avatar
Dmitry Volodin committed
602
                logging.error("event_proxy failed: %s"%error)
603
604
        r=EventRequest()
        r.timestamp=timestamp
Dmitry Volodin's avatar
Dmitry Volodin committed
605
        r.ip=ip
606
607
608
609
610
        for k,v in body.items():
            i=r.body.add()
            i.key=str(k)
            i.value=str(v)
        self.sae_stream.proxy.event(r,on_event_callback)
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
    ##
    ##
    ##
    def queue_pm_data(self,pm_data):
        self.pm_data_queue+=pm_data
    ##
    ##
    ##
    def queue_pm_result(self,pm_result):
        self.pm_result_queue+=pm_result
    ##
    ## Send collected PM data to the SAE
    ##
    def send_pm_data(self):
        def pm_data_callback(transaction,response=None,error=None):
            if error:
                logging.error("pm_data failed: %s"%error)
        r=PMDataRequest()
        for probe_name,probe_type,timestamp,service,result,message in self.pm_result_queue:
            d=r.result.add()
            d.probe_name=probe_name
            d.probe_type=probe_type
            d.timestamp=timestamp
            d.service=service
            d.result=result
            d.message=message
        self.pm_result_queue=[]
        for name,timestamp,is_null,value in self.pm_data_queue:
            d=r.data.add()
            d.name=name
            d.timestamp=timestamp
            d.is_null=is_null
            d.value=value
        self.pm_data_queue=[]
        self.sae_stream.proxy.pm_data(r,pm_data_callback)
646
647
648
649
650
651
652
653
654
655
656
657
    # Signal handlers

    # SIGUSR1 returns process info
    def SIGUSR1(self,signo,frame):
        s=[
            ["factory.sockets",len(self.factory)],
        ]
        if self.sae_stream:
            s+=self.sae_stream.stats
        logging.info("STATS:")
        for n,v in s:
            logging.info("%s: %s"%(n,v))
658
659
660
661
662
663
664
665
666
667
    # SIGCHLD: Zombie hunting
    def SIGCHLD(self,signo,frame):
        while True:
            try:
                pid,status=os.waitpid(-1,os.WNOHANG)
            except:
                break
            if pid:
                logging.debug("Zombie pid=%d is hunted and mercilessly killed"%pid)
            else:
Dmitry Volodin's avatar
Dmitry Volodin committed
668
                break