Source code for bencoding.decoder

from collections import OrderedDict
from .tokens import *


[docs]class DecoderError(Exception): pass
[docs]class Decoder: """Use to decode date(bytes).""" def __init__(self, data): self.data = data self._index = 0 self._max_index = len(self.data) - 1
[docs] def decode(self): """Return OrderedDict of self data""" c = self._peek() if c is None: raise DecoderError('unexpected eof') elif c == TOKEN_INTEGER: self._consume() return self._decode_int() elif c == TOKEN_LIST: self._consume() return self._decode_list() elif c == TOKEN_DICT: self._consume() return self._decode_dict() elif c == TOKEN_END: return None elif c in b'0123456789': return self._decode_string() else: raise DecoderError('invalid token:{}'.format(c))
def _peek(self): if self._index >= self._max_index: return None return self.data[self._index:self._index + 1] def _consume(self): self._index += 1 def _read(self, length): if self._index + length > self._max_index: raise DecoderError('out of range') result = self.data[self._index:self._index + length] self._index += length return result def _read_until(self, token): try: lowest_index = self.data.index(token, self._index) result = self.data[self._index:lowest_index] self._index = lowest_index + 1 return result except ValueError: raise DecoderError('unable to find token:{}'.format(token.decode())) def _decode_int(self): return int(self._read_until(TOKEN_END)) def _decode_list(self): result = [] while self.data[self._index:self._index + 1] != TOKEN_END: result.append(self.decode()) self._consume() return result def _decode_dict(self): result = OrderedDict() while self.data[self._index:self._index + 1] != TOKEN_END: key = self.decode() value = self.decode() result[key] = value self._consume() return result def _decode_string(self): length = int(self._read_until(TOKEN_STRING_SEPARATOR)) data = self._read(length) return data