# coding=utf-8 from __future__ import absolute_import import octoprint.plugin from octoprint.users import FilebasedUserManager, User from octoprint.settings import settings import ldap import ldap3 import ldap3.utils.dn import ssl import uuid LDAP_SERVER = ldap3.Server("pool.ldap.ehlab.uk", tls=ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2)) LDAP_BASE = 'dc=edinburghhacklab,dc=com' LDAP_GROUPS = ["octoprint"] class LDAPUserManager(FilebasedUserManager, octoprint.plugin.SettingsPlugin, octoprint.plugin.TemplatePlugin): #Login phase : # - findUser called, if it return a user # - chaeckPassword called, if it return True # - login_user called with User returned by previous findUser def ldapify_groups(self, groups): output = [] for group in groups: output.append('cn={},ou=Groups,ou=People,dc=edinburghhacklab,dc=com'.format(group)) return output def check_auth(self, username, password): if username in [None, ''] or password in [None, '']: return None ldap_conn = ldap3.Connection(LDAP_SERVER, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND) ldap_conn.search(search_base=LDAP_BASE, search_filter='(&(objectClass=account)(uid={}))'.format(username), search_scope=ldap3.SUBTREE, attributes=['uid', 'memberOf']) if len(ldap_conn.response) > 0: dn = ldap_conn.response[0]['dn'] try: bind_conn = ldap3.Connection(LDAP_SERVER, user=dn, password=password, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND) if bind_conn: return ldap_conn.response[0]['attributes'] except ldap3.core.exceptions.LDAPBindError: pass return None def checkPassword(self, username, password): try: #connection = self.getLDAPClient() #username = self.escapeLDAP(username) #dn = self.findLDAPUser(username) #if dn is None: # return False #connection.bind_s(dn, password) #connection.unbind_s() data = self.check_auth(username, password) for group in self.ldapify_groups(LDAP_GROUPS): if group in data.get('memberOf', []): user = FilebasedUserManager.findUser(self, username) if not user: self._logger.debug("Add new user") self.addUser(username, str(uuid.uuid4()), True) return True else: self._logger.error("LDAP-CAMERON: user or password incorrect.") return False except ldap.INVALID_CREDENTIALS: self._logger.error("LDAP : Your username or password is incorrect.") return FilebasedUserManager.checkPassword(self, username, password) except Exception as e: if type(e.message) == dict: for (k, v) in e.message.iteritems(): self._logger.error("%s: %sn" % (k, v)) else: self._logger.error(e.message) return False def changeUserPassword(self, username, password): #Changing password of LDAP users is not allowed if FilebasedUserManager.findUser(self, username) is not None: return FilebasedUserManager.changeUserPassword(self, username, password) def findUser(self, userid=None, apikey=None, session=None): local_user = FilebasedUserManager.findUser(self, userid, apikey, session) #If user not exists in local database, search it on LDAP if userid and not local_user: #Return a fake user instance return User(userid, str(uuid.uuid4()), True, ["user"]) else : self._logger.debug("Local user found") return local_user def findLDAPUser(self, userid): ldap_search_base = settings().get(["accessControl", "ldap_search_base"]) #groups = settings().get(["accessControl", "groups"]) groups = "blah" userid = self.escapeLDAP(userid) if ldap_search_base is None: self._logger.error("LDAP conf error") return None try: connection = self.getLDAPClient() #verify user) result = connection.search_s(ldap_search_base, ldap.SCOPE_SUBTREE, "uid=" + userid) if result is None or len(result) == 0: return None self._logger.error("LDAP-AUTH: User found!") #check group(s) if groups is not None: self._logger.error("LDAP-AUTH: Checking Groups...") group_filter = "" if "," in groups: group_list = groups.split(",") group_filter = "(|" for g in group_list: group_filter = group_filter + "(cn=%s)" % g group_filter = group_filter + ")" else: group_filter = "(cn=%s)" % groups group_filter = "(cn=octoprint,ou=Groups,ou=People,dc=edinburghhacklab,dc=com)" #query = "(&(objectClass=groupOfNames)%s(uid=%s))" % (group_filter, userid) query = '(&(objectClass=groupOfNames)(cn=octoprint)(member="uid=%s,ou=Accounts,ou=People,dc=edinburghhacklab,dc=com"))' % userid self._logger.error("LDAP-AUTH QUERY:" + query) group_result = connection.search_s("ou=Groups,ou=People,dc=edinburghhacklab,dc=com", ldap.SCOPE_SUBTREE, query) if group_result is None or len(group_result) == 0: print("LDAP-AUTH: Group not found") return None self._logger.error("LDAP-AUTH: Group matched!") #disconnect connection.unbind_s() #Get the DN of first user found dn, data = result[0] return dn except ldap.NO_SUCH_OBJECT: self._logger.error("LDAP-AUTH: NO_SUCH_OBJECT") return None except ldap.LDAPError, e: if type(e.message) == dict: for (k, v) in e.message.iteritems(): self._logger.error("%s: %sn" % (k, v)) else: self._logger.error(e.message) return None def escapeLDAP(self, str): reservedStrings = ['+','=','\\','\r','\n','#',',','>','<','"',';'] for ch in reservedStrings: if ch in str: str = str.replace(ch, '\\' + ch) return str def getLDAPClient(self): ldap_server = settings().get(["accessControl", "ldap_uri"]) ldap_verifypeer = settings().get(["accessControl", "ldap_tls_reqcert"]) if ldap_server is None: self._logger.error("LDAP conf error") Exception("LDAP conf error, server is missing") connection = ldap.initialize(ldap_server) if (ldap_server.startswith('ldaps://')): verifypeer = ldap.OPT_X_TLS_NEVER if ldap_verifypeer == 'demand': verifypeer = ldap.OPT_X_TLS_DEMAND connection.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, verifypeer) try: connection.start_tls_s() except: pass return connection # Softwareupdate hook def get_update_information(self): return dict( filamentmanager=dict( displayName="Auth LDAP", displayVersion=self._plugin_version, # version check: github repository type="github_release", user="gillg", repo="OctoPrint-LDAP", current=self._plugin_version, # update method: pip pip="https://github.com/gillg/OctoPrint-LDAP/archive/{target_version}.zip" ) ) # UserManager hook def ldap_user_factory(components, settings, *args, **kwargs): return LDAPUserManager() # SettingsPlugin def get_settings_defaults(self): return dict( accessControl=dict( ldap_uri=None, ldap_tls_reqcert='demand', ldap_search_base=None, groups=None ) ) # TemplatePlugin def get_template_configs(self): return [ dict(type="settings", template="settings.jinja2") ] __plugin_name__ = "Auth LDAP" def __plugin_load__(): global __plugin_implementation__ __plugin_implementation__ = LDAPUserManager() global __plugin_hooks__ __plugin_hooks__ = { "octoprint.users.factory": __plugin_implementation__.ldap_user_factory, "octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information, } #@TODO Command clean LDAP users deleted