var/opt/nydus/ops/customer_local_ops/control_panel/linux_ispconfig.py000064400000012336147205655150022425 0ustar00# -*- coding: utf-8 -*- import logging import re from typing import Dict, Any from mysql.connector import connection from customer_local_ops import OpType from customer_local_ops.operating_system.linux import Linux, CentOS, CentOS6, CentOS7, Debian, Debian8, Ubuntu1604 from customer_local_ops.control_panel.ispconfig import OSISPConfig LOG = logging.getLogger(__name__) class LinuxISPConfig(Linux, OSISPConfig): """ ISPConfig Customer Local Ops for the Linux OS. All function names should contain 'ispconfig' so as not to override the OS ops """ op_type = OpType.CONTROL_PANEL_OPERATING_SYSTEM os_op_instance = None def change_ispconfig_hostname(self, payload: Dict[str, Any]) -> None: """Changes the server hostname for ispconfig :param payload: A dict containing input data """ new_hostname = payload['hostname'] self._change_hostname_in_db(new_hostname) def _change_hostname_in_db(self, new_hostname: str) -> None: """Changes the server hostname for ispconfig in the database :param new_hostname: The new hostname string """ cnx = self._get_database_connection() cursor = cnx.cursor() try: cursor.execute('select config from server where server_id = 1') config = cursor.fetchone()[0] config = self._replace_hostname_in_config(config, new_hostname) cursor.execute('UPDATE server SET config = %s WHERE server_id = 1', (config, )) cnx.commit() finally: cursor.close() cnx.close() def _get_database_connection(self) -> connection.MySQLConnection: """Returns an ispconfig MySQL database connection""" sql_username = self._get_sql_username() sql_password = self._get_sql_password() cnx = connection.MySQLConnection(user=sql_username, password=sql_password, host='127.0.0.1', database='dbispconfig') return cnx def _replace_hostname_in_config(self, config: str, new_hostname: str) -> str: """Replaces hostname in ispconfig configuration and returns the new configuration string :param config: Configuration string in which to replace the hostname :param new_hostname: The new hostname string """ pattern = r'hostname=(?P[\w\-.]*)' match = re.search(pattern, config) old_hostname = match.groupdict()['hostname'] config = config.replace(old_hostname, new_hostname) return config def change_ispconfig_password(self, payload: Dict[str, Any], op_name: str) -> None: """Changes the user password for ispconfig :param payload: A dict containing input data :param op_name: The name of the op """ new_password = self._get_new_password(payload) self._change_password_in_db(new_password) def _change_password_in_db(self, new_password: str) -> None: """Changes the server password for ispconfig in the database :param new_password: The new password string """ cnx = self._get_database_connection() cursor = cnx.cursor() try: cursor.execute('UPDATE sys_user SET passwort = md5(%s) WHERE username = %s', (new_password, 'admin')) cnx.commit() finally: cursor.close() cnx.close() def _get_new_password(self, payload: Dict[str, Any]) -> str: """Decrypts the new password from an encrypted string :param payload: A dict containing input data """ encrypted_password = payload['encrypted_password'] new_password = self.decrypt(encrypted_password) return new_password def _get_sql_password(self) -> str: """Returns the MySQL database password from configuration""" return self._get_config_value('db_password') def _get_sql_username(self) -> str: """Returns the MySQL database user from configuration""" return self._get_config_value('db_user') def _get_config_value(self, key: str) -> str: """Retrieves a value from configuration from its key :param key: The configuration key for the value """ with open('/usr/local/ispconfig/interface/lib/config.inc.php', encoding='utf-8') as f: ispConfig = f.read() pattern = re.escape(r"$conf['{0}']".format(key)) + \ r'\s*=\s*[\'"](?P[^"\']*)["\']' match = re.search(pattern, ispConfig) value = match.groupdict()['value'] return value class CentOSISPConfig(CentOS, LinuxISPConfig): def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class CentOS6ISPConfig(CentOS6, CentOSISPConfig): # pylint: disable=too-many-ancestors pass class CentOS7ISPConfig(CentOS7, CentOSISPConfig): # pylint: disable=too-many-ancestors pass class DebianISPConfig(Debian, LinuxISPConfig): pass class Debian8ISPConfig(Debian8, DebianISPConfig): # pylint: disable=too-many-ancestors def __init__(self, *args, **kwargs): # pylint: disable=super-init-not-called raise NotImplementedError class Ubuntu1604ISPConfig(Ubuntu1604, DebianISPConfig): # pylint: disable=too-many-ancestors pass