Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/validators/crypto_addresses/_keccak.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
"""Pure-Python Keccak-256 implementation — no external dependencies.

Used as fallback when ``eth-hash`` is not installed.
Compatible with Ethereum's EIP-55 address checksum (RFC Keccak-256,
which differs from NIST SHA3-256 only in the padding byte).
"""

from __future__ import annotations

_KeccakF_RoundConstants = [
0x0000000000000001, 0x0000000000008082, 0x800000000000808A, 0x8000000080008000,
0x000000000000808B, 0x0000000080000001, 0x8000000080008081, 0x8000000000008009,
0x000000000000008A, 0x0000000000000088, 0x0000000080008009, 0x000000008000000A,
0x000000008000808B, 0x800000000000008B, 0x8000000000008089, 0x8000000000008003,
0x8000000000008002, 0x8000000000000080, 0x000000000000800A, 0x800000008000000A,
0x8000000080008081, 0x8000000000008080, 0x0000000080000001, 0x8000000080008008,
]

_KeccakF_RotationConstants = [
1, 3, 6, 10, 15, 21, 28, 36, 45, 55, 2, 14,
27, 41, 56, 8, 25, 43, 62, 18, 39, 61, 20, 44,
]

_KeccakF_PiLane = [
10, 7, 11, 17, 18, 3, 5, 16, 8, 21, 24, 4,
15, 23, 19, 13, 12, 2, 20, 14, 22, 9, 6, 1,
]

_MOD64 = (1 << 64) - 1


def _keccak_f(state: list[int]) -> list[int]:
for rc in _KeccakF_RoundConstants:
c = [state[x] ^ state[x + 5] ^ state[x + 10] ^ state[x + 15] ^ state[x + 20]
for x in range(5)]
d = [c[(x + 4) % 5] ^ ((c[(x + 1) % 5] << 1 | c[(x + 1) % 5] >> 63) & _MOD64)
for x in range(5)]
state = [state[x] ^ d[x % 5] for x in range(25)]
b = [0] * 25
b[0] = state[0]
for x, (y, r) in enumerate(zip(_KeccakF_PiLane, _KeccakF_RotationConstants), 1):
b[y] = ((state[x] << r | state[x] >> (64 - r)) & _MOD64)
state = [b[x] ^ ((~b[(x + 1) % 5 + (x // 5) * 5]) & b[(x + 2) % 5 + (x // 5) * 5])
for x in range(25)]
state[0] ^= rc
return state


def keccak256(data: bytes) -> bytes:
"""Compute Keccak-256 (Ethereum variant) of *data*.

This is NOT the same as NIST SHA3-256; the padding byte differs (0x01 vs 0x06).

Args:
data: Raw bytes to hash.

Returns:
32-byte digest.
"""
rate_bytes = 136 # Keccak-256: 1600 - 2*256 = 1088 bits = 136 bytes
data = bytearray(data)

# Padding: Keccak uses 0x01 ... 0x80 (not SHA3's 0x06)
data += b"\x01"
data += b"\x00" * (rate_bytes - len(data) % rate_bytes)
data[-1] |= 0x80

state: list[int] = [0] * 25
for i in range(0, len(data), rate_bytes):
block = data[i:i + rate_bytes]
for j in range(rate_bytes // 8):
state[j] ^= int.from_bytes(block[j * 8:(j + 1) * 8], "little")
state = _keccak_f(state)

digest = bytearray()
for word in state[:4]:
digest += word.to_bytes(8, "little")
return bytes(digest)
89 changes: 62 additions & 27 deletions src/validators/crypto_addresses/eth_address.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,60 @@
# local
from validators.utils import validator

_keccak_flag = True
# Try providers in order: eth-hash (fast, C ext) → pycryptodome → unavailable
_keccak_fn = None

try:
# external
from eth_hash.auto import keccak
from eth_hash.auto import keccak as _eth_keccak # type: ignore

def _keccak_fn(data: bytes) -> bytes: # type: ignore[no-redef]
return _eth_keccak.new(data).digest()

except ImportError:
_keccak_flag = False
pass

if _keccak_fn is None:
try:
from Crypto.Hash import keccak as _pycrypto_keccak # type: ignore

def _validate_eth_checksum_address(addr: str):
"""Validate ETH type checksum address."""
addr = addr.replace("0x", "")
addr_hash = keccak.new(addr.lower().encode("ascii")).digest().hex() # type: ignore
def _keccak_fn(data: bytes) -> bytes: # type: ignore[no-redef]
k = _pycrypto_keccak.new(digest_bits=256)
k.update(data)
return k.digest()

except ImportError:
pass

_keccak_available = _keccak_fn is not None

_RE_ALL_LOWER = re.compile(r"^0x[0-9a-f]{40}$")
_RE_ALL_UPPER = re.compile(r"^0x[0-9A-F]{40}$")
_RE_ETH_ADDR = re.compile(r"^0x[0-9a-fA-F]{40}$")

if len(addr) != 40:
return False

for i in range(0, 40):
if (int(addr_hash[i], 16) > 7 and addr[i].upper() != addr[i]) or (
int(addr_hash[i], 16) <= 7 and addr[i].lower() != addr[i]
):
return False
return True
def _validate_eth_checksum_address(addr: str) -> bool:
"""Validate EIP-55 mixed-case checksum address."""
addr_stripped = addr[2:] # remove 0x
addr_hash = _keccak_fn(addr_stripped.lower().encode("ascii")).hex() # type: ignore[misc]
return all(
(int(addr_hash[i], 16) > 7 and addr_stripped[i].upper() == addr_stripped[i])
or (int(addr_hash[i], 16) <= 7 and addr_stripped[i].lower() == addr_stripped[i])
for i in range(40)
)


@validator
def eth_address(value: str, /):
"""Return whether or not given value is a valid ethereum address.

Full validation is implemented for ERC20 addresses.
Validates ERC-20 / EIP-55 addresses. Three address forms are accepted:

* **All-lowercase** ``0x`` + 40 hex chars — valid without checksum.
* **All-uppercase** ``0X`` + 40 hex chars — valid without checksum.
* **Mixed-case** (EIP-55 checksum) — requires ``eth-hash`` or
``pycryptodome`` to verify the Keccak-256 checksum. If neither
is available the address is rejected to avoid accepting corrupt
checksums silently.

Examples:
>>> eth_address('0x9cc14ba4f9f68ca159ea4ebf2c292a808aaeb598')
Expand All @@ -47,17 +72,27 @@ def eth_address(value: str, /):
Ethereum address string to validate.

Returns:
(Literal[True]): If `value` is a valid ethereum address.
(ValidationError): If `value` is an invalid ethereum address.
""" # noqa: E501
if not _keccak_flag:
raise ImportError(
"Do `pip install validators[crypto-eth-addresses]` to perform `eth_address` validation."
)
(Literal[True]): If ``value`` is a valid ethereum address.
(ValidationError): If ``value`` is an invalid ethereum address.

Note:
For full mixed-case checksum validation install either
``pip install validators[crypto-eth-addresses]``
or ``pip install pycryptodome``.
"""
if not value:
return False

return re.compile(r"^0x[0-9a-f]{40}$|^0x[0-9A-F]{40}$").match(
value
) or _validate_eth_checksum_address(value)
if not _RE_ETH_ADDR.match(value):
return False

# Pure-lowercase or pure-uppercase: structurally valid, no checksum needed
if _RE_ALL_LOWER.match(value) or _RE_ALL_UPPER.match(value):
return True

# Mixed-case requires EIP-55 checksum verification
if not _keccak_available:
# Cannot verify checksum — reject to avoid silently accepting bad checksums
return False

return _validate_eth_checksum_address(value)
20 changes: 12 additions & 8 deletions src/validators/encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
# local
from .utils import validator

# Perf: compile regex at module level — avoids recompilation on every call
_RE_BASE16 = re.compile(r"^[0-9A-Fa-f]+$")
_RE_BASE32 = re.compile(r"^[A-Z2-7]+=*$")
_RE_BASE58 = re.compile(r"^[1-9A-HJ-NP-Za-km-z]+$")
_RE_BASE64 = re.compile(
r"^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$"
)


@validator
def base16(value: str, /):
Expand All @@ -25,7 +33,7 @@ def base16(value: str, /):
(Literal[True]): If `value` is a valid base16 encoding.
(ValidationError): If `value` is an invalid base16 encoding.
"""
return re.match(r"^[0-9A-Fa-f]+$", value) if value else False
return _RE_BASE16.match(value) if value else False


@validator
Expand All @@ -46,7 +54,7 @@ def base32(value: str, /):
(Literal[True]): If `value` is a valid base32 encoding.
(ValidationError): If `value` is an invalid base32 encoding.
"""
return re.match(r"^[A-Z2-7]+=*$", value) if value else False
return _RE_BASE32.match(value) if value else False


@validator
Expand All @@ -67,7 +75,7 @@ def base58(value: str, /):
(Literal[True]): If `value` is a valid base58 encoding.
(ValidationError): If `value` is an invalid base58 encoding.
"""
return re.match(r"^[1-9A-HJ-NP-Za-km-z]+$", value) if value else False
return _RE_BASE58.match(value) if value else False


@validator
Expand All @@ -88,8 +96,4 @@ def base64(value: str, /):
(Literal[True]): If `value` is a valid base64 encoding.
(ValidationError): If `value` is an invalid base64 encoding.
"""
return (
re.match(r"^(?:[A-Za-z0-9+/]{4})*(?:[A-Za-z0-9+/]{2}==|[A-Za-z0-9+/]{3}=)?$", value)
if value
else False
)
return _RE_BASE64.match(value) if value else False
45 changes: 29 additions & 16 deletions src/validators/finance.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def _cusip_checksum(cusip: str):
else:
return False

# Check digit (position 8) must be strictly numeric per CUSIP spec
if idx == 8 and not (c >= "0" and c <= "9"):
return False

if idx & 1:
val += val

Expand All @@ -31,24 +35,33 @@ def _cusip_checksum(cusip: str):
return (check % 10) == 0


def _isin_checksum(value: str):
check, val = 0, None
def _isin_checksum(value: str) -> bool:
"""Validate ISIN checksum per ISO 6166 using the Luhn algorithm.

for idx in range(12):
c = value[idx]
if c >= "0" and c <= "9" and idx > 1:
val = ord(c) - ord("0")
elif c >= "A" and c <= "Z":
val = 10 + ord(c) - ord("A")
elif c >= "a" and c <= "z":
val = 10 + ord(c) - ord("a")
Each character is expanded to its numeric value (A=10, B=11, …, Z=35),
then the Luhn check is applied to the resulting digit string.
"""
# Expand each character to digit(s)
digits = ""
for c in value:
if c.isdigit():
digits += c
elif c.isupper():
digits += str(ord(c) - ord("A") + 10)
else:
return False

if idx & 1:
val += val

return (check % 10) == 0
return False # lowercase or invalid char

# Luhn check over the expanded digit string
total, alt = 0, False
for d in reversed(digits):
n = int(d)
if alt:
n *= 2
if n > 9:
n -= 9
total += n
alt = not alt
return total % 10 == 0


@validator
Expand Down
20 changes: 14 additions & 6 deletions src/validators/hashes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@
# local
from .utils import validator

# Perf: compile regex at module level — avoids recompilation on every call
_RE_MD5 = re.compile(r"^[0-9a-f]{32}$", re.IGNORECASE)
_RE_SHA1 = re.compile(r"^[0-9a-f]{40}$", re.IGNORECASE)
_RE_SHA224 = re.compile(r"^[0-9a-f]{56}$", re.IGNORECASE)
_RE_SHA256 = re.compile(r"^[0-9a-f]{64}$", re.IGNORECASE)
_RE_SHA384 = re.compile(r"^[0-9a-f]{96}$", re.IGNORECASE)
_RE_SHA512 = re.compile(r"^[0-9a-f]{128}$", re.IGNORECASE)


@validator
def md5(value: str, /):
Expand All @@ -25,7 +33,7 @@ def md5(value: str, /):
(Literal[True]): If `value` is a valid MD5 hash.
(ValidationError): If `value` is an invalid MD5 hash.
"""
return re.match(r"^[0-9a-f]{32}$", value, re.IGNORECASE) if value else False
return _RE_MD5.match(value) if value else False


@validator
Expand All @@ -46,7 +54,7 @@ def sha1(value: str, /):
(Literal[True]): If `value` is a valid SHA1 hash.
(ValidationError): If `value` is an invalid SHA1 hash.
"""
return re.match(r"^[0-9a-f]{40}$", value, re.IGNORECASE) if value else False
return _RE_SHA1.match(value) if value else False


@validator
Expand All @@ -67,7 +75,7 @@ def sha224(value: str, /):
(Literal[True]): If `value` is a valid SHA224 hash.
(ValidationError): If `value` is an invalid SHA224 hash.
"""
return re.match(r"^[0-9a-f]{56}$", value, re.IGNORECASE) if value else False
return _RE_SHA224.match(value) if value else False


@validator
Expand All @@ -91,7 +99,7 @@ def sha256(value: str, /):
(Literal[True]): If `value` is a valid SHA256 hash.
(ValidationError): If `value` is an invalid SHA256 hash.
"""
return re.match(r"^[0-9a-f]{64}$", value, re.IGNORECASE) if value else False
return _RE_SHA256.match(value) if value else False


@validator
Expand All @@ -115,7 +123,7 @@ def sha384(value: str, /):
(Literal[True]): If `value` is a valid SHA384 hash.
(ValidationError): If `value` is an invalid SHA384 hash.
"""
return re.match(r"^[0-9a-f]{96}$", value, re.IGNORECASE) if value else False
return _RE_SHA384.match(value) if value else False


@validator
Expand All @@ -140,4 +148,4 @@ def sha512(value: str, /):
(Literal[True]): If `value` is a valid SHA512 hash.
(ValidationError): If `value` is an invalid SHA512 hash.
"""
return re.match(r"^[0-9a-f]{128}$", value, re.IGNORECASE) if value else False
return _RE_SHA512.match(value) if value else False
Loading