Source code for baboossh.path

import hashlib
from baboossh.exceptions import NoPathError
from baboossh import Db, Endpoint, Host
from baboossh.utils import Unique

[docs]class Path(metaclass=Unique): """Indicates an endpoint is reachable from a host A path is created when an :class:`.Endpoint` can be reached from a :class:`.Host`, either by being on the same network or by not having filtering between both. This is used to pivot through compromised hosts to reach endpoints which could not be accessed from the user's position in the network (`"Local"`) This class provides various functions to get paths to Endpoints through several pivots, or to discover new paths to the host. Attributes: src (:class:`.Host` or `None`): the starting Host of the path. if `None`, the path starts at `"Local"`, the user current's position in the network. dst (:class:`.Endpoint`): the destination Endpoint of the path id (int): The path id """
[docs] def __init__(self, src, dst): if str(src) == str(dst): raise ValueError("Can't create path to self") self.src = src self.dst = dst self.id = None cursor = Db.get().cursor() cursor.execute('SELECT id FROM paths WHERE src=? AND dst=?', (self.src.id if self.src is not None else 0, self.dst.id)) saved_path = cursor.fetchone() cursor.close() if saved_path is not None: self.id = saved_path[0]
@classmethod def get_id(cls, src, dst): return hashlib.sha256((str(src)+str(dst)).encode()).hexdigest() @property def scope(self): if not self.dst.scope: return False if self.src is None: return True return self.src.scope
[docs] def save(self): """Save the Path in database If the Path object has an id it means it is already stored in database, so it is updated. Else it is inserted and the id is set in the object. """ cursor = Db.get().cursor() if self.id is not None: cursor.execute('''UPDATE paths SET src = ?, dst = ? WHERE id = ?''', (self.src.id if self.src is not None else 0, self.dst.id, self.id)) else: cursor.execute('''INSERT INTO paths(src, dst) VALUES (?, ?) ''', (self.src.id if self.src is not None else 0, self.dst.id)) cursor.close() cursor = Db.get().cursor() cursor.execute('SELECT id FROM paths WHERE src=? AND dst=?', (self.src.id if self.src is not None else 0, self.dst.id)) self.id = cursor.fetchone()[0] cursor.close() Db.get().commit()
[docs] def delete(self): """Delete an Path from the :class:`.Workspace`""" if self.id is None: return {} cursor = Db.get().cursor() cursor.execute('DELETE FROM paths WHERE id = ?', (self.id, )) cursor.close() Db.get().commit() return {"Path":[type(self).get_id(self.src, self.dst)]}
[docs] @classmethod def find_all(cls, src=None, dst=None): """Find all Paths Args: src (:class:`.Host` or `None`): the Host to use as source, `"Local"` if `(int)0` dst (:class:`.Endpoint`): the Endpoint to use as destination Returns: A list of all `Path` s in the :class:`.Workspace` """ if src is not None and src == 0: src_id = 0 elif src is not None: src_id = src.id ret = [] cursor = Db.get().cursor() if src is None: if dst is None: req = cursor.execute('SELECT src, dst FROM paths') else: req = cursor.execute('SELECT src, dst FROM paths WHERE dst=?', (dst.id, )) else: if dst is None: req = cursor.execute('SELECT src, dst FROM paths WHERE src=?', (src_id, )) else: req = cursor.execute('SELECT src, dst FROM paths WHERE src=? AND dst=?', (src_id, dst.id)) for row in req: ret.append(Path(Host.find_one(host_id=row[0]), Endpoint.find_one(endpoint_id=row[1]))) cursor.close() return ret
[docs] @classmethod def find_one(cls, path_id=None): """Find an path by its id Args: pathId (int): the path id to search Returns: A single `Path` or `None`. """ if path_id is None: return None cursor = Db.get().cursor() cursor.execute('''SELECT src, dst FROM paths WHERE id=?''', (path_id, )) row = cursor.fetchone() cursor.close() if row is None: return None return Path(Host.find_one(host_id=row[0]), Endpoint.find_one(endpoint_id=row[1]))
[docs] @classmethod def direct(cls, dst): """Check if there is a direct Path from `"Local"` to an Endpoint Args: dst (:class:`.Endpoint`): the Endpoint to use as destination Returns: `True` if there is a `Path` from `"Local"` to dst, `False` otherwise """ cursor = Db.get().cursor() cursor.execute('''SELECT id FROM paths WHERE src=0 and dst=?''', (dst.id, )) row = cursor.fetchone() cursor.close() return row is not None
[docs] @classmethod def get(cls, dst, first=True): """Get the chain of paths from `"Local"` to an `Endpoint` Args: dst (:class:`Endpoint`): the destination `Endpoint` Returns: A `List` of :class:`Hosts` forming a chain from `"Local"` to dst Raises: NoPathError: if no path could be found to `dst` """ try: previous_hop = Host.find_one(prev_hop_to=dst) except NoPathError as exc: raise exc if previous_hop is None: return [None, dst.host] chain = cls.get(previous_hop.closest_endpoint, first=False) if first: chain.append(dst) return chain chain.append(dst.host) return chain
def __str__(self): if self.src is not None: src = str(self.src) else: src = "local" if self.dst.host is not None: dst = str(self.dst.host) else: dst = str(self.dst) return src+" -> "+dst