From 5cf463bb8574b71aaa22964ed29cabd16116ef85 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 20 Oct 2025 21:06:02 +0800 Subject: [PATCH 1/3] 2025-10-20 21:06:02 --- pleth/core.py | 8 ++++---- pleth/ecdsa.py | 1 + pleth/rpc.py | 7 ++++--- pleth/secp256k1.py | 20 +++++++++++--------- pleth/wallet.py | 2 +- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/pleth/core.py b/pleth/core.py index 6f8eee0..70bdc99 100644 --- a/pleth/core.py +++ b/pleth/core.py @@ -30,12 +30,12 @@ def json(self) -> typing.Dict: 'n': f'{self.n:064x}', } - def pubkey(self): + def pubkey(self) -> PubKey: pubkey = pleth.secp256k1.G * pleth.secp256k1.Fr(self.n) return PubKey(pubkey.x.x, pubkey.y.x) @classmethod - def random(cls) -> typing.Self: + def random(cls) -> PriKey: return PriKey(max(1, secrets.randbelow(pleth.secp256k1.N))) def sign(self, data: bytearray) -> bytearray: @@ -92,7 +92,7 @@ def pt(self) -> pleth.secp256k1.Pt: return pleth.secp256k1.Pt(pleth.secp256k1.Fq(self.x), pleth.secp256k1.Fq(self.y)) @classmethod - def pt_decode(cls, data: pleth.secp256k1.Pt) -> typing.Self: + def pt_decode(cls, data: pleth.secp256k1.Pt) -> PubKey: return PubKey(data.x.x, data.y.x) @@ -353,7 +353,7 @@ def hash(self) -> bytearray: data = '\x19Ethereum Signed Message:\n' data += str(len(self.data)) data += self.data - return pleth.core.hash(data.encode()) + return pleth.core.hash(bytearray(data.encode())) def pubkey(self, sig: bytearray) -> PubKey: m = pleth.secp256k1.Fr(int.from_bytes(self.hash())) diff --git a/pleth/ecdsa.py b/pleth/ecdsa.py index acfc509..096abd4 100644 --- a/pleth/ecdsa.py +++ b/pleth/ecdsa.py @@ -22,6 +22,7 @@ def sign(prikey: pleth.secp256k1.Fr, m: pleth.secp256k1.Fr) -> typing.Tuple[plet if R.x.x >= pleth.secp256k1.N: v |= 2 return r, s, v + raise Exception('unreachable') def verify(pubkey: pleth.secp256k1.Pt, m: pleth.secp256k1.Fr, r: pleth.secp256k1.Fr, s: pleth.secp256k1.Fr) -> bool: diff --git a/pleth/rpc.py b/pleth/rpc.py index c2032e3..d46425b 100644 --- a/pleth/rpc.py +++ b/pleth/rpc.py @@ -9,8 +9,9 @@ def call(method: str, params: typing.List) -> typing.Any: - call.rate = getattr(call, 'rate', pleth.rate.Limits(pleth.config.current.rpc.qps, 1)) - call.rate.wait(1) + if not hasattr(call, 'rate'): + setattr(call, 'rate', pleth.rate.Limits(pleth.config.current.rpc.qps, 1)) + getattr(call, 'rate').wait(1) r = requests.post(pleth.config.current.rpc.url, json={ 'id': random.randint(0x00000000, 0xffffffff), 'jsonrpc': '2.0', @@ -154,7 +155,7 @@ def eth_protocol_version() -> str: return call('eth_protocolVersion', []) -def eth_send_raw_transaction(tx: typing.Dict) -> str: +def eth_send_raw_transaction(tx: str) -> str: return call('eth_sendRawTransaction', [tx]) diff --git a/pleth/secp256k1.py b/pleth/secp256k1.py index 9048d23..75e16d6 100644 --- a/pleth/secp256k1.py +++ b/pleth/secp256k1.py @@ -20,7 +20,8 @@ def __add__(self, data: typing.Self) -> typing.Self: assert self.p == data.p return self.__class__(self.x + data.x) - def __eq__(self, data: typing.Self) -> bool: + def __eq__(self, data: object) -> bool: + assert isinstance(data, Fp) assert self.p == data.p return self.x == data.x @@ -94,7 +95,7 @@ def __init__(self, x: Fq, y: Fq) -> None: self.x = x self.y = y - def __add__(self, data: typing.Self) -> typing.Self: + def __add__(self, data: Pt) -> Pt: # https://www.cs.miami.edu/home/burt/learning/Csc609.142/ecdsa-cert.pdf # Don Johnson, Alfred Menezes and Scott Vanstone, The Elliptic Curve Digital Signature Algorithm (ECDSA) # 4.1 Elliptic Curves Over Fp @@ -116,13 +117,14 @@ def __add__(self, data: typing.Self) -> typing.Self: y3 = sk * (x1 - x3) - y1 return Pt(x3, y3) - def __eq__(self, data: typing.Self) -> bool: + def __eq__(self, data: object) -> bool: + assert isinstance(data, Pt) return all([ self.x == data.x, self.y == data.y, ]) - def __mul__(self, k: Fr) -> typing.Self: + def __mul__(self, k: Fr) -> Pt: # Point multiplication: Double-and-add # https://en.wikipedia.org/wiki/Elliptic_curve_point_multiplication n = k.x @@ -136,22 +138,22 @@ def __mul__(self, k: Fr) -> typing.Self: n = n >> 1 return result - def __neg__(self) -> typing.Self: + def __neg__(self) -> Pt: return Pt(self.x, -self.y) def __repr__(self) -> str: return json.dumps(self.json()) - def __sub__(self, data: typing.Self) -> typing.Self: + def __sub__(self, data: Pt) -> Pt: return self + data.__neg__() - def __truediv__(self, k: Fr) -> typing.Self: + def __truediv__(self, k: Fr) -> Pt: return self.__mul__(k ** -1) - def __pos__(self) -> typing.Self: + def __pos__(self) -> Pt: return Pt(self.x, +self.y) - def json(self) -> typing.Self: + def json(self) -> typing.Dict[str, str]: return { 'x': self.x.json(), 'y': self.y.json(), diff --git a/pleth/wallet.py b/pleth/wallet.py index 8eb2855..f7aa336 100644 --- a/pleth/wallet.py +++ b/pleth/wallet.py @@ -45,7 +45,7 @@ def json(self) -> typing.Dict: def balance(self) -> int: return int(pleth.rpc.eth_get_balance(f'0x{self.addr.hex()}', 'latest'), 0) - def contract_addr(self, hash: bytearray) -> str: + def contract_addr(self, hash: bytearray) -> bytearray: return bytearray.fromhex(pleth.rpc.eth_get_transaction_receipt(f'0x{hash.hex()}')['contractAddress'][2:]) def contract_call(self, addr: bytearray, data: bytearray) -> bytearray: From 8576deccf2433869582d117c11c2422bf5bcafe6 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 20 Oct 2025 22:23:25 +0800 Subject: [PATCH 2/3] 2025-10-20 22:23:25 --- pleth/core.py | 84 +++++++++++------------ pleth/rlp.py | 175 +++++++++++++++-------------------------------- test/test_rlp.py | 18 +---- 3 files changed, 99 insertions(+), 178 deletions(-) diff --git a/pleth/core.py b/pleth/core.py index 70bdc99..8942744 100644 --- a/pleth/core.py +++ b/pleth/core.py @@ -127,15 +127,15 @@ def __repr__(self) -> str: def envelope(self) -> bytearray: return pleth.rlp.encode([ - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_price), - pleth.rlp.put_uint(self.gas), + self.nonce, + self.gas_price, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, - pleth.rlp.put_uint(self.v), - pleth.rlp.put_uint(self.r), - pleth.rlp.put_uint(self.s), + self.v, + self.r, + self.s, ]) def hash(self) -> bytearray: @@ -157,15 +157,15 @@ def json(self) -> typing.Dict: def sign(self, prikey: PriKey) -> None: # EIP-155: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-155.md sign = prikey.sign(hash(pleth.rlp.encode([ - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_price), - pleth.rlp.put_uint(self.gas), + self.nonce, + self.gas_price, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, - pleth.rlp.put_uint(pleth.config.current.chain_id), - pleth.rlp.put_uint(0), - pleth.rlp.put_uint(0), + pleth.config.current.chain_id, + 0, + 0, ]))) self.r = int.from_bytes(sign[0x00:0x20]) self.s = int.from_bytes(sign[0x20:0x40]) @@ -205,17 +205,17 @@ def __repr__(self) -> str: def envelope(self) -> bytearray: return bytearray([0x01]) + pleth.rlp.encode([ - pleth.rlp.put_uint(self.chain_id), - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_price), - pleth.rlp.put_uint(self.gas), + self.chain_id, + self.nonce, + self.gas_price, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, self.access_list, - pleth.rlp.put_uint(self.v), - pleth.rlp.put_uint(self.r), - pleth.rlp.put_uint(self.s), + self.v, + self.r, + self.s, ]) def hash(self) -> bytearray: @@ -238,12 +238,12 @@ def json(self) -> typing.Dict: def sign(self, prikey: PriKey) -> None: sign = prikey.sign(hash(bytearray([0x01]) + pleth.rlp.encode([ - pleth.rlp.put_uint(self.chain_id), - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_price), - pleth.rlp.put_uint(self.gas), + self.chain_id, + self.nonce, + self.gas_price, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, self.access_list, ]))) @@ -290,18 +290,18 @@ def __repr__(self) -> str: def envelope(self) -> bytearray: return bytearray([0x02]) + pleth.rlp.encode([ - pleth.rlp.put_uint(self.chain_id), - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_tip_cap), - pleth.rlp.put_uint(self.gas_fee_cap), - pleth.rlp.put_uint(self.gas), + self.chain_id, + self.nonce, + self.gas_tip_cap, + self.gas_fee_cap, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, self.access_list, - pleth.rlp.put_uint(self.v), - pleth.rlp.put_uint(self.r), - pleth.rlp.put_uint(self.s), + self.v, + self.r, + self.s, ]) def hash(self) -> bytearray: @@ -325,13 +325,13 @@ def json(self) -> typing.Dict: def sign(self, prikey: PriKey) -> None: sign = prikey.sign(hash(bytearray([0x02]) + pleth.rlp.encode([ - pleth.rlp.put_uint(self.chain_id), - pleth.rlp.put_uint(self.nonce), - pleth.rlp.put_uint(self.gas_tip_cap), - pleth.rlp.put_uint(self.gas_fee_cap), - pleth.rlp.put_uint(self.gas), + self.chain_id, + self.nonce, + self.gas_tip_cap, + self.gas_fee_cap, + self.gas, self.to if self.to else bytearray(), - pleth.rlp.put_uint(self.value), + self.value, self.data, self.access_list, ]))) diff --git a/pleth/rlp.py b/pleth/rlp.py index f481e18..5e62fdb 100644 --- a/pleth/rlp.py +++ b/pleth/rlp.py @@ -1,137 +1,72 @@ import typing -def encode_byte(data: bytearray) -> bytearray: - body = bytearray() - if len(data) == 0x01 and data[0] <= 0x7f: - body.extend(data) - return body - if len(data) <= 0x37: - body.append(0x80 + len(data)) - body.extend(data) - return body - if len(data): +def put_uint(data: int) -> bytearray: + return bytearray(data.to_bytes(32)).lstrip(bytearray([0x00])) + + +def encode(data: int | bytearray | typing.List) -> bytearray: + if isinstance(data, int): + return encode(put_uint(data)) + if isinstance(data, bytearray): + body = bytearray() + if len(data) == 0x01 and data[0] <= 0x7f: + body.extend(data) + return body + if len(data) <= 0x37: + body.append(0x80 + len(data)) + body.extend(data) + return body size = put_uint(len(data)) body.append(0xb7 + len(size)) body.extend(size) body.extend(data) return body - - -def encode_list(data: typing.List[bytearray]) -> bytearray: - head = bytearray() - body = bytearray() - for e in data: - body.extend(encode(e)) - if len(body) <= 0x37: - head.append(0xc0 + len(body)) - return head + body - if len(body): + if isinstance(data, list): + head = bytearray() + body = bytearray() + for e in data: + body.extend(encode(e)) + if len(body) <= 0x37: + head.append(0xc0 + len(body)) + return head + body size = put_uint(len(body)) head.append(0xf7 + len(size)) head.extend(size) return head + body + raise Exception('unreachable') -def encode(data: bytearray | typing.List[bytearray]) -> bytearray: - if isinstance(data, bytearray): - return encode_byte(data) - if isinstance(data, list): - return encode_list(data) - - -def decode_byte(data: bytearray) -> bytearray: - head = data[0] - assert head <= 0xbf - if head <= 0x7f: - assert len(data) == 1 - body = data.copy() - return body - if head <= 0xb7: - size = head - 0x80 - assert len(data) == 1 + size - body = data[1:].copy() - return body - if head <= 0xbf: - nlen = head - 0xb7 +def decode(data: bytearray) -> bytearray | typing.List[bytearray]: + if data[0] <= 0x7f: + return bytearray([data[0]]) + if data[0] <= 0xb7: + size = data[0] - 0x80 + return data[1:1+size] + if data[0] <= 0xbf: + nlen = data[0] - 0xb7 size = int.from_bytes(data[1:1+nlen]) - assert len(data) == 1 + nlen + size - body = data[1+nlen:].copy() + body = data[1+nlen:1+nlen+size] return body - - -def decode_list(data: bytearray) -> typing.List[bytearray]: - head = data[0] - assert head >= 0xc0 - offs = 0 - body = [] - if offs == 0 and head <= 0xf7: - size = head - 0xc0 - assert len(data) == 1 + size - offs += 1 - if offs == 0 and head <= 0xff: - nlen = head - 0xf7 + if data[0] <= 0xf7: + size = data[0] - 0xc0 + body = data[1:1+size] + rets = [] + offs = 0 + while offs < len(body): + item = decode(body[offs:]) + rets.append(item) + offs += len(encode(item)) + return rets + if data[0] <= 0xff: + nlen = data[0] - 0xf7 size = int.from_bytes(data[1:1+nlen]) - assert len(data) == 1 + nlen + size - offs += 1 - offs += nlen - for _ in range(1 << 0xf): - if offs >= len(data): - break - head = data[offs] - if head < 0x80: - body.append(decode_byte(data[offs: offs+1])) - offs += 1 - continue - if head < 0xb8: - size = head - 0x80 - body.append(decode_byte(data[offs: offs+1+size])) - offs += 1 - offs += size - continue - if head < 0xc0: - nlen = head - 0xb7 - size = int.from_bytes(data[offs+1:offs+1+nlen]) - body.append(decode_byte(data[offs: offs+1+nlen+size])) - offs += 1 - offs += nlen - offs += size - continue - if head < 0xf8: - size = head - 0xc0 - body.append(decode_list(data[offs: offs+1+size])) - offs += 1 - offs += size - continue - if head: - nlen = head - 0xf7 - size = int.from_bytes(data[offs+1:offs+1+nlen]) - body.append(decode_list(data[offs: offs+1+nlen+size])) - offs += 1 - offs += nlen - offs += size - continue - return body - - -def decode(data: bytearray) -> bytearray | typing.List[bytearray]: - if data[0] <= 0xbf: - return decode_byte(data) - if data[0]: - return decode_list(data) - - -def get_bool(data: bytearray) -> bool: - return len(data) == 1 - - -def get_uint(data: bytearray) -> int: - return int.from_bytes(data) - - -def put_bool(data: bool) -> bytearray: - return bytearray([0x01]) if data else bytearray() - - -def put_uint(data: int) -> bytearray: - return bytearray(data.to_bytes(32)).lstrip(bytearray([0x00])) + body = data[1+nlen:1+nlen+size] + rets = [] + offs = 0 + while offs < len(body): + item = decode(body[offs:]) + rets.append(item) + offs += len(encode(item)) + return rets + raise Exception('unreachable') diff --git a/test/test_rlp.py b/test/test_rlp.py index c76d6c0..ad3350f 100644 --- a/test/test_rlp.py +++ b/test/test_rlp.py @@ -6,6 +6,7 @@ def test_encode(): assert pleth.rlp.encode([bytearray(b'cat'), bytearray(b'dog')]) == bytearray(b'\xc8\x83cat\x83dog') assert pleth.rlp.encode(bytearray()) == bytearray(b'\x80') assert pleth.rlp.encode([]) == bytearray(b'\xc0') + assert pleth.rlp.encode(0x00) == bytearray(b'\x80') assert pleth.rlp.encode(bytearray([0x00])) == bytearray(b'\x00') assert pleth.rlp.encode(bytearray([0x0f])) == bytearray(b'\x0f') assert pleth.rlp.encode(bytearray([0x04, 0x00])) == bytearray(b'\x82\x04\x00') @@ -18,24 +19,9 @@ def test_decode(): assert pleth.rlp.decode(bytearray(b'\xc8\x83cat\x83dog')) == [bytearray(b'cat'), bytearray(b'dog')] assert pleth.rlp.decode(bytearray(b'\x80')) == bytearray() assert pleth.rlp.decode(bytearray(b'\xc0')) == [] + assert pleth.rlp.decode(bytearray(b'\x80')) == bytearray() assert pleth.rlp.decode(bytearray(b'\x00')) == bytearray([0x00]) assert pleth.rlp.decode(bytearray(b'\x0f')) == bytearray([0x0f]) assert pleth.rlp.decode(bytearray(b'\x82\x04\x00')) == bytearray([0x04, 0x00]) assert pleth.rlp.decode(bytearray(b'\xc7\xc0\xc1\xc0\xc3\xc0\xc1\xc0')) == [[], [[]], [[], [[]]]] assert pleth.rlp.decode(bytearray([0xb8, 0x38] + [0x00] * 56)) == bytearray([0] * 56) - - -def test_bool(): - assert pleth.rlp.encode(pleth.rlp.put_bool(1)) == bytearray([0x01]) - assert pleth.rlp.encode(pleth.rlp.put_bool(0)) == bytearray([0x80]) - assert pleth.rlp.get_bool(pleth.rlp.decode(bytearray([0x01]))) == 1 - assert pleth.rlp.get_bool(pleth.rlp.decode(bytearray([0x80]))) == 0 - - -def test_uint(): - assert pleth.rlp.encode(pleth.rlp.put_uint(852456)) == bytearray([0x83, 0x0d, 0x01, 0xe8]) - assert pleth.rlp.encode(pleth.rlp.put_uint(0)) == bytearray([0x80]) - assert pleth.rlp.encode(pleth.rlp.put_uint(0x10000000000000001)) == bytearray([0x89, 0x01, 0, 0, 0, 0, 0, 0, 0, 0x01]) - assert pleth.rlp.get_uint(pleth.rlp.decode(bytearray([0x83, 0x0d, 0x01, 0xe8]))) == 852456 - assert pleth.rlp.get_uint(pleth.rlp.decode(bytearray([0x80]))) == 0 - assert pleth.rlp.get_uint(pleth.rlp.decode(bytearray([0x89, 0x01, 0, 0, 0, 0, 0, 0, 0, 0x01]))) == 0x10000000000000001 From cd835014038ef2cd404e1c38f9e9b9abe64ee671 Mon Sep 17 00:00:00 2001 From: mohanson Date: Mon, 20 Oct 2025 22:31:45 +0800 Subject: [PATCH 3/3] 2025-10-20 22:31:45 --- pleth/rlp.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pleth/rlp.py b/pleth/rlp.py index 5e62fdb..7f05dd0 100644 --- a/pleth/rlp.py +++ b/pleth/rlp.py @@ -1,13 +1,9 @@ import typing -def put_uint(data: int) -> bytearray: - return bytearray(data.to_bytes(32)).lstrip(bytearray([0x00])) - - def encode(data: int | bytearray | typing.List) -> bytearray: if isinstance(data, int): - return encode(put_uint(data)) + return encode(bytearray(data.to_bytes(32)).lstrip(bytearray([0x00]))) if isinstance(data, bytearray): body = bytearray() if len(data) == 0x01 and data[0] <= 0x7f: @@ -17,7 +13,7 @@ def encode(data: int | bytearray | typing.List) -> bytearray: body.append(0x80 + len(data)) body.extend(data) return body - size = put_uint(len(data)) + size = bytearray(len(data).to_bytes(32)).lstrip(bytearray([0x00])) body.append(0xb7 + len(size)) body.extend(size) body.extend(data) @@ -30,7 +26,7 @@ def encode(data: int | bytearray | typing.List) -> bytearray: if len(body) <= 0x37: head.append(0xc0 + len(body)) return head + body - size = put_uint(len(body)) + size = bytearray(len(body).to_bytes(32)).lstrip(bytearray([0x00])) head.append(0xf7 + len(size)) head.extend(size) return head + body