Commit be9f247f authored by Andrey Vertiprahov's avatar Andrey Vertiprahov
Browse files

Merge branch 'noc-patch-wf' into 'microservices'

Workflow

See merge request !235
parents a380ebdb 63c74d3a
Pipeline #2839 passed with stage
in 2 minutes and 9 seconds
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Workflow maintenance
# ---------------------------------------------------------------------
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ---------------------------------------------------------------------
# Python modules
from __future__ import print_function
import argparse
# NOC modules
from noc.core.management.base import BaseCommand
from noc.models import get_model
from noc.wf.models.wfmigration import WFMigration
class Command(BaseCommand):
help = "Workflow maintenance"
PROFILE_MAP = {
"crm.SubscriberProfile": "crm.Subscriber",
"crm.SupplierProfile": "crm.Supplier"
}
def add_arguments(self, parser):
subparsers = parser.add_subparsers(dest="cmd")
# extract command
migrate_parser = subparsers.add_parser("migrate")
migrate_parser.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
help="Dump statistics. Do not perform updates"
)
migrate_parser.add_argument(
"--migration",
required=True,
help="Migration name"
)
migrate_parser.add_argument(
"--profile",
required=True,
help="Profile model"
)
migrate_parser.add_argument(
"profiles",
help="Profile ids",
nargs=argparse.REMAINDER
)
def handle(self, cmd, *args, **options):
return getattr(self, "handle_%s" % cmd)(*args, **options)
def handle_migrate(self, dry_run=False, migration=None, profile=None,
profiles=None, *args, **kwargs):
if profile not in self.PROFILE_MAP:
self.die("Invalid profile %s. Possible profiles:\n%s" % (
profile, "\n".join(self.PROFILE_MAP)))
wfm = WFMigration.objects.filter(name=migration).first()
if not wfm:
self.die("Invalid migration %s" % wfm.name)
pmodel = get_model(profile)
imodel = get_model(self.PROFILE_MAP[profile])
for pid in profiles:
p = pmodel.get_by_id(pid)
if not p:
self.die("Profile %s is not found" % pid)
self.print("Migrating profile %s" % p)
tr = wfm.get_translation_map(p.workflow)
if not tr:
self.print("No translations")
continue
for ostate in tr:
c = imodel.objects.filter(state=ostate.id).count()
self.print(" %s -> %s: %d records" % (ostate, tr[ostate], c))
if c and not dry_run:
for o in imodel.objects.filter(state=ostate.id):
o.set_state(tr[ostate])
if __name__ == "__main__":
Command().run()
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# @workflow decorator
# ----------------------------------------------------------------------
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import logging
# NOC modules
from noc.models import is_document
logger = logging.getLogger(__name__)
def fire_event(self, event):
"""
Perform transition using event name
:param event: event name
:return:
"""
self.state.fire_event(event, self)
def fire_transition(self, transition):
"""
Perform transition
:param transition: Transition instance
:return:
"""
self.state.fire_transition(transition, self)
def document_set_state(self, state):
"""
Set state
* Set field
* Perform database update
* Invalidate caches
* Call State on_enter_handlers
:param self:
:param object:
:return:
"""
# Set field
self.state = state
# Update database directly
# to avoid full save
self._get_collection().update_one({
"_id": self.id
}, {
"$set": {
"state": state.id
}
})
# Invalidate caches
ic_handler = getattr(self, "invalidate_caches", None)
if ic_handler:
ic_handler()
# Call state on_enter_handlers
self.state.on_enter_state(self)
def model_set_state(self, state):
"""
Set state
* Set field
* Perform database update
* Invalidate caches
* Call State on_enter_handlers
:param self:
:param object:
:return:
"""
# Set field
self.state = state
# Update database directly
# to avoid full save
self.objects.filter(id=self.id).update(state=str(state.id))
# Invalidate caches
ic_handler = getattr(self, "invalidate_caches", None)
if ic_handler:
ic_handler()
# Call state on_enter_handlers
self.state.on_enter_state(self)
def _on_document_post_save(sender, document, *args, **kwargs):
if document.state is None:
# No state, set default one
# Get workflow
profile = getattr(document, getattr(document, "PROFILE_LINK", "profile"))
if not profile:
logger.debug("[%s] Cannot set default state: No profile", document)
return
new_state = profile.workflow.get_default_state()
if not new_state:
logger.debug(
"[%s] Cannot set default state: No default state for workflow %s",
document, profile.workflow.name)
return
logger.debug("[%s] Set initial state to '%s'",
document, new_state.name)
document.set_state(new_state)
def _on_model_post_save(sender, instance, *args, **kwargs):
if instance.state is None:
# No state, set default one
# Get workflow
profile = getattr(instance, getattr(instance, "PROFILE_LINK", "profile"))
if not profile:
logger.debug("[%s] Cannot set default state: No profile", instance)
return
new_state = profile.workflow.get_default_state()
if not new_state:
logger.debug(
"[%s] Cannot set default state: No default state for workflow %s",
instance, profile.workflow.name)
return
logger.debug("[%s] Set initial state to '%s'",
instance, new_state.name)
instance.set_state(new_state)
def workflow(cls):
"""
@workflow decorator denotes models which have .state
field referring to WF State.
Methods contributed to class:
* set_state - change .state field with calling State.on_state_enter
* fire_event - Perform transition using event name
* fire_transition - Perform transition
:return:
"""
cls.fire_event = fire_event
cls.fire_transition = fire_transition
if is_document(cls):
# MongoEngine model
from mongoengine import signals as mongo_signals
cls.set_state = document_set_state
mongo_signals.post_save.connect(
_on_document_post_save,
sender=cls
)
else:
# Django model
from django.db.models import signals as django_signals
cls.set_state = model_set_state
django_signals.post_save.connect(
_on_model_post_save,
sender=cls
)
cls.fire_transition = fire_transition
cls.fire_event = fire_event
return cls
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# Transition Job Wrapper Class
# ----------------------------------------------------------------------
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import logging
# NOC modules
from noc.core.handler import get_handler
from noc.models import get_object
logger = logging.getLogger(__name__)
def state_job(handler, model, object):
"""
State.job_handler wrapper
:param handler:
:param model:
:param object:
:return:
"""
# Resolve handler
h = get_handler(handler)
if not h:
logger.error("Invalid handler %s", handler)
return
# Resolve object
obj = get_object(model, object)
if not obj:
logger.error("Cannot dereference %s:%s", model, object)
# Call handler
h(obj)
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# Set workflow
# ----------------------------------------------------------------------
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
import bson
# NOC modules
from noc.lib.nosql import get_db
class Migration(object):
depends_on = [
("wf", "0001_default_wf")
]
def forwards(self):
db = get_db()
wf = bson.ObjectId("5a1d078e1bb627000151a17d")
state = bson.ObjectId("5a1d07b41bb627000151a18b")
db["noc.supplierprofiles"].update_many({}, {
"$set": {
"workflow": wf
}
})
db["noc.subscriberprofiles"].update_many({}, {
"$set": {
"workflow": wf
}
})
db["noc.subscribers"].update_many({}, {
"$set": {
"state": state
}
})
db["noc.suppliers"].update_many({}, {
"$set": {
"state": state
}
})
def backwards(self):
pass
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# Initialize bi_id fields
# ----------------------------------------------------------------------
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Third-party modules
from pymongo import UpdateOne
import bson
# NOC modules
from noc.core.bi.decorator import bi_hash
from noc.lib.nosql import get_db
MONGO_CHUNK = 500
class Migration(object):
def forwards(self):
# Update mongodb collections
mdb = get_db()
for coll_name in ("noc.subscribers", "noc.suppliers",
"noc.subscriberprofiles", "noc.supplierprofiles"):
coll = mdb[coll_name]
updates = []
for d in coll.find({
"bi_id": {
"$exists": False
}
}, {
"_id": 1
}):
updates += [
UpdateOne({
"_id": d["_id"]
}, {
"$set": {
"bi_id": bson.Int64(bi_hash(d["_id"]))
}
})
]
if len(updates) >= MONGO_CHUNK:
coll.bulk_write(updates)
updates = []
if updates:
coll.bulk_write(updates)
......@@ -2,29 +2,45 @@
# ----------------------------------------------------------------------
# Subscriber
# ----------------------------------------------------------------------
# Copyright (C) 2007-2016 The NOC Project
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from __future__ import absolute_import
from threading import Lock
import operator
# Third-party modules
from mongoengine.document import Document
from mongoengine.fields import StringField, ReferenceField, ListField
from mongoengine.fields import StringField, ListField, LongField
import cachetools
# NOC modules
from subscriberprofile import SubscriberProfile
from .subscriberprofile import SubscriberProfile
from noc.main.models.remotesystem import RemoteSystem
from noc.lib.nosql import PlainReferenceField
from noc.wf.models.state import State
from noc.core.wf.decorator import workflow
from noc.core.bi.decorator import bi_sync
id_lock = Lock()
@bi_sync
@workflow
class Subscriber(Document):
meta = {
"collection": "noc.subscribers",
"indexes": [
"name"
]
],
"strict": False,
"auto_create_index": False
}
name = StringField()
description = StringField()
profile = ReferenceField(SubscriberProfile)
profile = PlainReferenceField(SubscriberProfile)
state = PlainReferenceField(State)
# Main address
address = StringField()
# Technical contacts
......@@ -33,9 +49,18 @@ class Subscriber(Document):
tags = ListField(StringField())
# Integration with external NRI and TT systems
# Reference to remote system object has been imported from
remote_system = ReferenceField(RemoteSystem)
remote_system = PlainReferenceField(RemoteSystem)
# Object id in remote system
remote_id = StringField()
# Object id in BI
bi_id = LongField(unique=True)
_id_cache = cachetools.TTLCache(maxsize=100, ttl=60)
def __unicode__(self):
return self.name
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id):
return Subscriber.objects.filter(id=id).first()
......@@ -2,21 +2,28 @@
# ----------------------------------------------------------------------
# Subscriber Profile
# ----------------------------------------------------------------------
# Copyright (C) 2007-2016 The NOC Project
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import operator
from threading import Lock
# Third-party modules
from mongoengine.document import Document
from mongoengine.fields import StringField, ListField, IntField
from mongoengine.fields import StringField, ListField, IntField, LongField
import cachetools
# NOC models
from noc.lib.nosql import ForeignKeyField
from noc.lib.nosql import ForeignKeyField, PlainReferenceField
from noc.main.models.remotesystem import RemoteSystem
from noc.main.models.style import Style
from noc.wf.models.workflow import Workflow
from noc.core.bi.decorator import bi_sync
id_lock = Lock()
@bi_sync
class SubscriberProfile(Document):
meta = {
"collection": "noc.subscriberprofiles",
......@@ -27,11 +34,19 @@ class SubscriberProfile(Document):
name = StringField(unique=True)
description = StringField()
style = ForeignKeyField(Style, required=False)
workflow = PlainReferenceField(Workflow)
# FontAwesome glyph
glyph = StringField()
tags = ListField(StringField())
# Alarm weight
weight = IntField(default=0)
# Integration with external NRI and TT systems
# Reference to remote system object has been imported from
remote_system = PlainReferenceField(RemoteSystem)
# Object id in remote system
remote_id = StringField()
# Object id in BI
bi_id = LongField(unique=True)
_id_cache = cachetools.TTLCache(maxsize=100, ttl=60)
......@@ -39,6 +54,6 @@ class SubscriberProfile(Document):
return self.name
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"))
def get_by_id(self, id):
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id):
return SubscriberProfile.objects.filter(id=id).first()
......@@ -2,28 +2,62 @@
# ----------------------------------------------------------------------
# Supplier
# ----------------------------------------------------------------------
# Copyright (C) 2007-2016 The NOC Project
# Copyright (C) 2007-2017 The NOC Project
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
from __future__ import absolute_import
from threading import Lock
import operator
# Third-party modules
from mongoengine.document import Document
from mongoengine.fields import (StringField, ReferenceField, ListField,
BooleanField)
from mongoengine.fields import StringField, ListField, BooleanField, LongField
import cachetools
# NOC modules
from supplierprofile import SupplierProfile
from .supplierprofile import SupplierProfile
from noc.main.models.remotesystem import RemoteSystem
from noc.lib.nosql import PlainReferenceField
from noc.wf.models.state import State
from noc.core.wf.decorator import workflow
from noc.core.bi.decorator import bi_sync
id_lock = Lock()
@bi_sync
@workflow
class Supplier(Document):
meta = {
"collection": "noc.suppliers"
"collection": "noc.suppliers",
"indexes": [
"name"
],
"strict": False,
"auto_create_index": False
}
name = StringField()
description = StringField()
is_affilated = BooleanField(default=False)
profile = ReferenceField(SupplierProfile)
profile = PlainReferenceField(SupplierProfile)
state = PlainReferenceField(State)
# Integration with external NRI and TT systems
# Reference to remote system object has been imported from
remote_system = PlainReferenceField(RemoteSystem)
# Object id in remote system
remote_id = StringField()
# Object id in BI
bi_id = LongField(unique=True)
tags = ListField(StringField())
_id_cache = cachetools.TTLCache(maxsize=100, ttl=60)
def __unicode__(self):
return self.name
@classmethod
@cachetools.cachedmethod(operator.attrgetter("_id_cache"), lock=lambda _: id_lock)
def get_by_id(cls, id):
return Supplier.objects.filter(id=id).first()
......@@ -6,14 +6,24 @@
# See LICENSE for details
# ----------------------------------------------------------------------
# Python modules
import operator
from threading import Lock
# Third-party modules
from mongoengine.document import Document
from mongoengine.fields import StringField, ListField
from mongoengine.fields import StringField, ListField, LongField
import cachetools
# NOC modules
from noc.lib.nosql import ForeignKeyField
from noc.lib.nosql import ForeignKeyField, PlainReferenceField
from noc.main.models.remotesystem import RemoteSystem
from noc.main.models.style import Style
from noc.wf.models.workflow import Workflow
from noc.core.bi.decorator import bi_sync
id_lock = Lock()
@bi_sync
class SupplierProfile(Document):
meta = {
"collection": "noc.supplierprofiles",
......@@ -23,8 +33,23 @@ class SupplierProfile(Document):
name = StringField(unique=True)
description = StringField()
workflow = PlainReferenceField(Workflow)
style = ForeignKeyField(Style, required=False)
tags = ListField(StringField())
# Integration with external NRI and TT systems
# Reference to remote system object has been imported from
remote_system = PlainReferenceField(RemoteSystem)