Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions nats-jwt/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[build-system]
requires = ["setuptools>=61.0"]
build-backend = "setuptools.build_meta"

[project]
name = "nats-jwt"
version = "0.0.0"
description = "Python library for NATS JWT claim generation and validation"
readme = "README.md"
requires-python = ">=3.11"
license = "Apache-2.0"
keywords = ["nats", "jwt", "claims", "authentication"]
authors = [
{ name = "Casper Beyer", email = "[email protected]" },
]
classifiers = [
"Development Status :: 4 - Beta",
"Programming Language :: Python",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: Implementation :: CPython",
"Programming Language :: Python :: Implementation :: PyPy",
]
dependencies = []

[project.optional-dependencies]
nkeys = ["nkeys"]

[project.urls]
Documentation = "https://github.com/nats-io/nats.py"
Issues = "https://github.com/nats-io/nats.py/issues"
Source = "https://github.com/nats-io/nats.py"

[tool.setuptools.packages.find]
where = ["src"]
namespaces = true

[tool.uv]
dev-dependencies = [
"pytest>=7.0.0",
"nkeys",
]

[tool.pytest.ini_options]
testpaths = ["tests"]

[tool.ruff.lint]
ignore = ["TRY301", "S104"]
Empty file.
Empty file.
103 changes: 103 additions & 0 deletions nats-jwt/src/nats/jwt/accounts/limits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
from dataclasses import dataclass
from typing import Dict, Optional

from nats.jwt.flatten_model import FlatteningModel


@dataclass
class NatsLimits:
data: Optional[int] = None
payload: Optional[int] = None
subs: Optional[int] = None


class AccountLimits(FlatteningModel):
imports: Optional[
int] # `json:"imports,omitempty"` // Max number of imports
exports: Optional[
int] # `json:"exports,omitempty"` // Max number of exports
wildcards: Optional[
bool
] # `json:"wildcards,omitempty"` // Are wildcards allowed in exports
disallow_bearer: Optional[
bool
] # `json:"disallow_bearer,omitempty"` // User JWT can't be bearer token
conn: Optional[
int
] # `json:"conn,omitempty"` // Max number of active connections
leaf: Optional[
int
] # `json:"leaf,omitempty"` // Max number of active leaf node connections

def __init__(
self,
imports: Optional[int] = None,
exports: Optional[int] = None,
wildcards: Optional[bool] = None,
disallow_bearer: Optional[bool] = None,
conn: Optional[int] = None,
leaf: Optional[int] = None,
):
self.imports = imports
self.exports = exports
self.wildcards = wildcards
self.disallow_bearer = disallow_bearer
self.conn = conn
self.leaf = leaf


class JetStreamLimits(FlatteningModel):
mem_storage: Optional[int] = None
disk_storage: Optional[int] = None
streams: Optional[int] = None
consumer: Optional[int] = None
mem_max_stream_bytes: Optional[int] = None
disk_max_stream_bytes: Optional[int] = None
max_bytes_required: Optional[bool] = None
max_ack_pending: Optional[int] = None

def __init__(
self,
mem_storage: Optional[int] = None,
disk_storage: Optional[int] = None,
streams: Optional[int] = None,
consumer: Optional[int] = None,
mem_max_stream_bytes: Optional[int] = None,
disk_max_stream_bytes: Optional[int] = None,
max_bytes_required: Optional[bool] = None,
max_ack_pending: Optional[int] = None,
):
self.mem_storage = mem_storage
self.disk_storage = disk_storage
self.streams = streams
self.consumer = consumer
self.mem_max_stream_bytes = mem_max_stream_bytes
self.disk_max_stream_bytes = disk_max_stream_bytes
self.max_bytes_required = max_bytes_required
self.max_ack_pending = max_ack_pending


JetStreamTieredLimits = Dict[str, JetStreamLimits]


class OperatorLimits(FlatteningModel):
nats_limits: NatsLimits
account_limits: AccountLimits
jetstream_limits: Optional[JetStreamLimits]
tiered_limits: Optional[JetStreamTieredLimits
] # `json:"tiered_limits,omitempty"`

def __init__(
self,
nats_limits: NatsLimits,
account_limits: AccountLimits,
jetstream_limits: Optional[JetStreamLimits] = None,
tiered_limits: Optional[JetStreamTieredLimits] = None,
):
self.nats_limits = nats_limits
self.account_limits = account_limits
self.jetstream_limits = jetstream_limits
self.tiered_limits = tiered_limits

class META:
unflattened_fields = [('tiered_limits', 'tiered_limits')]
104 changes: 104 additions & 0 deletions nats-jwt/src/nats/jwt/accounts/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from dataclasses import dataclass, field, fields
from typing import Dict, List, Optional, Union

from nats.jwt.accounts.limits import OperatorLimits
from nats.jwt.claims.generic import GenericFields
from nats.jwt.exports import Exports
from nats.jwt.flatten_model import FlatteningModel
from nats.jwt.imports import Imports
from nats.jwt.signingkeys import SigningKeys
from nats.jwt.types import Info, Permissions, Types


@dataclass
class WeightedMapping:
subject: str
weight: Optional[
int] = None # uint8 is mapped to int, with Optional for omitempty
cluster: Optional[str] = None


@dataclass
class ExternalAuthorization:
auth_users: Optional[List[str]]
allowed_accounts: Optional[List[str]]
xkey: Optional[str]


@dataclass
class MsgTrace:
# Destination is the subject the server will send message traces to
# if the inbound message contains the "traceparent" header and has
# its sampled field indicating that the trace should be triggered.
dest: Optional[str] # `json:"dest,omitempty"`

# Sampling is used to set the probability sampling, that is, the
# server will get a random number between 1 and 100 and trigger
# the trace if the number is lower than this Sampling value.
# The valid range is [1..100]. If the value is not set Validate()
# will set the value to 100.
sampling: Optional[int] # `json:"sampling,omitempty"`


class Account(FlatteningModel):
imports: Optional[Imports] # `json:"imports,omitempty"`
exports: Optional[Exports] # `json:"exports,omitempty"`
limits: Optional[OperatorLimits] # `json:"limits,omitempty"`
signing_keys: Optional[SigningKeys] # `json:"signing_keys,omitempty"`
revocations: Optional[Dict[str, int]] # `json:"revocations,omitempty"`
default_permissions: Optional[Permissions
] # `json:"default_permissions,omitempty"`
mappings: Optional[Dict[str,
WeightedMapping]] # `json:"mappings,omitempty"`
authorization: Optional[ExternalAuthorization
] # `json:"authorization,omitempty"`
trace: Optional[MsgTrace] # `json:"trace,omitempty"`
cluster_traffic: Optional[str] # `json:"cluster_traffic,omitempty"`

info: Info = field(default_factory=Info)
generic_fields: GenericFields = field(default_factory=GenericFields)

def __init__(
self,
imports: Optional[Imports] = None,
exports: Optional[Exports] = None,
limits: Optional[OperatorLimits] = None,
signing_keys: Optional[SigningKeys] = None,
revocations: Optional[Dict[str, int]] = None,
default_permissions: Optional[Permissions] = None,
mappings: Optional[Dict[str, WeightedMapping]] = None,
authorization: Optional[ExternalAuthorization] = None,
trace: Optional[MsgTrace] = None,
cluster_traffic: Optional[str] = None,
info: Optional[Info] = None,
generic_fields: Optional[GenericFields] = None
):
self.imports = imports
self.exports = exports
self.limits = limits
self.signing_keys = signing_keys
self.revocations = revocations
self.default_permissions = default_permissions
self.mappings = mappings
self.authorization = authorization
self.trace = trace
self.cluster_traffic = cluster_traffic

self.info = info if info else Info()
self.generic_fields = generic_fields if generic_fields else GenericFields(
type=Types.Account, version=2
)

class Meta:
unflatten_fields = [
("imports", "imports"),
("exports", "exports"),
("limits", "limits"),
("signing_keys", "signing_keys"),
("revocations", "revocations"),
("default_permissions", "default_permissions"),
("mappings", "mappings"),
("authorization", "authorization"),
("trace", "trace"),
("cluster_traffic", "cluster_traffic"),
]
Empty file.
20 changes: 20 additions & 0 deletions nats-jwt/src/nats/jwt/claims/generic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from typing import List, Optional

from nats.jwt.flatten_model import FlatteningModel
from nats.jwt.types import Types


class GenericFields(FlatteningModel):
tags: Optional[List[str]]
type: Optional[Types]
version: Optional[int]

def __init__(
self,
tags: Optional[List[str]] = None,
type: Optional[Types] = None,
version: Optional[int] = None,
):
self.tags = tags
self.type = type
self.version = version
60 changes: 60 additions & 0 deletions nats-jwt/src/nats/jwt/claims/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from typing import List, Optional, Union

from nats.jwt.accounts.models import Account
from nats.jwt.flatten_model import FlatteningModel
from nats.jwt.operator.models import Operator
from nats.jwt.types import Types
from nats.jwt.users.models import User


class Claims(FlatteningModel):
# Claims Data
exp: Optional[int] # Expires int64 `json:"exp,omitempty"`
jti: Optional[str] # ID string `json:"jti,omitempty"`
iat: Optional[int] # IssuedAt int64 `json:"iat,omitempty"`
iss: Optional[str] # Issuer string `json:"iss,omitempty"`
name: Optional[str] # Name string `json:"name,omitempty"`
nbf: Optional[int] # NotBefore int64 `json:"nbf,omitempty"`
sub: Optional[str] # Subject string `json:"sub,omitempty"`

# Nats Data
nats: Optional[Union[User, Account, Operator]]
issuer_account: Optional[Union[str, bytes]]

def __init__(
self,
exp: Optional[int] = None,
jti: Optional[str] = None,
iat: Optional[int] = None,
iss: Optional[str] = None,
name: Optional[str] = None,
nbf: Optional[int] = None,
sub: Optional[str] = None,
nats: Optional[Union[User, Account, Operator]] = None,
issuer_account: Optional[Union[str, bytes]] = None,
):
self.exp: Optional[int] = exp
self.jti: Optional[str] = jti
self.iat: Optional[int] = iat
self.iss: Optional[str] = iss
self.name: Optional[str] = name
self.nbf: Optional[int] = nbf
self.sub: Optional[str] = sub

self.nats = nats
self.issuer_account = issuer_account

class Meta:
unflatten_fields = [('nats', 'nats')]


class UserClaims(Claims):
pass


class AccountClaims(Claims):
pass


class OperatorClaims(Claims):
pass
Loading
Loading