From bb7bab5f57826693cdfdf06940f8e1810fa3b4b0 Mon Sep 17 00:00:00 2001 From: smile Date: Thu, 25 Jul 2019 17:55:34 +0300 Subject: [PATCH 1/2] Add MySQL Extractor --- core/etl/extractor/mysql.py | 72 +++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 core/etl/extractor/mysql.py diff --git a/core/etl/extractor/mysql.py b/core/etl/extractor/mysql.py new file mode 100644 index 0000000000..f884b7cf03 --- /dev/null +++ b/core/etl/extractor/mysql.py @@ -0,0 +1,72 @@ +# -*- coding: utf-8 -*- +# ---------------------------------------------------------------------- +# Mysql Data Extractor +# ---------------------------------------------------------------------- +# Copyright (C) 2007-2019 The NOC Project +# See LICENSE for details +# ---------------------------------------------------------------------- + +# Python modules +from __future__ import absolute_import +import os + +# Third-party modules +from concurrent.futures import as_completed + +# NOC modules +from noc.core.threadpool import ThreadPoolExecutor +from noc.core.etl.extractor.sql import SQLExtractor + + +class MySQLExtractor(SQLExtractor): + """ + Mysql SQL extractor. + Requres pymysql + + Configuration variables + *MYSQL_HOST* - Mysql host + *MYSQL_PORT* - Mysql port + *MYSQL_USER* - Mysql database user + *MYSQL_PASSWORD* - Mysql database password + *MYSQL_DB* - Mysql database + *MYSQL_CHARSET* - Mysql charset + """ + + def __init__(self, *args, **kwargs): + super(MySQLExtractor, self).__init__(*args, **kwargs) + self.connect = None + + def get_cursor(self): + import pymysql + + if not self.connect: + # Alter environment + old_env = os.environ.copy() # Save environment + os.environ.update(self.system.config) + # Connect to database + self.logger.info("Connecting to database") + self.connect = pymysql.connect( + host=str(self.config.get("MYSQL_HOST")), + port=int(self.config.get("MYSQL_PORT")), + user=str(self.config.get("MYSQL_USER")), + password=str(self.config.get("MYSQL_PASSWORD")), + db=str(self.config.get("MYSQL_DB")), + charset=str( + self.config.get("MYSQL_CHARSET") + if self.config.get("MYSQL_CHARSET") + else "utf8mb4" + ), + ) + + os.environ = old_env # Restore environment + cursor = self.connect.cursor() + return cursor + + def iter_data(self): + cursor = self.get_cursor() + # Fetch data + self.logger.info("Fetching data") + for query, params in self.get_sql(): + cursor.execute(query, params) + for row in cursor: + yield row -- GitLab From 5e66c993374bad896433f79ac3fa08b077735711 Mon Sep 17 00:00:00 2001 From: smile Date: Thu, 25 Jul 2019 18:02:08 +0300 Subject: [PATCH 2/2] Fix Flake8 --- core/etl/extractor/mysql.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/etl/extractor/mysql.py b/core/etl/extractor/mysql.py index f884b7cf03..e94add8aa2 100644 --- a/core/etl/extractor/mysql.py +++ b/core/etl/extractor/mysql.py @@ -10,11 +10,7 @@ from __future__ import absolute_import import os -# Third-party modules -from concurrent.futures import as_completed - # NOC modules -from noc.core.threadpool import ThreadPoolExecutor from noc.core.etl.extractor.sql import SQLExtractor -- GitLab