Source code for py2p.base

"""A library to store common functions and protocol definitions"""

from __future__ import absolute_import
from __future__ import print_function
from __future__ import with_statement

import inspect
import socket

from hashlib import (sha256, sha384)
from itertools import chain
from logging import (getLogger, INFO, DEBUG)
from sys import version_info
from threading import (Lock, Thread, current_thread)
from traceback import format_exc
from uuid import uuid4

from umsgpack import (packb, unpackb, UnsupportedTypeException)
from pyee import EventEmitter
from typing import (cast, Any, Callable, Dict, Iterable, List, NamedTuple,
                    Sequence, Tuple, Union)

from .utils import (getUTC, intersect, get_lan_ip, get_socket, sanitize_packet,
                    inherit_doc, log_entry)

_MsgPackable__ = Union[None, int, float, str, bytes]
_MsgPackable_ = Union[_MsgPackable__, List[_MsgPackable__], Tuple[
    _MsgPackable__, ...], Dict[str, _MsgPackable__]]
_MsgPackable = Union[_MsgPackable_, List[_MsgPackable_], Tuple[
    _MsgPackable_, ...], Dict[str, _MsgPackable_]]
MsgPackable = Union[_MsgPackable, List[_MsgPackable], Tuple[_MsgPackable, ...],
                    Dict[str, _MsgPackable]]

protocol_version = "0.6"
node_policy_version = "757"

version = '.'.join((protocol_version, node_policy_version))

plock = Lock()


[docs]class flags(): """A namespace to hold protocol-defined flags""" # Reserved set of bytes reserved = tuple(range(0x30)) # main flags broadcast = 0x00 # also sub-flag renegotiate = 0x01 whisper = 0x02 # also sub-flag ping = 0x03 # Unused, but reserved pong = 0x04 # Unused, but reserved # sub-flags # broadcast = 0x00 compression = 0x01 # whisper = 0x02 # ping = 0x03 # pong = 0x04 handshake = 0x05 notify = 0x06 peers = 0x07 request = 0x08 resend = 0x09 response = 0x0A store = 0x0B retrieve = 0x0C retrieved = 0x0D # implemented compression methods bz2 = 0x10 gzip = 0x11 lzma = 0x12 zlib = 0x13 snappy = 0x20 # non-implemented compression methods (based on list from compressjs): bwtc = 0x14 context1 = 0x15 defsum = 0x16 dmc = 0x17 fenwick = 0x18 huffman = 0x19 lzjb = 0x1A lzjbr = 0x1B lzp3 = 0x1C mtf = 0x1D ppmd = 0x1E simple = 0x1F
user_salt = str(uuid4()).encode()
[docs]def compress(msg, method): #type: (bytes, int) -> bytes """Shortcut method for compression Args: msg: The message you wish to compress, the type required is defined by the requested method method: The compression method you wish to use. Supported (assuming installed): - :py:class:`~base.flags.gzip` - :py:class:`~base.flags.zlib` - :py:class:`~base.flags.bz2` - :py:class:`~base.flags.lzma` Returns: Defined by the compression method, but typically the bytes of the compressed message Warning: The types fed are dependent on which compression method you use. Best to assume most values are :py:class:`bytes` or :py:class:`bytearray` Raises: A :py:class:`ValueError` if there is an unknown compression method, or a method-specific error """ if method in (flags.gzip, flags.zlib): wbits = 15 + (16 * (method == flags.gzip)) compressor = zlib.compressobj(zlib.Z_DEFAULT_COMPRESSION, zlib.DEFLATED, wbits) return compressor.compress(msg) + compressor.flush() elif method == flags.bz2: return bz2.compress(msg) elif method == flags.lzma: return lzma.compress(msg) elif method == flags.snappy: return snappy.compress(msg) else: # pragma: no cover raise ValueError('Unknown compression method')
[docs]def decompress(msg, method): #type: (bytes, int) -> bytes """Shortcut method for decompression Args: msg: The message you wish to decompress, the type required is defined by the requested method method: The decompression method you wish to use. Supported (assuming installed): - :py:class:`~base.flags.gzip` - :py:class:`~base.flags.zlib` - :py:class:`~base.flags.bz2` - :py:class:`~base.flags.lzma` Returns: Defined by the decompression method, but typically the bytes of the compressed message Warning: The types fed are dependent on which decompression method you use. Best to assume most values are :py:class:`bytes` or :py:class:`bytearray` Raises: A :py:class:`ValueError` if there is an unknown compression method, or a method-specific error """ if method in (flags.gzip, flags.zlib): return zlib.decompress(msg, zlib.MAX_WBITS | 32) elif method == flags.bz2: return bz2.decompress(msg) elif method == flags.lzma: return lzma.decompress(msg) elif method == flags.snappy: return snappy.decompress(msg) else: # pragma: no cover raise ValueError('Unknown decompression method')
# This should be in order of preference, with None being implied as last compression = [] # Compression testing section try: import snappy if hasattr(snappy, 'compress'): decompress(compress(b'test', flags.snappy), flags.snappy) compression.append(flags.snappy) except Exception: # pragma: no cover getLogger('py2p.base').info("Unable to load snappy compression") try: import zlib if hasattr(zlib, 'compressobj'): decompress(compress(b'test', flags.zlib), flags.zlib) decompress(compress(b'test', flags.gzip), flags.gzip) compression.extend((flags.zlib, flags.gzip)) except Exception: # pragma: no cover getLogger('py2p.base').info("Unable to load gzip/zlib compression") try: import bz2 if hasattr(bz2, 'compress'): decompress(compress(b'test', flags.bz2), flags.bz2) compression.append(flags.bz2) except Exception: # pragma: no cover getLogger('py2p.base').info("Unable to load bz2 compression") try: import lzma if hasattr(lzma, 'compress'): decompress(compress(b'test', flags.lzma), flags.lzma) compression.append(flags.lzma) except Exception: # pragma: no cover getLogger('py2p.base').info("Unable to load lzma compression")
[docs]def pack_value(l, i): #type: (int, int) -> bytes """For value i, pack it into bytes of size length Args: length: A positive, integral value describing how long to make the packed array i: A positive, integral value to pack into said array Returns: A bytes object containing the given value Raises: ValueError: If length is not large enough to contain the value provided """ ret = bytearray(l) for x in range(l - 1, -1, -1): # Iterate over length backwards ret[x] = i & 0xFF i >>= 8 if i == 0: break if i: raise ValueError("Value not allocatable in size given") return bytes(ret)
[docs]def unpack_value(string): #type: (Union[bytes, bytearray, str]) -> int """For a string, return the packed value inside of it Args: string: A string or bytes-like object Returns: An integral value interpreted from this, as if it were a big-endian, unsigned integral """ val = 0 for char in bytearray(sanitize_packet(string)): val = val << 8 val += char return val
base_58 = b'123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'
[docs]def to_base_58(i): #type: (int) -> bytes """Takes an integer and returns its corresponding base_58 string Args: i: The integral value you wish to encode Returns: A :py:class:`bytes` object which contains the base_58 string Raises: TypeError: If you feed a non-integral value """ string = b"" while i: idx = i % 58 string = base_58[idx:idx + 1] + string i //= 58 if not string: string = base_58[0:1] return string
[docs]def from_base_58(string): #type: (Union[bytes, bytearray, str]) -> int """Takes a base_58 string and returns its corresponding integer Args: string: The base_58 value you wish to decode (string, bytes, or bytearray) Returns: Returns integral value which corresponds to the fed string """ decimal = 0 for char in sanitize_packet(string): decimal = decimal * 58 + base_58.index(cast(bytes, char)) return decimal
[docs]class Protocol( NamedTuple("_Protocol", [('subnet', str), ('encryption', str)])): """Defines service variables so that you can reject connections looking for a different service Attributes: subnet: The subnet flag this Protocol uses encryption: The encryption method this Protocol uses id: The SHA-256 based ID of this Protocol """ __slots__ = () @property def id(self): #type: (Protocol) -> str """The SHA-256-based ID of the Protocol""" h = sha256(''.join(str(x) for x in self).encode()) h.update(protocol_version.encode()) return to_base_58(int(h.hexdigest(), 16)).decode()
default_protocol = Protocol('', "Plaintext") # SSL")
[docs]class InternalMessage(object): """An object used to build and parse protocol-defined message structures""" __slots__ = ('__msg_type', '__time', '__sender', '__payload', '__string', '__compression', '__id', 'compression_fail', '__full_string') @classmethod def __sanitize_string(cls, string, sizeless=False): #type: (Any, Union[bytes, bytearray, str], bool) -> bytes """Removes the size header for further processing. Also checks if the header is valid. Args: string: The string you wish to sanitize sizeless: Whether this string is missing a size header (default: ``False``) Returns: The fed string without the size header Raises: AttributeError: Fed a non-string, non-bytes argument AssertionError: Initial size header is incorrect """ _string = sanitize_packet(string) if not sizeless: if unpack_value(_string[:4]) != len(_string[4:]): raise AssertionError( "Real message size {} != expected size {}. " "Buffer given: {}".format( len(_string), unpack_value(_string[:4]) + 4, _string)) _string = _string[4:] return _string @classmethod def __decompress_string(cls, string, compressions=None): #type: (Any, bytes, Union[None, Iterable[int]]) -> Tuple[bytes, bool] """Returns a tuple containing the decompressed bytes and a boolean as to whether decompression failed or not Args: string: The possibly-compressed message you wish to parse compressions: A list of the standard compression methods this message may be under (default: []) Returns: A decompressed version of the message Raises: ValueError: Unrecognized compression method fed in compressions Warning: Do not feed it with the size header, it will throw errors """ compression_fail = False # second is module scope compression for method in intersect(compressions, compression): try: string = decompress(string, method) compression_fail = False break except: compression_fail = True continue return (string, compression_fail) @classmethod
[docs] def feed_string(cls, string, sizeless=False, compressions=None): #type: (Any, Union[bytes, bytearray, str], bool, Union[None, Iterable[int]]) -> InternalMessage """Constructs a InternalMessage from a string or bytes object. Args: string: The string you wish to parse sizeless: A boolean which describes whether this string has its size header (default: it does) compressions: A list containing the standardized compression methods this message might be under (default: []) Returns: A base.InternalMessage from the given string Raises: AttributeError: Fed a non-string, non-bytes argument AssertionError: Initial size header is incorrect ValueError: Unrecognized compression method fed in compressions IndexError: Packet headers are incorrect OR unrecognized compression """ # First section checks size header _string = cls.__sanitize_string(string, sizeless) # Then we attempt to decompress _string, compression_fail = cls.__decompress_string(_string, compressions) id_ = _string[0:32] serialized = _string[32:] checksum = sha256(serialized).digest() assert id_ == checksum, "Checksum failed: {} != {}".format(id_, checksum) packets = unpackb(serialized) msg = cls(packets[0], packets[1], packets[3:], compression=compressions) msg.time = packets[2] msg.compression_fail = compression_fail msg._InternalMessage__id = checksum msg._InternalMessage__string = serialized # msg.__string = _string return msg
[docs] def __init__( self, #type: InternalMessage msg_type, #type: MsgPackable sender, #type: bytes payload, #type: Iterable[MsgPackable, ...] compression=None, #type: Union[None, Iterable[int, ...]] timestamp=None #type: Union[None, int] ): #type: (...) -> None """Initializes a InternalMessage instance Args: msg_type: A bytes-like header for the message you wish to send sender: A bytes-like sender ID the message is using payload: A iterable of bytes-like objects containing the payload of the message compression: A list of the compression methods this message may use (default: []) timestamp: The current UTC timestamp (as an integer) (default: result of utils.getUTC()) Raises: TypeError: If you feed an object which cannot convert to bytes Warning: If you feed a unicode object, it will be decoded using utf-8. All other objects are treated as raw bytes. If you desire a particular codec, encode it yourself before feeding it in. """ self.__msg_type = msg_type self.__sender = sender self.__payload = tuple(payload) self.__time = timestamp or getUTC() self.__id = None #type: Union[None, bytes] self.__string = None #type: Union[None, bytes] self.__full_string = None #type: Union[None, bytes] self.compression_fail = False if compression: self.__compression = tuple(compression) #type: Tuple[int, ...] else: self.__compression = ()
@property def payload(self): #type: (InternalMessage) -> Tuple[MsgPackable, ...] """Returns a :py:class:`tuple` containing the message payload encoded as :py:class:`bytes` """ return self.__payload @property def compression_used(self): #type: (InternalMessage) -> Union[None, int] """Returns the compression method this message is using""" for method in intersect(compression, self.compression): return method return None def __clear_cache(self): #type: (InternalMessage) -> None self.__full_string = None self.__string = None self.__id = None @property def msg_type(self): #type: (InternalMessage) -> MsgPackable return self.__msg_type @msg_type.setter def msg_type(self, val): #type: (InternalMessage, MsgPackable) -> None self.__clear_cache() self.__msg_type = val @property def sender(self): #type: (InternalMessage) -> bytes return self.__sender @sender.setter def sender(self, val): #type: (InternalMessage, bytes) -> None self.__clear_cache() self.__sender = val @property def compression(self): #type: (InternalMessage) -> Tuple[int, ...] return self.__compression @compression.setter def compression(self, val): #type: (InternalMessage, Iterable[int]) -> None new_comps = intersect(compression, val) old_comp = self.compression_used if (old_comp, ) != new_comps[0:1]: self.__full_string = None self.__compression = tuple(val) @property def time(self): #type: (InternalMessage) -> int return self.__time @time.setter def time(self, val): #type: (InternalMessage, int) -> None self.__clear_cache() self.__time = val @property def time_58(self): #type: (InternalMessage) -> bytes """Returns this message's timestamp in base_58""" return to_base_58(self.__time) @property def id(self): #type: (InternalMessage) -> bytes """Returns the message id""" if not self.__id: payload_hash = sha256(self.__non_len_string) self.__id = payload_hash.digest() return self.__id @property def packets(self): #type: (InternalMessage) -> Tuple[MsgPackable, ...] """Returns the full :py:class:`tuple` of packets in this message encoded as :py:class:`bytes`, excluding the header """ return ((self.__msg_type, self.__sender, self.time) + self.payload) @property def __non_len_string(self): #type: (InternalMessage) -> bytes """Returns a :py:class:`bytes` object containing the entire message, excepting the total length header Raises: TypeError: If any of the arguments are not serializable. This means your objects must be one of the following: - :py:class:`bool` - :py:class:`float` - :py:class:`int` (if ``2**64 > x > -2**63``) - :py:class:`str` - :py:class:`bytes` - :py:class:`unicode` - :py:class:`tuple` - :py:class:`list` - :py:class:`dict` (if all keys are :py:class:`unicode`) """ if not self.__string: try: self.__string = packb(self.packets) except UnsupportedTypeException as e: raise TypeError(*e.args) return self.__string @property def string(self): #type: (InternalMessage) -> bytes """Returns a :py:class:`bytes` representation of the message Raises: TypeError: See :py:func:`~py2p.base.InternalMessage._InternalMessage__non_len_string` """ if not all((self.__id, self.__string, self.__full_string)): id_ = self.id ret = b''.join((id_, self.__non_len_string)) compression_used = self.compression_used if compression_used: ret = compress(ret, compression_used) self.__full_string = b''.join((pack_value(4, len(ret)), ret)) return self.__full_string def __len__(self): #type: (InternalMessage) -> int return len(self.string)
[docs]class BaseConnection(object): """The base class for a connection""" __slots__ = ('sock', 'server', 'outgoing', 'buffer', 'id', 'time', 'addr', 'compression', 'last_sent', 'expected', 'active') @log_entry('py2p.base.BaseConnection.__init__', DEBUG) def __init__(self, sock, server, outgoing=False): #type: (BaseConnection, Any, BaseSocket, bool) -> None """Sets up a connection to another peer-to-peer socket Args: sock: The connected socket object server: A reference to your peer-to-peer socket outgoing: Whether this connection is outgoing (default: False) """ self.sock = sock self.server = server self.outgoing = outgoing self.buffer = bytearray() self.id = None #type: Union[None, bytes] self.time = getUTC() self.addr = None #type: Union[None, Tuple[str, int]] self.compression = [] #type: List[int] self.last_sent = () #type: Tuple[MsgPackable, ...] self.expected = 4 self.active = False
[docs] def send_InternalMessage(self, msg): #type: (BaseConnection, InternalMessage) -> InternalMessage """Sends a preconstructed message Args: msg: The :py:class:`~py2p.base.IntenalMessage` you wish to send Returns: the :py:class:`~py2p.base.IntenalMessage` object you just sent, or ``None`` if the sending was unsuccessful """ msg.compression = self.compression #type: ignore if msg.msg_type in (flags.whisper, flags.broadcast): self.last_sent = msg.payload self.__print__("Sending %s to %s" % (msg.packets, self), level=4) if msg.compression_used: self.__print__( "Compressing with %s" % repr(msg.compression_used), level=4) try: self.sock.send(msg.string) return msg except (IOError, socket.error) as e: # pragma: no cover self.server.daemon.exceptions.append(format_exc()) self.server.disconnect(self)
[docs] def send(self, msg_type, *args, **kargs): #type: (BaseConnection, MsgPackable, *MsgPackable, **Union[bytes, int]) -> InternalMessage """Sends a message through its connection. Args: msg_type: Message type, corresponds to the header in a :py:class:`~py2p.base.InternalMessage` object *args: A list of bytes-like objects, which correspond to the packets to send to you **kargs: There are two available keywords: id: The ID this message should appear to be sent from (default: your ID) time: The time this message should appear to be sent from (default: now in UTC) Returns: the :py:class:`~py2p.base.IntenalMessage` object you just sent, or ``None`` if the sending was unsuccessful """ # Latter is returned if key not found id = kargs.get('id', self.server.id) time = kargs.get('time') or getUTC() # Begin real method msg = InternalMessage( msg_type, id, args, self.compression, timestamp=time) return self.send_InternalMessage(msg)
@property def protocol(self): #type: (BaseConnection) -> Protocol """Returns server.protocol""" return self.server.protocol
[docs] def collect_incoming_data(self, data): #type: (BaseConnection, Union[bytes, bytearray]) -> bool """Collects incoming data Args: data: The most recently received :py:class:`bytes` Returns: ``True`` if the data collection was successful, ``False`` if the connection was closed """ if not bool(data): try: self.sock.shutdown(socket.SHUT_RDWR) except: pass return False self.buffer.extend(data) self.time = getUTC() if not self.active and self.find_terminator(): self.__print__( self.buffer, self.expected, self.find_terminator(), level=4) self.expected = unpack_value(bytes(self.buffer[:4])) + 4 self.active = True return True
[docs] def find_terminator(self): #type: (BaseConnection) -> bool """Returns whether the defined return sequences is found""" return len(self.buffer) >= self.expected
[docs] def found_terminator(self): #type: (BaseConnection) -> InternalMessage """Processes received messages""" raw_msg, self.buffer = bytes(self.buffer[:self.expected]), \ self.buffer[self.expected:] self.__print__("Received: %s" % repr(raw_msg), level=6) self.active = len(self.buffer) > 4 if self.active: self.expected = unpack_value(bytes(self.buffer[:4])) + 4 else: self.expected = 4 msg = InternalMessage.feed_string(raw_msg, False, self.compression) return msg
[docs] def handle_renegotiate(self, packets): #type: (BaseConnection, Sequence[MsgPackable]) -> bool """The handler for connection renegotiations This is to deal with connection maintenance. For instance, it could be that a compression method fails to decode on the other end, and a node will need to renegotiate which methods it is using. Hence the name of the flag associated with it, "renegotiate". Args: packets: A :py:class:`tuple` containing the packets received in this message Returns: ``True`` if an action was taken, ``False`` if not """ if packets[0] == flags.renegotiate: if packets[4] == flags.compression: encoded_methods = packets[5] respond = (self.compression != encoded_methods) self.compression = list(cast(Iterable[int], encoded_methods)) self.__print__( "Compression methods changed to: %s" % repr(self.compression), level=2) if respond: self.send(flags.renegotiate, flags.compression, cast(Tuple[int, ...], intersect(compression, self.compression))) return True elif packets[4] == flags.resend: self.send(*self.last_sent) return True return False
[docs] def fileno(self): #type: (BaseConnection) -> int """Mirror for the fileno() method of the connection's underlying socket """ return self.sock.fileno()
def __print__(self, *args, **kargs): #type: (BaseConnection, *Any, **int) -> None """Private method to print if level is <= self.server.debug_level Args: *args: Each argument you wish to feed to the print method **kargs: One keyword is used here: level, which defines the lowest value of self.server.debug_level at which the message will be printed """ self.server.__print__(*args, **kargs)
[docs]class BaseDaemon(object): """The base class for a daemon""" __slots__ = ('server', 'sock', 'exceptions', 'alive', '_logger', 'main_thread', 'daemon', 'conn_type') @log_entry('py2p.base.BaseDaemon.__init__', DEBUG) def __init__(self, addr, port, server): #type: (Any, str, int, BaseSocket) -> None """Sets up a daemon process for your peer-to-peer socket Args: addr: The address you wish to bind to port: The port you wish to bind to server: A reference to the peer-to-peer socket Raises: socket.error: The address you wanted is already in use ValueError: If your peer-to-peer socket is set up with an unknown encryption method """ self.server = server self.sock = get_socket(self.protocol, True) self.sock.bind((addr, port)) self.sock.listen(5) self.sock.settimeout(0.1) self.exceptions = [] #type: List[str] self.alive = True self._logger = getLogger( '{}.{}.{}'.format(self.__class__.__module__, self.__class__.__name__, self.server.id)) self.main_thread = current_thread() self.daemon = Thread(target=self.mainloop) self.daemon.start() @property def protocol(self): #type: (BaseDaemon) -> Protocol """Returns server.protocol""" return self.server.protocol
[docs] def kill_old_nodes(self, handler): #type: (BaseDaemon, BaseConnection) -> None """Cleans out connections which never finish a message""" if handler.active and handler.time < getUTC() - 60: self.server.disconnect(handler)
[docs] def process_data(self, handler): #type: (BaseDaemon, BaseConnection) -> None """Collects incoming data from nodes""" try: while not handler.find_terminator(): if not handler.collect_incoming_data(handler.sock.recv(1024)): self.__print__( "disconnecting node %s while in loop" % handler.id, level=6) self.server.disconnect(handler) self.server.request_peers() return while handler.find_terminator(): handler.found_terminator() except socket.timeout: # pragma: no cover return # Shouldn't happen with select, but if it does... except Exception as e: if (isinstance(e, socket.error) and e.args[0] in (9, 104, 10053, 10054, 10058)): node_id = repr(handler.id or handler) self.__print__( "Node %s has disconnected from the network" % node_id, level=1) else: self.__print__( "There was an unhandled exception with peer id %s. This " "peer is being disconnected, and the relevant exception " "is added to the debug queue. If you'd like to report " "this, please post a copy of your MeshSocket.status to " "git.p2p.today/issues." % handler.id, level=0) self.exceptions.append(format_exc()) self.server.disconnect(handler) self.server.request_peers()
def __del__(self): #type: (BaseDaemon) -> None self.alive = False try: self.sock.shutdown(socket.SHUT_RDWR) except: # pragma: no cover pass @inherit_doc(BaseConnection.__print__) def __print__(self, *args, **kargs): #type: (BaseDaemon, *Any, **int) -> None self.server.__print__(*args, **kargs)
[docs]class BaseSocket(EventEmitter, object): """ The base class for a peer-to-peer socket abstractor .. inheritance-diagram:: py2p.base.BaseSocket """ __slots__ = ('protocol', 'debug_level', 'routing_table', 'awaiting_ids', 'out_addr', 'id', '_logger', '__handlers', '__closed') @log_entry('py2p.base.BaseSocket.__init__', DEBUG) def __init__( self, #type: Any addr, #type: str port, #type: int prot=default_protocol, #type: Protocol out_addr=None, #type: Union[None, Tuple[str, int]] debug_level=0 #type: int ): #type: (...) -> None """Initializes a peer to peer socket Args: addr: The address you wish to bind to (ie: "192.168.1.1") port: The port you wish to bind to (ie: 44565) prot: The protocol you wish to operate over, defined by a :py:class:`py2p.base.Protocol` object out_addr: Your outward facing address. Only needed if you're connecting over the internet. If you use '0.0.0.0' for the addr argument, this will automatically be set to your LAN address. debug_level: The verbosity you want this socket to use when printing event data Raises: socket.error: The address you wanted could not be bound, or is otherwise used """ object.__init__(self) EventEmitter.__init__(self) self.protocol = prot self.debug_level = debug_level self.routing_table = {} #type: Dict[bytes, BaseConnection] # In format {ID: handler} self.awaiting_ids = [] #type: List[BaseConnection] # Connected, but not handshook yet if out_addr: # Outward facing address, if you're port forwarding self.out_addr = out_addr elif addr == '0.0.0.0': self.out_addr = get_lan_ip(), port else: self.out_addr = addr, port info = (str(self.out_addr).encode(), prot.id.encode(), user_salt) h = sha384(b''.join(info)) self.id = to_base_58(int(h.hexdigest(), 16)) #type: bytes self._logger = getLogger('{}.{}.{}'.format( self.__class__.__module__, self.__class__.__name__, self.id)) self.__handlers = [ ] #type: List[Callable[[Message, BaseConnection], Union[bool, None]]] self.__closed = False
[docs] def close(self): #type: (BaseSocket) -> None """If the socket is not closed, close the socket Raises: RuntimeError: The socket was already closed """ if self.__closed: raise RuntimeError("Already closed") else: self.daemon.alive = False self.daemon.daemon.join() self.debug_level = 0 try: self.daemon.sock.shutdown(socket.SHUT_RDWR) except: pass for conn in chain( tuple(self.routing_table.values()), self.awaiting_ids): self.disconnect(conn) self.__closed = True
if version_info >= (3, ): def register_handler(self, method): #type: (BaseSocket, Callable[[Message, BaseConnection], Union[bool, None]]) -> None """Register a handler for incoming method. Args: method: A function with two given arguments. Its signature should be of the form ``handler(msg, handler)``, where msg is a :py:class:`py2p.base.Message` object, and handler is a :py:class:`py2p.base.BaseConnection` object. It should return ``True`` if it performed an action, to reduce the number of handlers checked. Raises: ValueError: If the method signature doesn't parse correctly """ args = inspect.signature(method) if (len(args.parameters) != (3 if args.parameters.get('self') else 2)): raise ValueError( "This method must contain exactly two arguments " "(or three if first is self)") self.__handlers.append(method) else:
[docs] def register_handler(self, method): #type: (BaseSocket, Callable[[Message, BaseConnection], Union[bool, None]]) -> None """Register a handler for incoming method. Args: method: A function with two given arguments. Its signature should be of the form ``handler(msg, handler)``, where msg is a :py:class:`py2p.base.Message` object, and handler is a :py:class:`py2p.base.BaseConnection` object. It should return ``True`` if it performed an action, to reduce the number of handlers checked. Raises: ValueError: If the method signature doesn't parse correctly """ args = inspect.getargspec(method) if (args[1:] != (None, None, None) or len(args[0]) != (3 if args[0][0] == 'self' else 2)): raise ValueError( "This method must contain exactly two arguments " "(or three if first is self)") self.__handlers.append(method)
[docs] def handle_msg(self, msg, conn): #type: (BaseSocket, Message, BaseConnection) -> Union[bool, None] """Decides how to handle various message types, allowing some to be handled automatically Args: msg: A :py:class:`py2p.base.Message` object conn: A :py:class:`py2p.base.BaseConnection` object Returns: True if an action was taken, None if not. """ for handler in self.__handlers: self.__print__("Checking handler: %s" % handler.__name__, level=4) if handler(msg, conn): self.__print__( "Breaking from handler: %s" % handler.__name__, level=4) return True
@property def status(self): #type: (BaseSocket) -> Union[str, List[str]] """The status of the socket. Returns: ``"Nominal"`` if all is going well, or a list of unexpected (Exception, traceback) tuples if not """ return self.daemon.exceptions or "Nominal" @property def outgoing(self): #type: (BaseSocket) -> Iterable[bytes] """IDs of outgoing connections""" return (handler.id for handler in self.routing_table.values() if handler.outgoing) @property def incoming(self): #type: (BaseSocket) -> Iterable[bytes] """IDs of incoming connections""" return (handler.id for handler in self.routing_table.values() if not handler.outgoing) def __print__(self, *args, **kargs): #type: (BaseSocket, *Any, **int) -> None """Private method to print if level is <= self.debug_level Args: *args: Each argument you wish to feed to the print method **kargs: One keyword is used here: level, which defines the lowest value of self.debug_level at which the message will be printed """ if kargs.get('level', 0) <= self.debug_level: with plock: print(self.out_addr[1], *args) def __del__(self): #type: (BaseSocket) -> None if not self.__closed: self.close()
[docs]class Message(object): """An object which gets returned to a user, containing all necessary information to parse and reply to a message """ __slots__ = ('msg', 'server')
[docs] def __init__(self, msg, server): #type: (Message, InternalMessage, BaseSocket) -> None """Initializes a Message object Args: msg: A :py:class:`py2p.base.InternalMessage` object server: A :py:class:`py2p.base.BaseSocket` object """ self.msg = msg self.server = server
@property def time(self): #type: (Message) -> int """The time this Message was sent at""" return self.msg.time @property #type: ignore @inherit_doc(InternalMessage.time_58) def time_58(self): #type: (Message) -> bytes return self.msg.time_58 @property def sender(self): #type: (Message) -> bytes """The ID of this Message's sender""" return self.msg.sender @property #type: ignore @inherit_doc(InternalMessage.id) def id(self): #type: (Message) -> bytes return self.msg.id @property #type: ignore @inherit_doc(InternalMessage.payload) def packets(self): #type: (Message) -> Tuple[MsgPackable, ...] return self.msg.payload @inherit_doc(InternalMessage.__len__) def __len__(self): #type: (Message) -> int return self.msg.__len__() def __repr__(self): #type: (Message) -> str packets = self.packets return "Message(type={}, packets={}, sender={})".format( packets[0], packets[1:], self.sender)
[docs] def reply(self, *args): #type: (Message, *MsgPackable) -> None """Replies to the sender if you're directly connected. Tries to make a connection otherwise Args: *args: Each argument given is a packet you wish to send. This is prefixed with base.flags.whisper, so the other end will receive ``[base.flags.whisper, *args]`` """ self.server._logger.debug( 'Initiating a direct reply to Message ID {}'.format(self.id)) if self.server.routing_table.get(self.sender): self.server.routing_table.get(self.sender).send( flags.whisper, flags.whisper, *args) else: self.server._logger.debug('Requesting connection for direct reply' ' to Message ID {}'.format(self.id)) request_hash = sha384(self.sender + to_base_58( getUTC())).hexdigest() request_id = to_base_58(int(request_hash, 16)) self.server.send(request_id, self.sender, type=flags.request) to_send = (flags.whisper, flags.whisper ) #type: Tuple[MsgPackable, ...] self.server.requests[request_id] = to_send + args self.server._logger.critical( "You aren't connected to the original sender. This reply is " "not guarunteed, but we're trying to make a connection and " "put the message through.")