Threading together fine-grained auth with Stytch and Cerbos

Published by Sam Lock on December 08, 2022
image

Stytch is an all-in-one platform for authentication and authorization. They provide a fully integrated suite offering a variety of auth solutions; including magic-links, passwords, OTP, session management, other hosted logins and more. Cerbos is an open source access control system that can handle all of your complicated business logic through simple configuration. It allows you to plug it into your existing stack as a decoupled service.

In this demo, we'll be setting up a Python FastAPI service, using Stytch's Email Magic Links for authentication, and Session Management for "just-in-time" identity verification. We'll be rolling it all up with Cerbos' decoupled authorization for fine-grained access control.

Prerequisites

You'll need the following:

You can find the code for this demo here.

Setting up your Stytch project

This is easy-peasy:

  1. Go to the Stytch Dashboard, sign up (if you haven't) and log in.
  2. Retrieve your project_id and secret from the API keys page. Set them as the following environment variables, respectively:
    STYTCH_PROJECT_ID
    STYTCH_SECRET
    
  3. Go to the Redirect URLs page, and set the Login and Sign-up redirect URLs to http://localhost:3000/callback.

A note on role management...

Stytch doesn't have an explicit method to store roles against an identity. What it does have, however, is support for arbitrary JSON objects within specific metadata fields.

It offers two types of metadata, trusted_metadata and untrusted_metadata. We only want Direct API integrations to be able to write to our metadata, therefore we use trusted_metadata. The metadata objects can store up to 20 top level keys. We'll use one of them to store a list of roles.

The code

Some initial points:

Let's take a look at some of the more interesting components in more detail below.

Streamlining (de)serialization

The User and TrustedMetadata classes here are Python dataclass instances:

@dataclass_json
@dataclass
class TrustedMetadata:
    roles: set[str] = field(default_factory=set)


@dataclass_json
@dataclass
class User:
    user_id: str
    trusted_metadata: TrustedMetadata = field(default_factory=TrustedMetadata)

    @property
    def roles(self) -> set[str]:
        return self.trusted_metadata.roles

    def add_role(self, role: str):
        self.trusted_metadata.roles.add(role)

Notice how TrustedMetadata is nested as a child of User. Structuring our dataclasses like this allows us to map to the expected structure of requests and responses to and from Stytch. This in turn allows us to effortlessly serialize/deserialize (and validate) payloads into convenient data structures, via User.from_dict(...) and User.to_dict(...). We'll see how this works below.

Login/sign-up

Navigating to the root page at localhost:3000/ will hit the default / route, and trigger the index function. If you're not already signed in, it'll load the sign-in page (otherwise, on successful authentication, you'll find yourself redirected to the /user page - more on that below).

You'll notice paramaters in the login_or_create_user function signature that default to an instance of Form():

@app.post("/login_or_create_user")
async def login_or_create_user(request: Request, email: str = Form(), role: str = Form()):
    ...

FastAPI maps these parameter names to those in HTML:

<form action="{{ url_for('login_or_create_user') }}" method="POST">
  <input type="text" name="email" placeholder="example@email.com" />
  <label for="role">Role</label>
  <select name="role" id="role">
    <option value="user">User</option>
    <option value="admin">Admin</option>
  </select>
  <button type="submit">Sign In</button>
</form>

On submit, FastAPI retrieves the values for each, and passes them in to the route handler functions as the respective parameters. Magic 👍.

Next, we need to call the login_or_create Stytch endpoint with the provided email. Behind the scenes, Stytch will then attempt to lookup the user, or create a new one if they don't exist. It'll then send them a magic link by email. This is as simple as:

resp = stytch_client.magic_links.email.login_or_create(
    email=email,
    login_magic_link_url=MAGIC_LINK_URL,
    signup_magic_link_url=MAGIC_LINK_URL,
)

if resp.status_code != 200:
    raise Exception

...

After a successful call, we can infer that a user now exists within Stytch. It's at this point that we send a separate request to populate that user with an appropriate role, via the push_role_to_stytch function:

def push_role_to_stytch(user_id: str, role: str):
    u = User(user_id=user_id)
    u.add_role(role)
    try:
        resp = stytch_client.users.update(
            **u.to_dict(),
        )
        ...

You can see how a User instance is created, a role added as a top-level key in the trusted_metadata dict, and the User serialized to json (the update method on the Stytch client expects a dict, and handles serialization itself).

Authentication callback

We now have a User stored in Stytch (with roles attributed), and an email with the magic link sitting in the User's inbox ✔️. What happens when the link is clicked, I hear you ask? Well, let me explain:

Stytch knows where to redirect to, because we specified the URLs in the configuration step above. Clicking the link redirects to our callback function. There's a few steps here, so I'll break them down.

The session_token is passed as a query parameter. It's retrieved from the request object passed in to the route function:

@app.route("/callback")
async def callback(request: Request):
    token = request.query_params["token"]
    ...

We use this token to authenticate the user against Stytch's session auth API, along with a couple of other parameters:

try:
    data = {
        "token": request.query_params["token"],
        "session_duration_minutes": 60,
    }

    # If we already have a local session token, we send it with the authenticate request. Stripe
    # will refresh it if it's valid
    if (t := request.session.get("session_token")) is not None:
        data["session_token"] = t

    resp = stytch_client.magic_links.authenticate(**data)

    if resp.status_code != 200:
        raise Exception

except Exception:
    ...

You'll notice that we retrieve a locally stored session token, if available, and add that to the authentication payload. This isn't necessary (a successful request will return a new token), but it allows us to extend the duration of the existing token. The other argument is the session duration (how long the token will be valid for) - without it, a session will not be created.

After each successful authentication request, we set session_token in the session middleware, so we can use it to authenticate on protected endpoints later on. We then redirect to one of these protected endpoints: /user:

Protected endpoint -> /user

This route has a double-whammy of protection. Firstly, we check that the user has permission to access the route at all, using Stytches session authentication. We do this with a handy FastAPI feature called "dependables". Note the get_user_from_session function in the snippet below:

def get_user_from_session(request: Request) -> User:
    token = request.session.get("session_token")
    if token is None:
        raise HTTPException(
            status_code=status.HTTP_307_TEMPORARY_REDIRECT,
            headers={"Location": request.url_for("index")},
        )

    try:
        resp = stytch_client.sessions.authenticate(
            session_token=token,
        )
    except Exception:
        request.session.pop("session_token")
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN, detail="Session token invalid"
        )

    return User.from_dict(resp.json()["user"])

@app.get("/user", response_class=HTMLResponse)
async def user(request: Request, user: User = Depends(get_user_from_session)):
    ...

A FastAPI "dependable" is a function that accepts the same parameters that the path operation function takes, and returns anything we might need within that function.

In the example above, each time the /user route is hit, it automatically invokes the get_user_from_session function, which retrieves the locally stored session token; verifying it against Stytch's session authentication API. If unsuccessful, the whole request will fail and return the appropriate error message, or redirect the user to where they need to go. If successful, the dependable returns an instantiated instance of the User dataclass as a dependency to the route handler.

Cerbos policies

Cerbos policies are simple and extensible configuration files, written in YAML or JSON. Cerbos PDPs can load them from a number of sources, and automatically update themselves when the policies change. Cerbos is stateless, so once instantiated, any authorization requests made to it are evaluated using only the contextual information contained in the request.

Let's take a quick look at the policy defined in this demo (the policy directory is here):

apiVersion: api.cerbos.dev/v1
resourcePolicy:
  version: default
  resource: contact
  rules:
    - actions: ["read", "create"]
      effect: EFFECT_ALLOW
      roles:
        - admin
        - user

    - actions: ["update", "delete"]
      effect: EFFECT_ALLOW
      roles:
        - admin

    - actions: ["update", "delete"]
      effect: EFFECT_ALLOW
      roles:
        - user
      condition:
        match:
          expr: request.resource.attr.owner_id == request.principal.id

The resource: contact specifies that this particular policy applies to resources of type: "contact". Below that, you can see three separate rule groupings; for each, you might notice that they apply to roles of either admin or user, which are concrete roles often defined within your IdP (we stored them in trusted_metadata above). Take a look at the first grouping:

- actions: ["read", "create"]
  effect: EFFECT_ALLOW
  roles:
    - admin
    - user

This simply states that anybody/anything that has the role admin or user is able to carry out the actions read or create on a resource of type "contact". Similarly, if you take a look at the second grouping, it gives admin users the ability to update or delete contacts. The third grouping is a little more interesting:

- actions: ["update", "delete"]
  effect: EFFECT_ALLOW
  roles:
    - user
  condition:
    match:
      expr: request.resource.attr.owner_id == request.principal.id

The condition block is what's key here, and is a glimpse into what makes Cerbos as powerful as it is. The condition is evaluated at run-time against the contextual data passed in the request; it specifies that any user is given additional permissions (update or delete) on a resource providing that they are the owner of the resource. This is achieved by passing owner_id and other necessary metadata in the request. We'll see how this works below.

Authorization with Cerbos

Now we know the user is who they say they are and what roles they have, we can check which user permissions they have against a specific resource using the Cerbos PDP.

Firstly, we construct a Principal object using the User data:

principal = Principal(
    user.user_id,
    roles=user.roles,
)

And next, we create a ResourceList of the resources they are trying to access, along with the actions they need to perform (note: in normal operation, you'd likely retrieve your resource data from a datastore - here, we're just forming them in code for demo purposes):

actions = {"read", "update", "delete"}
resource_list = ResourceList(
    resources=[
        ResourceAction(
            Resource(
                "abc123",
                "contact",
                attr={
                    "owner_id": user.user_id,
                },
            ),
            actions=actions,
        ),
        ResourceAction(
            Resource(
                "def456",
                "contact",
                attr={
                    "owner_id": "other_user_id",
                },
            ),
            actions=actions,
        ),
    ]
)

Notice the owner_id attribute passed in each ResourceAction. This is what is ultimately evaluated against the principal when determining if someone is the owner of the resource (e.g. the third part of the policy above!).

The principal and resource_list objects are then used to construct the request to Cerbos:

with CerbosClient(host=CERBOS_HOST) as c:
    try:
        resp = c.check_resources(principal=principal, resources=resource_list)
        resp.raise_if_failed()
    except Exception:
        logger.exception("cerbos error")
        request.session.pop(SESSION_TOKEN_KEY, None)
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN, detail="Unauthorized"
        )

Cerbos evaluates the principal against each resource in the list, and returns the access decisions in the response. And with that, we're done! 🎉


As always, if you have any questions or feedback, please join our friendly and active Slack community!

DOCUMENTATION
GUIDE
INTEGRATION

Book a free Policy Workshop to discuss your requirements and get your first policy written by the Cerbos team