octoprint-ldap/octoprint_auth_ldap/__init__.py

256 lines
8.9 KiB
Python
Raw Normal View History

2017-11-06 23:58:01 +00:00
# coding=utf-8
from __future__ import absolute_import
import octoprint.plugin
2017-11-07 23:35:38 +00:00
from octoprint.users import FilebasedUserManager, User
2017-11-06 23:58:01 +00:00
from octoprint.settings import settings
import ldap
2019-11-11 23:43:39 +00:00
import ldap3
import ldap3.utils.dn
2019-11-11 23:49:34 +00:00
import ssl
2017-11-06 23:58:01 +00:00
import uuid
2019-11-11 23:43:39 +00:00
LDAP_SERVER = ldap3.Server("pool.ldap.ehlab.uk", tls=ldap3.Tls(validate=ssl.CERT_REQUIRED, version=ssl.PROTOCOL_TLSv1_2))
LDAP_BASE = 'dc=edinburghhacklab,dc=com'
LDAP_GROUPS = ["octoprint"]
2017-11-06 23:58:01 +00:00
class LDAPUserManager(FilebasedUserManager,
octoprint.plugin.SettingsPlugin,
octoprint.plugin.TemplatePlugin):
2017-11-07 23:35:38 +00:00
#Login phase :
# - findUser called, if it return a user
# - chaeckPassword called, if it return True
# - login_user called with User returned by previous findUser
2019-11-11 23:43:39 +00:00
def ldapify_groups(self, groups):
output = []
for group in groups:
output.append('cn={},ou=Groups,ou=People,dc=edinburghhacklab,dc=com'.format(group))
return output
def check_auth(self, username, password):
if username in [None, ''] or password in [None, '']:
return None
ldap_conn = ldap3.Connection(LDAP_SERVER, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND)
ldap_conn.search(search_base=LDAP_BASE,
search_filter='(&(objectClass=account)(uid={}))'.format(username),
search_scope=ldap3.SUBTREE,
attributes=['uid', 'memberOf'])
if len(ldap_conn.response) > 0:
dn = ldap_conn.response[0]['dn']
try:
bind_conn = ldap3.Connection(LDAP_SERVER, user=dn, password=password, auto_bind=ldap3.AUTO_BIND_TLS_BEFORE_BIND)
if bind_conn:
return ldap_conn.response[0]['attributes']
except ldap3.core.exceptions.LDAPBindError:
pass
return None
2017-11-06 23:58:01 +00:00
def checkPassword(self, username, password):
try:
2019-11-11 23:43:39 +00:00
#connection = self.getLDAPClient()
2017-11-07 23:35:38 +00:00
2019-11-11 23:43:39 +00:00
#username = self.escapeLDAP(username)
#dn = self.findLDAPUser(username)
#if dn is None:
# return False
#connection.bind_s(dn, password)
#connection.unbind_s()
2019-11-12 00:02:50 +00:00
data = self.check_auth(username, password)
2019-11-11 23:46:06 +00:00
2019-11-12 00:02:50 +00:00
for group in self.ldapify_groups(LDAP_GROUPS):
2019-11-11 23:43:39 +00:00
if group in data.get('memberOf', []):
user = FilebasedUserManager.findUser(self, username)
if not user:
self._logger.debug("Add new user")
self.addUser(username, str(uuid.uuid4()), True)
return True
else:
2019-11-11 23:46:06 +00:00
self._logger.error("LDAP-CAMERON: user or password incorrect.")
2019-11-11 23:43:39 +00:00
return False
2017-11-06 23:58:01 +00:00
except ldap.INVALID_CREDENTIALS:
2017-11-07 23:35:38 +00:00
self._logger.error("LDAP : Your username or password is incorrect.")
2017-11-06 23:58:01 +00:00
return FilebasedUserManager.checkPassword(self, username, password)
2019-11-11 23:43:39 +00:00
except Exception as e:
2017-11-06 23:58:01 +00:00
if type(e.message) == dict:
for (k, v) in e.message.iteritems():
2017-11-07 23:35:38 +00:00
self._logger.error("%s: %sn" % (k, v))
2017-11-06 23:58:01 +00:00
else:
2017-11-07 23:35:38 +00:00
self._logger.error(e.message)
2017-11-06 23:58:01 +00:00
return False
2017-11-07 23:35:38 +00:00
def changeUserPassword(self, username, password):
#Changing password of LDAP users is not allowed
if FilebasedUserManager.findUser(self, username) is not None:
return FilebasedUserManager.changeUserPassword(self, username, password)
2017-11-07 23:35:38 +00:00
def findUser(self, userid=None, apikey=None, session=None):
local_user = FilebasedUserManager.findUser(self, userid, apikey, session)
2017-11-07 23:35:38 +00:00
#If user not exists in local database, search it on LDAP
if userid and not local_user:
#Return a fake user instance
2019-11-11 23:43:39 +00:00
return User(userid, str(uuid.uuid4()), True, ["user"])
2017-11-07 23:35:38 +00:00
else :
self._logger.debug("Local user found")
2017-11-07 23:35:38 +00:00
return local_user
2017-11-06 23:58:01 +00:00
def findLDAPUser(self, userid):
ldap_search_base = settings().get(["accessControl", "ldap_search_base"])
2019-11-11 22:09:08 +00:00
#groups = settings().get(["accessControl", "groups"])
groups = "blah"
userid = self.escapeLDAP(userid)
if ldap_search_base is None:
self._logger.error("LDAP conf error")
return None
try:
connection = self.getLDAPClient()
#verify user)
result = connection.search_s(ldap_search_base, ldap.SCOPE_SUBTREE, "uid=" + userid)
if result is None or len(result) == 0:
return None
self._logger.error("LDAP-AUTH: User found!")
#check group(s)
if groups is not None:
self._logger.error("LDAP-AUTH: Checking Groups...")
group_filter = ""
if "," in groups:
group_list = groups.split(",")
group_filter = "(|"
for g in group_list:
group_filter = group_filter + "(cn=%s)" % g
group_filter = group_filter + ")"
else:
group_filter = "(cn=%s)" % groups
2019-11-11 22:48:32 +00:00
group_filter = "(cn=octoprint,ou=Groups,ou=People,dc=edinburghhacklab,dc=com)"
2019-11-11 22:09:08 +00:00
2019-11-11 23:08:27 +00:00
#query = "(&(objectClass=groupOfNames)%s(uid=%s))" % (group_filter, userid)
query = '(&(objectClass=groupOfNames)(cn=octoprint)(member="uid=%s,ou=Accounts,ou=People,dc=edinburghhacklab,dc=com"))' % userid
self._logger.error("LDAP-AUTH QUERY:" + query)
2019-11-11 23:18:50 +00:00
group_result = connection.search_s("ou=Groups,ou=People,dc=edinburghhacklab,dc=com", ldap.SCOPE_SUBTREE, query)
if group_result is None or len(group_result) == 0:
print("LDAP-AUTH: Group not found")
return None
self._logger.error("LDAP-AUTH: Group matched!")
#disconnect
connection.unbind_s()
#Get the DN of first user found
dn, data = result[0]
return dn
except ldap.NO_SUCH_OBJECT:
self._logger.error("LDAP-AUTH: NO_SUCH_OBJECT")
return None
except ldap.LDAPError, e:
if type(e.message) == dict:
for (k, v) in e.message.iteritems():
self._logger.error("%s: %sn" % (k, v))
else:
self._logger.error(e.message)
return None
def escapeLDAP(self, str):
reservedStrings = ['+','=','\\','\r','\n','#',',','>','<','"',';']
for ch in reservedStrings:
if ch in str:
str = str.replace(ch, '\\' + ch)
return str
def getLDAPClient(self):
ldap_server = settings().get(["accessControl", "ldap_uri"])
ldap_verifypeer = settings().get(["accessControl", "ldap_tls_reqcert"])
if ldap_server is None:
self._logger.error("LDAP conf error")
Exception("LDAP conf error, server is missing")
connection = ldap.initialize(ldap_server)
if (ldap_server.startswith('ldaps://')):
verifypeer = ldap.OPT_X_TLS_NEVER
if ldap_verifypeer == 'demand':
verifypeer = ldap.OPT_X_TLS_DEMAND
connection.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, verifypeer)
try:
connection.start_tls_s()
except:
pass
return connection
# Softwareupdate hook
def get_update_information(self):
return dict(
filamentmanager=dict(
displayName="Auth LDAP",
displayVersion=self._plugin_version,
# version check: github repository
type="github_release",
user="gillg",
repo="OctoPrint-LDAP",
current=self._plugin_version,
# update method: pip
pip="https://github.com/gillg/OctoPrint-LDAP/archive/{target_version}.zip"
)
)
# UserManager hook
def ldap_user_factory(components, settings, *args, **kwargs):
return LDAPUserManager()
# SettingsPlugin
def get_settings_defaults(self):
return dict(
accessControl=dict(
ldap_uri=None,
ldap_tls_reqcert='demand',
ldap_search_base=None,
groups=None
)
)
# TemplatePlugin
def get_template_configs(self):
return [
dict(type="settings", template="settings.jinja2")
]
2017-11-06 23:58:01 +00:00
__plugin_name__ = "Auth LDAP"
def __plugin_load__():
global __plugin_implementation__
__plugin_implementation__ = LDAPUserManager()
2017-11-06 23:58:01 +00:00
global __plugin_hooks__
__plugin_hooks__ = {
"octoprint.users.factory": __plugin_implementation__.ldap_user_factory,
"octoprint.plugin.softwareupdate.check_config": __plugin_implementation__.get_update_information,
2017-11-06 23:58:01 +00:00
}
#@TODO Command clean LDAP users deleted