Source code for baboossh.user

import hashlib
from baboossh import Db
from baboossh.utils import Unique

[docs]class User(metaclass=Unique): """A username to authenticate with on servers. Attributes: name (str): the username id (int): the User's id scope (bool): whether the User is in the scope or not found (:class:`.Endpoint`): the endpoint the user was discovered on """
[docs] def __init__(self, name): self.name = name self.id = None self.scope = True self.found = None cursor = Db.get().cursor() cursor.execute('SELECT id, scope, found FROM users WHERE username=?', (self.name, )) saved_user = cursor.fetchone() cursor.close() if saved_user is not None: self.id = saved_user[0] self.scope = saved_user[1] != 0 if saved_user[2] is not None: from baboossh import Endpoint self.found = Endpoint.find_one(endpoint_id=saved_user[2])
@classmethod def get_id(cls, name): return hashlib.sha256(name.encode()).hexdigest()
[docs] def save(self): """Save the user in database If the User object has an id it means it is already stored in database, so it is updated. Else it is inserted and the id is set in the object. """ cursor = Db.get().cursor() if self.id is not None: #If we have an ID, the user is already saved in the database : UPDATE cursor.execute('''UPDATE users SET username = ?, scope = ?, found = ? WHERE id = ?''', (self.name, self.scope, self.found.id if self.found is not None else None, self.id)) else: #The user doesn't exists in database : INSERT cursor.execute('''INSERT INTO users(username, scope, found) VALUES (?, ?, ?) ''', (self.name, self.scope, self.found.id if self.found is not None else None)) cursor.close() cursor = Db.get().cursor() cursor.execute('SELECT id FROM users WHERE username=?', (self.name, )) self.id = cursor.fetchone()[0] cursor.close() Db.get().commit()
[docs] def delete(self): """Delete a User from the :class:`.Workspace`""" from baboossh import Connection if self.id is None: return {} from baboossh.utils import unstore_targets_merge del_data = {} for connection in Connection.find_all(user=self): unstore_targets_merge(del_data, connection.delete()) cursor = Db.get().cursor() cursor.execute('DELETE FROM users WHERE id = ?', (self.id, )) cursor.close() Db.get().commit() unstore_targets_merge(del_data, {"User":[type(self).get_id(self.name)]}) return del_data
[docs] @classmethod def find_all(cls, scope=None, found=None): """Find all Users corresponding to criteria Args: scope (bool): List Users in scope (`True`), out of scope (`False`), or both (`None`) found (:class:`.Endpoint`): the `Endpoint` the users were discovered on Returns: A list of all `User` s in the :class:`.Workspace` matching the criteria """ ret = [] cursor = Db.get().cursor() if found is None: if scope is None: req = cursor.execute('SELECT username FROM users') else: req = cursor.execute('SELECT username FROM users WHERE scope=?', (scope, )) else: if scope is None: req = cursor.execute('SELECT username FROM users WHERE found=?', (found.id if found is not None else None, )) else: req = cursor.execute('SELECT username FROM users WHERE found=? AND scope=?', (found.id if found is not None else None, scope)) for row in req: ret.append(User(row[0])) cursor.close() return ret
[docs] @classmethod def find_one(cls, user_id=None, name=None): """Find a user matching the criteria Args: user_id (int): the user id to search name (str): the username to search Returns: A single `User` or `None`. """ cursor = Db.get().cursor() if user_id is not None: cursor.execute('''SELECT username FROM users WHERE id=?''', (user_id, )) elif name is not None: cursor.execute('''SELECT username FROM users WHERE username=?''', (name, )) else: cursor.close() return None row = cursor.fetchone() cursor.close() if row is None: return None return User(row[0])
def __str__(self): return self.name