werkzeug icon indicating copy to clipboard operation
werkzeug copied to clipboard

Improvements to authentication

Open jab opened this issue 6 years ago • 9 comments

Thanks for reviewing #1755, @davidism. Now that that's merged, I'm following up as promised to pose whether we should make any changes to werkzeug.auth.WWWAuthenticateMixin / werkzeug.datastructures.WWWAuthenticate, to improve Werkzeug's support for authentication.

It looks like the current APIs were designed for a simpler time:

  1. They assume that a service will only support a single authentication scheme (so they don't actually suffer from the interop issue in #1755).
  2. They only support using either Basic or Digest auth. The docstring for WWWAuthenticate.type even says "HTTP currently specifies Basic and Digest", which is ~10 years out of date (the spec now points to the IANA Authentication Scheme Registry, which lists several more schemes, and in particular Bearer and Negotiate have been quite common for a while now).

On the other hand, it may be telling that no one has asked about this until now. Please feel free to close this issue without further discussion if there is too little evidence of demand for these changes at this time.

jab avatar Mar 21 '20 14:03 jab

I'm definitely ok with revisiting this. Fairly sure I've left some comments about not having Bearer support in the past.

davidism avatar Mar 21 '20 14:03 davidism

Thanks, good to know! I should have said, this is not actually something I need at this time, since I'm handling the unsupported authentication schemes in some lightweight middleware that doesn't require Werkzeug. So I probably won't submit a PR for this anytime soon (unless this year's PyCon sprint is still happening virtually and I can make it). But I'd be happy to help review any proposed new designs or implementations if someone else wants to work on it sooner.

jab avatar Mar 21 '20 15:03 jab

@jab do you have any ideas for what the API should look like? If we can figure that out, someone else might be able to pick this up and implement it.

davidism avatar Jan 21 '21 17:01 davidism

One thing I noticed about the current API is that we have only a single type that's trying to represent all supported Authorization headers – and likewise for WWW-Authenticate – that just unions together all the properties for all the supported authentication schemes. The rest of this comment probably writes itself, but just to be explicit, we currently have something like:

class Authorization:
    def username(...):   # used for basic auth and digest auth
    def password(...):   # only used for basic auth
    def nonce(...):      # only used for digest auth
    def nc(...):         # only used for digest auth
    def cnonce(...):     # only used for digest auth
    def response(...):   # only used for digest auth
    def opaque(...):     # only used for digest auth
    def qop(...):        # only used for digest auth

    def to_header(self):
        if self.type == "basic":
            return ...
        elif self.type == "digest":
            return ...

I wonder if using subclasses would improve this design, something more like:

class Authorization(ABC):
    @abstractmethod
    def to_header(self):
        ...


class BasicAuth(Authorization):
    def username(...): ...
    def password(...): ...
    def to_header(self):
        # basic auth serialization logic goes here


class DigestAuth(Authorization):
    def username(...): ...
    def nonce(...): ...
    ...
    def to_header(self):
        # digest auth serialization logic goes here

Especially if we're now considering adding support for more schemes.

In case it's helpful, https://github.com/deshaw/wsgi-kerberos/blob/master/wsgi_kerberos.py is some open source middleware I've worked on that implements Kerberos authentication in WSGI middleware. In that case, WWW-Authenticate headers look like Negotiate, and Authorization headers just look like negotiate <token>. If WSGI-Kerberos ever wanted to use Werkzeug to parse/serialize these, it seems better if Werkzeug provided a datastructure that wasn't bloated with the APIs of all the other supported authentication schemes combined.

What do you think?

jab avatar Jan 29 '21 02:01 jab

Splitting it up makes sense. I would only expect the type attribute to be common between them all, and for users to look at that (currently) for whether to use the other attributes.

We also need to parse the headers, and I'm trying to move towards having parse in classes instead of separate http functions. Maybe Authorization.parse still has to do if value.startswith("Basic "): ... elif startswith(...): ....

davidism avatar Jan 29 '21 14:01 davidism

To support user subclasses so we don't have to implement every possible scheme, we'd need Authorization.parse to do something like the following:

class Authorization:
    @classmethod
    def parse(cls, value):
        for sub in cls.__subclasses__():
            if value.startswith(sub.prefix):  # every class needs to define what their value starts with
                return sub.parse(value)

If there's a lot of subclasses it will get a bit slower, and it requires each having a unique prefix, which I'm not 100% sure about. But I can't think of a better way to make this extensible, and I don't anticipate there being so many subclasses that it becomes a problem. And users could always override the authorization property to use their own parse logic, for example if they know they will only ever use Kerberos.

davidism avatar Jan 29 '21 15:01 davidism

Wonder if it's worth using __init_subclass__() here instead. A super simplified version might look like:

from abc import ABC, abstractmethod
from base64 import b64encode, b64decode


class Authorization(ABC):
    __subclass_by_scheme = {}

    def __init_subclass__(cls):
        super().__init_subclass__()
        Authorization.__subclass_by_scheme[cls.scheme] = cls

    @classmethod
    @abstractmethod
    def from_header(cls, header):
        scheme = header.split(None, 1)[0]
        return Authorization.__subclass_by_scheme[scheme].from_header(header)

    @abstractmethod
    def to_header(self):
        raise NotImplementedError


class BasicAuth(Authorization):
    scheme = "Basic"

    def __init__(self, username, password):
        self.username = username
        self.password = password

    @classmethod
    def from_header(cls, header):
        auth_info = header.split(None, 1)[1]
        username, password = b64decode(auth_info).split(b":", 1)
        return cls(username.decode("utf8"), password.decode("utf8"))

    def to_header(self):
        encoded = b64encode(f"{self.username}:{self.password}".encode("utf8")).decode("utf8")
        return f"Basic {encoded}"


>>> basic = Authorization.from_header("Basic dXNlcm5hbWU6cGFzc3dvcmQ=")
>>> basic
<auth.BasicAuth object at 0x1096bc0d0>
>>> basic.to_header()
'Basic dXNlcm5hbWU6cGFzc3dvcmQ='

jab avatar Jan 30 '21 15:01 jab

Oh nice, I forgot they made it possible to do registration without metaclasses, and it's 3.6+.

davidism avatar Jan 30 '21 15:01 davidism

+1 on this improvement. I tried to extend WWWAuthenticate to support the bearer challenge but it was not straightforward to plug in restrictions on character ranges as defined in https://datatracker.ietf.org/doc/html/rfc6750#section-3.

Values for the "scope" attribute (specified in Appendix A.4 of [RFC6749]) MUST NOT include characters outside the set %x21 / %x23-5B / %x5D-7E for representing scope values and %x20 for delimiters between scope values. Values for the "error" and "error_description" attributes (specified in Appendixes A.7 and A.8 of [RFC6749]) MUST NOT include characters outside the set %x20-21 / %x23-5B / %x5D-7E. Values for the "error_uri" attribute (specified in Appendix A.9 of [RFC6749]) MUST conform to the URI-reference syntax and thus MUST NOT include characters outside the set %x21 / %x23-5B / %x5D-7E.

For example, in response to a protected resource request without authentication:

 HTTP/1.1 401 Unauthorized
 WWW-Authenticate: Bearer realm="example"

And in response to a protected resource request with an authentication attempt using an expired access token:

 HTTP/1.1 401 Unauthorized
 WWW-Authenticate: Bearer realm="example",
                   error="invalid_token",
                   error_description="The access token expired"

yhack avatar Apr 05 '22 00:04 yhack

I'm working on this and trying to figure out how to make it nice for type checking. Haven't come up with any good ideas yet. The problem is that we need to parse a string and return a specific subclass, but the string itself is not typing information. So MyPy can't know that Authorization.parse_header("basic asdlkfjadsa") will return a BasicAuth instance. And there's no good way to make request.authorization more specific either. I think users will just have to check the type, this should satisfy MyPy:

auth = request.authorization

if isinstance(auth, BasicAuth):
    return get_user(auth.username, auth.password)
else:
    raise Unauthorized(www_authenticate=WWWAuthenticate("basic"))

davidism avatar Mar 06 '23 22:03 davidism

I've been working on this all day, and had a pretty good interface for different Authorization and WWWAuthenticate subclasses, but now I'm wondering if that's actually a good idea. I wonder if we should go the opposite direction, and make the whole thing less specialized.

In the new system I have, plain Authorization instances don't exist anymore. You (or Werkzeug) call the Authorization.from_header class method, and get back a specific subclass like BasicAuth based on the scheme in the header value. There are various compatibilities in place to deprecate old behavior like checking auth.type or accessing digest params if it's actually basic, etc.

However, this requires defining a different subclass for each auth scheme you might encounter. A client might send a scheme you know nothing about. Should you ignore it? Raise a 400 error? Or maybe you actually do know what it is, but didn't write a whole class for it, because the current system still lets you access auth.type and all the parameters using auth[key].

I think what we should actually do is deprecate all the attributes, and only use the dict interface. We can add an extra attribute called token for schemes like Bearer which only send a token and not parameters; basic auth also does this but we can special case that since we know what the token represents. Given that Werkzeug doesn't actually do anything with auth, I think we should just expose the parsed header and let the user or an extension take care of doing anything with that data. The same goes for the WWWAuthenticate class.

davidism avatar Mar 07 '23 23:03 davidism

Just for reference, here's what I was working on to split into multiple specialized classes, but I didn't end up going this direction.

from __future__ import annotations

import base64
import typing as t
import warnings
from urllib.parse import quote

from werkzeug.datastructures import ImmutableDictMixin
from werkzeug.exceptions import BadRequest
from werkzeug.http import dump_header
from werkzeug.http import parse_dict_header
from werkzeug.http import quote_header_value


class Authorization(ImmutableDictMixin, dict):
    _registry: t.ClassVar[dict[str, t.Type[Authorization]]] = {}
    _scheme = t.ClassVar[str]
    _dict_keys: t.ClassVar[t.Tuple[str]]

    def __init_subclass__(
        cls, scheme: str, _keys: t.Tuple[str] = (), **kwargs: t.Any
    ) -> None:
        super().__init_subclass__(**kwargs)

        if _keys and scheme not in {"basic", "digest"}:
            raise TypeError("Cannot setup legacy dict interface for new scheme.")

        cls._registry[scheme] = cls
        cls._scheme = scheme
        cls._dict_keys = _keys

    def __init__(
        self,
        auth_type: str | None = None,
        data: t.Dict[str, str] | None = None
    ) -> None:
        if auth_type is not None:
            warnings.warn(
                "Creating the base 'Authorization' type is deprecated and will not be"
                " supported in Werkzeug 2.4. Use 'Authorization.from_header' instead.",
                DeprecationWarning,
                stacklevel=2,
            )
            cls = self._registry[auth_type.lower()]
            self.__class__ = cls

            if data is not None:
                cls.__init__(self, **data)

        super().__init__()

        if self._dict_keys:
            dict.update(self, {k: getattr(self, k) for k in self._dict_keys})

    @classmethod
    def from_header(cls, value: str | None) -> Authorization | None:
        scheme, _, parameters = value.partition(" ")
        scheme = scheme.lower()

        if scheme not in cls._registry:
            return None

        return cls._registry[scheme].from_parameters(parameters)

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        raise NotImplementedError

    def to_header(self) -> str:
        raise NotImplementedError

    @property
    def type(self) -> str:
        """The authentication scheme.

        .. deprecated:: 2.3
            Will be removed in Werkzeug 2.4. Use ``isinstance`` to check for a specific
            auth class instead.
        """
        warnings.warn(
            "'Authorization.type' is deprecated and will be removed in Werkzeug 2.4."
            f" Use 'isinstance(auth, {type(self).__name__})' to check for this type in"
            " a way that static type checking tools understand.",
            DeprecationWarning,
            stacklevel=2,
        )
        return self._scheme

    def __str__(self) -> str:
        # Show as object instead of deprecated dict.
        return object.__str__(self)

    def __repr__(self) -> str:
        # Show as object instead of deprecated dict.
        return object.__repr__(self)

    def __getattr__(self, name: str) -> t.Any:
        if name not in BasicAuth._dict_keys + DigestAuth._dict_keys:
            raise AttributeError(name)

        warnings.warn(
            f"'{name}' is not an attribute of this auth scheme. Accessing attributes of"
            " other schemes is deprecated and will be removed in Werkzeug 2.4.",
            DeprecationWarning,
            stacklevel=2,
        )
        return None


def _deprecated_dict_method(f):
    def wrapper(*args, **kwargs):
        warnings.warn(
            "Treating 'Authorization' as a dict is deprecated and will"
            " be removed in Werkzeug 2.4.",
            DeprecationWarning,
            stacklevel=2,
        )
        return f(*args, **kwargs)

    return wrapper


for name in (
    "__iter__",
    "__len__",
    "__getitem__",
    "__contains__",
    "clear",
    "copy",
    "get",
    "items",
    "keys",
    "__reversed__",
    "values",
    "__or__",
    "__ror__",
):
    setattr(Authorization, name, _deprecated_dict_method(getattr(dict, name)))


class BasicAuth(Authorization, scheme="basic", _keys=("username", "password")):
    def __init__(self, username: str, password: str) -> None:
        self.username = username
        self.password = password
        super().__init__()

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        value = base64.b64decode(value).decode()
        username, _, password = value.partition(":")
        return cls(username, password)

    def to_header(self) -> str:
        parameters = base64.b64encode(
            f"{self.username}:{self.password}".encode()
        ).decode()
        return f"Basic {parameters}"


class DigestAuth(
    Authorization,
    scheme="digest",
    _keys=(
        "username",
        "realm",
        "uri",
        "algorithm",
        "nonce",
        "nc",
        "cnonce",
        "qop",
        "response",
        "opaque",
        "userhash"
    )
):
    def __init__(
        self,
        *,
        username: str,
        realm: str,
        uri: str,
        nonce: str,
        response: str,
        nc: int | None = None,
        cnonce: str | None = None,
        algorithm: str | None,
        qop: str | None = None,
        opaque: str | None = None,
        userhash: bool = False
    ) -> None:
        self.username = username
        self.realm = realm
        self.uri = uri
        self.algorithm = algorithm
        self.nonce = nonce
        self.nc = nc
        self.cnonce = cnonce
        self.qop = qop
        self.response = response
        self.opaque = opaque
        self.userhash = userhash
        super().__init__()

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        data: t.Dict[str, t.Any] = parse_dict_header(value)

        if "userhash" in data:
            data["userhash"] = data["userhash"] == "true"

        try:
            return cls(**data)
        except TypeError:
            raise BadRequest("Missing parameters for Digest auth.")

    def to_header(self) -> str:
        try:
            self.username.encode("ascii")
            parameters = {"username": self.username}
        except UnicodeError:
            parameters = {"username*": f"UTF-8''{quote(self.username, safe='')}"}

        parameters.update(
            realm=self.realm,
            uri=self.uri,
            algorithm=self.algorithm,
            nonce=self.nonce,
            nc=self.nc,
            cnonce=self.cnonce,
            qop=self.qop,
            response=self.response,
            opaque=self.opaque,
            userhash="true" if self.userhash else "false"
        )

        items = []

        for key, value in parameters.items():
            if value is None:
                continue

            quoted = quote_header_value(value)

            if (
                key in {"realm", "uri", "nonce", "cnonce", "response", "opaque"}
                and value == quoted
            ):
                quoted = f'"{value}"'

            items.append(f"{key}={quoted}")

        return f"Digest {', '.join(items)}"


class BearerAuth(Authorization, scheme="bearer"):
    def __init__(self, token: str) -> None:
        self.token = token
        super().__init__()

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        return cls(value)

    def to_header(self) -> str:
        return f"Bearer {self.token}"


class WWWAuthenticate(dict):
    _registry: t.ClassVar[t.Dict[str, WWWAuthenticate]] = {}

    def to_header(self) -> str:
        raise NotImplementedError

    @classmethod
    def from_header(cls, value: str) -> WWWAuthenticate:
        scheme, _, parameters = value.partition(" ")
        return cls._registry[scheme].from_parameters(value)

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        raise NotImplementedError


class BasicChallenge(WWWAuthenticate):
    def __init__(self, realm: str) -> None:
        self.realm = realm
        super().__init__()

    def to_header(self):
        parameters = dump_header({"realm": self.realm, "charset": "UTF-8"})
        return f"Basic {parameters}"

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        parameters = parse_dict_header(value)
        return cls(parameters["realm"])


class DigestChallenge(WWWAuthenticate):
    def __init__(
        self,
        realm: str | None,
        algorithm: str | None,
        domain: list[str] | str | None = None,
        nonce: str | None = None,
        opaque: str | None = None,
        stale: bool | None = None,
        qop: list[str] | str | None = None,
        userhash: bool = False,
    ) -> None:
        self.realm = realm

        if domain is None:
            domain = []
        elif isinstance(domain, str):
            domain = domain.split()

        self.domain = domain
        self.nonce = nonce
        self.opaque = opaque
        self.stale = stale
        self.algorithm = algorithm

        if qop is None:
            qop = ["auth"]
        elif isinstance(qop, str):
            qop = qop.split()

        self.qop: t.List[str] = qop
        self.userhash = userhash
        super().__init__()

    def to_header(self) -> str:
        pass

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        parameters = parse_dict_header(value)

        try:
            return cls(**parameters)
        except TypeError:
            raise BadRequest("Unknown parameters for Digest challenge.")


class BearerChallenge(WWWAuthenticate):
    def __init__(
        self,
        realm: str | None = None,
        scope: str | None = None,
        error: str | None = None,
        error_description: str | None = None,
        error_uri: str | None = None,
        **kwargs: t.Any,
    ):
        self.realm = realm
        self.scope = scope
        self.error = error
        self.error_description = error_description
        self.error_uri = error_uri
        self.additional = kwargs
        super().__init__()

    def to_header(self) -> str:
        parameters = {}

        for key in "realm", "scope", "error", "error_description", "error_uri":
            value = getattr(self, key)

            if value is None:
                continue

            parameters[key] = quote_header_value(value)

        parameters.update(self.additional)
        return f"Bearer {dump_header(parameters)}"

    @classmethod
    def from_parameters(cls, value: str) -> t.Self:
        parameters = parse_dict_header(value)
        return cls(**parameters)

davidism avatar Mar 10 '23 15:03 davidism