256 lines
8.9 KiB
Python
256 lines
8.9 KiB
Python
# coding=utf-8
|
|
from __future__ import absolute_import
|
|
|
|
import octoprint.plugin
|
|
from octoprint.users import FilebasedUserManager, User
|
|
from octoprint.settings import settings
|
|
import ldap
|
|
import ldap3
|
|
import ldap3.utils.dn
|
|
import ssl
|
|
import uuid
|
|
|
|
LDAP_SERVER = ldap3.Server("pool.ldap.ehlab.uk", tls=ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2))
|
|
LDAP_BASE = 'dc=edinburghhacklab,dc=com'
|
|
LDAP_GROUPS = ["octoprint"]
|
|
|
|
|
|
|
|
|
|
class LDAPUserManager(FilebasedUserManager,
|
|
octoprint.plugin.SettingsPlugin,
|
|
octoprint.plugin.TemplatePlugin):
|
|
|
|
#Login phase :
|
|
# - findUser called, if it return a user
|
|
# - chaeckPassword called, if it return True
|
|
# - login_user called with User returned by previous findUser
|
|
|
|
def ldapify_groups(self, groups):
|
|
output = []
|
|
for group in groups:
|
|
output.append('cn={},ou=Groups,ou=People,dc=edinburghhacklab,dc=com'.format(group))
|
|
return output
|
|
|
|
|
|
def check_auth(self, username, password):
|
|
if username in [None, ''] or password in [None, '']:
|
|
return None
|
|
ldap_conn = ldap3.Connection(LDAP_SERVER, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND)
|
|
ldap_conn.search(search_base=LDAP_BASE,
|
|
search_filter='(&(objectClass=account)(uid={}))'.format(username),
|
|
search_scope=ldap3.SUBTREE,
|
|
attributes=['uid', 'memberOf'])
|
|
if len(ldap_conn.response) > 0:
|
|
dn = ldap_conn.response[0]['dn']
|
|
try:
|
|
bind_conn = ldap3.Connection(LDAP_SERVER, user=dn, password=password, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND)
|
|
if bind_conn:
|
|
return ldap_conn.response[0]['attributes']
|
|
except ldap3.core.exceptions.LDAPBindError:
|
|
pass
|
|
return None
|
|
|
|
def checkPassword(self, username, password):
|
|
try:
|
|
#connection = self.getLDAPClient()
|
|
|
|
#username = self.escapeLDAP(username)
|
|
#dn = self.findLDAPUser(username)
|
|
#if dn is None:
|
|
# return False
|
|
#connection.bind_s(dn, password)
|
|
#connection.unbind_s()
|
|
|
|
data = self.check_auth(username, password)
|
|
|
|
for group in self.ldapify_groups(LDAP_GROUPS):
|
|
if group in data.get('memberOf', []):
|
|
user = FilebasedUserManager.findUser(self, username)
|
|
if not user:
|
|
self._logger.debug("Add new user")
|
|
self.addUser(username, str(uuid.uuid4()), True)
|
|
return True
|
|
else:
|
|
self._logger.error("LDAP-CAMERON: user or password incorrect.")
|
|
return False
|
|
|
|
except ldap.INVALID_CREDENTIALS:
|
|
self._logger.error("LDAP : Your username or password is incorrect.")
|
|
return FilebasedUserManager.checkPassword(self, username, password)
|
|
except Exception as e:
|
|
if type(e.message) == dict:
|
|
for (k, v) in e.message.iteritems():
|
|
self._logger.error("%s: %sn" % (k, v))
|
|
else:
|
|
self._logger.error(e.message)
|
|
return False
|
|
|
|
def changeUserPassword(self, username, password):
|
|
#Changing password of LDAP users is not allowed
|
|
if FilebasedUserManager.findUser(self, username) is not None:
|
|
return FilebasedUserManager.changeUserPassword(self, username, password)
|
|
|
|
def findUser(self, userid=None, apikey=None, session=None):
|
|
local_user = FilebasedUserManager.findUser(self, userid, apikey, session)
|
|
#If user not exists in local database, search it on LDAP
|
|
if userid and not local_user:
|
|
#Return a fake user instance
|
|
return User(userid, str(uuid.uuid4()), True, ["user"])
|
|
|
|
else :
|
|
self._logger.debug("Local user found")
|
|
return local_user
|
|
|
|
def findLDAPUser(self, userid):
|
|
ldap_search_base = settings().get(["accessControl", "ldap_search_base"])
|
|
#groups = settings().get(["accessControl", "groups"])
|
|
groups = "blah"
|
|
userid = self.escapeLDAP(userid)
|
|
|
|
if ldap_search_base is None:
|
|
self._logger.error("LDAP conf error")
|
|
return None
|
|
|
|
try:
|
|
connection = self.getLDAPClient()
|
|
|
|
#verify user)
|
|
result = connection.search_s(ldap_search_base, ldap.SCOPE_SUBTREE, "uid=" + userid)
|
|
if result is None or len(result) == 0:
|
|
return None
|
|
self._logger.error("LDAP-AUTH: User found!")
|
|
|
|
#check group(s)
|
|
if groups is not None:
|
|
self._logger.error("LDAP-AUTH: Checking Groups...")
|
|
group_filter = ""
|
|
if "," in groups:
|
|
group_list = groups.split(",")
|
|
group_filter = "(|"
|
|
for g in group_list:
|
|
group_filter = group_filter + "(cn=%s)" % g
|
|
group_filter = group_filter + ")"
|
|
else:
|
|
group_filter = "(cn=%s)" % groups
|
|
|
|
group_filter = "(cn=octoprint,ou=Groups,ou=People,dc=edinburghhacklab,dc=com)"
|
|
|
|
#query = "(&(objectClass=groupOfNames)%s(uid=%s))" % (group_filter, userid)
|
|
|
|
query = '(&(objectClass=groupOfNames)(cn=octoprint)(member="uid=%s,ou=Accounts,ou=People,dc=edinburghhacklab,dc=com"))' % userid
|
|
|
|
self._logger.error("LDAP-AUTH QUERY:" + query)
|
|
group_result = connection.search_s("ou=Groups,ou=People,dc=edinburghhacklab,dc=com", ldap.SCOPE_SUBTREE, query)
|
|
|
|
if group_result is None or len(group_result) == 0:
|
|
print("LDAP-AUTH: Group not found")
|
|
return None
|
|
|
|
self._logger.error("LDAP-AUTH: Group matched!")
|
|
|
|
#disconnect
|
|
connection.unbind_s()
|
|
|
|
#Get the DN of first user found
|
|
dn, data = result[0]
|
|
return dn
|
|
|
|
except ldap.NO_SUCH_OBJECT:
|
|
self._logger.error("LDAP-AUTH: NO_SUCH_OBJECT")
|
|
return None
|
|
|
|
except ldap.LDAPError, e:
|
|
if type(e.message) == dict:
|
|
for (k, v) in e.message.iteritems():
|
|
self._logger.error("%s: %sn" % (k, v))
|
|
else:
|
|
self._logger.error(e.message)
|
|
return None
|
|
|
|
def escapeLDAP(self, str):
|
|
reservedStrings = ['+','=','\\','\r','\n','#',',','>','<','"',';']
|
|
for ch in reservedStrings:
|
|
if ch in str:
|
|
str = str.replace(ch, '\\' + ch)
|
|
return str
|
|
|
|
def getLDAPClient(self):
|
|
ldap_server = settings().get(["accessControl", "ldap_uri"])
|
|
ldap_verifypeer = settings().get(["accessControl", "ldap_tls_reqcert"])
|
|
if ldap_server is None:
|
|
self._logger.error("LDAP conf error")
|
|
Exception("LDAP conf error, server is missing")
|
|
|
|
connection = ldap.initialize(ldap_server)
|
|
if (ldap_server.startswith('ldaps://')):
|
|
verifypeer = ldap.OPT_X_TLS_NEVER
|
|
if ldap_verifypeer == 'demand':
|
|
verifypeer = ldap.OPT_X_TLS_DEMAND
|
|
connection.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, verifypeer)
|
|
try:
|
|
connection.start_tls_s()
|
|
except:
|
|
pass
|
|
|
|
return connection
|
|
|
|
# Softwareupdate hook
|
|
|
|
def get_update_information(self):
|
|
return dict(
|
|
filamentmanager=dict(
|
|
displayName="Auth LDAP",
|
|
displayVersion=self._plugin_version,
|
|
|
|
# version check: github repository
|
|
type="github_release",
|
|
user="gillg",
|
|
repo="OctoPrint-LDAP",
|
|
current=self._plugin_version,
|
|
|
|
# update method: pip
|
|
pip="https://github.com/gillg/OctoPrint-LDAP/archive/{target_version}.zip"
|
|
)
|
|
)
|
|
|
|
# UserManager hook
|
|
|
|
def ldap_user_factory(components, settings, *args, **kwargs):
|
|
return LDAPUserManager()
|
|
|
|
# SettingsPlugin
|
|
|
|
def get_settings_defaults(self):
|
|
return dict(
|
|
accessControl=dict(
|
|
ldap_uri=None,
|
|
ldap_tls_reqcert='demand',
|
|
ldap_search_base=None,
|
|
groups=None
|
|
)
|
|
)
|
|
|
|
# TemplatePlugin
|
|
|
|
def get_template_configs(self):
|
|
return [
|
|
dict(type="settings", template="settings.jinja2")
|
|
]
|
|
|
|
|
|
__plugin_name__ = "Auth LDAP"
|
|
|
|
def __plugin_load__():
|
|
global __plugin_implementation__
|
|
__plugin_implementation__ = LDAPUserManager()
|
|
|
|
global __plugin_hooks__
|
|
__plugin_hooks__ = {
|
|
"octoprint.users.factory": __plugin_implementation__.ldap_user_factory,
|
|
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information,
|
|
}
|
|
|
|
|
|
#@TODO Command clean LDAP users deleted
|