Commit b3109a84 authored by Dmitry Volodin's avatar Dmitry Volodin
Browse files

noc-scheduler daemon

parent 16419f88
......@@ -12,7 +12,7 @@ import datetime
from django.contrib import admin
## NOC modules
from noc.cm.repoapp import RepoApplication, HasPerm, ModelApplication, view
from noc.sa.models import TaskSchedule
from noc.main.models import Schedule
from noc.cm.models import Config
from noc.lib.app.site import site
......@@ -47,7 +47,7 @@ class ConfigAdmin(admin.ModelAdmin):
o.save()
count+=1
# Rechedule cm.config_pull
TaskSchedule.reschedule("cm.config_pull")
Schedule.reschedule("cm.config_pull")
# Notify user
if count==1:
self.message_user(request,"1 config scheduled to immediate fetch")
......
......@@ -12,12 +12,11 @@ import datetime
## Django modules
from django.db.models import Q
## NOC modules
import noc.sa.periodic
from noc.lib.periodic import Task as PeriodicTask
##
## Reduce function for cm.config_pull
##
def reduce_config_pull(task):
"""Reduce function for cm.config_pull"""
import datetime
import random
import logging
......@@ -26,55 +25,61 @@ def reduce_config_pull(task):
from noc.sa.protocols.sae_pb2 import ERR_OVERLOAD, ERR_DOWN
## Process task results
for mt in task.maptask_set.all():
c=mt.managed_object.config # Config object
r=mt.script_result
if mt.status=="C":
c = mt.managed_object.config # Config object
r = mt.script_result
if mt.status == "C":
# Completed tasks
c.write(r)
timeout=c.pull_every
status="OK"
reason="OK"
elif mt.status=="F":
timeout = c.pull_every
status = "OK"
reason = "OK"
elif mt.status == "F":
# Failed tasks
if r["code"]==ERR_OVERLOAD:
timeout=config.getint("cm","timeout_overload")
status="ERR_OVERLOAD"
elif r["code"]==ERR_DOWN:
timeout=config.getint("cm","timeout_down")
status="ERR_DOWN"
if r["code"] == ERR_OVERLOAD:
timeout = config.getint("cm", "timeout_overload")
status = "ERR_OVERLOAD"
elif r["code"] == ERR_DOWN:
timeout = config.getint("cm", "timeout_down")
status = "ERR_DOWN"
else:
timeout=config.getint("cm","timeout_error")
status="ERR_TIMEOUT"
reason=r["text"]
timeout = config.getint("cm", "timeout_error")
status = "ERR_TIMEOUT"
reason = r["text"]
else:
# Invalid state
timeout=config.getint("cm","timeout_error")
status="UNKNOWN"
reason="Timed out"
timeout = config.getint("cm", "timeout_error")
status = "UNKNOWN"
reason = "Timed out"
# Reschedule next pull
variation=config.getint("cm","timeout_variation")
timeout+=random.randint(-timeout/variation,timeout/variation) # Add jitter to avoid blocking by dead task
c.next_pull=datetime.datetime.now()+datetime.timedelta(seconds=timeout)
variation = config.getint("cm", "timeout_variation")
# Add jitter to avoid blocking by dead task
timeout += random.randint(-timeout / variation, timeout / variation)
c.next_pull = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
c.save()
logging.info("cm.config_pull: %s, status=%s, reason=%s"%(mt.managed_object.name,status,reason))
logging.info("cm.config_pull: %s, status=%s, reason=%s" % (
mt.managed_object.name, status, reason))
return True
##
## cm.config_pull periodic task
##
class Task(noc.sa.periodic.Task):
name="cm.config_pull"
description=""
class Task(PeriodicTask):
"""
cm.config_pull periodic task
"""
name = "cm.config_pull"
description = ""
default_timeout = 300
def execute(self):
# Import here to avoid circular import error
from noc.cm.models import Config
from noc.sa.models import ReduceTask
# Run Map/Reduce task
q= Q(managed_object__is_configuration_managed=True, pull_every__isnull=False)\
&(Q(next_pull__lt=datetime.datetime.now())|Q(next_pull__isnull=True))
objects=[o.managed_object for o in Config.objects.filter(q).order_by("next_pull")]
task=ReduceTask.create_task(objects, reduce_config_pull, {}, "get_config", {}, self.timeout-3) #@todo: smarter timeout calculation
q = (Q(managed_object__is_configuration_managed=True, pull_every__isnull=False) &
(Q(next_pull__lt=datetime.datetime.now()) | Q(next_pull__isnull=True)))
objects = [o.managed_object for o in Config.objects.filter(q).order_by("next_pull")]
# @todo: smarter timeout calculation
task = ReduceTask.create_task(objects, reduce_config_pull, {},
"get_config", {}, self.timeout - 3)
return task.get_result(block=True)
......@@ -5,9 +5,9 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="cm.dns_pull"
description=""
wait_for=["cm.dns_push"]
......
......@@ -5,9 +5,9 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="cm.dns_push"
description=""
wait_for=["cm.dns_pull"]
......
......@@ -5,9 +5,9 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="cm.prefix_list_pull"
description=""
wait_for=["peer.update_whois_cache","peer.prefix_list_provisioning"]
......
......@@ -5,9 +5,9 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="cm.rpsl_pull"
description=""
......
......@@ -15,7 +15,7 @@ from django.utils.safestring import SafeString
## NOC modules
from noc.lib.app import ModelApplication, HasPerm, view
from noc.dns.models import DNSZone, DNSZoneRecord, DNSServer, DNSZoneProfile
from noc.sa.models import TaskSchedule
from noc.main.models import Schedule
from noc.lib.validators import is_fqdn
from noc.lib.app.site import site
......@@ -141,7 +141,7 @@ class DNSZoneApplication(ModelApplication):
self.message_user(request, "No new zones found")
else:
# Trigger dns.update_domain_expiration to update paid_till
TaskSchedule.reschedule("dns.update_domain_expiration",
Schedule.reschedule("dns.update_domain_expiration",
minutes=10)
self.message_user(request, "%d new zones are imported" % n)
return self.response_redirect(self.base_url)
......
......@@ -7,12 +7,12 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
from noc.settings import config
from django.utils.dateformat import DateFormat
import datetime
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="dns.check_domain_expiration"
description=""
......
......@@ -7,10 +7,10 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
import datetime,re,logging
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="dns.update_domain_expiration"
description=""
......
......@@ -18,6 +18,12 @@ user = noc
group =
config = etc/noc-fcgi.conf
[noc-scheduler]
enabled = true
user = noc
group =
config = etc/noc-scheduler.conf
[noc-sae]
enabled = true
user = noc
......
##------------------------------------------------
## noc-scheduler default configuration.
##------------------------------------------------
## WARNING: Do not edit this file directly!!!
## Override settings using etc/noc-scheduler.conf
##
[main]
logfile = /var/log/noc/noc-scheduler.log
loglevel = info
logsize = 0
logfiles = 0
pidfile = /var/log/noc/noc-scheduler.pid
heartbeat= true
......@@ -7,11 +7,11 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
import datetime,logging
T_LIMIT=100
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="fm.archive"
description=""
......
......@@ -2,24 +2,39 @@
##----------------------------------------------------------------------
## Runs PING probe of all hosts
##----------------------------------------------------------------------
## Copyright (C) 2007-2009 The NOC Project
## Copyright (C) 2007-2011 The NOC Project
## See LICENSE for details
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import datetime,logging
import noc.lib.periodic
class Task(noc.sa.periodic.Task):
def reduce_ping(task):
"""Reduce script for ping_check"""
for mt in task.maptask_set.all():
if mt.status == "C":
return True
return False
class Task(noc.lib.periodic.Task):
name="fm.ping_check"
description=""
default_timeout = 30
def execute(self):
from noc.sa.models import Activator
from noc.sa.models import Activator, ReduceTask
# Look for addresses
params=[]
for a in Activator.objects.filter(is_active=True):
objects=[o.trap_source_ip for o in a.managedobject_set.filter(trap_source_ip__isnull=False,is_managed=True)]
objects=[o.trap_source_ip for o in a.managedobject_set.filter(trap_source_ip__isnull=False, is_managed=True)]
if objects:
self.sae.ping_check(a,objects)
params += [{"activator_name": a.name, "addresses": objects}]
# Run task
if params:
task = ReduceTask.create_task("SAE", reduce_ping, {},
["ping_check"] * len(params), params, self.timeout - 1)
return task.get_result(block=True)
return True
......@@ -7,7 +7,7 @@
##----------------------------------------------------------------------
"""
"""
import noc.sa.periodic
import noc.lib.periodic
import bisect
def sync_macs_reduce(task,addresses):
......@@ -54,7 +54,7 @@ def sync_macs_reduce(task,addresses):
SystemNotification.notify("ip.sync_macs",subject="MAC Syncronization Report",body="\n".join(s))
class Task(noc.sa.periodic.Task):
class Task(noc.lib.periodic.Task):
name="ip.sync_macs"
description=""
TIMEOUT=60
......
......@@ -13,35 +13,41 @@ import datetime
## NOC modules
from noc.lib.registry import Registry
##
## Registry for all periodic tasks
##
class PeriodicRegistry(Registry):
name="PeriodicRegistry"
subdir="periodics"
classname="Task"
periodic_registry=PeriodicRegistry()
"""Registry for all periodic tasks"""
name = "PeriodicRegistry"
subdir = "periodics"
classname = "Task"
periodic_registry = PeriodicRegistry()
##
## Metaclass for Task
##
class TaskBase(type):
def __new__(cls,name,bases,attrs):
m=type.__new__(cls,name,bases,attrs)
periodic_registry.register(m.name,m)
"""Metaclass for Task"""
def __new__(cls, name, bases, attrs):
m = type.__new__(cls, name, bases, attrs)
periodic_registry.register(m.name, m)
return m
##
## Task handler
##
class Task(object):
"""Task handler"""
__metaclass__ = TaskBase
name=None
description=""
wait_for=[] # A list of periodic task names which cannot be started concurrenctly
def __init__(self, sae, timeout):
self.sae=sae
self.timeout=timeout
name = None
description = ""
# A list of periodic task names which cannot be started concurrenctly
wait_for = []
# Default task timeout.
# If set to None, task has no configurable timeout,
# Otherwise it can be configured
default_timeout = None
def __init__(self, timeout=None):
if self.default_timeout:
self.timeout = timeout if timeout else self.default_timeout
else:
self.timeout = None
def execute(self):
return True
......
......@@ -13,8 +13,9 @@ import ConfigParser,os,re,pwd
##
class ConfigApplication(Application):
title="Configs"
CONFIGS=["noc.conf","noc-launcher.conf","noc-fcgi.conf","noc-sae.conf","noc-activator.conf",
"noc-classifier.conf","noc-correlator.conf","noc-notifier.conf"]
CONFIGS=["noc.conf", "noc-launcher.conf", "noc-scheduler.py",
"noc-fcgi.conf", "noc-sae.conf", "noc-activator.conf",
"noc-classifier.conf", "noc-correlator.conf", "noc-notifier.conf"]
##
## Display config index
##
......
......@@ -8,5 +8,5 @@
from noc.lib.test import ModelApplicationTestCase
from django.utils import simplejson as json
class taskscheduleTestCase(ModelApplicationTestCase):
class ScheduleTestCase(ModelApplicationTestCase):
pass
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment