如何在你的 API 服务或后端校验访问令牌 (Access token)
校验访问令牌 (Access token) 是在 Logto 中实施基于角色的访问控制 (RBAC)的关键步骤。本指南将带你完成在后端 / API 中验证 Logto 签发的 JWT,包括校验签名、发行者 (Issuer)、受众 (Audience)、过期时间、权限 (Scopes) 以及组织 (Organization) 上下文。
开始之前
- 本指南假设你已熟悉 Logto 的基于角色的访问控制 (RBAC) 概念。
- 如果你正在保护 API 资源,本指南假设你已阅读 保护全局 API 资源 指南。
- 如果你正在保护应用内功能或流程(非 API 权限),本指南假设你已阅读 保护组织 (Organization)(非 API)权限 指南。
- 如果你正在保护组织级 API 资源,本指南假设你已阅读 保护组织级 API 资源 指南。
步骤 1:初始化常量和工具方法
在你的代码中定义必要的常量和工具方法,用于处理令牌提取和校验。一个有效的请求必须包含 Authorization
头,格式为 Bearer <access_token>
。
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
import { IncomingHttpHeaders } from 'http';
const JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks';
const ISSUER = 'https://your-tenant.logto.app/oidc';
export class AuthInfo {
constructor(
public sub: string,
public clientId?: string,
public organizationId?: string,
public scopes: string[] = [],
public audience: string[] = []
) {}
}
export class AuthorizationError extends Error {
name = 'AuthorizationError';
constructor(
message: string,
public status = 403
) {
super(message);
}
}
export function extractBearerTokenFromHeaders({ authorization }: IncomingHttpHeaders): string {
const bearerPrefix = 'Bearer ';
if (!authorization) {
throw new AuthorizationError('Authorization header is missing', 401);
}
if (!authorization.startsWith(bearerPrefix)) {
throw new AuthorizationError(`Authorization header must start with "${bearerPrefix}"`, 401);
}
return authorization.slice(bearerPrefix.length);
}
JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks'
ISSUER = 'https://your-tenant.logto.app/oidc'
class AuthInfo:
def __init__(self, sub: str, client_id: str = None, organization_id: str = None,
scopes: list = None, audience: list = None):
self.sub = sub
self.client_id = client_id
self.organization_id = organization_id
self.scopes = scopes or []
self.audience = audience or []
def to_dict(self):
return {
'sub': self.sub,
'client_id': self.client_id,
'organization_id': self.organization_id,
'scopes': self.scopes,
'audience': self.audience
}
class AuthorizationError(Exception):
def __init__(self, message: str, status: int = 403):
self.message = message
self.status = status
super().__init__(self.message)
def extract_bearer_token_from_headers(headers: dict) -> str:
"""
从 HTTP headers 中提取 bearer 令牌。
注意:FastAPI 和 Django REST Framework 已内置令牌提取功能,
所以此函数主要用于 Flask 及其他框架。
"""
authorization = headers.get('authorization') or headers.get('Authorization')
if not authorization:
raise AuthorizationError('Authorization header 缺失', 401)
if not authorization.startswith('Bearer '):
raise AuthorizationError('Authorization header 必须以 "Bearer " 开头', 401)
return authorization[7:] # 移除 'Bearer ' 前缀
package main
import (
"fmt"
"net/http"
"strings"
)
const (
JWKS_URI = "https://your-tenant.logto.app/oidc/jwks"
ISSUER = "https://your-tenant.logto.app/oidc"
)
type 授权 (Authorization)Error struct {
Message string
Status int
}
func (e *授权 (Authorization)Error) Error() string {
return e.Message
}
func New授权 (Authorization)Error(message string, status ...int) *授权 (Authorization)Error {
statusCode := http.StatusForbidden // 默认 403 Forbidden
if len(status) > 0 {
statusCode = status[0]
}
return &授权 (Authorization)Error{
Message: message,
Status: statusCode,
}
}
func extractBearerTokenFromHeaders(r *http.Request) (string, error) {
const bearerPrefix = "Bearer "
authorization := r.Header.Get("Authorization")
if authorization == "" {
return "", New授权 (Authorization)Error("Authorization header is missing", http.StatusUnauthorized)
}
if !strings.HasPrefix(authorization, bearerPrefix) {
return "", New授权 (Authorization)Error(fmt.Sprintf("Authorization header must start with \"%s\"", bearerPrefix), http.StatusUnauthorized)
}
return strings.TrimPrefix(authorization, bearerPrefix), nil
}
public final class AuthConstants {
public static final String JWKS_URI = "https://your-tenant.logto.app/oidc/jwks";
public static final String ISSUER = "https://your-tenant.logto.app/oidc";
private AuthConstants() {
// 防止实例化
}
}
public class AuthorizationException extends RuntimeException {
private final int statusCode;
public AuthorizationException(String message) {
this(message, 403); // 默认 403 禁止访问
}
public AuthorizationException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}
public int getStatusCode() {
return statusCode;
}
}
namespace YourApiNamespace
{
public static class AuthConstants
{
public const string Issuer = "https://your-tenant.logto.app/oidc";
}
}
namespace YourApiNamespace.Exceptions
{
public class AuthorizationException : Exception
{
public int StatusCode { get; }
public AuthorizationException(string message, int statusCode = 403) : base(message)
{
StatusCode = statusCode;
}
}
}
<?php
class AuthConstants
{
public const JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks';
public const ISSUER = 'https://your-tenant.logto.app/oidc';
}
<?php
class AuthInfo
{
public function __construct(
public readonly string $sub,
public readonly ?string $clientId = null,
public readonly ?string $organizationId = null,
public readonly array $scopes = [],
public readonly array $audience = []
) {}
public function toArray(): array
{
return [
'sub' => $this->sub,
'client_id' => $this->clientId,
'organization_id' => $this->organizationId,
'scopes' => $this->scopes,
'audience' => $this->audience,
];
}
}
<?php
class AuthorizationException extends Exception
{
public function __construct(
string $message,
public readonly int $statusCode = 403
) {
parent::__construct($message);
}
}
<?php
trait AuthHelpers
{
protected function extractBearerToken(array $headers): string
{
$authorization = $headers['authorization'][0] ?? $headers['Authorization'][0] ?? null;
if (!$authorization) {
throw new AuthorizationException('Authorization header is missing', 401);
}
if (!str_starts_with($authorization, 'Bearer ')) {
throw new AuthorizationException('Authorization header must start with "Bearer "', 401);
}
return substr($authorization, 7); // 移除 'Bearer ' 前缀
}
}
module AuthConstants
JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks'
ISSUER = 'https://your-tenant.logto.app/oidc'
end
class AuthInfo
attr_accessor :sub, :client_id, :organization_id, :scopes, :audience
def initialize(sub, client_id = nil, organization_id = nil, scopes = [], audience = [])
@sub = sub
@client_id = client_id
@organization_id = organization_id
@scopes = scopes
@audience = audience
end
def to_h
{
sub: @sub,
client_id: @client_id,
organization_id: @organization_id,
scopes: @scopes,
audience: @audience
}
end
end
class AuthorizationError < StandardError
attr_reader :status
def initialize(message, status = 403)
super(message)
@status = status
end
end
module AuthHelpers
def extract_bearer_token(request)
authorization = request.headers['Authorization']
raise AuthorizationError.new('Authorization header is missing', 401) unless authorization
raise AuthorizationError.new('Authorization header must start with "Bearer "', 401) unless authorization.start_with?('Bearer ')
authorization[7..-1] # 移除 'Bearer ' 前缀
end
end
use serde::{Deserialize, Serialize};
use std::fmt;
pub const JWKS_URI: &str = "https://your-tenant.logto.app/oidc/jwks";
pub const ISSUER: &str = "https://your-tenant.logto.app/oidc";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthInfo {
pub sub: String,
pub client_id: Option<String>,
pub organization_id: Option<String>,
pub scopes: Vec<String>,
pub audience: Vec<String>,
}
impl AuthInfo {
pub fn new(
sub: String,
client_id: Option<String>,
organization_id: Option<String>,
scopes: Vec<String>,
audience: Vec<String>,
) -> Self {
Self {
sub,
client_id,
organization_id,
scopes,
audience,
}
}
}
#[derive(Debug)]
pub struct AuthorizationError {
pub message: String,
pub status_code: u16,
}
impl AuthorizationError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
status_code: 403,
}
}
pub fn with_status(message: impl Into<String>, status_code: u16) -> Self {
Self {
message: message.into(),
status_code,
}
}
}
impl fmt::Display for AuthorizationError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for AuthorizationError {}
pub fn extract_bearer_token(authorization: Option<&str>) -> Result<&str, AuthorizationError> {
let auth_header = authorization.ok_or_else(|| {
AuthorizationError::with_status("Authorization header is missing", 401)
})?;
if !auth_header.starts_with("Bearer ") {
return Err(AuthorizationError::with_status(
"Authorization header must start with \"Bearer \"",
401,
));
}
Ok(&auth_header[7..]) // 移除 'Bearer ' 前缀
}
步骤 2:获取你的 Logto 租户信息
你需要以下信息来校验 Logto 签发的令牌:
- JSON Web Key Set (JWKS) URI:Logto 公钥的 URL,用于验证 JWT 签名。
- 发行者 (Issuer):期望的发行者值(Logto 的 OIDC URL)。
首先,找到你的 Logto 租户端点。你可以在以下位置找到:
- 在 Logto 控制台,设置 → 域名。
- 在你配置的任意应用设置中,设置 → 端点与凭据。
从 OpenID Connect 发现端点获取
这些值可以通过 Logto 的 OpenID Connect 发现端点获取:
https://<your-logto-endpoint>/oidc/.well-known/openid-configuration
以下是一个示例响应(为简洁省略了其他字段):
{
"jwks_uri": "https://your-tenant.logto.app/oidc/jwks",
"issuer": "https://your-tenant.logto.app/oidc"
}
在代码中硬编码(不推荐)
由于 Logto 不允许自定义 JWKS URI 或发行者 (Issuer),你可以在代码中硬编码这些值。但不推荐在生产环境中这样做,因为如果将来有配置变更,会增加维护成本。
- JWKS URI:
https://<your-logto-endpoint>/oidc/jwks
- 发行者 (Issuer):
https://<your-logto-endpoint>/oidc
步骤 3:校验令牌和权限 (Permissions)
在提取令牌并获取 OIDC 配置后,校验以下内容:
- 签名:JWT 必须有效且由 Logto(通过 JWKS)签名。
- 发行者 (Issuer):必须与你的 Logto 租户发行者一致。
- 受众 (Audience):必须与在 Logto 注册的 API 资源指示器 (Resource indicator) 匹配,或在适用时匹配组织上下文。
- 过期时间:令牌必须未过期。
- 权限 (Scopes):令牌必须包含你 API / 操作所需的权限 (Scopes)。Scopes 是
scope
声明中的以空格分隔的字符串。 - 组织 (Organization) 上下文:如果保护的是组织级 API 资源,需要校验
organization_id
声明。
参见 JSON Web Token 了解更多关于 JWT 结构和声明 (Claims) 的信息。
不同权限模型下的校验要点
不同权限模型下,声明 (Claims) 和校验规则有所不同:
权限模型 | 受众声明 (aud ) | 组织声明 (organization_id ) | 需校验的权限 (Scopes) (scope ) |
---|---|---|---|
全局 API 资源 | API 资源指示器 (Resource indicator) | 无 | API 资源权限 |
组织(非 API)权限 | urn:logto:organization:<id> (组织上下文在 aud 声明中) | 无 | 组织权限 |
组织级 API 资源 | API 资源指示器 (Resource indicator) | 组织 ID(需与请求匹配) | API 资源权限 |
对于非 API 的组织权限,组织上下文通过 aud
声明表示 (如
urn:logto:organization:abc123
)。organization_id
声明仅在组织级 API 资源令牌中存在。
对于多租户 API,务必同时校验权限 (Scopes) 和上下文(受众、组织)以确保安全。
添加校验逻辑
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
在本示例中,我们使用 jose 来验证 JWT。如果你还没有安装,请先安装:
npm install jose
或者使用你喜欢的包管理器(如 pnpm
或 yarn
)。
首先,添加这些用于处理 JWT 验证的通用工具:
import { createRemoteJWKSet, jwtVerify, JWTPayload } from 'jose';
import { AuthInfo, AuthorizationError } from './auth-middleware.js';
const jwks = createRemoteJWKSet(new URL(JWKS_URI));
export async function validateJwtToken(token: string): Promise<JWTPayload> {
const { payload } = await jwtVerify(token, jwks, {
issuer: ISSUER,
});
verifyPayload(payload);
return payload;
}
export function createAuthInfo(payload: JWTPayload): AuthInfo {
const scopes = (payload.scope as string)?.split(' ') ?? [];
const audience = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
return new AuthInfo(
payload.sub!,
payload.client_id as string,
payload.organization_id as string,
scopes,
audience
);
}
function verifyPayload(payload: JWTPayload): void {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方的权限模型部分展示
}
然后,实现用于验证访问令牌 (Access token) 的中间件:
- Express.js
- Koa.js
- Fastify
- Hapi.js
- NestJS
import { Request, Response, NextFunction } from 'express';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
// 扩展 Express Request 接口以包含 auth
declare global {
namespace Express {
interface Request {
auth?: AuthInfo;
}
}
}
export async function verifyAccessToken(req: Request, res: Response, next: NextFunction) {
try {
const token = extractBearerTokenFromHeaders(req.headers);
const payload = await validateJwtToken(token);
// 将认证信息存储在 request 中以便通用使用
req.auth = createAuthInfo(payload);
next();
} catch (err: any) {
return res.status(err.status ?? 401).json({ error: err.message });
}
}
import { Context, Next } from 'koa';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
export async function koaVerifyAccessToken(ctx: Context, next: Next) {
try {
const token = extractBearerTokenFromHeaders(ctx.request.headers);
const payload = await validateJwtToken(token);
// 将认证信息存储在 state 中以便通用使用
ctx.state.auth = createAuthInfo(payload);
await next();
} catch (err: any) {
ctx.status = err.status ?? 401;
ctx.body = { error: err.message };
}
}
import { FastifyRequest, FastifyReply } from 'fastify';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
// 扩展 Fastify Request 接口以包含 auth
declare module 'fastify' {
interface FastifyRequest {
auth?: AuthInfo;
}
}
export async function fastifyVerifyAccessToken(request: FastifyRequest, reply: FastifyReply) {
try {
const token = extractBearerTokenFromHeaders(request.headers);
const payload = await validateJwtToken(token);
// 将认证信息存储在 request 中以便通用使用
request.auth = createAuthInfo(payload);
} catch (err: any) {
reply.code(err.status ?? 401).send({ error: err.message });
}
}
import { Request, ResponseToolkit } from '@hapi/hapi';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
export async function hapiVerifyAccessToken(request: Request, h: ResponseToolkit) {
try {
const token = extractBearerTokenFromHeaders(request.headers);
const payload = await validateJwtToken(token);
// 将认证信息存储在 request.app 中以便通用使用
request.app.auth = createAuthInfo(payload);
return h.continue;
} catch (err: any) {
return h
.response({ error: err.message })
.code(err.status ?? 401)
.takeover();
}
}
import {
Injectable,
CanActivate,
ExecutionContext,
UnauthorizedException,
ForbiddenException,
} from '@nestjs/common';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
@Injectable()
export class AccessTokenGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const req = context.switchToHttp().getRequest();
try {
const token = extractBearerTokenFromHeaders(req.headers);
const payload = await validateJwtToken(token);
// 将认证信息存储在 request 中以便通用使用
req.auth = createAuthInfo(payload);
return true;
} catch (err: any) {
if (err.status === 401) throw new UnauthorizedException(err.message);
throw new ForbiddenException(err.message);
}
}
}
根据你的权限模型,在 jwt-validator.ts
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限
- 组织级 API 资源
function verifyPayload(payload: JWTPayload): void {
// 检查 audience 声明是否匹配你的 API 资源指示器 (Resource indicator)
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
if (!audiences.includes('https://your-api-resource-indicator')) {
throw new AuthorizationError('Invalid audience');
}
// 检查全局 API 资源所需的权限 (Scopes)
const requiredScopes = ['api:read', 'api:write']; // 替换为你实际需要的权限
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient scope');
}
}
function verifyPayload(payload: JWTPayload): void {
// 检查 audience 声明是否匹配组织格式
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
const hasOrgAudience = audiences.some((aud) => aud.startsWith('urn:logto:organization:'));
if (!hasOrgAudience) {
throw new AuthorizationError('Invalid audience for organization permissions');
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
const expectedOrgId = 'your-organization-id'; // 从请求上下文中提取
const expectedAud = `urn:logto:organization:${expectedOrgId}`;
if (!audiences.includes(expectedAud)) {
throw new AuthorizationError('Organization ID mismatch');
}
// 检查所需的组织权限 (Scopes)
const requiredScopes = ['invite:users', 'manage:settings']; // 替换为你实际需要的权限
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient organization scope');
}
}
function verifyPayload(payload: JWTPayload): void {
// 检查 audience 声明是否匹配你的 API 资源指示器 (Resource indicator)
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
if (!audiences.includes('https://your-api-resource-indicator')) {
throw new AuthorizationError('Invalid audience for organization-level API resources');
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
const expectedOrgId = 'your-organization-id'; // 从请求上下文中提取
const orgId = payload.organization_id as string;
if (expectedOrgId !== orgId) {
throw new AuthorizationError('Organization ID mismatch');
}
// 检查组织级 API 资源所需的权限 (Scopes)
const requiredScopes = ['api:read', 'api:write']; // 替换为你实际需要的权限
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient organization-level API scopes');
}
}
我们使用 PyJWT 来验证 JWT。如果你还没有安装,请先安装:
pip install pyjwt[crypto]
首先,添加这些用于处理 JWT 验证的通用工具:
import jwt
from jwt import PyJWKClient
from typing import Dict, Any
from auth_middleware import AuthInfo, AuthorizationError, JWKS_URI, ISSUER
jwks_client = PyJWKClient(JWKS_URI)
def validate_jwt_token(token: str) -> Dict[str, Any]:
"""验证 JWT 令牌并返回载荷"""
try:
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=['RS256'],
issuer=ISSUER,
options={'verify_aud': False} # 我们会手动验证受众 (Audience)
)
verify_payload(payload)
return payload
except jwt.InvalidTokenError as e:
raise AuthorizationError(f'无效令牌: {str(e)}', 401)
except Exception as e:
raise AuthorizationError(f'令牌验证失败: {str(e)}', 401)
def create_auth_info(payload: Dict[str, Any]) -> AuthInfo:
"""从 JWT 载荷创建 AuthInfo"""
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
audience = payload.get('aud', [])
if isinstance(audience, str):
audience = [audience]
return AuthInfo(
sub=payload.get('sub'),
client_id=payload.get('client_id'),
organization_id=payload.get('organization_id'),
scopes=scopes,
audience=audience
)
def verify_payload(payload: Dict[str, Any]) -> None:
"""基于权限 (Permission) 模型验证载荷"""
# 在这里根据权限 (Permission) 模型实现你的验证逻辑
# 具体内容将在下方权限 (Permission) 模型部分展示
pass
然后,实现中间件以验证访问令牌 (Access token):
- FastAPI
- Flask
- Django
- Django REST Framework
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt_validator import validate_jwt_token, create_auth_info
security = HTTPBearer()
async def verify_access_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> AuthInfo:
try:
token = credentials.credentials
payload = validate_jwt_token(token)
return create_auth_info(payload)
except AuthorizationError as e:
raise HTTPException(status_code=e.status, detail=str(e))
from functools import wraps
from flask import request, jsonify, g
from jwt_validator import validate_jwt_token, create_auth_info
def verify_access_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
token = extract_bearer_token_from_headers(dict(request.headers))
payload = validate_jwt_token(token)
# 将认证信息存储到 Flask 的 g 对象中以便通用使用
g.auth = create_auth_info(payload)
return f(*args, **kwargs)
except AuthorizationError as e:
return jsonify({'error': str(e)}), e.status
return decorated_function
from django.http import JsonResponse
from jwt_validator import validate_jwt_token, create_auth_info
def require_access_token(view_func):
def wrapper(request, *args, **kwargs):
try:
headers = {key.replace('HTTP_', '').replace('_', '-').lower(): value
for key, value in request.META.items() if key.startswith('HTTP_')}
token = extract_bearer_token_from_headers(headers)
payload = validate_jwt_token(token)
# 将认证信息附加到 request 以便通用使用
request.auth = create_auth_info(payload)
return view_func(request, *args, **kwargs)
except AuthorizationError as e:
return JsonResponse({'error': str(e)}, status=e.status)
return wrapper
from rest_framework.authentication import TokenAuthentication
from rest_framework import exceptions
from jwt_validator import validate_jwt_token, create_auth_info
class AccessTokenAuthentication(TokenAuthentication):
keyword = 'Bearer' # 使用 'Bearer' 而不是 'Token'
def authenticate_credentials(self, key):
"""
通过将其作为 JWT 验证令牌。
"""
try:
payload = validate_jwt_token(key)
auth_info = create_auth_info(payload)
# 创建一个包含认证信息的类似用户对象以便通用使用
user = type('User', (), {
'auth': auth_info,
'is_authenticated': True,
'is_anonymous': False,
'is_active': True,
})()
return (user, key)
except AuthorizationError as e:
if e.status == 401:
raise exceptions.AuthenticationFailed(str(e))
else: # 403
raise exceptions.PermissionDenied(str(e))
根据你的权限 (Permission) 模型,在 jwt_validator.py
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限 (Permissions)
- 组织级 API 资源
def verify_payload(payload: Dict[str, Any]) -> None:
"""验证全局 API 资源的载荷"""
# 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
if 'https://your-api-resource-indicator' not in audiences:
raise AuthorizationError('无效的受众 (Audience)')
# 检查全局 API 资源所需的权限 (Scopes)
required_scopes = ['api:read', 'api:write'] # 替换为你实际需要的权限 (Scopes)
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('权限 (Scope) 不足')
def verify_payload(payload: Dict[str, Any]) -> None:
"""验证组织权限 (Permissions) 的载荷"""
# 检查受众 (Audience) 声明是否符合组织格式
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
has_org_audience = any(aud.startswith('urn:logto:organization:') for aud in audiences)
if not has_org_audience:
raise AuthorizationError('组织权限 (Permissions) 的受众 (Audience) 无效')
# 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expected_org_id = 'your-organization-id' # 从请求上下文中提取
expected_aud = f'urn:logto:organization:{expected_org_id}'
if expected_aud not in audiences:
raise AuthorizationError('组织 ID 不匹配')
# 检查所需的组织权限 (Scopes)
required_scopes = ['invite:users', 'manage:settings'] # 替换为你实际需要的权限 (Scopes)
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('组织权限 (Scope) 不足')
def verify_payload(payload: Dict[str, Any]) -> None:
"""验证组织级 API 资源的载荷"""
# 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
if 'https://your-api-resource-indicator' not in audiences:
raise AuthorizationError('组织级 API 资源的受众 (Audience) 无效')
# 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expected_org_id = 'your-organization-id' # 从请求上下文中提取
org_id = payload.get('organization_id')
if expected_org_id != org_id:
raise AuthorizationError('组织 ID 不匹配')
# 检查组织级 API 资源所需的权限 (Scopes)
required_scopes = ['api:read', 'api:write'] # 替换为你实际需要的权限 (Scopes)
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('组织级 API 权限 (Scope) 不足')
我们使用 github.com/lestrrat-go/jwx 来验证 JWT。如果你还没有安装它,请先安装:
go mod init your-project
go get github.com/lestrrat-go/jwx/v3
首先,将这些共享组件添加到你的 auth_middleware.go
文件中:
import (
"context"
"strings"
"time"
"github.com/lestrrat-go/jwx/v3/jwk"
"github.com/lestrrat-go/jwx/v3/jwt"
)
var jwkSet jwk.Set
func init() {
// 初始化 JWKS 缓存
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var err error
jwkSet, err = jwk.Fetch(ctx, JWKS_URI)
if err != nil {
panic("获取 JWKS 失败: " + err.Error())
}
}
// validateJWTToken 验证 JWT 令牌并返回解析后的令牌
func validateJWTToken(tokenString string) (jwt.Token, error) {
token, err := jwt.Parse([]byte(tokenString), jwt.WithKeySet(jwkSet))
if err != nil {
return nil, NewAuthorizationError("无效令牌: "+err.Error(), http.StatusUnauthorized)
}
// 验证发行者 (Issuer)
if token.Issuer() != ISSUER {
return nil, NewAuthorizationError("无效发行者 (Issuer)", http.StatusUnauthorized)
}
if err := verifyPayload(token); err != nil {
return nil, err
}
return token, nil
}
// 辅助函数用于提取令牌数据
func getStringClaim(token jwt.Token, key string) string {
if val, ok := token.Get(key); ok {
if str, ok := val.(string); ok {
return str
}
}
return ""
}
func getScopesFromToken(token jwt.Token) []string {
if val, ok := token.Get("scope"); ok {
if scope, ok := val.(string); ok && scope != "" {
return strings.Split(scope, " ")
}
}
return []string{}
}
func getAudienceFromToken(token jwt.Token) []string {
return token.Audience()
}
然后,实现中间件以验证访问令牌 (Access token):
- Gin
- Echo
- Fiber
- Chi
import "github.com/gin-gonic/gin"
func VerifyAccessToken() gin.HandlerFunc {
return func(c *gin.Context) {
tokenString, err := extractBearerTokenFromHeaders(c.Request)
if err != nil {
authErr := err.(*AuthorizationError)
c.JSON(authErr.Status, gin.H{"error": authErr.Message})
c.Abort()
return
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
c.JSON(authErr.Status, gin.H{"error": authErr.Message})
c.Abort()
return
}
// 在上下文中存储令牌以便通用使用
c.Set("auth", token)
c.Next()
}
}
import "github.com/labstack/echo/v4"
func VerifyAccessToken(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
tokenString, err := extractBearerTokenFromHeaders(c.Request())
if err != nil {
authErr := err.(*AuthorizationError)
return c.JSON(authErr.Status, echo.Map{"error": authErr.Message})
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
return c.JSON(authErr.Status, echo.Map{"error": authErr.Message})
}
// 在上下文中存储令牌以便通用使用
c.Set("auth", token)
return next(c)
}
}
import (
"net/http"
"github.com/gofiber/fiber/v2"
)
func VerifyAccessToken(c *fiber.Ctx) error {
// 将 fiber 请求转换为 http.Request 以兼容
req := &http.Request{
Header: make(http.Header),
}
req.Header.Set("Authorization", c.Get("Authorization"))
tokenString, err := extractBearerTokenFromHeaders(req)
if err != nil {
authErr := err.(*AuthorizationError)
return c.Status(authErr.Status).JSON(fiber.Map{"error": authErr.Message})
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
return c.Status(authErr.Status).JSON(fiber.Map{"error": authErr.Message})
}
// 在 locals 中存储令牌以便通用使用
c.Locals("auth", token)
return c.Next()
}
import (
"context"
"encoding/json"
"net/http"
)
type contextKey string
const AuthContextKey contextKey = "auth"
func VerifyAccessToken(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokenString, err := extractBearerTokenFromHeaders(r)
if err != nil {
authErr := err.(*AuthorizationError)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(authErr.Status)
json.NewEncoder(w).Encode(map[string]string{"error": authErr.Message})
return
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(authErr.Status)
json.NewEncoder(w).Encode(map[string]string{"error": authErr.Message})
return
}
// 在上下文中存储令牌以便通用使用
ctx := context.WithValue(r.Context(), AuthContextKey, token)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
根据你的权限 (Permission) 模型,你可能需要采用不同的 verifyPayload
逻辑:
- 全局 API 资源 (API resources)
- 组织 (Organization)(非 API)权限 (Permissions)
- 组织级 API 资源 (Organization-level API resources)
func verifyPayload(token jwt.Token) error {
// 检查 audience 声明是否匹配你的 API 资源指示器 (Resource indicator)
if !hasAudience(token, "https://your-api-resource-indicator") {
return NewAuthorizationError("无效受众 (Audience)")
}
// 检查全局 API 资源所需的权限 (Scopes)
requiredScopes := []string{"api:read", "api:write"} // 替换为你实际需要的权限 (Scopes)
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("权限 (Scope) 不足")
}
return nil
}
func verifyPayload(token jwt.Token) error {
// 检查 audience 声明是否为组织 (Organization) 格式
if !hasOrganizationAudience(token) {
return NewAuthorizationError("组织 (Organization) 权限 (Permissions) 的受众 (Audience) 无效")
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expectedOrgID := "your-organization-id" // 从请求上下文中提取
if !hasMatchingOrganization(token, expectedOrgID) {
return NewAuthorizationError("组织 (Organization) ID 不匹配")
}
// 检查所需的组织 (Organization) 权限 (Scopes)
requiredScopes := []string{"invite:users", "manage:settings"} // 替换为你实际需要的权限 (Scopes)
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("组织 (Organization) 权限 (Scope) 不足")
}
return nil
}
func verifyPayload(token jwt.Token) error {
// 检查 audience 声明是否匹配你的 API 资源指示器 (Resource indicator)
if !hasAudience(token, "https://your-api-resource-indicator") {
return NewAuthorizationError("组织级 API 资源 (Organization-level API resources) 的受众 (Audience) 无效")
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expectedOrgID := "your-organization-id" // 从请求上下文中提取
if !hasMatchingOrganizationID(token, expectedOrgID) {
return NewAuthorizationError("组织 (Organization) ID 不匹配")
}
// 检查组织级 API 资源 (Organization-level API resources) 所需的权限 (Scopes)
requiredScopes := []string{"api:read", "api:write"} // 替换为你实际需要的权限 (Scopes)
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("组织级 API 权限 (Scope) 不足")
}
return nil
}
为 payload 验证添加这些辅助函数:
// hasAudience 检查令牌是否包含指定的受众 (Audience)
func hasAudience(token jwt.Token, expectedAud string) bool {
audiences := token.Audience()
for _, aud := range audiences {
if aud == expectedAud {
return true
}
}
return false
}
// hasOrganizationAudience 检查令牌是否包含组织 (Organization) 受众 (Audience) 格式
func hasOrganizationAudience(token jwt.Token) bool {
audiences := token.Audience()
for _, aud := range audiences {
if strings.HasPrefix(aud, "urn:logto:organization:") {
return true
}
}
return false
}
// hasRequiredScopes 检查令牌是否包含所有必需的权限 (Scopes)
func hasRequiredScopes(token jwt.Token, requiredScopes []string) bool {
scopes := getScopesFromToken(token)
for _, required := range requiredScopes {
found := false
for _, scope := range scopes {
if scope == required {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// hasMatchingOrganization 检查令牌受众 (Audience) 是否与期望的组织 (Organization) 匹配
func hasMatchingOrganization(token jwt.Token, expectedOrgID string) bool {
expectedAud := fmt.Sprintf("urn:logto:organization:%s", expectedOrgID)
return hasAudience(token, expectedAud)
}
// hasMatchingOrganizationID 检查令牌中的 organization_id 是否与期望值一致
func hasMatchingOrganizationID(token jwt.Token, expectedOrgID string) bool {
orgID := getStringClaim(token, "organization_id")
return orgID == expectedOrgID
}
我们会根据不同的框架使用不同的 JWT 库。请安装所需的依赖:
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
在你的 pom.xml
中添加:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-resource-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-jose</artifactId>
</dependency>
在你的 pom.xml
中添加:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
在你的 pom.xml
中添加:
<dependency>
<groupId>io.micronaut.security</groupId>
<artifactId>micronaut-security-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
在你的 pom.xml
中添加:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-auth-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
然后,实现中间件以验证访问令牌 (Access token):
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
public class JwtSecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(authz -> authz
.requestMatchers("/api/protected/**").authenticated()
.anyRequest().permitAll()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.decoder(jwtDecoder()))
);
return http.build();
}
@Bean
public JwtDecoder jwtDecoder() {
return NimbusJwtDecoder.withJwkSetUri(AuthConstants.JWKS_URI)
.build();
}
}
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Component
public class JwtValidator {
public void verifyPayload(Jwt jwt) {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方权限模型部分展示
}
}
import org.eclipse.microprofile.jwt.JsonWebToken;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerRequestFilter;
import jakarta.ws.rs.ext.Provider;
@Provider
@ApplicationScoped
public class JwtVerificationFilter implements ContainerRequestFilter {
@Inject
JsonWebToken jwt;
@Override
public void filter(ContainerRequestContext requestContext) {
if (requestContext.getUriInfo().getPath().startsWith("/api/protected")) {
try {
verifyPayload(jwt);
// 将 JWT 存储到上下文,便于控制器访问
requestContext.setProperty("auth", jwt);
} catch (AuthorizationException e) {
requestContext.abortWith(
jakarta.ws.rs.core.Response.status(e.getStatusCode())
.entity("{\"error\": \"" + e.getMessage() + "\"}")
.build()
);
} catch (Exception e) {
requestContext.abortWith(
jakarta.ws.rs.core.Response.status(401)
.entity("{\"error\": \"Invalid token\"}")
.build()
);
}
}
}
private void verifyPayload(JsonWebToken jwt) {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方权限模型部分展示
}
}
micronaut:
security:
authentication: bearer
token:
jwt:
signatures:
jwks:
logto:
url: ${JWKS_URI:https://your-tenant.logto.app/oidc/jwks}
import io.micronaut.security.token.Claims;
import io.micronaut.security.token.validator.TokenValidator;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
@Singleton
public class JwtClaimsValidator implements TokenValidator {
@Override
public Publisher<Boolean> validateToken(String token, Claims claims) {
return Mono.fromCallable(() -> {
try {
verifyPayload(claims);
return true;
} catch (AuthorizationException e) {
throw new RuntimeException(e.getMessage());
}
});
}
private void verifyPayload(Claims claims) {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方权限模型部分展示
}
}
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.client.WebClient;
public class JwtAuthHandler implements Handler<RoutingContext> {
private final JWTAuth jwtAuth;
private final WebClient webClient;
public JwtAuthHandler(io.vertx.core.Vertx vertx) {
this.webClient = WebClient.create(vertx);
this.jwtAuth = JWTAuth.create(vertx, new JWTAuthOptions());
// 获取 JWKS 并配置 JWT 认证
fetchJWKS().onSuccess(jwks -> {
// 配置 JWKS(简化处理 - 实际可能需要合适的 JWKS 解析器)
});
}
@Override
public void handle(RoutingContext context) {
String authHeader = context.request().getHeader("Authorization");
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Authorization header missing or invalid\"}");
return;
}
String token = authHeader.substring(7);
jwtAuth.authenticate(new JsonObject().put("token", token))
.onSuccess(user -> {
try {
JsonObject principal = user.principal();
verifyPayload(principal);
// 将 JWT principal 存储到上下文,便于 handler 访问
context.put("auth", principal);
context.next();
} catch (AuthorizationException e) {
context.response()
.setStatusCode(e.getStatusCode())
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"" + e.getMessage() + "\"}");
} catch (Exception e) {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Invalid token\"}");
}
})
.onFailure(err -> {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Invalid token\"}");
});
}
private Future<JsonObject> fetchJWKS() {
return webClient.getAbs(AuthConstants.JWKS_URI)
.send()
.map(response -> response.bodyAsJsonObject());
}
private void verifyPayload(JsonObject principal) {
// 在这里根据权限模型实现你的验证逻辑
// 具体内容将在下方权限模型部分展示
}
}
根据你的权限模型,你可能需要采用不同的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限
- 组织级 API 资源
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 检查 audience 声明是否匹配你的 API 资源指示器
List<String> audiences = jwt.getAudience();
if (!audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 检查全局 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 检查 audience 声明是否匹配你的 API 资源指示器
if (!jwt.getAudience().contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 检查全局 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(Claims claims) {
// 检查 audience 声明是否匹配你的 API 资源指示器
List<String> audiences = (List<String>) claims.get("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 检查全局 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(JsonObject principal) {
// 检查 audience 声明是否匹配你的 API 资源指示器
JsonArray audiences = principal.getJsonArray("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 检查全局 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 检查 audience 声明是否为组织格式
List<String> audiences = jwt.getAudience();
boolean hasOrgAudience = audiences.stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查所需的组织权限 (Scopes)
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 替换为你实际需要的权限
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 检查 audience 声明是否为组织格式
boolean hasOrgAudience = jwt.getAudience().stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!jwt.getAudience().contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查所需的组织权限 (Scopes)
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 替换为你实际需要的权限
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(Claims claims) {
// 检查 audience 声明是否为组织格式
List<String> audiences = (List<String>) claims.get("aud");
boolean hasOrgAudience = audiences != null && audiences.stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查所需的组织权限 (Scopes)
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 替换为你实际需要的权限
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(JsonObject principal) {
// 检查 audience 声明是否为组织格式
JsonArray audiences = principal.getJsonArray("aud");
boolean hasOrgAudience = false;
if (audiences != null) {
for (Object aud : audiences) {
if (aud.toString().startsWith("urn:logto:organization:")) {
hasOrgAudience = true;
break;
}
}
}
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查所需的组织权限 (Scopes)
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 替换为你实际需要的权限
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 检查 audience 声明是否匹配你的 API 资源指示器
List<String> audiences = jwt.getAudience();
if (!audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String orgId = jwt.getClaimAsString("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查组织级 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 检查 audience 声明是否匹配你的 API 资源指示器
if (!jwt.getAudience().contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String orgId = jwt.getClaim("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查组织级 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(Claims claims) {
// 检查 audience 声明是否匹配你的 API 资源指示器
List<String> audiences = (List<String>) claims.get("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String orgId = (String) claims.get("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查组织级 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(JsonObject principal) {
// 检查 audience 声明是否匹配你的 API 资源指示器
JsonArray audiences = principal.getJsonArray("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
String expectedOrgId = "your-organization-id"; // 从请求上下文提取
String orgId = principal.getString("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 检查组织级 API 资源所需的权限 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 替换为你实际需要的权限
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
为 JWT 认证 (Authentication) 添加所需的 NuGet 包:
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.0" />
创建一个验证服务用于处理令牌验证:
using System.Security.Claims;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using YourApiNamespace.Exceptions;
namespace YourApiNamespace.Services
{
public interface IJwtValidationService
{
Task ValidateTokenAsync(TokenValidatedContext context);
}
public class JwtValidationService : IJwtValidationService
{
public async Task ValidateTokenAsync(TokenValidatedContext context)
{
var principal = context.Principal!;
try
{
// 在此处根据权限 (Permission) 模型添加你的验证逻辑
ValidatePayload(principal);
}
catch (AuthorizationException)
{
throw; // 重新抛出授权 (Authorization) 异常
}
catch (Exception ex)
{
throw new AuthorizationException($"Token validation failed: {ex.Message}", 401);
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 在此处根据权限 (Permission) 模型实现你的验证逻辑
// 具体内容将在下方权限 (Permission) 模型部分展示
}
}
}
在你的 Program.cs
中配置 JWT 认证 (Authentication):
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using YourApiNamespace.Services;
using YourApiNamespace.Exceptions;
var builder = WebApplication.CreateBuilder(args);
// 向容器添加服务
builder.Services.AddControllers();
builder.Services.AddScoped<IJwtValidationService, JwtValidationService>();
// 配置 JWT 认证 (Authentication)
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Authority = AuthConstants.Issuer;
options.MetadataAddress = $"{AuthConstants.Issuer}/.well-known/openid_configuration";
options.RequireHttpsMetadata = true;
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidIssuer = AuthConstants.Issuer,
ValidateAudience = false, // 我们将根据权限 (Permission) 模型手动验证受众 (Audience)
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ClockSkew = TimeSpan.FromMinutes(5)
};
options.Events = new JwtBearerEvents
{
OnTokenValidated = async context =>
{
var validationService = context.HttpContext.RequestServices
.GetRequiredService<IJwtValidationService>();
await validationService.ValidateTokenAsync(context);
},
OnAuthenticationFailed = context =>
{
// 将 JWT 库错误处理为 401
context.Response.StatusCode = 401;
context.Response.ContentType = "application/json";
context.Response.WriteAsync($"{{\"error\": \"Invalid token\"}}");
context.HandleResponse();
return Task.CompletedTask;
}
};
});
builder.Services.AddAuthorization();
var app = builder.Build();
// 针对认证 (Authentication) / 授权 (Authorization) 失败的全局错误处理
app.Use(async (context, next) =>
{
try
{
await next();
}
catch (AuthorizationException ex)
{
context.Response.StatusCode = ex.StatusCode;
context.Response.ContentType = "application/json";
await context.Response.WriteAsync($"{{\"error\": \"{ex.Message}\"}}");
}
});
// 配置 HTTP 请求管道
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers();
app.Run();
根据你的权限 (Permission) 模型,在 JwtValidationService
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限 (Permissions)
- 组织级 API 资源
private void ValidatePayload(ClaimsPrincipal principal)
{
// 检查受众 (Audience) 声明是否与你的 API 资源指示器匹配
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
if (!audiences.Contains("https://your-api-resource-indicator"))
{
throw new AuthorizationException("Invalid audience");
}
// 检查全局 API 资源所需的权限 (Scopes)
var requiredScopes = new[] { "api:read", "api:write" }; // 替换为你实际需要的权限 (Scopes)
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient scope");
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 检查受众 (Audience) 声明是否为组织 (Organization) 格式
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
var hasOrgAudience = audiences.Any(aud => aud.StartsWith("urn:logto:organization:"));
if (!hasOrgAudience)
{
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 检查组织 (Organization) ID 是否与上下文匹配(你可能需要从请求上下文中提取)
var expectedOrgId = "your-organization-id"; // 从请求上下文中提取
var expectedAud = $"urn:logto:organization:{expectedOrgId}";
if (!audiences.Contains(expectedAud))
{
throw new AuthorizationException("Organization ID mismatch");
}
// 检查所需的组织 (Organization) 权限 (Scopes)
var requiredScopes = new[] { "invite:users", "manage:settings" }; // 替换为你实际需要的权限 (Scopes)
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient organization scope");
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 检查受众 (Audience) 声明是否与你的 API 资源指示器匹配
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
if (!audiences.Contains("https://your-api-resource-indicator"))
{
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 检查组织 (Organization) ID 是否与上下文匹配(你可能需要从请求上下文中提取)
var expectedOrgId = "your-organization-id"; // 从请求上下文中提取
var orgId = principal.FindFirst("organization_id")?.Value;
if (!expectedOrgId.Equals(orgId))
{
throw new AuthorizationException("Organization ID mismatch");
}
// 检查组织级 API 资源所需的权限 (Scopes)
var requiredScopes = new[] { "api:read", "api:write" }; // 替换为你实际需要的权限 (Scopes)
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
我们使用 firebase/php-jwt 来验证 JWT。使用 Composer 安装:
- Laravel
- Symfony
- Slim
composer require firebase/php-jwt
composer require firebase/php-jwt
composer require firebase/php-jwt slim/slim:"4.*" slim/psr7
首先,添加这些通用工具来处理 JWT 验证:
<?php
use Firebase\JWT\JWT;
use Firebase\JWT\JWK;
use Firebase\JWT\Key;
class JwtValidator
{
use AuthHelpers;
private static ?array $jwks = null;
public static function fetchJwks(): array
{
if (self::$jwks === null) {
$jwksData = file_get_contents(AuthConstants::JWKS_URI);
if ($jwksData === false) {
throw new AuthorizationException('获取 JWKS 失败', 401);
}
self::$jwks = json_decode($jwksData, true);
}
return self::$jwks;
}
public static function validateJwtToken(string $token): array
{
try {
$jwks = self::fetchJwks();
$keys = JWK::parseKeySet($jwks);
$decoded = JWT::decode($token, $keys);
$payload = (array) $decoded;
// 验证发行者 (Issuer)
if (($payload['iss'] ?? '') !== AuthConstants::ISSUER) {
throw new AuthorizationException('发行者 (Issuer) 无效', 401);
}
self::verifyPayload($payload);
return $payload;
} catch (AuthorizationException $e) {
throw $e;
} catch (Exception $e) {
throw new AuthorizationException('令牌无效: ' . $e->getMessage(), 401);
}
}
public static function createAuthInfo(array $payload): AuthInfo
{
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
$audience = $payload['aud'] ?? [];
if (is_string($audience)) {
$audience = [$audience];
}
return new AuthInfo(
sub: $payload['sub'],
clientId: $payload['client_id'] ?? null,
organizationId: $payload['organization_id'] ?? null,
scopes: $scopes,
audience: $audience
);
}
private static function verifyPayload(array $payload): void
{
// 在此根据权限 (Permission) 模型实现你的验证逻辑
// 具体内容将在下方权限 (Permission) 模型部分展示
}
}
然后,实现中间件以验证访问令牌 (Access token):
- Laravel
- Symfony
- Slim
<?php
namespace App\Http\Middleware;
use Closure;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
class VerifyAccessToken
{
use AuthHelpers;
public function handle(Request $request, Closure $next): Response
{
try {
$token = $this->extractBearerToken($request->headers->all());
$payload = JwtValidator::validateJwtToken($token);
// 将认证信息存储到 request attributes 以便通用使用
$request->attributes->set('auth', JwtValidator::createAuthInfo($payload));
return $next($request);
} catch (AuthorizationException $e) {
return response()->json(['error' => $e->getMessage()], $e->statusCode);
}
}
}
在 app/Http/Kernel.php
中注册中间件:
protected $middlewareAliases = [
// ... 其他中间件
'auth.token' => \App\Http\Middleware\VerifyAccessToken::class,
];
<?php
namespace App\Security;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Security\Core\Authentication\Token\TokenInterface;
use Symfony\Component\Security\Core\Exception\AuthenticationException;
use Symfony\Component\Security\Http\Authenticator\AbstractAuthenticator;
use Symfony\Component\Security\Http\Authenticator\Passport\Badge\UserBadge;
use Symfony\Component\Security\Http\Authenticator\Passport\Passport;
use Symfony\Component\Security\Http\Authenticator\Passport\SelfValidatingPassport;
class JwtAuthenticator extends AbstractAuthenticator
{
use AuthHelpers;
public function supports(Request $request): ?bool
{
return $request->headers->has('authorization');
}
public function authenticate(Request $request): Passport
{
try {
$token = $this->extractBearerToken($request->headers->all());
$payload = JwtValidator::validateJwtToken($token);
$authInfo = JwtValidator::createAuthInfo($payload);
// 将认证信息存储到 request attributes 以便通用使用
$request->attributes->set('auth', $authInfo);
return new SelfValidatingPassport(new UserBadge($payload['sub']));
} catch (AuthorizationException $e) {
throw new AuthenticationException($e->getMessage());
}
}
public function onAuthenticationSuccess(Request $request, TokenInterface $token, string $firewallName): ?Response
{
return null; // 继续到控制器
}
public function onAuthenticationFailure(Request $request, AuthenticationException $exception): ?Response
{
return new JsonResponse(['error' => $exception->getMessage()], Response::HTTP_UNAUTHORIZED);
}
}
在 config/packages/security.yaml
配置安全策略:
security:
firewalls:
api:
pattern: ^/api/protected
stateless: true
custom_authenticators:
- App\Security\JwtAuthenticator
<?php
namespace App\Middleware;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Slim\Psr7\Response;
class JwtMiddleware implements MiddlewareInterface
{
use AuthHelpers;
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
try {
$headers = $request->getHeaders();
$token = $this->extractBearerToken($headers);
$payload = JwtValidator::validateJwtToken($token);
// 将认证信息存储到 request attributes 以便通用使用
$request = $request->withAttribute('auth', JwtValidator::createAuthInfo($payload));
return $handler->handle($request);
} catch (AuthorizationException $e) {
$response = new Response();
$response->getBody()->write(json_encode(['error' => $e->getMessage()]));
return $response
->withHeader('Content-Type', 'application/json')
->withStatus($e->statusCode);
}
}
}
根据你的权限 (Permission) 模型,在 JwtValidator
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (Organization)(非 API)权限 (Permissions)
- 组织级 API 资源
private static function verifyPayload(array $payload): void
{
// 检查受众 (Audience) 声明是否匹配你的 API 资源指示器 (Resource indicator)
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
if (!in_array('https://your-api-resource-indicator', $audiences)) {
throw new AuthorizationException('受众 (Audience) 无效');
}
// 检查全局 API 资源所需的权限 (Scopes)
$requiredScopes = ['api:read', 'api:write']; // 替换为你实际需要的权限 (Scopes)
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('权限 (Scope) 不足');
}
}
}
private static function verifyPayload(array $payload): void
{
// 检查受众 (Audience) 声明是否匹配组织 (Organization) 格式
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
$hasOrgAudience = false;
foreach ($audiences as $aud) {
if (str_starts_with($aud, 'urn:logto:organization:')) {
$hasOrgAudience = true;
break;
}
}
if (!$hasOrgAudience) {
throw new AuthorizationException('组织 (Organization) 权限 (Permissions) 的受众 (Audience) 无效');
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
$expectedOrgId = 'your-organization-id'; // 从请求上下文中提取
$expectedAud = "urn:logto:organization:{$expectedOrgId}";
if (!in_array($expectedAud, $audiences)) {
throw new AuthorizationException('组织 (Organization) ID 不匹配');
}
// 检查所需的组织 (Organization) 权限 (Scopes)
$requiredScopes = ['invite:users', 'manage:settings']; // 替换为你实际需要的权限 (Scopes)
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('组织 (Organization) 权限 (Scope) 不足');
}
}
}
private static function verifyPayload(array $payload): void
{
// 检查受众 (Audience) 声明是否匹配你的 API 资源指示器 (Resource indicator)
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
if (!in_array('https://your-api-resource-indicator', $audiences)) {
throw new AuthorizationException('组织级 API 资源的受众 (Audience) 无效');
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
$expectedOrgId = 'your-organization-id'; // 从请求上下文中提取
$orgId = $payload['organization_id'] ?? null;
if ($expectedOrgId !== $orgId) {
throw new AuthorizationException('组织 (Organization) ID 不匹配');
}
// 检查组织级 API 资源所需的权限 (Scopes)
$requiredScopes = ['api:read', 'api:write']; // 替换为你实际需要的权限 (Scopes)
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('组织级 API 权限 (Scope) 不足');
}
}
}
我们使用 jwt gem 来验证 JWT。请将其添加到你的 Gemfile:
gem 'jwt'
# net-http 是 Ruby 2.7 及以上版本的标准库,无需显式添加
然后运行:
bundle install
首先,添加这些用于处理 JWKS 和令牌验证的通用工具:
require 'jwt'
require 'net/http'
require 'json'
class JwtValidator
include AuthHelpers
def self.fetch_jwks
@jwks ||= begin
uri = URI(AuthConstants::JWKS_URI)
response = Net::HTTP.get_response(uri)
raise AuthorizationError.new('Failed to fetch JWKS', 401) unless response.is_a?(Net::HTTPSuccess)
jwks_data = JSON.parse(response.body)
JWT::JWK::Set.new(jwks_data)
end
end
def self.validate_jwt_token(token)
jwks = fetch_jwks
# 让 JWT 库根据 JWKS 自动检测算法
decoded_token = JWT.decode(token, nil, true, {
iss: AuthConstants::ISSUER,
verify_iss: true,
verify_aud: false, # 我们会根据权限模型手动验证受众 (Audience)
jwks: jwks
})[0]
decoded_token
end
def self.create_auth_info(payload)
scopes = payload['scope']&.split(' ') || []
audience = payload['aud'] || []
AuthInfo.new(
payload['sub'],
payload['client_id'],
payload['organization_id'],
scopes,
audience
)
end
def self.verify_payload(payload)
# 在这里根据权限模型实现你的验证逻辑
# 具体内容将在下方权限模型部分展示
end
end
接下来,实现中间件以验证访问令牌 (Access token):
- Rails
- Sinatra
- Grape
module JwtAuthentication
extend ActiveSupport::Concern
include AuthHelpers
included do
before_action :verify_access_token, only: [:protected_action] # 添加需要保护的具体 action
end
private
def verify_access_token
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 存储认证信息以便通用使用
@auth = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
render json: { error: e.message }, status: e.status
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
render json: { error: 'Invalid token' }, status: 401
end
end
end
class AuthMiddleware
include AuthHelpers
def initialize(app)
@app = app
end
def call(env)
request = Rack::Request.new(env)
# 只保护特定路由
if request.path.start_with?('/api/protected')
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 在 env 中存储认证信息以便通用使用
env['auth'] = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
return [e.status, { 'Content-Type' => 'application/json' }, [{ error: e.message }.to_json]]
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
return [401, { 'Content-Type' => 'application/json' }, [{ error: 'Invalid token' }.to_json]]
end
end
@app.call(env)
end
end
module GrapeAuthHelpers
include AuthHelpers
def authenticate_user!
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 存储认证信息以便通用使用
@auth = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
error!({ error: e.message }, e.status)
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
error!({ error: 'Invalid token' }, 401)
end
end
def auth
@auth
end
end
根据你的权限模型,在 JwtValidator
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限
- 组织级 API 资源
def self.verify_payload(payload)
# 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
audiences = payload['aud'] || []
unless audiences.include?('https://your-api-resource-indicator')
raise AuthorizationError.new('Invalid audience')
end
# 检查全局 API 资源所需的权限 (Scopes)
required_scopes = ['api:read', 'api:write'] # 替换为你实际需要的权限
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient scope')
end
end
def self.verify_payload(payload)
# 检查受众 (Audience) 声明是否符合组织格式
audiences = payload['aud'] || []
has_org_audience = audiences.any? { |aud| aud.start_with?('urn:logto:organization:') }
unless has_org_audience
raise AuthorizationError.new('Invalid audience for organization permissions')
end
# 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expected_org_id = 'your-organization-id' # 从请求上下文中提取
expected_aud = "urn:logto:organization:#{expected_org_id}"
unless audiences.include?(expected_aud)
raise AuthorizationError.new('Organization ID mismatch')
end
# 检查所需的组织权限 (Scopes)
required_scopes = ['invite:users', 'manage:settings'] # 替换为你实际需要的权限
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient organization scope')
end
end
def self.verify_payload(payload)
# 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
audiences = payload['aud'] || []
unless audiences.include?('https://your-api-resource-indicator')
raise AuthorizationError.new('Invalid audience for organization-level API resources')
end
# 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
expected_org_id = 'your-organization-id' # 从请求上下文中提取
org_id = payload['organization_id']
unless expected_org_id == org_id
raise AuthorizationError.new('Organization ID mismatch')
end
# 检查组织级 API 资源所需的权限 (Scopes)
required_scopes = ['api:read', 'api:write'] # 替换为你实际需要的权限
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient organization-level API scopes')
end
end
我们使用 jsonwebtoken 来验证 JWT。请在你的 Cargo.toml
中添加所需依赖:
- Axum
- Actix Web
- Rocket
[dependencies]
axum = "0.7"
tokio = { version = "1.0", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
[dependencies]
actix-web = "4.0"
tokio = { version = "1.0", features = ["full"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
[dependencies]
rocket = { version = "0.5", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
首先,添加这些通用工具以处理 JWT 验证:
use crate::{AuthInfo, AuthorizationError, ISSUER, JWKS_URI};
use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation};
use serde_json::Value;
use std::collections::HashMap;
pub struct JwtValidator {
jwks: HashMap<String, DecodingKey>,
}
impl JwtValidator {
pub async fn new() -> Result<Self, AuthorizationError> {
let jwks = Self::fetch_jwks().await?;
Ok(Self { jwks })
}
async fn fetch_jwks() -> Result<HashMap<String, DecodingKey>, AuthorizationError> {
let response = reqwest::get(JWKS_URI).await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to fetch JWKS: {}", e), 401)
})?;
let jwks: Value = response.json().await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to parse JWKS: {}", e), 401)
})?;
let mut keys = HashMap::new();
if let Some(keys_array) = jwks["keys"].as_array() {
for key in keys_array {
if let (Some(kid), Some(kty), Some(n), Some(e)) = (
key["kid"].as_str(),
key["kty"].as_str(),
key["n"].as_str(),
key["e"].as_str(),
) {
if kty == "RSA" {
if let Ok(decoding_key) = DecodingKey::from_rsa_components(n, e) {
keys.insert(kid.to_string(), decoding_key);
}
}
}
}
}
if keys.is_empty() {
return Err(AuthorizationError::with_status("No valid keys found in JWKS", 401));
}
Ok(keys)
}
pub fn validate_jwt_token(&self, token: &str) -> Result<AuthInfo, AuthorizationError> {
let header = decode_header(token).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token header: {}", e), 401)
})?;
let kid = header.kid.ok_or_else(|| {
AuthorizationError::with_status("Token missing kid claim", 401)
})?;
let key = self.jwks.get(&kid).ok_or_else(|| {
AuthorizationError::with_status("Unknown key ID", 401)
})?;
let mut validation = Validation::new(Algorithm::RS256);
validation.set_issuer(&[ISSUER]);
validation.validate_aud = false; // 我们会手动验证受众 (Audience)
let token_data = decode::<Value>(token, key, &validation).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token: {}", e), 401)
})?;
let claims = token_data.claims;
self.verify_payload(&claims)?;
Ok(self.create_auth_info(claims))
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 在此根据权限 (Permission) 模型实现你的验证逻辑
// 具体内容将在下方权限 (Permission) 模型部分展示
Ok(())
}
fn create_auth_info(&self, claims: Value) -> AuthInfo {
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').map(|s| s.to_string()).collect())
.unwrap_or_default();
let audience = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect(),
Value::String(s) => vec![s.clone()],
_ => vec![],
};
AuthInfo::new(
claims["sub"].as_str().unwrap_or_default().to_string(),
claims["client_id"].as_str().map(|s| s.to_string()),
claims["organization_id"].as_str().map(|s| s.to_string()),
scopes,
audience,
)
}
}
然后,实现中间件以验证访问令牌 (Access token):
- Axum
- Actix Web
- Rocket
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use axum::{
extract::Request,
http::{HeaderMap, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
Extension, Json,
};
use serde_json::json;
use std::sync::Arc;
pub async fn jwt_middleware(
Extension(validator): Extension<Arc<JwtValidator>>,
headers: HeaderMap,
mut request: Request,
next: Next,
) -> Result<Response, AuthorizationError> {
let authorization = headers
.get("authorization")
.and_then(|h| h.to_str().ok());
let token = extract_bearer_token(authorization)?;
let auth_info = validator.validate_jwt_token(token)?;
// 将认证信息存储到请求扩展中,便于通用使用
request.extensions_mut().insert(auth_info);
Ok(next.run(request).await)
}
impl IntoResponse for AuthorizationError {
fn into_response(self) -> Response {
let status = StatusCode::from_u16(self.status_code).unwrap_or(StatusCode::FORBIDDEN);
(status, Json(json!({ "error": self.message }))).into_response()
}
}
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use actix_web::{
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
web, Error, HttpMessage, HttpResponse,
};
use futures::future::{ok, Ready};
use std::sync::Arc;
pub struct JwtMiddleware {
validator: Arc<JwtValidator>,
}
impl JwtMiddleware {
pub fn new(validator: Arc<JwtValidator>) -> Self {
Self { validator }
}
}
impl<S, B> Transform<S, ServiceRequest> for JwtMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = JwtMiddlewareService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(JwtMiddlewareService {
service,
validator: self.validator.clone(),
})
}
}
pub struct JwtMiddlewareService<S> {
service: S,
validator: Arc<JwtValidator>,
}
impl<S, B> Service<ServiceRequest> for JwtMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = futures::future::LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
let validator = self.validator.clone();
Box::pin(async move {
let authorization = req
.headers()
.get("authorization")
.and_then(|h| h.to_str().ok());
match extract_bearer_token(authorization)
.and_then(|token| validator.validate_jwt_token(token))
{
Ok(auth_info) => {
// 将认证信息存储到请求扩展中,便于通用使用
req.extensions_mut().insert(auth_info);
let fut = self.service.call(req);
fut.await
}
Err(e) => {
let response = HttpResponse::build(
actix_web::http::StatusCode::from_u16(e.status_code)
.unwrap_or(actix_web::http::StatusCode::FORBIDDEN),
)
.json(serde_json::json!({ "error": e.message }));
Ok(req.into_response(response))
}
}
})
}
}
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use rocket::{
http::Status,
outcome::Outcome,
request::{self, FromRequest, Request},
State,
};
#[rocket::async_trait]
impl<'r> FromRequest<'r> for AuthInfo {
type Error = AuthorizationError;
async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
let validator = match req.guard::<&State<JwtValidator>>().await {
Outcome::Success(validator) => validator,
Outcome::Failure((status, _)) => {
return Outcome::Failure((
status,
AuthorizationError::with_status("JWT validator not found", 500),
))
}
Outcome::Forward(()) => {
return Outcome::Forward(())
}
};
let authorization = req.headers().get_one("authorization");
match extract_bearer_token(authorization)
.and_then(|token| validator.validate_jwt_token(token))
{
Ok(auth_info) => Outcome::Success(auth_info),
Err(e) => {
let status = Status::from_code(e.status_code).unwrap_or(Status::Forbidden);
Outcome::Failure((status, e))
}
}
}
}
根据你的权限 (Permission) 模型,在 JwtValidator
中实现相应的验证逻辑:
- 全局 API 资源
- 组织 (非 API) 权限 (Permissions)
- 组织级 API 资源
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
if !audiences.contains(&"https://your-api-resource-indicator") {
return Err(AuthorizationError::new("Invalid audience"));
}
// 检查全局 API 资源所需的权限 (Scopes)
let required_scopes = vec!["api:read", "api:write"]; // 替换为你实际需要的权限 (Scopes)
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient scope"));
}
}
Ok(())
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 检查受众 (Audience) 声明是否匹配组织格式
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
let has_org_audience = audiences.iter().any(|aud| aud.starts_with("urn:logto:organization:"));
if !has_org_audience {
return Err(AuthorizationError::new("Invalid audience for organization permissions"));
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
let expected_org_id = "your-organization-id"; // 从请求上下文中提取
let expected_aud = format!("urn:logto:organization:{}", expected_org_id);
if !audiences.contains(&expected_aud.as_str()) {
return Err(AuthorizationError::new("Organization ID mismatch"));
}
// 检查所需的组织权限 (Scopes)
let required_scopes = vec!["invite:users", "manage:settings"]; // 替换为你实际需要的权限 (Scopes)
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient organization scope"));
}
}
Ok(())
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 检查受众 (Audience) 声明是否匹配你的 API 资源指示器
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
if !audiences.contains(&"https://your-api-resource-indicator") {
return Err(AuthorizationError::new("Invalid audience for organization-level API resources"));
}
// 检查组织 ID 是否与上下文匹配(你可能需要从请求上下文中提取)
let expected_org_id = "your-organization-id"; // 从请求上下文中提取
let org_id = claims["organization_id"].as_str().unwrap_or_default();
if expected_org_id != org_id {
return Err(AuthorizationError::new("Organization ID mismatch"));
}
// 检查组织级 API 资源所需的权限 (Scopes)
let required_scopes = vec!["api:read", "api:write"]; // 替换为你实际需要的权限 (Scopes)
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient organization-level API scopes"));
}
}
Ok(())
}
步骤 4:为你的 API 应用中间件
将中间件应用到你需要保护的 API 路由。
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
- Express.js
- Koa.js
- Fastify
- Hapi.js
- NestJS
import express from 'express';
import { verifyAccessToken } from './auth-middleware.js';
const app = express();
app.get('/api/protected', verifyAccessToken, (req, res) => {
// 直接从 req.auth 获取认证 (Authentication) 信息
res.json({ auth: req.auth });
});
app.get('/api/protected/detailed', verifyAccessToken, (req, res) => {
// 你的受保护接口逻辑
res.json({
auth: req.auth,
message: '受保护数据访问成功',
});
});
app.listen(3000);
import Koa from 'koa';
import Router from '@koa/router';
import { koaVerifyAccessToken } from './auth-middleware.js';
const app = new Koa();
const router = new Router();
router.get('/api/protected', koaVerifyAccessToken, (ctx) => {
// 直接从 ctx.state.auth 获取认证 (Authentication) 信息
ctx.body = { auth: ctx.state.auth };
});
router.get('/api/protected/detailed', koaVerifyAccessToken, (ctx) => {
// 你的受保护接口逻辑
ctx.body = {
auth: ctx.state.auth,
message: '受保护数据访问成功',
};
});
app.use(router.routes());
app.listen(3000);
import Fastify from 'fastify';
import { fastifyVerifyAccessToken } from './auth-middleware.js';
const fastify = Fastify();
fastify.get('/api/protected', { preHandler: fastifyVerifyAccessToken }, (request, reply) => {
// 直接从 request.auth 获取认证 (Authentication) 信息
reply.send({ auth: request.auth });
});
fastify.get(
'/api/protected/detailed',
{ preHandler: fastifyVerifyAccessToken },
(request, reply) => {
// 你的受保护接口逻辑
reply.send({
auth: request.auth,
message: '受保护数据访问成功',
});
}
);
fastify.listen({ port: 3000 });
import Hapi from '@hapi/hapi';
import { hapiVerifyAccessToken } from './auth-middleware.js';
const server = Hapi.server({ port: 3000 });
server.route({
method: 'GET',
path: '/api/protected',
options: {
pre: [{ method: hapiVerifyAccessToken }],
handler: (request, h) => {
// 从 request.app.auth 获取认证 (Authentication) 信息
return { auth: request.app.auth };
},
},
});
server.route({
method: 'GET',
path: '/api/protected/detailed',
options: {
pre: [{ method: hapiVerifyAccessToken }],
handler: (request, h) => {
// 你的受保护接口逻辑
return {
auth: request.app.auth,
message: '受保护数据访问成功',
};
},
},
});
await server.start();
import { Controller, Get, UseGuards, Req } from '@nestjs/common';
import { AccessTokenGuard } from './access-token.guard.js';
@Controller('api')
export class ProtectedController {
@Get('protected')
@UseGuards(AccessTokenGuard)
getProtected(@Req() req: any) {
// 从 req.auth 获取认证 (Authentication) 信息
return { auth: req.auth };
}
@Get('protected/detailed')
@UseGuards(AccessTokenGuard)
getDetailedProtected(@Req() req: any) {
// 你的受保护接口逻辑
return {
auth: req.auth,
message: '受保护数据访问成功',
};
}
}
- FastAPI
- Flask
- Django
- Django REST Framework
from fastapi import FastAPI, Depends
from auth_middleware import verify_access_token, AuthInfo
app = FastAPI()
@app.get("/api/protected")
async def protected_endpoint(auth: AuthInfo = Depends(verify_access_token)):
# 直接从 auth 参数获取认证信息
return {"auth": auth.to_dict()}
@app.get("/api/protected/detailed")
async def detailed_endpoint(auth: AuthInfo = Depends(verify_access_token)):
# 你的受保护接口逻辑
return {
"auth": auth.to_dict(),
"message": "受保护数据访问成功"
}
from flask import Flask, g, jsonify
from auth_middleware import verify_access_token
app = Flask(__name__)
@app.route('/api/protected', methods=['GET'])
@verify_access_token
def protected_endpoint():
# 从 g.auth 获取认证信息
return jsonify({"auth": g.auth.to_dict()})
@app.route('/api/protected/detailed', methods=['GET'])
@verify_access_token
def detailed_endpoint():
# 你的受保护接口逻辑
return jsonify({
"auth": g.auth.to_dict(),
"message": "受保护数据访问成功"
})
from django.http import JsonResponse
from auth_middleware import require_access_token
@require_access_token
def protected_view(request):
# 从 request.auth 获取认证信息
return JsonResponse({"auth": request.auth.to_dict()})
@require_access_token
def detailed_view(request):
# 你的受保护接口逻辑
return JsonResponse({
"auth": request.auth.to_dict(),
"message": "受保护数据访问成功"
})
from rest_framework.decorators import api_view, authentication_classes
from rest_framework.response import Response
from auth_middleware import AccessTokenAuthentication
@api_view(['GET'])
@authentication_classes([AccessTokenAuthentication])
def protected_view(request):
# 从 request.user.auth 获取认证信息
return Response({"auth": request.user.auth.to_dict()})
@api_view(['GET'])
@authentication_classes([AccessTokenAuthentication])
def detailed_view(request):
# 你的受保护接口逻辑
return Response({
"auth": request.user.auth.to_dict(),
"message": "受保护数据访问成功"
})
或者使用基于类的视图:
from rest_framework.views import APIView
from rest_framework.response import Response
from auth_middleware import AccessTokenAuthentication
class ProtectedView(APIView):
authentication_classes = [AccessTokenAuthentication]
def get(self, request):
# 从 request.user.auth 获取认证信息
return Response({"auth": request.user.auth.to_dict()})
class DetailedView(APIView):
authentication_classes = [AccessTokenAuthentication]
def get(self, request):
# 你的受保护接口逻辑
return Response({
"auth": request.user.auth.to_dict(),
"message": "受保护数据访问成功"
})
- Gin
- Echo
- Fiber
- Chi
package main
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := gin.Default()
// 对受保护路由应用中间件
r.GET("/api/protected", VerifyAccessToken(), func(c *gin.Context) {
// 直接从 context 获取访问令牌 (Access token) 信息
tokenInterface, exists := c.Get("auth")
if !exists {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Token not found"})
return
}
token := tokenInterface.(jwt.Token)
c.JSON(http.StatusOK, gin.H{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
r.Run(":8080")
}
package main
import (
"net/http"
"github.com/labstack/echo/v4"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
e := echo.New()
// 对受保护路由应用中间件
e.GET("/api/protected", func(c echo.Context) error {
// 直接从 context 获取访问令牌 (Access token) 信息
tokenInterface := c.Get("auth")
if tokenInterface == nil {
return c.JSON(http.StatusInternalServerError, echo.Map{"error": "Token not found"})
}
token := tokenInterface.(jwt.Token)
return c.JSON(http.StatusOK, echo.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
}, VerifyAccessToken)
e.Start(":8080")
}
或者使用路由分组:
package main
import (
"github.com/labstack/echo/v4"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
e := echo.New()
// 创建受保护的路由分组
api := e.Group("/api", VerifyAccessToken)
api.GET("/protected", func(c echo.Context) error {
// 直接从 context 获取访问令牌 (Access token) 信息
token := c.Get("auth").(jwt.Token)
return c.JSON(200, echo.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "Protected data accessed successfully",
})
})
e.Start(":8080")
}
package main
import (
"github.com/gofiber/fiber/v2"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
app := fiber.New()
// 对受保护路由应用中间件
app.Get("/api/protected", VerifyAccessToken, func(c *fiber.Ctx) error {
// 直接从 locals 获取访问令牌 (Access token) 信息
tokenInterface := c.Locals("auth")
if tokenInterface == nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Token not found"})
}
token := tokenInterface.(jwt.Token)
return c.JSON(fiber.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
app.Listen(":8080")
}
或者使用路由分组:
package main
import (
"github.com/gofiber/fiber/v2"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
app := fiber.New()
// 创建受保护的路由分组
api := app.Group("/api", VerifyAccessToken)
api.Get("/protected", func(c *fiber.Ctx) error {
// 直接从 locals 获取访问令牌 (Access token) 信息
token := c.Locals("auth").(jwt.Token)
return c.JSON(fiber.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "Protected data accessed successfully",
})
})
app.Listen(":8080")
}
package main
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := chi.NewRouter()
// 对受保护路由应用中间件
r.With(VerifyAccessToken).Get("/api/protected", func(w http.ResponseWriter, r *http.Request) {
// 直接从 context 获取访问令牌 (Access token) 信息
tokenInterface := r.Context().Value(AuthContextKey)
if tokenInterface == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": "Token not found"})
return
}
token := tokenInterface.(jwt.Token)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
http.ListenAndServe(":8080", r)
}
或者使用路由分组:
package main
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := chi.NewRouter()
// 创建受保护的路由分组
r.Route("/api", func(r chi.Router) {
r.Use(VerifyAccessToken)
r.Get("/protected", func(w http.ResponseWriter, r *http.Request) {
// 直接从 context 获取访问令牌 (Access token) 信息
token := r.Context().Value(AuthContextKey).(jwt.Token)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "Protected data accessed successfully",
})
})
})
http.ListenAndServe(":8080", r)
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RestController
public class ProtectedController {
@GetMapping("/api/protected")
public Map<String, Object> protectedEndpoint(@AuthenticationPrincipal Jwt jwt) {
// 直接从 JWT 获取访问令牌 (Access token) 信息
String scopes = jwt.getClaimAsString("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.getClaimAsString("client_id"),
"organization_id", jwt.getClaimAsString("organization_id"),
"scopes", scopeList,
"audience", jwt.getAudience()
);
}
@GetMapping("/api/protected/detailed")
public Map<String, Object> detailedEndpoint(@AuthenticationPrincipal Jwt jwt) {
// 你的受保护接口逻辑
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.getClaimAsString("client_id"),
"organization_id", jwt.getClaimAsString("organization_id"),
"scopes", jwt.getClaimAsString("scope"),
"audience", jwt.getAudience(),
"message", "受保护数据访问成功"
);
}
}
import org.eclipse.microprofile.jwt.JsonWebToken;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.container.ContainerRequestContext;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Path("/api")
public class ProtectedResource {
@Inject
JsonWebToken jwt;
@GET
@Path("/protected")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> protectedEndpoint(@Context ContainerRequestContext requestContext) {
// 直接通过注入或上下文获取 JWT
JsonWebToken token = (JsonWebToken) requestContext.getProperty("auth");
if (token == null) {
token = jwt; // 回退到注入的 JWT
}
String scopes = token.getClaim("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", token.getSubject(),
"client_id", token.<String>getClaim("client_id"),
"organization_id", token.<String>getClaim("organization_id"),
"scopes", scopeList,
"audience", token.getAudience()
);
}
@GET
@Path("/protected/detailed")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> detailedEndpoint() {
// 你的受保护接口逻辑
String scopes = jwt.getClaim("scope");
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.<String>getClaim("client_id"),
"organization_id", jwt.<String>getClaim("organization_id"),
"scopes", scopes,
"audience", jwt.getAudience(),
"message", "受保护数据访问成功"
);
}
}
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.security.annotation.Secured;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.rules.SecurityRule;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Controller("/api")
@Secured(SecurityRule.IS_AUTHENTICATED)
public class ProtectedController {
@Get("/protected")
public Map<String, Object> protectedEndpoint(Authentication authentication) {
// 直接从 Authentication 获取访问令牌 (Access token) 信息
String scopes = (String) authentication.getAttributes().get("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", authentication.getName(),
"client_id", authentication.getAttributes().get("client_id"),
"organization_id", authentication.getAttributes().get("organization_id"),
"scopes", scopeList,
"audience", authentication.getAttributes().get("aud")
);
}
@Get("/protected/detailed")
public Map<String, Object> detailedEndpoint(Authentication authentication) {
// 你的受保护接口逻辑
return Map.of(
"sub", authentication.getName(),
"client_id", authentication.getAttributes().get("client_id"),
"organization_id", authentication.getAttributes().get("organization_id"),
"scopes", authentication.getAttributes().get("scope"),
"audience", authentication.getAttributes().get("aud"),
"message", "受保护数据访问成功"
);
}
}
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);
// 为受保护路由应用中间件
router.route("/api/protected*").handler(new JwtAuthHandler(vertx));
router.get("/api/protected").handler(this::protectedEndpoint);
router.get("/api/protected/detailed").handler(this::detailedEndpoint);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080, result -> {
if (result.succeeded()) {
startPromise.complete();
} else {
startPromise.fail(result.cause());
}
});
}
private void protectedEndpoint(RoutingContext context) {
// 直接从 context 获取 JWT principal
JsonObject principal = context.get("auth");
if (principal == null) {
context.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"未找到 JWT principal\"}");
return;
}
String scopes = principal.getString("scope");
JsonObject response = new JsonObject()
.put("sub", principal.getString("sub"))
.put("client_id", principal.getString("client_id"))
.put("organization_id", principal.getString("organization_id"))
.put("scopes", scopes != null ? scopes.split(" ") : new String[0])
.put("audience", principal.getJsonArray("aud"));
context.response()
.putHeader("Content-Type", "application/json")
.end(response.encode());
}
private void detailedEndpoint(RoutingContext context) {
// 你的受保护接口逻辑
JsonObject principal = context.get("auth");
JsonObject response = new JsonObject()
.put("sub", principal.getString("sub"))
.put("client_id", principal.getString("client_id"))
.put("organization_id", principal.getString("organization_id"))
.put("scopes", principal.getString("scope"))
.put("audience", principal.getJsonArray("aud"))
.put("message", "受保护数据访问成功");
context.response()
.putHeader("Content-Type", "application/json")
.end(response.encode());
}
}
我们已经在前面的章节中设置好了认证 (Authentication) 和授权 (Authorization) 中间件。现在,我们可以创建一个受保护的控制器,用于验证访问令牌 (Access token) 并从已认证 (Authentication) 的请求中提取声明 (Claims)。
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Security.Claims;
namespace YourApiNamespace.Controllers
{
[ApiController]
[Route("api/[controller]")]
[Authorize] // 要求此控制器中的所有操作都需认证 (Authentication)
public class ProtectedController : ControllerBase
{
[HttpGet]
public IActionResult GetProtectedData()
{
// 直接从 User 的声明 (Claims) 中获取访问令牌 (Access token) 信息
var sub = User.FindFirst(ClaimTypes.NameIdentifier)?.Value ?? User.FindFirst("sub")?.Value;
var clientId = User.FindFirst("client_id")?.Value;
var organizationId = User.FindFirst("organization_id")?.Value;
var scopes = User.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
var audience = User.FindAll("aud").Select(c => c.Value).ToArray();
return Ok(new {
sub,
client_id = clientId,
organization_id = organizationId,
scopes,
audience
});
}
[HttpGet("claims")]
public IActionResult GetAllClaims()
{
// 返回所有声明 (Claims),用于调试 / 检查
var claims = User.Claims.Select(c => new { c.Type, c.Value }).ToList();
return Ok(new { claims });
}
}
}
- Laravel
- Symfony
- Slim
<?php
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Route;
Route::middleware('auth.token')->group(function () {
Route::get('/api/protected', function (Request $request) {
// 从请求属性中获取认证 (Authentication) 信息
$auth = $request->attributes->get('auth');
return ['auth' => $auth->toArray()];
});
Route::get('/api/protected/detailed', function (Request $request) {
// 你的受保护接口逻辑
$auth = $request->attributes->get('auth');
return [
'auth' => $auth->toArray(),
'message' => '受保护数据访问成功'
];
});
});
或者使用控制器:
<?php
namespace App\Http\Controllers\Api;
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
class ProtectedController extends Controller
{
public function __construct()
{
$this->middleware('auth.token');
}
public function index(Request $request)
{
// 从请求属性中获取认证 (Authentication) 信息
$auth = $request->attributes->get('auth');
return ['auth' => $auth->toArray()];
}
public function show(Request $request)
{
// 你的受保护接口逻辑
$auth = $request->attributes->get('auth');
return [
'auth' => $auth->toArray(),
'message' => '受保护数据访问成功'
];
}
}
<?php
namespace App\Controller\Api;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Routing\Annotation\Route;
use Symfony\Component\Security\Http\Attribute\IsGranted;
#[Route('/api/protected')]
#[IsGranted('IS_AUTHENTICATED_FULLY')]
class ProtectedController extends AbstractController
{
#[Route('', methods: ['GET'])]
public function index(Request $request): JsonResponse
{
// 从请求属性中获取认证 (Authentication) 信息
$auth = $request->attributes->get('auth');
return $this->json(['auth' => $auth->toArray()]);
}
#[Route('/detailed', methods: ['GET'])]
public function detailed(Request $request): JsonResponse
{
// 你的受保护接口逻辑
$auth = $request->attributes->get('auth');
return $this->json([
'auth' => $auth->toArray(),
'message' => '受保护数据访问成功'
]);
}
}
<?php
use App\Middleware\JwtMiddleware;
use Psr\Http\Message\ResponseInterface as Response;
use Psr\Http\Message\ServerRequestInterface as Request;
use Slim\Factory\AppFactory;
require __DIR__ . '/../vendor/autoload.php';
$app = AppFactory::create();
// 为受保护路由应用中间件
$app->group('/api/protected', function ($group) {
$group->get('', function (Request $request, Response $response) {
// 从请求属性中获取认证 (Authentication) 信息
$auth = $request->getAttribute('auth');
$response->getBody()->write(json_encode(['auth' => $auth->toArray()]));
return $response->withHeader('Content-Type', 'application/json');
});
$group->get('/detailed', function (Request $request, Response $response) {
// 你的受保护接口逻辑
$auth = $request->getAttribute('auth');
$data = [
'auth' => $auth->toArray(),
'message' => '受保护数据访问成功'
];
$response->getBody()->write(json_encode($data));
return $response->withHeader('Content-Type', 'application/json');
});
})->add(new JwtMiddleware());
// 公共接口(不受保护)
$app->get('/', function (Request $request, Response $response) {
$response->getBody()->write(json_encode(['message' => '公共接口']));
return $response->withHeader('Content-Type', 'application/json');
});
$app->run();
或者使用更结构化的方式:
<?php
namespace App\Controllers;
use Psr\Http\Message\ResponseInterface as Response;
use Psr\Http\Message\ServerRequestInterface as Request;
class ProtectedController
{
public function index(Request $request, Response $response): Response
{
// 从请求属性中获取认证 (Authentication) 信息
$auth = $request->getAttribute('auth');
$response->getBody()->write(json_encode(['auth' => $auth->toArray()]));
return $response->withHeader('Content-Type', 'application/json');
}
public function detailed(Request $request, Response $response): Response
{
// 你的受保护接口逻辑
$auth = $request->getAttribute('auth');
$data = [
'auth' => $auth->toArray(),
'message' => '受保护数据访问成功'
];
$response->getBody()->write(json_encode($data));
return $response->withHeader('Content-Type', 'application/json');
}
}
- Rails
- Sinatra
- Grape
class ApplicationController < ActionController::API # 仅 API 应用
# class ApplicationController < ActionController::Base # 完整 Rails 应用
include JwtAuthentication
end
class Api::ProtectedController < ApplicationController
before_action :verify_access_token
def index
# 从 @auth 获取认证 (Authentication) 信息
render json: { auth: @auth.to_h }
end
def show
# 你的受保护接口逻辑
render json: {
auth: @auth.to_h,
message: "受保护数据访问成功"
}
end
end
Rails.application.routes.draw do
namespace :api do
resources :protected, only: [:index, :show]
end
end
require 'sinatra'
require 'json'
require_relative 'auth_middleware'
require_relative 'auth_constants'
require_relative 'auth_info'
require_relative 'authorization_error'
require_relative 'auth_helpers'
require_relative 'jwt_validator'
# 应用中间件
use AuthMiddleware
get '/api/protected' do
content_type :json
# 从 env 获取认证 (Authentication) 信息
auth = env['auth']
{ auth: auth.to_h }.to_json
end
get '/api/protected/:id' do
content_type :json
# 你的受保护接口逻辑
auth = env['auth']
{
auth: auth.to_h,
id: params[:id],
message: "受保护数据访问成功"
}.to_json
end
# 公共接口(未被中间件保护)
get '/' do
content_type :json
{ message: "公共接口" }.to_json
end
require 'grape'
require_relative 'auth_helpers'
require_relative 'auth_constants'
require_relative 'auth_info'
require_relative 'authorization_error'
require_relative 'jwt_validator'
class API < Grape::API
format :json
helpers GrapeAuthHelpers
namespace :api do
namespace :protected do
before do
authenticate_user!
end
get do
# 从认证 (Authentication) helper 获取认证信息
{ auth: auth.to_h }
end
get ':id' do
# 你的受保护接口逻辑
{
auth: auth.to_h,
id: params[:id],
message: "受保护数据访问成功"
}
end
end
end
# 公共接口(未保护)
get :public do
{ message: "公共接口" }
end
end
require_relative 'api'
run API
- Axum
- Actix Web
- Rocket
use axum::{
extract::Extension,
http::StatusCode,
middleware,
response::Json,
routing::get,
Router,
};
use serde_json::{json, Value};
use std::sync::Arc;
use tower_http::cors::CorsLayer;
mod lib;
mod jwt_validator;
mod middleware as jwt_middleware;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
#[tokio::main]
async fn main() {
let validator = Arc::new(JwtValidator::new().await.expect("Failed to initialize JWT validator"));
let app = Router::new()
.route("/api/protected", get(protected_handler))
.route("/api/protected/detailed", get(detailed_handler))
.layer(middleware::from_fn(jwt_middleware::jwt_middleware))
.layer(Extension(validator))
.layer(CorsLayer::permissive());
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn protected_handler(Extension(auth): Extension<AuthInfo>) -> Json<Value> {
// 直接从 Extension 获取认证信息
Json(json!({ "auth": auth }))
}
async fn detailed_handler(Extension(auth): Extension<AuthInfo>) -> Json<Value> {
// 你的受保护端点逻辑
Json(json!({
"auth": auth,
"message": "受保护数据访问成功"
}))
}
use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Result};
use serde_json::{json, Value};
use std::sync::Arc;
mod lib;
mod jwt_validator;
mod middleware as jwt_middleware;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
use jwt_middleware::JwtMiddleware;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let validator = Arc::new(JwtValidator::new().await.expect("Failed to initialize JWT validator"));
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(validator.clone()))
.wrap(Logger::default())
.service(
web::scope("/api/protected")
.wrap(JwtMiddleware::new(validator.clone()))
.route("", web::get().to(protected_handler))
.route("/detailed", web::get().to(detailed_handler))
)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
async fn protected_handler(req: HttpRequest) -> Result<web::Json<Value>> {
// 从请求扩展中获取认证信息
let auth = req.extensions().get::<AuthInfo>().unwrap();
Ok(web::Json(json!({ "auth": auth })))
}
async fn detailed_handler(req: HttpRequest) -> Result<web::Json<Value>> {
// 你的受保护端点逻辑
let auth = req.extensions().get::<AuthInfo>().unwrap();
Ok(web::Json(json!({
"auth": auth,
"message": "受保护数据访问成功"
})))
}
use rocket::{get, launch, routes, serde::json::Json, State};
use serde_json::{json, Value};
mod lib;
mod jwt_validator;
mod guards;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
#[get("/api/protected")]
fn protected_handler(auth: AuthInfo) -> Json<Value> {
// 直接从请求守卫获取认证信息
Json(json!({ "auth": auth }))
}
#[get("/api/protected/detailed")]
fn detailed_handler(auth: AuthInfo) -> Json<Value> {
// 你的受保护端点逻辑
Json(json!({
"auth": auth,
"message": "受保护数据访问成功"
}))
}
#[launch]
async fn rocket() -> _ {
let validator = JwtValidator::new().await.expect("Failed to initialize JWT validator");
rocket::build()
.manage(validator)
.mount("/", routes![protected_handler, detailed_handler])
}
步骤 5:测试你的实现
使用有效和无效的令牌测试你的 API,确保:
- 有效令牌可以通过校验并获得访问权限。
- 对于无效 / 缺失令牌返回
401 Unauthorized
,对于有效但缺少所需权限或上下文的令牌返回403 Forbidden
。
相关资源
自定义令牌声明 (Claims) JSON Web Token (JWT) OpenID Connect 发现RFC 8707:资源指示器 (Resource indicator)