如何在你的 API 服務或後端驗證存取權杖 (Access token)
驗證存取權杖 (Access token) 是在 Logto 中實施 角色型存取控制 (RBAC, Role-based access control) 的關鍵步驟。本指南將帶你逐步驗證 Logto 發行的 JWT,包括檢查簽章、簽發者 (Issuer)、受眾 (Audience)、過期時間、權限 (Scopes) 以及組織 (Organization) 上下文。
開始之前
- 本指南假設你已熟悉 Logto 的 RBAC 概念。
- 如果你要保護 API 資源,請先完成 保護全域 API 資源 指南。
- 如果你要保護應用程式內功能或流程(非 API 權限),請先完成 保護組織(非 API)權限 指南。
- 如果你要保護組織層級 API 資源,請先完成 保護組織層級 API 資源 指南。
步驟 1:初始化常數與工具函式
在你的程式碼中定義必要的常數與工具函式,用於處理權杖提取與驗證。有效的請求必須包含 Authorization
標頭,格式為 Bearer <access_token>
。
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
import { IncomingHttpHeaders } from 'http';
const JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks';
const ISSUER = 'https://your-tenant.logto.app/oidc';
export class AuthInfo {
constructor(
public sub: string,
public clientId?: string,
public organizationId?: string,
public scopes: string[] = [],
public audience: string[] = []
) {}
}
export class AuthorizationError extends Error {
name = 'AuthorizationError';
constructor(
message: string,
public status = 403
) {
super(message);
}
}
export function extractBearerTokenFromHeaders({ authorization }: IncomingHttpHeaders): string {
const bearerPrefix = 'Bearer ';
if (!authorization) {
throw new AuthorizationError('Authorization header is missing', 401);
}
if (!authorization.startsWith(bearerPrefix)) {
throw new AuthorizationError(`Authorization header must start with "${bearerPrefix}"`, 401);
}
return authorization.slice(bearerPrefix.length);
}
JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks'
ISSUER = 'https://your-tenant.logto.app/oidc'
class AuthInfo:
def __init__(self, sub: str, client_id: str = None, organization_id: str = None,
scopes: list = None, audience: list = None):
self.sub = sub
self.client_id = client_id
self.organization_id = organization_id
self.scopes = scopes or []
self.audience = audience or []
def to_dict(self):
return {
'sub': self.sub,
'client_id': self.client_id,
'organization_id': self.organization_id,
'scopes': self.scopes,
'audience': self.audience
}
class AuthorizationError(Exception):
def __init__(self, message: str, status: int = 403):
self.message = message
self.status = status
super().__init__(self.message)
def extract_bearer_token_from_headers(headers: dict) -> str:
"""
從 HTTP 標頭中擷取 bearer 權杖 (token)。
注意:FastAPI 與 Django REST Framework 已內建權杖擷取功能,
此函式主要用於 Flask 及其他框架。
"""
authorization = headers.get('authorization') or headers.get('Authorization')
if not authorization:
raise AuthorizationError('缺少 Authorization 標頭', 401)
if not authorization.startswith('Bearer '):
raise AuthorizationError('Authorization 標頭必須以 "Bearer " 開頭', 401)
return authorization[7:] # 移除 'Bearer ' 前綴
package main
import (
"fmt"
"net/http"
"strings"
)
const (
JWKS_URI = "https://your-tenant.logto.app/oidc/jwks"
ISSUER = "https://your-tenant.logto.app/oidc"
)
type 授權錯誤 (AuthorizationError) struct {
Message string
Status int
}
func (e *授權錯誤 (AuthorizationError)) Error() string {
return e.Message
}
func 新授權錯誤 (NewAuthorizationError)(message string, status ...int) *授權錯誤 (AuthorizationError) {
statusCode := http.StatusForbidden // 預設為 403 Forbidden
if len(status) > 0 {
statusCode = status[0]
}
return &授權錯誤 (AuthorizationError){
Message: message,
Status: statusCode,
}
}
func 從標頭擷取 Bearer 權杖 (extractBearerTokenFromHeaders)(r *http.Request) (string, error) {
const bearerPrefix = "Bearer "
authorization := r.Header.Get("Authorization")
if authorization == "" {
return "", 新授權錯誤 (NewAuthorizationError)("Authorization 標頭遺失", http.StatusUnauthorized)
}
if !strings.HasPrefix(authorization, bearerPrefix) {
return "", 新授權錯誤 (NewAuthorizationError)(fmt.Sprintf("Authorization 標頭必須以 \"%s\" 開頭", bearerPrefix), http.StatusUnauthorized)
}
return strings.TrimPrefix(authorization, bearerPrefix), nil
}
public final class AuthConstants {
public static final String JWKS_URI = "https://your-tenant.logto.app/oidc/jwks";
public static final String ISSUER = "https://your-tenant.logto.app/oidc";
private AuthConstants() {
// 防止實例化
}
}
public class AuthorizationException extends RuntimeException {
private final int statusCode;
public AuthorizationException(String message) {
this(message, 403); // 預設為 403 禁止存取
}
public AuthorizationException(String message, int statusCode) {
super(message);
this.statusCode = statusCode;
}
public int getStatusCode() {
return statusCode;
}
}
namespace YourApiNamespace
{
public static class AuthConstants
{
public const string Issuer = "https://your-tenant.logto.app/oidc";
}
}
namespace YourApiNamespace.Exceptions
{
public class 授權 (Authorization)Exception : Exception
{
public int StatusCode { get; }
public 授權 (Authorization)Exception(string message, int statusCode = 403) : base(message)
{
StatusCode = statusCode;
}
}
}
<?php
class AuthConstants
{
public const JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks';
public const ISSUER = 'https://your-tenant.logto.app/oidc';
}
<?php
class AuthInfo
{
public function __construct(
public readonly string $sub,
public readonly ?string $clientId = null,
public readonly ?string $organizationId = null,
public readonly array $scopes = [],
public readonly array $audience = []
) {}
public function toArray(): array
{
return [
'sub' => $this->sub,
'client_id' => $this->clientId,
'organization_id' => $this->organizationId,
'scopes' => $this->scopes,
'audience' => $this->audience,
];
}
}
<?php
class AuthorizationException extends Exception
{
public function __construct(
string $message,
public readonly int $statusCode = 403
) {
parent::__construct($message);
}
}
<?php
trait AuthHelpers
{
protected function extractBearerToken(array $headers): string
{
$authorization = $headers['authorization'][0] ?? $headers['Authorization'][0] ?? null;
if (!$authorization) {
throw new AuthorizationException('Authorization 標頭缺失 (Authorization header is missing)', 401);
}
if (!str_starts_with($authorization, 'Bearer ')) {
throw new AuthorizationException('Authorization 標頭必須以 "Bearer " 開頭 (Authorization header must start with "Bearer ")', 401);
}
return substr($authorization, 7); // 移除 'Bearer ' 前綴 (Remove 'Bearer ' prefix)
}
}
module AuthConstants
JWKS_URI = 'https://your-tenant.logto.app/oidc/jwks'
ISSUER = 'https://your-tenant.logto.app/oidc'
end
class AuthInfo
attr_accessor :sub, :client_id, :organization_id, :scopes, :audience
def initialize(sub, client_id = nil, organization_id = nil, scopes = [], audience = [])
@sub = sub
@client_id = client_id
@organization_id = organization_id
@scopes = scopes
@audience = audience
end
def to_h
{
sub: @sub,
client_id: @client_id,
organization_id: @organization_id,
scopes: @scopes,
audience: @audience
}
end
end
class AuthorizationError < StandardError
attr_reader :status
def initialize(message, status = 403)
super(message)
@status = status
end
end
module AuthHelpers
def extract_bearer_token(request)
authorization = request.headers['Authorization']
raise AuthorizationError.new('Authorization header is missing', 401) unless authorization
raise AuthorizationError.new('Authorization header must start with "Bearer "', 401) unless authorization.start_with?('Bearer ')
authorization[7..-1] # 移除 'Bearer ' 前綴
end
end
use serde::{Deserialize, Serialize};
use std::fmt;
pub const JWKS_URI: &str = "https://your-tenant.logto.app/oidc/jwks";
pub const ISSUER: &str = "https://your-tenant.logto.app/oidc";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuthInfo {
pub sub: String,
pub client_id: Option<String>,
pub organization_id: Option<String>,
pub scopes: Vec<String>,
pub audience: Vec<String>,
}
impl AuthInfo {
pub fn new(
sub: String,
client_id: Option<String>,
organization_id: Option<String>,
scopes: Vec<String>,
audience: Vec<String>,
) -> Self {
Self {
sub,
client_id,
organization_id,
scopes,
audience,
}
}
}
#[derive(Debug)]
pub struct 授權錯誤 (AuthorizationError) {
pub message: String,
pub status_code: u16,
}
impl 授權錯誤 (AuthorizationError) {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
status_code: 403,
}
}
pub fn with_status(message: impl Into<String>, status_code: u16) -> Self {
Self {
message: message.into(),
status_code,
}
}
}
impl fmt::Display for 授權錯誤 (AuthorizationError) {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for 授權錯誤 (AuthorizationError) {}
pub fn extract_bearer_token(authorization: Option<&str>) -> Result<&str, 授權錯誤 (AuthorizationError)> {
let auth_header = authorization.ok_or_else(|| {
授權錯誤 (AuthorizationError)::with_status("Authorization header is missing", 401)
})?;
if !auth_header.starts_with("Bearer ") {
return Err(授權錯誤 (AuthorizationError)::with_status(
"Authorization header must start with \"Bearer \"",
401,
));
}
Ok(&auth_header[7..]) // 移除 'Bearer ' 前綴
}
步驟 2:取得你的 Logto 租戶資訊
你需要以下資訊來驗證 Logto 發行的權杖:
- JSON Web Key Set (JWKS) URI:Logto 公鑰的網址,用於驗證 JWT 簽章。
- 簽發者 (Issuer):預期的簽發者值(Logto 的 OIDC URL)。
首先,找到你的 Logto 租戶端點。你可以在以下位置找到:
- 在 Logto Console,設定 → 網域。
- 在任何你於 Logto 設定過的應用程式設定中,設定 → 端點與憑證。
從 OpenID Connect 探索端點取得
這些值可以從 Logto 的 OpenID Connect 探索端點取得:
https://<your-logto-endpoint>/oidc/.well-known/openid-configuration
以下為範例回應(省略其他欄位):
{
"jwks_uri": "https://your-tenant.logto.app/oidc/jwks",
"issuer": "https://your-tenant.logto.app/oidc"
}
在程式碼中硬編碼(不建議)
由於 Logto 不允許自訂 JWKS URI 或簽發者 (Issuer),你可以直接在程式碼中硬編碼這些值。但不建議在正式環境這麼做,因為未來若有設定變動會增加維護成本。
- JWKS URI:
https://<your-logto-endpoint>/oidc/jwks
- 簽發者 (Issuer):
https://<your-logto-endpoint>/oidc
步驟 3:驗證權杖與權限
提取權杖並取得 OIDC 設定後,請驗證下列項目:
- 簽章 (Signature): JWT 必須有效且由 Logto(透過 JWKS)簽署。
- 簽發者 (Issuer): 必須與你的 Logto 租戶簽發者一致。
- 受眾 (Audience): 必須與在 Logto 註冊的 API 資源標示符 (Resource indicator) 或組織上下文相符(如適用)。
- 過期時間 (Expiration): 權杖不得過期。
- 權限 (Scopes): 權杖必須包含你 API / 行為所需的權限範圍。Scopes 會以空格分隔字串出現在
scope
宣告 (Claim)。 - 組織上下文 (Organization context): 若保護的是組織層級 API 資源,需驗證
organization_id
宣告 (Claim)。
想了解更多 JWT 結構與宣告,請參閱 JSON Web Token。
各權限模型需檢查的項目
不同權限模型下,宣告與驗證規則如下:
權限模型 | 受眾宣告 (aud ) | 組織宣告 (organization_id ) | 權限(需檢查的 scopes) (scope ) |
---|---|---|---|
全域 API 資源 | API 資源標示符 (Resource indicator) | 無 | API 資源權限 |
組織(非 API)權限 | urn:logto:organization:<id> (組織上下文於 aud 宣告) | 無 | 組織權限 |
組織層級 API 資源 | API 資源標示符 (Resource indicator) | 組織 ID(必須與請求相符) | API 資源權限 |
對於非 API 組織權限,組織上下文以 aud
宣告表示 (例如
urn:logto:organization:abc123
)。organization_id
宣告僅存在於組織層級 API 資源權杖中。
務必同時驗證權限(scopes)與上下文(受眾、組織)以確保多租戶 API 的安全性。
新增驗證邏輯
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
本範例中我們使用 jose 來驗證 JWT。如尚未安裝,請先安裝:
npm install jose
或使用你偏好的套件管理工具(例如 pnpm
或 yarn
)。
首先,新增這些共用工具來處理 JWT 驗證:
import { createRemoteJWKSet, jwtVerify, JWTPayload } from 'jose';
import { AuthInfo, AuthorizationError } from './auth-middleware.js';
const jwks = createRemoteJWKSet(new URL(JWKS_URI));
export async function validateJwtToken(token: string): Promise<JWTPayload> {
const { payload } = await jwtVerify(token, jwks, {
issuer: ISSUER,
});
verifyPayload(payload);
return payload;
}
export function createAuthInfo(payload: JWTPayload): AuthInfo {
const scopes = (payload.scope as string)?.split(' ') ?? [];
const audience = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
return new AuthInfo(
payload.sub!,
payload.client_id as string,
payload.organization_id as string,
scopes,
audience
);
}
function verifyPayload(payload: JWTPayload): void {
// 根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型章節展示
}
接著,實作中介軟體來驗證存取權杖 (Access token):
- Express.js
- Koa.js
- Fastify
- Hapi.js
- NestJS
import { Request, Response, NextFunction } from 'express';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
// 擴充 Express Request 介面以包含 auth
declare global {
namespace Express {
interface Request {
auth?: AuthInfo;
}
}
}
export async function verifyAccessToken(req: Request, res: Response, next: NextFunction) {
try {
const token = extractBearerTokenFromHeaders(req.headers);
const payload = await validateJwtToken(token);
// 將驗證資訊存入 request 以便通用使用
req.auth = createAuthInfo(payload);
next();
} catch (err: any) {
return res.status(err.status ?? 401).json({ error: err.message });
}
}
import { Context, Next } from 'koa';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
export async function koaVerifyAccessToken(ctx: Context, next: Next) {
try {
const token = extractBearerTokenFromHeaders(ctx.request.headers);
const payload = await validateJwtToken(token);
// 將驗證資訊存入 state 以便通用使用
ctx.state.auth = createAuthInfo(payload);
await next();
} catch (err: any) {
ctx.status = err.status ?? 401;
ctx.body = { error: err.message };
}
}
import { FastifyRequest, FastifyReply } from 'fastify';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
// 擴充 Fastify Request 介面以包含 auth
declare module 'fastify' {
interface FastifyRequest {
auth?: AuthInfo;
}
}
export async function fastifyVerifyAccessToken(request: FastifyRequest, reply: FastifyReply) {
try {
const token = extractBearerTokenFromHeaders(request.headers);
const payload = await validateJwtToken(token);
// 將驗證資訊存入 request 以便通用使用
request.auth = createAuthInfo(payload);
} catch (err: any) {
reply.code(err.status ?? 401).send({ error: err.message });
}
}
import { Request, ResponseToolkit } from '@hapi/hapi';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
export async function hapiVerifyAccessToken(request: Request, h: ResponseToolkit) {
try {
const token = extractBearerTokenFromHeaders(request.headers);
const payload = await validateJwtToken(token);
// 將驗證資訊存入 request.app 以便通用使用
request.app.auth = createAuthInfo(payload);
return h.continue;
} catch (err: any) {
return h
.response({ error: err.message })
.code(err.status ?? 401)
.takeover();
}
}
import {
Injectable,
CanActivate,
ExecutionContext,
UnauthorizedException,
ForbiddenException,
} from '@nestjs/common';
import { extractBearerTokenFromHeaders, AuthorizationError } from './auth-middleware.js';
import { validateJwtToken, createAuthInfo } from './jwt-validator.js';
@Injectable()
export class AccessTokenGuard implements CanActivate {
async canActivate(context: ExecutionContext): Promise<boolean> {
const req = context.switchToHttp().getRequest();
try {
const token = extractBearerTokenFromHeaders(req.headers);
const payload = await validateJwtToken(token);
// 將驗證資訊存入 request 以便通用使用
req.auth = createAuthInfo(payload);
return true;
} catch (err: any) {
if (err.status === 401) throw new UnauthorizedException(err.message);
throw new ForbiddenException(err.message);
}
}
}
根據你的權限模型,在 jwt-validator.ts
中實作相應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
function verifyPayload(payload: JWTPayload): void {
// 檢查 audience 宣告是否符合你的 API 資源標示符
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
if (!audiences.includes('https://your-api-resource-indicator')) {
throw new AuthorizationError('Invalid audience');
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
const requiredScopes = ['api:read', 'api:write']; // 請替換為實際所需權限範圍
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient scope');
}
}
function verifyPayload(payload: JWTPayload): void {
// 檢查 audience 宣告是否符合組織格式
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
const hasOrgAudience = audiences.some((aud) => aud.startsWith('urn:logto:organization:'));
if (!hasOrgAudience) {
throw new AuthorizationError('Invalid audience for organization permissions');
}
// 檢查組織 ID 是否與情境相符(你可能需要從請求情境中取得)
const expectedOrgId = 'your-organization-id'; // 從請求情境中取得
const expectedAud = `urn:logto:organization:${expectedOrgId}`;
if (!audiences.includes(expectedAud)) {
throw new AuthorizationError('Organization ID mismatch');
}
// 檢查所需的組織權限範圍
const requiredScopes = ['invite:users', 'manage:settings']; // 請替換為實際所需權限範圍
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient organization scope');
}
}
function verifyPayload(payload: JWTPayload): void {
// 檢查 audience 宣告是否符合你的 API 資源標示符
const audiences = Array.isArray(payload.aud) ? payload.aud : payload.aud ? [payload.aud] : [];
if (!audiences.includes('https://your-api-resource-indicator')) {
throw new AuthorizationError('Invalid audience for organization-level API resources');
}
// 檢查組織 ID 是否與情境相符(你可能需要從請求情境中取得)
const expectedOrgId = 'your-organization-id'; // 從請求情境中取得
const orgId = payload.organization_id as string;
if (expectedOrgId !== orgId) {
throw new AuthorizationError('Organization ID mismatch');
}
// 檢查組織層級 API 資源所需的權限範圍
const requiredScopes = ['api:read', 'api:write']; // 請替換為實際所需權限範圍
const scopes = (payload.scope as string)?.split(' ') ?? [];
if (!requiredScopes.every((scope) => scopes.includes(scope))) {
throw new AuthorizationError('Insufficient organization-level API scopes');
}
}
我們使用 PyJWT 來驗證 JWT。如尚未安裝,請先執行:
pip install pyjwt[crypto]
首先,新增這些共用工具來處理 JWT 驗證:
import jwt
from jwt import PyJWKClient
from typing import Dict, Any
from auth_middleware import AuthInfo, AuthorizationError, JWKS_URI, ISSUER
jwks_client = PyJWKClient(JWKS_URI)
def validate_jwt_token(token: str) -> Dict[str, Any]:
"""驗證 JWT 權杖並回傳內容 (payload)"""
try:
signing_key = jwks_client.get_signing_key_from_jwt(token)
payload = jwt.decode(
token,
signing_key.key,
algorithms=['RS256'],
issuer=ISSUER,
options={'verify_aud': False} # 受眾 (Audience) 會手動驗證
)
verify_payload(payload)
return payload
except jwt.InvalidTokenError as e:
raise AuthorizationError(f'Invalid token: {str(e)}', 401)
except Exception as e:
raise AuthorizationError(f'Token validation failed: {str(e)}', 401)
def create_auth_info(payload: Dict[str, Any]) -> AuthInfo:
"""從 JWT payload 建立 AuthInfo"""
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
audience = payload.get('aud', [])
if isinstance(audience, str):
audience = [audience]
return AuthInfo(
sub=payload.get('sub'),
client_id=payload.get('client_id'),
organization_id=payload.get('organization_id'),
scopes=scopes,
audience=audience
)
def verify_payload(payload: Dict[str, Any]) -> None:
"""根據權限模型驗證 payload"""
# 請根據權限模型實作驗證邏輯
# 相關內容會在下方權限模型區段說明
pass
接著,實作中介軟體(middleware)來驗證存取權杖 (Access token):
- FastAPI
- Flask
- Django
- Django REST Framework
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jwt_validator import validate_jwt_token, create_auth_info
security = HTTPBearer()
async def verify_access_token(credentials: HTTPAuthorizationCredentials = Depends(security)) -> AuthInfo:
try:
token = credentials.credentials
payload = validate_jwt_token(token)
return create_auth_info(payload)
except AuthorizationError as e:
raise HTTPException(status_code=e.status, detail=str(e))
from functools import wraps
from flask import request, jsonify, g
from jwt_validator import validate_jwt_token, create_auth_info
def verify_access_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
try:
token = extract_bearer_token_from_headers(dict(request.headers))
payload = validate_jwt_token(token)
# 將驗證資訊存入 Flask 的 g 物件以便通用存取
g.auth = create_auth_info(payload)
return f(*args, **kwargs)
except AuthorizationError as e:
return jsonify({'error': str(e)}), e.status
return decorated_function
from django.http import JsonResponse
from jwt_validator import validate_jwt_token, create_auth_info
def require_access_token(view_func):
def wrapper(request, *args, **kwargs):
try:
headers = {key.replace('HTTP_', '').replace('_', '-').lower(): value
for key, value in request.META.items() if key.startswith('HTTP_')}
token = extract_bearer_token_from_headers(headers)
payload = validate_jwt_token(token)
# 將驗證資訊附加到 request 以便通用存取
request.auth = create_auth_info(payload)
return view_func(request, *args, **kwargs)
except AuthorizationError as e:
return JsonResponse({'error': str(e)}, status=e.status)
return wrapper
from rest_framework.authentication import TokenAuthentication
from rest_framework import exceptions
from jwt_validator import validate_jwt_token, create_auth_info
class AccessTokenAuthentication(TokenAuthentication):
keyword = 'Bearer' # 使用 'Bearer' 而非 'Token'
def authenticate_credentials(self, key):
"""
透過驗證 JWT 權杖進行驗證。
"""
try:
payload = validate_jwt_token(key)
auth_info = create_auth_info(payload)
# 建立一個類似 user 的物件,持有驗證資訊以便通用存取
user = type('User', (), {
'auth': auth_info,
'is_authenticated': True,
'is_anonymous': False,
'is_active': True,
})()
return (user, key)
except AuthorizationError as e:
if e.status == 401:
raise exceptions.AuthenticationFailed(str(e))
else: # 403
raise exceptions.PermissionDenied(str(e))
根據你的權限模型,在 jwt_validator.py
中實作對應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
def verify_payload(payload: Dict[str, Any]) -> None:
"""驗證全域 API 資源 (Global API resources) 的 payload"""
# 檢查受眾 (audience) 宣告是否符合你的 API 資源標示符
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
if 'https://your-api-resource-indicator' not in audiences:
raise AuthorizationError('Invalid audience')
# 檢查全域 API 資源所需的權限範圍 (scopes)
required_scopes = ['api:read', 'api:write'] # 請替換為實際所需權限範圍
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('Insufficient scope')
def verify_payload(payload: Dict[str, Any]) -> None:
"""驗證組織權限 (Organization permissions) 的 payload"""
# 檢查受眾 (audience) 宣告是否符合組織格式
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
has_org_audience = any(aud.startswith('urn:logto:organization:') for aud in audiences)
if not has_org_audience:
raise AuthorizationError('Invalid audience for organization permissions')
# 檢查組織 ID 是否與情境相符(你可能需要從請求內容取得)
expected_org_id = 'your-organization-id' # 從請求內容取得
expected_aud = f'urn:logto:organization:{expected_org_id}'
if expected_aud not in audiences:
raise AuthorizationError('Organization ID mismatch')
# 檢查所需的組織權限範圍
required_scopes = ['invite:users', 'manage:settings'] # 請替換為實際所需權限範圍
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('Insufficient organization scope')
def verify_payload(payload: Dict[str, Any]) -> None:
"""驗證組織層級 API 資源 (Organization-level API resources) 的 payload"""
# 檢查受眾 (audience) 宣告是否符合你的 API 資源標示符
audiences = payload.get('aud', [])
if isinstance(audiences, str):
audiences = [audiences]
if 'https://your-api-resource-indicator' not in audiences:
raise AuthorizationError('Invalid audience for organization-level API resources')
# 檢查組織 ID 是否與情境相符(你可能需要從請求內容取得)
expected_org_id = 'your-organization-id' # 從請求內容取得
org_id = payload.get('organization_id')
if expected_org_id != org_id:
raise AuthorizationError('Organization ID mismatch')
# 檢查組織層級 API 資源所需的權限範圍
required_scopes = ['api:read', 'api:write'] # 請替換為實際所需權限範圍
scopes = payload.get('scope', '').split(' ') if payload.get('scope') else []
if not all(scope in scopes for scope in required_scopes):
raise AuthorizationError('Insufficient organization-level API scopes')
我們使用 github.com/lestrrat-go/jwx 來驗證 JWT。若尚未安裝,請先安裝:
go mod init your-project
go get github.com/lestrrat-go/jwx/v3
首先,將這些共用元件加入你的 auth_middleware.go
:
import (
"context"
"strings"
"time"
"github.com/lestrrat-go/jwx/v3/jwk"
"github.com/lestrrat-go/jwx/v3/jwt"
)
var jwkSet jwk.Set
func init() {
// 初始化 JWKS 快取
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var err error
jwkSet, err = jwk.Fetch(ctx, JWKS_URI)
if err != nil {
panic("無法取得 JWKS: " + err.Error())
}
}
// validateJWTToken 驗證 JWT 權杖並回傳解析後的權杖
func validateJWTToken(tokenString string) (jwt.Token, error) {
token, err := jwt.Parse([]byte(tokenString), jwt.WithKeySet(jwkSet))
if err != nil {
return nil, NewAuthorizationError("無效的權杖: "+err.Error(), http.StatusUnauthorized)
}
// 驗證簽發者 (Issuer)
if token.Issuer() != ISSUER {
return nil, NewAuthorizationError("無效的簽發者", http.StatusUnauthorized)
}
if err := verifyPayload(token); err != nil {
return nil, err
}
return token, nil
}
// 輔助函式:取得權杖中的字串宣告 (Claim)
func getStringClaim(token jwt.Token, key string) string {
if val, ok := token.Get(key); ok {
if str, ok := val.(string); ok {
return str
}
}
return ""
}
func getScopesFromToken(token jwt.Token) []string {
if val, ok := token.Get("scope"); ok {
if scope, ok := val.(string); ok && scope != "" {
return strings.Split(scope, " ")
}
}
return []string{}
}
func getAudienceFromToken(token jwt.Token) []string {
return token.Audience()
}
接著,實作中介軟體來驗證存取權杖 (Access token):
- Gin
- Echo
- Fiber
- Chi
import "github.com/gin-gonic/gin"
func VerifyAccessToken() gin.HandlerFunc {
return func(c *gin.Context) {
tokenString, err := extractBearerTokenFromHeaders(c.Request)
if err != nil {
authErr := err.(*AuthorizationError)
c.JSON(authErr.Status, gin.H{"error": authErr.Message})
c.Abort()
return
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
c.JSON(authErr.Status, gin.H{"error": authErr.Message})
c.Abort()
return
}
// 將權杖存入 context 以便後續使用
c.Set("auth", token)
c.Next()
}
}
import "github.com/labstack/echo/v4"
func VerifyAccessToken(next echo.HandlerFunc) echo.HandlerFunc {
return func(c echo.Context) error {
tokenString, err := extractBearerTokenFromHeaders(c.Request())
if err != nil {
authErr := err.(*AuthorizationError)
return c.JSON(authErr.Status, echo.Map{"error": authErr.Message})
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
return c.JSON(authErr.Status, echo.Map{"error": authErr.Message})
}
// 將權杖存入 context 以便後續使用
c.Set("auth", token)
return next(c)
}
}
import (
"net/http"
"github.com/gofiber/fiber/v2"
)
func VerifyAccessToken(c *fiber.Ctx) error {
// 將 fiber request 轉換為 http.Request 以相容
req := &http.Request{
Header: make(http.Header),
}
req.Header.Set("Authorization", c.Get("Authorization"))
tokenString, err := extractBearerTokenFromHeaders(req)
if err != nil {
authErr := err.(*AuthorizationError)
return c.Status(authErr.Status).JSON(fiber.Map{"error": authErr.Message})
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
return c.Status(authErr.Status).JSON(fiber.Map{"error": authErr.Message})
}
// 將權杖存入 locals 以便後續使用
c.Locals("auth", token)
return c.Next()
}
import (
"context"
"encoding/json"
"net/http"
)
type contextKey string
const AuthContextKey contextKey = "auth"
func VerifyAccessToken(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
tokenString, err := extractBearerTokenFromHeaders(r)
if err != nil {
authErr := err.(*AuthorizationError)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(authErr.Status)
json.NewEncoder(w).Encode(map[string]string{"error": authErr.Message})
return
}
token, err := validateJWTToken(tokenString)
if err != nil {
authErr := err.(*AuthorizationError)
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(authErr.Status)
json.NewEncoder(w).Encode(map[string]string{"error": authErr.Message})
return
}
// 將權杖存入 context 以便後續使用
ctx := context.WithValue(r.Context(), AuthContextKey, token)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
根據你的權限模型,你可能需要採用不同的 verifyPayload
邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
func verifyPayload(token jwt.Token) error {
// 檢查 audience 宣告是否符合你的 API 資源標示符
if !hasAudience(token, "https://your-api-resource-indicator") {
return NewAuthorizationError("無效的受眾 (audience)")
}
// 檢查全域 API 資源所需的權限範圍 (scopes)
requiredScopes := []string{"api:read", "api:write"} // 請替換為實際所需權限範圍
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("權限範圍不足")
}
return nil
}
func verifyPayload(token jwt.Token) error {
// 檢查 audience 宣告是否為組織格式
if !hasOrganizationAudience(token) {
return NewAuthorizationError("組織權限的受眾 (audience) 無效")
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
expectedOrgID := "your-organization-id" // 從請求 context 取得
if !hasMatchingOrganization(token, expectedOrgID) {
return NewAuthorizationError("組織 ID 不符")
}
// 檢查所需的組織權限範圍
requiredScopes := []string{"invite:users", "manage:settings"} // 請替換為實際所需權限範圍
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("組織權限範圍不足")
}
return nil
}
func verifyPayload(token jwt.Token) error {
// 檢查 audience 宣告是否符合你的 API 資源標示符
if !hasAudience(token, "https://your-api-resource-indicator") {
return NewAuthorizationError("組織層級 API 資源的受眾 (audience) 無效")
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
expectedOrgID := "your-organization-id" // 從請求 context 取得
if !hasMatchingOrganizationID(token, expectedOrgID) {
return NewAuthorizationError("組織 ID 不符")
}
// 檢查組織層級 API 資源所需的權限範圍
requiredScopes := []string{"api:read", "api:write"} // 請替換為實際所需權限範圍
if !hasRequiredScopes(token, requiredScopes) {
return NewAuthorizationError("組織層級 API 權限範圍不足")
}
return nil
}
新增這些輔助函式以驗證 payload:
// hasAudience 檢查權杖是否包含指定的受眾 (audience)
func hasAudience(token jwt.Token, expectedAud string) bool {
audiences := token.Audience()
for _, aud := range audiences {
if aud == expectedAud {
return true
}
}
return false
}
// hasOrganizationAudience 檢查權杖是否為組織受眾格式
func hasOrganizationAudience(token jwt.Token) bool {
audiences := token.Audience()
for _, aud := range audiences {
if strings.HasPrefix(aud, "urn:logto:organization:") {
return true
}
}
return false
}
// hasRequiredScopes 檢查權杖是否包含所有必要的權限範圍 (scopes)
func hasRequiredScopes(token jwt.Token, requiredScopes []string) bool {
scopes := getScopesFromToken(token)
for _, required := range requiredScopes {
found := false
for _, scope := range scopes {
if scope == required {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// hasMatchingOrganization 檢查權杖受眾是否與預期組織相符
func hasMatchingOrganization(token jwt.Token, expectedOrgID string) bool {
expectedAud := fmt.Sprintf("urn:logto:organization:%s", expectedOrgID)
return hasAudience(token, expectedAud)
}
// hasMatchingOrganizationID 檢查權杖中的 organization_id 是否與預期相符
func hasMatchingOrganizationID(token jwt.Token, expectedOrgID string) bool {
orgID := getStringClaim(token, "organization_id")
return orgID == expectedOrgID
}
我們會根據不同框架使用不同的 JWT 函式庫。請安裝所需的相依套件:
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
在你的 pom.xml
中新增:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-resource-server</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.security</groupId>
<artifactId>spring-security-oauth2-jose</artifactId>
</dependency>
在你的 pom.xml
中新增:
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-reactive</artifactId>
</dependency>
在你的 pom.xml
中新增:
<dependency>
<groupId>io.micronaut.security</groupId>
<artifactId>micronaut-security-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.micronaut</groupId>
<artifactId>micronaut-http-server-netty</artifactId>
</dependency>
在你的 pom.xml
中新增:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-auth-jwt</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web-client</artifactId>
</dependency>
接著,實作中介軟體來驗證存取權杖 (Access token):
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
public class JwtSecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(authz -> authz
.requestMatchers("/api/protected/**").authenticated()
.anyRequest().permitAll()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.decoder(jwtDecoder()))
);
return http.build();
}
@Bean
public JwtDecoder jwtDecoder() {
return NimbusJwtDecoder.withJwkSetUri(AuthConstants.JWKS_URI)
.build();
}
}
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
@Component
public class JwtValidator {
public void verifyPayload(Jwt jwt) {
// 在此根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型區段展示
}
}
import org.eclipse.microprofile.jwt.JsonWebToken;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.container.ContainerRequestContext;
import jakarta.ws.rs.container.ContainerRequestFilter;
import jakarta.ws.rs.ext.Provider;
@Provider
@ApplicationScoped
public class JwtVerificationFilter implements ContainerRequestFilter {
@Inject
JsonWebToken jwt;
@Override
public void filter(ContainerRequestContext requestContext) {
if (requestContext.getUriInfo().getPath().startsWith("/api/protected")) {
try {
verifyPayload(jwt);
// 將 JWT 存入 context 以便 controller 存取
requestContext.setProperty("auth", jwt);
} catch (AuthorizationException e) {
requestContext.abortWith(
jakarta.ws.rs.core.Response.status(e.getStatusCode())
.entity("{\"error\": \"" + e.getMessage() + "\"}")
.build()
);
} catch (Exception e) {
requestContext.abortWith(
jakarta.ws.rs.core.Response.status(401)
.entity("{\"error\": \"Invalid token\"}")
.build()
);
}
}
}
private void verifyPayload(JsonWebToken jwt) {
// 在此根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型區段展示
}
}
micronaut:
security:
authentication: bearer
token:
jwt:
signatures:
jwks:
logto:
url: ${JWKS_URI:https://your-tenant.logto.app/oidc/jwks}
import io.micronaut.security.token.Claims;
import io.micronaut.security.token.validator.TokenValidator;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
@Singleton
public class JwtClaimsValidator implements TokenValidator {
@Override
public Publisher<Boolean> validateToken(String token, Claims claims) {
return Mono.fromCallable(() -> {
try {
verifyPayload(claims);
return true;
} catch (AuthorizationException e) {
throw new RuntimeException(e.getMessage());
}
});
}
private void verifyPayload(Claims claims) {
// 在此根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型區段展示
}
}
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.auth.jwt.JWTAuth;
import io.vertx.ext.auth.jwt.JWTAuthOptions;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.client.WebClient;
public class JwtAuthHandler implements Handler<RoutingContext> {
private final JWTAuth jwtAuth;
private final WebClient webClient;
public JwtAuthHandler(io.vertx.core.Vertx vertx) {
this.webClient = WebClient.create(vertx);
this.jwtAuth = JWTAuth.create(vertx, new JWTAuthOptions());
// 取得 JWKS 並設定 JWT 驗證
fetchJWKS().onSuccess(jwks -> {
// 設定 JWKS(簡化版-實務上可能需要正規 JWKS 解析器)
});
}
@Override
public void handle(RoutingContext context) {
String authHeader = context.request().getHeader("Authorization");
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Authorization header missing or invalid\"}");
return;
}
String token = authHeader.substring(7);
jwtAuth.authenticate(new JsonObject().put("token", token))
.onSuccess(user -> {
try {
JsonObject principal = user.principal();
verifyPayload(principal);
// 將 JWT principal 存入 context 以便 handler 存取
context.put("auth", principal);
context.next();
} catch (AuthorizationException e) {
context.response()
.setStatusCode(e.getStatusCode())
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"" + e.getMessage() + "\"}");
} catch (Exception e) {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Invalid token\"}");
}
})
.onFailure(err -> {
context.response()
.setStatusCode(401)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"Invalid token\"}");
});
}
private Future<JsonObject> fetchJWKS() {
return webClient.getAbs(AuthConstants.JWKS_URI)
.send()
.map(response -> response.bodyAsJsonObject());
}
private void verifyPayload(JsonObject principal) {
// 在此根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型區段展示
}
}
根據你的權限模型,可能需要採用不同的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
List<String> audiences = jwt.getAudience();
if (!audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
if (!jwt.getAudience().contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(Claims claims) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
List<String> audiences = (List<String>) claims.get("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
private void verifyPayload(JsonObject principal) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
JsonArray audiences = principal.getJsonArray("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience");
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient scope");
}
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 檢查 audience 宣告是否為組織格式
List<String> audiences = jwt.getAudience();
boolean hasOrgAudience = audiences.stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查所需組織權限範圍
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 檢查 audience 宣告是否為組織格式
boolean hasOrgAudience = jwt.getAudience().stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!jwt.getAudience().contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查所需組織權限範圍
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(Claims claims) {
// 檢查 audience 宣告是否為組織格式
List<String> audiences = (List<String>) claims.get("aud");
boolean hasOrgAudience = audiences != null && audiences.stream()
.anyMatch(aud -> aud.startsWith("urn:logto:organization:"));
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查所需組織權限範圍
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 請替換為實際所需權限範圍
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
private void verifyPayload(JsonObject principal) {
// 檢查 audience 宣告是否為組織格式
JsonArray audiences = principal.getJsonArray("aud");
boolean hasOrgAudience = false;
if (audiences != null) {
for (Object aud : audiences) {
if (aud.toString().startsWith("urn:logto:organization:")) {
hasOrgAudience = true;
break;
}
}
}
if (!hasOrgAudience) {
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String expectedAud = "urn:logto:organization:" + expectedOrgId;
if (!audiences.contains(expectedAud)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查所需組織權限範圍
List<String> requiredScopes = Arrays.asList("invite:users", "manage:settings"); // 請替換為實際所需權限範圍
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization scope");
}
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
private void verifyPayload(Jwt jwt) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
List<String> audiences = jwt.getAudience();
if (!audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String orgId = jwt.getClaimAsString("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查組織層級 API 資源所需的權限範圍
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaimAsString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(JsonWebToken jwt) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
if (!jwt.getAudience().contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String orgId = jwt.getClaim("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查組織層級 API 資源所需的權限範圍
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = jwt.getClaim("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(Claims claims) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
List<String> audiences = (List<String>) claims.get("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String orgId = (String) claims.get("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查組織層級 API 資源所需的權限範圍
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = (String) claims.get("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
private void verifyPayload(JsonObject principal) {
// 檢查 audience 宣告是否符合你的 API 資源標示符
JsonArray audiences = principal.getJsonArray("aud");
if (audiences == null || !audiences.contains("https://your-api-resource-indicator")) {
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 檢查組織 ID 是否與 context 相符(你可能需要從請求 context 取得)
String expectedOrgId = "your-organization-id"; // 從請求 context 取得
String orgId = principal.getString("organization_id");
if (!expectedOrgId.equals(orgId)) {
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查組織層級 API 資源所需的權限範圍
List<String> requiredScopes = Arrays.asList("api:read", "api:write"); // 請替換為實際所需權限範圍
String scopes = principal.getString("scope");
List<String> tokenScopes = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
if (!tokenScopes.containsAll(requiredScopes)) {
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
新增 JWT 驗證 (Authentication) 所需的 NuGet 套件:
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="8.0.0" />
建立驗證服務以處理權杖驗證:
using System.Security.Claims;
using Microsoft.AspNetCore.Authentication.JwtBearer;
using YourApiNamespace.Exceptions;
namespace YourApiNamespace.Services
{
public interface IJwtValidationService
{
Task ValidateTokenAsync(TokenValidatedContext context);
}
public class JwtValidationService : IJwtValidationService
{
public async Task ValidateTokenAsync(TokenValidatedContext context)
{
var principal = context.Principal!;
try
{
// 根據權限 (Permission) 模型新增你的驗證邏輯
ValidatePayload(principal);
}
catch (AuthorizationException)
{
throw; // 重新拋出授權 (Authorization) 例外
}
catch (Exception ex)
{
throw new AuthorizationException($"Token validation failed: {ex.Message}", 401);
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 根據權限 (Permission) 模型實作你的驗證邏輯
// 相關內容會在下方權限 (Permission) 模型章節說明
}
}
}
在你的 Program.cs
中設定 JWT 驗證 (Authentication):
using Microsoft.AspNetCore.Authentication.JwtBearer;
using Microsoft.IdentityModel.Tokens;
using YourApiNamespace.Services;
using YourApiNamespace.Exceptions;
var builder = WebApplication.CreateBuilder(args);
// 加入服務到容器
builder.Services.AddControllers();
builder.Services.AddScoped<IJwtValidationService, JwtValidationService>();
// 設定 JWT 驗證 (Authentication)
builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme)
.AddJwtBearer(options =>
{
options.Authority = AuthConstants.Issuer;
options.MetadataAddress = $"{AuthConstants.Issuer}/.well-known/openid_configuration";
options.RequireHttpsMetadata = true;
options.TokenValidationParameters = new TokenValidationParameters
{
ValidateIssuer = true,
ValidIssuer = AuthConstants.Issuer,
ValidateAudience = false, // 受眾 (Audience) 驗證將根據權限 (Permission) 模型手動處理
ValidateLifetime = true,
ValidateIssuerSigningKey = true,
ClockSkew = TimeSpan.FromMinutes(5)
};
options.Events = new JwtBearerEvents
{
OnTokenValidated = async context =>
{
var validationService = context.HttpContext.RequestServices
.GetRequiredService<IJwtValidationService>();
await validationService.ValidateTokenAsync(context);
},
OnAuthenticationFailed = context =>
{
// 將 JWT 函式庫錯誤處理為 401
context.Response.StatusCode = 401;
context.Response.ContentType = "application/json";
context.Response.WriteAsync($"{{\"error\": \"Invalid token\"}}");
context.HandleResponse();
return Task.CompletedTask;
}
};
});
builder.Services.AddAuthorization();
var app = builder.Build();
// 驗證 (Authentication) / 授權 (Authorization) 失敗的全域錯誤處理
app.Use(async (context, next) =>
{
try
{
await next();
}
catch (AuthorizationException ex)
{
context.Response.StatusCode = ex.StatusCode;
context.Response.ContentType = "application/json";
await context.Response.WriteAsync($"{{\"error\": \"{ex.Message}\"}}");
}
});
// 設定 HTTP 請求管線
app.UseAuthentication();
app.UseAuthorization();
app.MapControllers();
app.Run();
根據你的權限 (Permission) 模型,在 JwtValidationService
中實作相應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
private void ValidatePayload(ClaimsPrincipal principal)
{
// 檢查受眾 (Audience) 宣告是否符合你的 API 資源標示符 (Resource indicator)
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
if (!audiences.Contains("https://your-api-resource-indicator"))
{
throw new AuthorizationException("Invalid audience");
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
var requiredScopes = new[] { "api:read", "api:write" }; // 請替換為實際所需權限範圍
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient scope");
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 檢查受眾 (Audience) 宣告是否符合組織格式
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
var hasOrgAudience = audiences.Any(aud => aud.StartsWith("urn:logto:organization:"));
if (!hasOrgAudience)
{
throw new AuthorizationException("Invalid audience for organization permissions");
}
// 檢查組織 ID 是否符合上下文(你可能需要從請求上下文取得)
var expectedOrgId = "your-organization-id"; // 從請求上下文取得
var expectedAud = $"urn:logto:organization:{expectedOrgId}";
if (!audiences.Contains(expectedAud))
{
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查所需的組織權限範圍 (Scopes)
var requiredScopes = new[] { "invite:users", "manage:settings" }; // 請替換為實際所需權限範圍
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient organization scope");
}
}
private void ValidatePayload(ClaimsPrincipal principal)
{
// 檢查受眾 (Audience) 宣告是否符合你的 API 資源標示符 (Resource indicator)
var audiences = principal.FindAll("aud").Select(c => c.Value).ToList();
if (!audiences.Contains("https://your-api-resource-indicator"))
{
throw new AuthorizationException("Invalid audience for organization-level API resources");
}
// 檢查組織 ID 是否符合上下文(你可能需要從請求上下文取得)
var expectedOrgId = "your-organization-id"; // 從請求上下文取得
var orgId = principal.FindFirst("organization_id")?.Value;
if (!expectedOrgId.Equals(orgId))
{
throw new AuthorizationException("Organization ID mismatch");
}
// 檢查組織層級 API 資源所需的權限範圍 (Scopes)
var requiredScopes = new[] { "api:read", "api:write" }; // 請替換為實際所需權限範圍
var tokenScopes = principal.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
if (!requiredScopes.All(scope => tokenScopes.Contains(scope)))
{
throw new AuthorizationException("Insufficient organization-level API scopes");
}
}
我們使用 firebase/php-jwt 來驗證 JWT。請使用 Composer 安裝:
- Laravel
- Symfony
- Slim
composer require firebase/php-jwt
composer require firebase/php-jwt
composer require firebase/php-jwt slim/slim:"4.*" slim/psr7
首先,新增這些共用工具來處理 JWT 驗證:
<?php
use Firebase\JWT\JWT;
use Firebase\JWT\JWK;
use Firebase\JWT\Key;
class JwtValidator
{
use AuthHelpers;
private static ?array $jwks = null;
public static function fetchJwks(): array
{
if (self::$jwks === null) {
$jwksData = file_get_contents(AuthConstants::JWKS_URI);
if ($jwksData === false) {
throw new AuthorizationException('Failed to fetch JWKS', 401);
}
self::$jwks = json_decode($jwksData, true);
}
return self::$jwks;
}
public static function validateJwtToken(string $token): array
{
try {
$jwks = self::fetchJwks();
$keys = JWK::parseKeySet($jwks);
$decoded = JWT::decode($token, $keys);
$payload = (array) $decoded;
// 驗證簽發者 (Issuer)
if (($payload['iss'] ?? '') !== AuthConstants::ISSUER) {
throw new AuthorizationException('Invalid issuer', 401);
}
self::verifyPayload($payload);
return $payload;
} catch (AuthorizationException $e) {
throw $e;
} catch (Exception $e) {
throw new AuthorizationException('Invalid token: ' . $e->getMessage(), 401);
}
}
public static function createAuthInfo(array $payload): AuthInfo
{
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
$audience = $payload['aud'] ?? [];
if (is_string($audience)) {
$audience = [$audience];
}
return new AuthInfo(
sub: $payload['sub'],
clientId: $payload['client_id'] ?? null,
organizationId: $payload['organization_id'] ?? null,
scopes: $scopes,
audience: $audience
);
}
private static function verifyPayload(array $payload): void
{
// 根據權限模型實作你的驗證邏輯
// 相關內容會在下方權限模型區段展示
}
}
接著,實作中介軟體(middleware)來驗證存取權杖 (Access token):
- Laravel
- Symfony
- Slim
<?php
namespace App\Http\Middleware;
use Closure;
use Illuminate\Http\Request;
use Illuminate\Http\Response;
class VerifyAccessToken
{
use AuthHelpers;
public function handle(Request $request, Closure $next): Response
{
try {
$token = $this->extractBearerToken($request->headers->all());
$payload = JwtValidator::validateJwtToken($token);
// 將驗證資訊存入 request 屬性,方便後續使用
$request->attributes->set('auth', JwtValidator::createAuthInfo($payload));
return $next($request);
} catch (AuthorizationException $e) {
return response()->json(['error' => $e->getMessage()], $e->statusCode);
}
}
}
在 app/Http/Kernel.php
註冊中介軟體:
protected $middlewareAliases = [
// ... 其他中介軟體
'auth.token' => \App\Http\Middleware\VerifyAccessToken::class,
];
<?php
namespace App\Security;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Security\Core\Authentication\Token\TokenInterface;
use Symfony\Component\Security\Core\Exception\AuthenticationException;
use Symfony\Component\Security\Http\Authenticator\AbstractAuthenticator;
use Symfony\Component\Security\Http\Authenticator\Passport\Badge\UserBadge;
use Symfony\Component\Security\Http\Authenticator\Passport\Passport;
use Symfony\Component\Security\Http\Authenticator\Passport\SelfValidatingPassport;
class JwtAuthenticator extends AbstractAuthenticator
{
use AuthHelpers;
public function supports(Request $request): ?bool
{
return $request->headers->has('authorization');
}
public function authenticate(Request $request): Passport
{
try {
$token = $this->extractBearerToken($request->headers->all());
$payload = JwtValidator::validateJwtToken($token);
$authInfo = JwtValidator::createAuthInfo($payload);
// 將驗證資訊存入 request 屬性,方便後續使用
$request->attributes->set('auth', $authInfo);
return new SelfValidatingPassport(new UserBadge($payload['sub']));
} catch (AuthorizationException $e) {
throw new AuthenticationException($e->getMessage());
}
}
public function onAuthenticationSuccess(Request $request, TokenInterface $token, string $firewallName): ?Response
{
return null; // 繼續進入 controller
}
public function onAuthenticationFailure(Request $request, AuthenticationException $exception): ?Response
{
return new JsonResponse(['error' => $exception->getMessage()], Response::HTTP_UNAUTHORIZED);
}
}
在 config/packages/security.yaml
配置 security:
security:
firewalls:
api:
pattern: ^/api/protected
stateless: true
custom_authenticators:
- App\Security\JwtAuthenticator
<?php
namespace App\Middleware;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Psr\Http\Server\MiddlewareInterface;
use Psr\Http\Server\RequestHandlerInterface;
use Slim\Psr7\Response;
class JwtMiddleware implements MiddlewareInterface
{
use AuthHelpers;
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
try {
$headers = $request->getHeaders();
$token = $this->extractBearerToken($headers);
$payload = JwtValidator::validateJwtToken($token);
// 將驗證資訊存入 request 屬性,方便後續使用
$request = $request->withAttribute('auth', JwtValidator::createAuthInfo($payload));
return $handler->handle($request);
} catch (AuthorizationException $e) {
$response = new Response();
$response->getBody()->write(json_encode(['error' => $e->getMessage()]));
return $response
->withHeader('Content-Type', 'application/json')
->withStatus($e->statusCode);
}
}
}
根據你的權限模型,在 JwtValidator
中實作對應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
private static function verifyPayload(array $payload): void
{
// 檢查 audience 宣告是否符合你的 API 資源標示符 (Resource indicator)
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
if (!in_array('https://your-api-resource-indicator', $audiences)) {
throw new AuthorizationException('Invalid audience');
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
$requiredScopes = ['api:read', 'api:write']; // 請替換為實際所需的權限範圍
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('Insufficient scope');
}
}
}
private static function verifyPayload(array $payload): void
{
// 檢查 audience 宣告是否符合組織格式
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
$hasOrgAudience = false;
foreach ($audiences as $aud) {
if (str_starts_with($aud, 'urn:logto:organization:')) {
$hasOrgAudience = true;
break;
}
}
if (!$hasOrgAudience) {
throw new AuthorizationException('Invalid audience for organization permissions');
}
// 檢查組織 ID 是否與情境相符(你可能需要從 request context 取得)
$expectedOrgId = 'your-organization-id'; // 從 request context 取得
$expectedAud = "urn:logto:organization:{$expectedOrgId}";
if (!in_array($expectedAud, $audiences)) {
throw new AuthorizationException('Organization ID mismatch');
}
// 檢查所需的組織權限範圍 (Scopes)
$requiredScopes = ['invite:users', 'manage:settings']; // 請替換為實際所需的權限範圍
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('Insufficient organization scope');
}
}
}
private static function verifyPayload(array $payload): void
{
// 檢查 audience 宣告是否符合你的 API 資源標示符 (Resource indicator)
$audiences = $payload['aud'] ?? [];
if (is_string($audiences)) {
$audiences = [$audiences];
}
if (!in_array('https://your-api-resource-indicator', $audiences)) {
throw new AuthorizationException('Invalid audience for organization-level API resources');
}
// 檢查組織 ID 是否與情境相符(你可能需要從 request context 取得)
$expectedOrgId = 'your-organization-id'; // 從 request context 取得
$orgId = $payload['organization_id'] ?? null;
if ($expectedOrgId !== $orgId) {
throw new AuthorizationException('Organization ID mismatch');
}
// 檢查組織層級 API 資源所需的權限範圍 (Scopes)
$requiredScopes = ['api:read', 'api:write']; // 請替換為實際所需的權限範圍
$scopes = !empty($payload['scope']) ? explode(' ', $payload['scope']) : [];
foreach ($requiredScopes as $scope) {
if (!in_array($scope, $scopes)) {
throw new AuthorizationException('Insufficient organization-level API scopes');
}
}
}
我們使用 jwt gem 來驗證 JWT。請將其加入你的 Gemfile:
gem 'jwt'
# net-http 是 Ruby 2.7 起標準函式庫的一部分,無需額外加入
然後執行:
bundle install
首先,新增這些共用工具來處理 JWKS 與權杖驗證:
require 'jwt'
require 'net/http'
require 'json'
class JwtValidator
include AuthHelpers
def self.fetch_jwks
@jwks ||= begin
uri = URI(AuthConstants::JWKS_URI)
response = Net::HTTP.get_response(uri)
raise AuthorizationError.new('Failed to fetch JWKS', 401) unless response.is_a?(Net::HTTPSuccess)
jwks_data = JSON.parse(response.body)
JWT::JWK::Set.new(jwks_data)
end
end
def self.validate_jwt_token(token)
jwks = fetch_jwks
# 讓 JWT 函式庫自動從 JWKS 偵測演算法
decoded_token = JWT.decode(token, nil, true, {
iss: AuthConstants::ISSUER,
verify_iss: true,
verify_aud: false, # 依據權限模型手動驗證 audience
jwks: jwks
})[0]
decoded_token
end
def self.create_auth_info(payload)
scopes = payload['scope']&.split(' ') || []
audience = payload['aud'] || []
AuthInfo.new(
payload['sub'],
payload['client_id'],
payload['organization_id'],
scopes,
audience
)
end
def self.verify_payload(payload)
# 根據權限模型實作你的驗證邏輯
# 相關內容會在下方權限模型區段展示
end
end
接著,實作中介軟體來驗證存取權杖 (Access token):
- Rails
- Sinatra
- Grape
module JwtAuthentication
extend ActiveSupport::Concern
include AuthHelpers
included do
before_action :verify_access_token, only: [:protected_action] # 指定需保護的 action
end
private
def verify_access_token
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 儲存驗證資訊以便通用使用
@auth = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
render json: { error: e.message }, status: e.status
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
render json: { error: 'Invalid token' }, status: 401
end
end
end
class AuthMiddleware
include AuthHelpers
def initialize(app)
@app = app
end
def call(env)
request = Rack::Request.new(env)
# 僅保護特定路徑
if request.path.start_with?('/api/protected')
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 將驗證資訊存入 env 以便通用使用
env['auth'] = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
return [e.status, { 'Content-Type' => 'application/json' }, [{ error: e.message }.to_json]]
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
return [401, { 'Content-Type' => 'application/json' }, [{ error: 'Invalid token' }.to_json]]
end
end
@app.call(env)
end
end
module GrapeAuthHelpers
include AuthHelpers
def authenticate_user!
begin
token = extract_bearer_token(request)
decoded_token = JwtValidator.validate_jwt_token(token)
JwtValidator.verify_payload(decoded_token)
# 儲存驗證資訊以便通用使用
@auth = JwtValidator.create_auth_info(decoded_token)
rescue AuthorizationError => e
error!({ error: e.message }, e.status)
rescue JWT::DecodeError, JWT::VerificationError, JWT::ExpiredSignature => e
error!({ error: 'Invalid token' }, 401)
end
end
def auth
@auth
end
end
根據你的權限模型,在 JwtValidator
中實作相應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
def self.verify_payload(payload)
# 檢查 audience 宣告是否符合你的 API 資源標示符
audiences = payload['aud'] || []
unless audiences.include?('https://your-api-resource-indicator')
raise AuthorizationError.new('Invalid audience')
end
# 檢查全域 API 資源所需的權限範圍 (Scopes)
required_scopes = ['api:read', 'api:write'] # 請替換為實際所需權限範圍
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient scope')
end
end
def self.verify_payload(payload)
# 檢查 audience 宣告是否符合組織格式
audiences = payload['aud'] || []
has_org_audience = audiences.any? { |aud| aud.start_with?('urn:logto:organization:') }
unless has_org_audience
raise AuthorizationError.new('Invalid audience for organization permissions')
end
# 檢查組織 ID 是否與情境相符(你可能需要從請求情境中取得)
expected_org_id = 'your-organization-id' # 從請求情境取得
expected_aud = "urn:logto:organization:#{expected_org_id}"
unless audiences.include?(expected_aud)
raise AuthorizationError.new('Organization ID mismatch')
end
# 檢查所需組織權限範圍
required_scopes = ['invite:users', 'manage:settings'] # 請替換為實際所需權限範圍
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient organization scope')
end
end
def self.verify_payload(payload)
# 檢查 audience 宣告是否符合你的 API 資源標示符
audiences = payload['aud'] || []
unless audiences.include?('https://your-api-resource-indicator')
raise AuthorizationError.new('Invalid audience for organization-level API resources')
end
# 檢查組織 ID 是否與情境相符(你可能需要從請求情境中取得)
expected_org_id = 'your-organization-id' # 從請求情境取得
org_id = payload['organization_id']
unless expected_org_id == org_id
raise AuthorizationError.new('Organization ID mismatch')
end
# 檢查組織層級 API 資源所需的權限範圍
required_scopes = ['api:read', 'api:write'] # 請替換為實際所需權限範圍
token_scopes = payload['scope']&.split(' ') || []
unless required_scopes.all? { |scope| token_scopes.include?(scope) }
raise AuthorizationError.new('Insufficient organization-level API scopes')
end
end
我們使用 jsonwebtoken 來驗證 JWT。請在你的 Cargo.toml
中加入所需相依套件:
- Axum
- Actix Web
- Rocket
[dependencies]
axum = "0.7"
tokio = { version = "1.0", features = ["full"] }
tower = "0.4"
tower-http = { version = "0.5", features = ["cors"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
[dependencies]
actix-web = "4.0"
tokio = { version = "1.0", features = ["full"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
[dependencies]
rocket = { version = "0.5", features = ["json"] }
tokio = { version = "1.0", features = ["full"] }
jsonwebtoken = "9.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
reqwest = { version = "0.11", features = ["json"] }
首先,新增這些共用工具來處理 JWT 驗證:
use crate::{AuthInfo, AuthorizationError, ISSUER, JWKS_URI};
use jsonwebtoken::{decode, decode_header, Algorithm, DecodingKey, Validation};
use serde_json::Value;
use std::collections::HashMap;
pub struct JwtValidator {
jwks: HashMap<String, DecodingKey>,
}
impl JwtValidator {
pub async fn new() -> Result<Self, AuthorizationError> {
let jwks = Self::fetch_jwks().await?;
Ok(Self { jwks })
}
async fn fetch_jwks() -> Result<HashMap<String, DecodingKey>, AuthorizationError> {
let response = reqwest::get(JWKS_URI).await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to fetch JWKS: {}", e), 401)
})?;
let jwks: Value = response.json().await.map_err(|e| {
AuthorizationError::with_status(format!("Failed to parse JWKS: {}", e), 401)
})?;
let mut keys = HashMap::new();
if let Some(keys_array) = jwks["keys"].as_array() {
for key in keys_array {
if let (Some(kid), Some(kty), Some(n), Some(e)) = (
key["kid"].as_str(),
key["kty"].as_str(),
key["n"].as_str(),
key["e"].as_str(),
) {
if kty == "RSA" {
if let Ok(decoding_key) = DecodingKey::from_rsa_components(n, e) {
keys.insert(kid.to_string(), decoding_key);
}
}
}
}
}
if keys.is_empty() {
return Err(AuthorizationError::with_status("No valid keys found in JWKS", 401));
}
Ok(keys)
}
pub fn validate_jwt_token(&self, token: &str) -> Result<AuthInfo, AuthorizationError> {
let header = decode_header(token).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token header: {}", e), 401)
})?;
let kid = header.kid.ok_or_else(|| {
AuthorizationError::with_status("Token missing kid claim", 401)
})?;
let key = self.jwks.get(&kid).ok_or_else(|| {
AuthorizationError::with_status("Unknown key ID", 401)
})?;
let mut validation = Validation::new(Algorithm::RS256);
validation.set_issuer(&[ISSUER]);
validation.validate_aud = false; // 受眾 (Audience) 會由我們手動驗證
let token_data = decode::<Value>(token, key, &validation).map_err(|e| {
AuthorizationError::with_status(format!("Invalid token: {}", e), 401)
})?;
let claims = token_data.claims;
self.verify_payload(&claims)?;
Ok(self.create_auth_info(claims))
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 根據你的權限模型實作驗證邏輯
// 相關內容會在下方權限模型區段說明
Ok(())
}
fn create_auth_info(&self, claims: Value) -> AuthInfo {
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').map(|s| s.to_string()).collect())
.unwrap_or_default();
let audience = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str().map(|s| s.to_string())).collect(),
Value::String(s) => vec![s.clone()],
_ => vec![],
};
AuthInfo::new(
claims["sub"].as_str().unwrap_or_default().to_string(),
claims["client_id"].as_str().map(|s| s.to_string()),
claims["organization_id"].as_str().map(|s| s.to_string()),
scopes,
audience,
)
}
}
接著,實作中介層(middleware)來驗證存取權杖 (Access token):
- Axum
- Actix Web
- Rocket
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use axum::{
extract::Request,
http::{HeaderMap, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
Extension, Json,
};
use serde_json::json;
use std::sync::Arc;
pub async fn jwt_middleware(
Extension(validator): Extension<Arc<JwtValidator>>,
headers: HeaderMap,
mut request: Request,
next: Next,
) -> Result<Response, AuthorizationError> {
let authorization = headers
.get("authorization")
.and_then(|h| h.to_str().ok());
let token = extract_bearer_token(authorization)?;
let auth_info = validator.validate_jwt_token(token)?;
// 將驗證資訊存入 request extensions 以便後續使用
request.extensions_mut().insert(auth_info);
Ok(next.run(request).await)
}
impl IntoResponse for AuthorizationError {
fn into_response(self) -> Response {
let status = StatusCode::from_u16(self.status_code).unwrap_or(StatusCode::FORBIDDEN);
(status, Json(json!({ "error": self.message }))).into_response()
}
}
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use actix_web::{
dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform},
web, Error, HttpMessage, HttpResponse,
};
use futures::future::{ok, Ready};
use std::sync::Arc;
pub struct JwtMiddleware {
validator: Arc<JwtValidator>,
}
impl JwtMiddleware {
pub fn new(validator: Arc<JwtValidator>) -> Self {
Self { validator }
}
}
impl<S, B> Transform<S, ServiceRequest> for JwtMiddleware
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type InitError = ();
type Transform = JwtMiddlewareService<S>;
type Future = Ready<Result<Self::Transform, Self::InitError>>;
fn new_transform(&self, service: S) -> Self::Future {
ok(JwtMiddlewareService {
service,
validator: self.validator.clone(),
})
}
}
pub struct JwtMiddlewareService<S> {
service: S,
validator: Arc<JwtValidator>,
}
impl<S, B> Service<ServiceRequest> for JwtMiddlewareService<S>
where
S: Service<ServiceRequest, Response = ServiceResponse<B>, Error = Error>,
S::Future: 'static,
B: 'static,
{
type Response = ServiceResponse<B>;
type Error = Error;
type Future = futures::future::LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
forward_ready!(service);
fn call(&self, req: ServiceRequest) -> Self::Future {
let validator = self.validator.clone();
Box::pin(async move {
let authorization = req
.headers()
.get("authorization")
.and_then(|h| h.to_str().ok());
match extract_bearer_token(authorization)
.and_then(|token| validator.validate_jwt_token(token))
{
Ok(auth_info) => {
// 將驗證資訊存入 request extensions 以便後續使用
req.extensions_mut().insert(auth_info);
let fut = self.service.call(req);
fut.await
}
Err(e) => {
let response = HttpResponse::build(
actix_web::http::StatusCode::from_u16(e.status_code)
.unwrap_or(actix_web::http::StatusCode::FORBIDDEN),
)
.json(serde_json::json!({ "error": e.message }));
Ok(req.into_response(response))
}
}
})
}
}
use crate::{AuthInfo, AuthorizationError, extract_bearer_token};
use crate::jwt_validator::JwtValidator;
use rocket::{
http::Status,
outcome::Outcome,
request::{self, FromRequest, Request},
State,
};
#[rocket::async_trait]
impl<'r> FromRequest<'r> for AuthInfo {
type Error = AuthorizationError;
async fn from_request(req: &'r Request<'_>) -> request::Outcome<Self, Self::Error> {
let validator = match req.guard::<&State<JwtValidator>>().await {
Outcome::Success(validator) => validator,
Outcome::Failure((status, _)) => {
return Outcome::Failure((
status,
AuthorizationError::with_status("JWT validator not found", 500),
))
}
Outcome::Forward(()) => {
return Outcome::Forward(())
}
};
let authorization = req.headers().get_one("authorization");
match extract_bearer_token(authorization)
.and_then(|token| validator.validate_jwt_token(token))
{
Ok(auth_info) => Outcome::Success(auth_info),
Err(e) => {
let status = Status::from_code(e.status_code).unwrap_or(Status::Forbidden);
Outcome::Failure((status, e))
}
}
}
}
根據你的權限模型,在 JwtValidator
中實作對應的驗證邏輯:
- 全域 API 資源 (Global API resources)
- 組織(非 API)權限 (Organization (non-API) permissions)
- 組織層級 API 資源 (Organization-level API resources)
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 檢查受眾 (Audience) 是否符合你的 API 資源標示符 (Resource indicator)
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
if !audiences.contains(&"https://your-api-resource-indicator") {
return Err(AuthorizationError::new("Invalid audience"));
}
// 檢查全域 API 資源所需的權限範圍 (Scopes)
let required_scopes = vec!["api:read", "api:write"]; // 請替換為實際所需權限範圍
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient scope"));
}
}
Ok(())
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 檢查受眾 (Audience) 是否符合組織格式
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
let has_org_audience = audiences.iter().any(|aud| aud.starts_with("urn:logto:organization:"));
if !has_org_audience {
return Err(AuthorizationError::new("Invalid audience for organization permissions"));
}
// 檢查組織 ID 是否與情境相符(你可能需要從請求內容取得)
let expected_org_id = "your-organization-id"; // 從請求內容取得
let expected_aud = format!("urn:logto:organization:{}", expected_org_id);
if !audiences.contains(&expected_aud.as_str()) {
return Err(AuthorizationError::new("Organization ID mismatch"));
}
// 檢查所需組織權限範圍 (Scopes)
let required_scopes = vec!["invite:users", "manage:settings"]; // 請替換為實際所需權限範圍
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient organization scope"));
}
}
Ok(())
}
fn verify_payload(&self, claims: &Value) -> Result<(), AuthorizationError> {
// 檢查受眾 (Audience) 是否符合你的 API 資源標示符 (Resource indicator)
let audiences = match &claims["aud"] {
Value::Array(arr) => arr.iter().filter_map(|v| v.as_str()).collect::<Vec<_>>(),
Value::String(s) => vec![s.as_str()],
_ => vec![],
};
if !audiences.contains(&"https://your-api-resource-indicator") {
return Err(AuthorizationError::new("Invalid audience for organization-level API resources"));
}
// 檢查組織 ID 是否與情境相符(你可能需要從請求內容取得)
let expected_org_id = "your-organization-id"; // 從請求內容取得
let org_id = claims["organization_id"].as_str().unwrap_or_default();
if expected_org_id != org_id {
return Err(AuthorizationError::new("Organization ID mismatch"));
}
// 檢查組織層級 API 資源所需的權限範圍 (Scopes)
let required_scopes = vec!["api:read", "api:write"]; // 請替換為實際所需權限範圍
let scopes = claims["scope"]
.as_str()
.map(|s| s.split(' ').collect::<Vec<_>>())
.unwrap_or_default();
for required_scope in &required_scopes {
if !scopes.contains(required_scope) {
return Err(AuthorizationError::new("Insufficient organization-level API scopes"));
}
}
Ok(())
}
步驟 4:將中介軟體 (Middleware) 套用到你的 API
將中介軟體 (Middleware) 套用到你要保護的 API 路由。
- Node.js
- Python
- Go
- Java
- .NET
- PHP
- Ruby
- Rust
- Express.js
- Koa.js
- Fastify
- Hapi.js
- NestJS
import express from 'express';
import { verifyAccessToken } from './auth-middleware.js';
const app = express();
app.get('/api/protected', verifyAccessToken, (req, res) => {
// 直接從 req.auth 取得驗證 (Authentication) 資訊
res.json({ auth: req.auth });
});
app.get('/api/protected/detailed', verifyAccessToken, (req, res) => {
// 你的受保護端點邏輯
res.json({
auth: req.auth,
message: '成功存取受保護資料',
});
});
app.listen(3000);
import Koa from 'koa';
import Router from '@koa/router';
import { koaVerifyAccessToken } from './auth-middleware.js';
const app = new Koa();
const router = new Router();
router.get('/api/protected', koaVerifyAccessToken, (ctx) => {
// 直接從 ctx.state.auth 取得驗證 (Authentication) 資訊
ctx.body = { auth: ctx.state.auth };
});
router.get('/api/protected/detailed', koaVerifyAccessToken, (ctx) => {
// 你的受保護端點邏輯
ctx.body = {
auth: ctx.state.auth,
message: '成功存取受保護資料',
};
});
app.use(router.routes());
app.listen(3000);
import Fastify from 'fastify';
import { fastifyVerifyAccessToken } from './auth-middleware.js';
const fastify = Fastify();
fastify.get('/api/protected', { preHandler: fastifyVerifyAccessToken }, (request, reply) => {
// 直接從 request.auth 取得驗證 (Authentication) 資訊
reply.send({ auth: request.auth });
});
fastify.get(
'/api/protected/detailed',
{ preHandler: fastifyVerifyAccessToken },
(request, reply) => {
// 你的受保護端點邏輯
reply.send({
auth: request.auth,
message: '成功存取受保護資料',
});
}
);
fastify.listen({ port: 3000 });
import Hapi from '@hapi/hapi';
import { hapiVerifyAccessToken } from './auth-middleware.js';
const server = Hapi.server({ port: 3000 });
server.route({
method: 'GET',
path: '/api/protected',
options: {
pre: [{ method: hapiVerifyAccessToken }],
handler: (request, h) => {
// 從 request.app.auth 取得驗證 (Authentication) 資訊
return { auth: request.app.auth };
},
},
});
server.route({
method: 'GET',
path: '/api/protected/detailed',
options: {
pre: [{ method: hapiVerifyAccessToken }],
handler: (request, h) => {
// 你的受保護端點邏輯
return {
auth: request.app.auth,
message: '成功存取受保護資料',
};
},
},
});
await server.start();
import { Controller, Get, UseGuards, Req } from '@nestjs/common';
import { AccessTokenGuard } from './access-token.guard.js';
@Controller('api')
export class ProtectedController {
@Get('protected')
@UseGuards(AccessTokenGuard)
getProtected(@Req() req: any) {
// 從 req.auth 取得驗證 (Authentication) 資訊
return { auth: req.auth };
}
@Get('protected/detailed')
@UseGuards(AccessTokenGuard)
getDetailedProtected(@Req() req: any) {
// 你的受保護端點邏輯
return {
auth: req.auth,
message: '成功存取受保護資料',
};
}
}
- FastAPI
- Flask
- Django
- Django REST Framework
from fastapi import FastAPI, Depends
from auth_middleware import verify_access_token, AuthInfo
app = FastAPI()
@app.get("/api/protected")
async def protected_endpoint(auth: AuthInfo = Depends(verify_access_token)):
# 直接從 auth 參數取得驗證資訊
return {"auth": auth.to_dict()}
@app.get("/api/protected/detailed")
async def detailed_endpoint(auth: AuthInfo = Depends(verify_access_token)):
# 你的受保護端點邏輯
return {
"auth": auth.to_dict(),
"message": "成功存取受保護資料"
}
from flask import Flask, g, jsonify
from auth_middleware import verify_access_token
app = Flask(__name__)
@app.route('/api/protected', methods=['GET'])
@verify_access_token
def protected_endpoint():
# 從 g.auth 取得驗證資訊
return jsonify({"auth": g.auth.to_dict()})
@app.route('/api/protected/detailed', methods=['GET'])
@verify_access_token
def detailed_endpoint():
# 你的受保護端點邏輯
return jsonify({
"auth": g.auth.to_dict(),
"message": "成功存取受保護資料"
})
from django.http import JsonResponse
from auth_middleware import require_access_token
@require_access_token
def protected_view(request):
# 從 request.auth 取得驗證資訊
return JsonResponse({"auth": request.auth.to_dict()})
@require_access_token
def detailed_view(request):
# 你的受保護端點邏輯
return JsonResponse({
"auth": request.auth.to_dict(),
"message": "成功存取受保護資料"
})
from rest_framework.decorators import api_view, authentication_classes
from rest_framework.response import Response
from auth_middleware import AccessTokenAuthentication
@api_view(['GET'])
@authentication_classes([AccessTokenAuthentication])
def protected_view(request):
# 從 request.user.auth 取得驗證資訊
return Response({"auth": request.user.auth.to_dict()})
@api_view(['GET'])
@authentication_classes([AccessTokenAuthentication])
def detailed_view(request):
# 你的受保護端點邏輯
return Response({
"auth": request.user.auth.to_dict(),
"message": "成功存取受保護資料"
})
或使用類別型視圖:
from rest_framework.views import APIView
from rest_framework.response import Response
from auth_middleware import AccessTokenAuthentication
class ProtectedView(APIView):
authentication_classes = [AccessTokenAuthentication]
def get(self, request):
# 從 request.user.auth 取得驗證資訊
return Response({"auth": request.user.auth.to_dict()})
class DetailedView(APIView):
authentication_classes = [AccessTokenAuthentication]
def get(self, request):
# 你的受保護端點邏輯
return Response({
"auth": request.user.auth.to_dict(),
"message": "成功存取受保護資料"
})
- Gin
- Echo
- Fiber
- Chi
package main
import (
"net/http"
"github.com/gin-gonic/gin"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := gin.Default()
// 將中介軟體套用到受保護路由
r.GET("/api/protected", VerifyAccessToken(), func(c *gin.Context) {
// 直接從 context 取得存取權杖 (Access token) 資訊
tokenInterface, exists := c.Get("auth")
if !exists {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Token not found"})
return
}
token := tokenInterface.(jwt.Token)
c.JSON(http.StatusOK, gin.H{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
r.Run(":8080")
}
package main
import (
"net/http"
"github.com/labstack/echo/v4"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
e := echo.New()
// 將中介軟體套用到受保護路由
e.GET("/api/protected", func(c echo.Context) error {
// 直接從 context 取得存取權杖 (Access token) 資訊
tokenInterface := c.Get("auth")
if tokenInterface == nil {
return c.JSON(http.StatusInternalServerError, echo.Map{"error": "Token not found"})
}
token := tokenInterface.(jwt.Token)
return c.JSON(http.StatusOK, echo.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
}, VerifyAccessToken)
e.Start(":8080")
}
或使用路由群組:
package main
import (
"github.com/labstack/echo/v4"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
e := echo.New()
// 建立受保護路由群組
api := e.Group("/api", VerifyAccessToken)
api.GET("/protected", func(c echo.Context) error {
// 直接從 context 取得存取權杖 (Access token) 資訊
token := c.Get("auth").(jwt.Token)
return c.JSON(200, echo.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "成功存取受保護資料 (Protected data accessed successfully)",
})
})
e.Start(":8080")
}
package main
import (
"github.com/gofiber/fiber/v2"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
app := fiber.New()
// 將中介軟體套用到受保護路由
app.Get("/api/protected", VerifyAccessToken, func(c *fiber.Ctx) error {
// 直接從 locals 取得存取權杖 (Access token) 資訊
tokenInterface := c.Locals("auth")
if tokenInterface == nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "Token not found"})
}
token := tokenInterface.(jwt.Token)
return c.JSON(fiber.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
app.Listen(":8080")
}
或使用路由群組:
package main
import (
"github.com/gofiber/fiber/v2"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
app := fiber.New()
// 建立受保護路由群組
api := app.Group("/api", VerifyAccessToken)
api.Get("/protected", func(c *fiber.Ctx) error {
// 直接從 locals 取得存取權杖 (Access token) 資訊
token := c.Locals("auth").(jwt.Token)
return c.JSON(fiber.Map{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "成功存取受保護資料 (Protected data accessed successfully)",
})
})
app.Listen(":8080")
}
package main
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := chi.NewRouter()
// 將中介軟體套用到受保護路由
r.With(VerifyAccessToken).Get("/api/protected", func(w http.ResponseWriter, r *http.Request) {
// 直接從 context 取得存取權杖 (Access token) 資訊
tokenInterface := r.Context().Value(AuthContextKey)
if tokenInterface == nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
json.NewEncoder(w).Encode(map[string]string{"error": "Token not found"})
return
}
token := tokenInterface.(jwt.Token)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
})
})
http.ListenAndServe(":8080", r)
}
或使用路由群組:
package main
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/lestrrat-go/jwx/v3/jwt"
)
func main() {
r := chi.NewRouter()
// 建立受保護路由群組
r.Route("/api", func(r chi.Router) {
r.Use(VerifyAccessToken)
r.Get("/protected", func(w http.ResponseWriter, r *http.Request) {
// 直接從 context 取得存取權杖 (Access token) 資訊
token := r.Context().Value(AuthContextKey).(jwt.Token)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"sub": token.Subject(),
"client_id": getStringClaim(token, "client_id"),
"organization_id": getStringClaim(token, "organization_id"),
"scopes": getScopesFromToken(token),
"audience": getAudienceFromToken(token),
"message": "成功存取受保護資料 (Protected data accessed successfully)",
})
})
})
http.ListenAndServe(":8080", r)
}
- Spring Boot
- Quarkus
- Micronaut
- Vert.x Web
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RestController
public class ProtectedController {
@GetMapping("/api/protected")
public Map<String, Object> protectedEndpoint(@AuthenticationPrincipal Jwt jwt) {
// 直接從 JWT 取得存取權杖 (Access token) 資訊
String scopes = jwt.getClaimAsString("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.getClaimAsString("client_id"),
"organization_id", jwt.getClaimAsString("organization_id"),
"scopes", scopeList,
"audience", jwt.getAudience()
);
}
@GetMapping("/api/protected/detailed")
public Map<String, Object> detailedEndpoint(@AuthenticationPrincipal Jwt jwt) {
// 你的受保護端點邏輯
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.getClaimAsString("client_id"),
"organization_id", jwt.getClaimAsString("organization_id"),
"scopes", jwt.getClaimAsString("scope"),
"audience", jwt.getAudience(),
"message", "成功存取受保護資料"
);
}
}
import org.eclipse.microprofile.jwt.JsonWebToken;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.container.ContainerRequestContext;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Path("/api")
public class ProtectedResource {
@Inject
JsonWebToken jwt;
@GET
@Path("/protected")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> protectedEndpoint(@Context ContainerRequestContext requestContext) {
// 直接從注入或 context 取得 JWT
JsonWebToken token = (JsonWebToken) requestContext.getProperty("auth");
if (token == null) {
token = jwt; // 備用注入的 JWT
}
String scopes = token.getClaim("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", token.getSubject(),
"client_id", token.<String>getClaim("client_id"),
"organization_id", token.<String>getClaim("organization_id"),
"scopes", scopeList,
"audience", token.getAudience()
);
}
@GET
@Path("/protected/detailed")
@Produces(MediaType.APPLICATION_JSON)
public Map<String, Object> detailedEndpoint() {
// 你的受保護端點邏輯
String scopes = jwt.getClaim("scope");
return Map.of(
"sub", jwt.getSubject(),
"client_id", jwt.<String>getClaim("client_id"),
"organization_id", jwt.<String>getClaim("organization_id"),
"scopes", scopes,
"audience", jwt.getAudience(),
"message", "成功存取受保護資料"
);
}
}
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import io.micronaut.security.annotation.Secured;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.rules.SecurityRule;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@Controller("/api")
@Secured(SecurityRule.IS_AUTHENTICATED)
public class ProtectedController {
@Get("/protected")
public Map<String, Object> protectedEndpoint(Authentication authentication) {
// 直接從 Authentication 取得存取權杖 (Access token) 資訊
String scopes = (String) authentication.getAttributes().get("scope");
List<String> scopeList = scopes != null ? Arrays.asList(scopes.split(" ")) : List.of();
return Map.of(
"sub", authentication.getName(),
"client_id", authentication.getAttributes().get("client_id"),
"organization_id", authentication.getAttributes().get("organization_id"),
"scopes", scopeList,
"audience", authentication.getAttributes().get("aud")
);
}
@Get("/protected/detailed")
public Map<String, Object> detailedEndpoint(Authentication authentication) {
// 你的受保護端點邏輯
return Map.of(
"sub", authentication.getName(),
"client_id", authentication.getAttributes().get("client_id"),
"organization_id", authentication.getAttributes().get("organization_id"),
"scopes", authentication.getAttributes().get("scope"),
"audience", authentication.getAttributes().get("aud"),
"message", "成功存取受保護資料"
);
}
}
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Promise;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
Router router = Router.router(vertx);
// 對受保護路由套用中介軟體
router.route("/api/protected*").handler(new JwtAuthHandler(vertx));
router.get("/api/protected").handler(this::protectedEndpoint);
router.get("/api/protected/detailed").handler(this::detailedEndpoint);
vertx.createHttpServer()
.requestHandler(router)
.listen(8080, result -> {
if (result.succeeded()) {
startPromise.complete();
} else {
startPromise.fail(result.cause());
}
});
}
private void protectedEndpoint(RoutingContext context) {
// 直接從 context 取得 JWT principal
JsonObject principal = context.get("auth");
if (principal == null) {
context.response()
.setStatusCode(500)
.putHeader("Content-Type", "application/json")
.end("{\"error\": \"找不到 JWT principal\"}");
return;
}
String scopes = principal.getString("scope");
JsonObject response = new JsonObject()
.put("sub", principal.getString("sub"))
.put("client_id", principal.getString("client_id"))
.put("organization_id", principal.getString("organization_id"))
.put("scopes", scopes != null ? scopes.split(" ") : new String[0])
.put("audience", principal.getJsonArray("aud"));
context.response()
.putHeader("Content-Type", "application/json")
.end(response.encode());
}
private void detailedEndpoint(RoutingContext context) {
// 你的受保護端點邏輯
JsonObject principal = context.get("auth");
JsonObject response = new JsonObject()
.put("sub", principal.getString("sub"))
.put("client_id", principal.getString("client_id"))
.put("organization_id", principal.getString("organization_id"))
.put("scopes", principal.getString("scope"))
.put("audience", principal.getJsonArray("aud"))
.put("message", "成功存取受保護資料");
context.response()
.putHeader("Content-Type", "application/json")
.end(response.encode());
}
}
我們已在前述章節設定好驗證 (Authentication) 與授權 (Authorization) 中介軟體。現在可以建立一個受保護的控制器,用來驗證存取權杖 (Access token) 並從已驗證的請求中擷取宣告 (Claims)。
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.Security.Claims;
namespace YourApiNamespace.Controllers
{
[ApiController]
[Route("api/[controller]")]
[Authorize] // 此控制器所有動作皆需驗證 (Authentication)
public class ProtectedController : ControllerBase
{
[HttpGet]
public IActionResult GetProtectedData()
{
// 直接從 User 宣告 (Claims) 取得存取權杖 (Access token) 資訊
var sub = User.FindFirst(ClaimTypes.NameIdentifier)?.Value ?? User.FindFirst("sub")?.Value;
var clientId = User.FindFirst("client_id")?.Value;
var organizationId = User.FindFirst("organization_id")?.Value;
var scopes = User.FindFirst("scope")?.Value?.Split(' ') ?? Array.Empty<string>();
var audience = User.FindAll("aud").Select(c => c.Value).ToArray();
return Ok(new {
sub,
client_id = clientId,
organization_id = organizationId,
scopes,
audience
});
}
[HttpGet("claims")]
public IActionResult GetAllClaims()
{
// 回傳所有宣告 (Claims) 以便除錯 / 檢查
var claims = User.Claims.Select(c => new { c.Type, c.Value }).ToList();
return Ok(new { claims });
}
}
}
- Laravel
- Symfony
- Slim
<?php
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Route;
Route::middleware('auth.token')->group(function () {
Route::get('/api/protected', function (Request $request) {
// 從 request 屬性取得驗證 (Authentication) 資訊
$auth = $request->attributes->get('auth');
return ['auth' => $auth->toArray()];
});
Route::get('/api/protected/detailed', function (Request $request) {
// 你的受保護端點邏輯
$auth = $request->attributes->get('auth');
return [
'auth' => $auth->toArray(),
'message' => '成功存取受保護資料'
];
});
});
或使用 controller:
<?php
namespace App\Http\Controllers\Api;
use App\Http\Controllers\Controller;
use Illuminate\Http\Request;
class ProtectedController extends Controller
{
public function __construct()
{
$this->middleware('auth.token');
}
public function index(Request $request)
{
// 從 request 屬性取得驗證 (Authentication) 資訊
$auth = $request->attributes->get('auth');
return ['auth' => $auth->toArray()];
}
public function show(Request $request)
{
// 你的受保護端點邏輯
$auth = $request->attributes->get('auth');
return [
'auth' => $auth->toArray(),
'message' => '成功存取受保護資料'
];
}
}
<?php
namespace App\Controller\Api;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Routing\Annotation\Route;
use Symfony\Component\Security\Http\Attribute\IsGranted;
#[Route('/api/protected')]
#[IsGranted('IS_AUTHENTICATED_FULLY')]
class ProtectedController extends AbstractController
{
#[Route('', methods: ['GET'])]
public function index(Request $request): JsonResponse
{
// 從 request 屬性取得驗證 (Authentication) 資訊
$auth = $request->attributes->get('auth');
return $this->json(['auth' => $auth->toArray()]);
}
#[Route('/detailed', methods: ['GET'])]
public function detailed(Request $request): JsonResponse
{
// 你的受保護端點邏輯
$auth = $request->attributes->get('auth');
return $this->json([
'auth' => $auth->toArray(),
'message' => '成功存取受保護資料'
]);
}
}
<?php
use App\Middleware\JwtMiddleware;
use Psr\Http\Message\ResponseInterface as Response;
use Psr\Http\Message\ServerRequestInterface as Request;
use Slim\Factory\AppFactory;
require __DIR__ . '/../vendor/autoload.php';
$app = AppFactory::create();
// 對受保護路由套用 middleware
$app->group('/api/protected', function ($group) {
$group->get('', function (Request $request, Response $response) {
// 從 request 屬性取得驗證 (Authentication) 資訊
$auth = $request->getAttribute('auth');
$response->getBody()->write(json_encode(['auth' => $auth->toArray()]));
return $response->withHeader('Content-Type', 'application/json');
});
$group->get('/detailed', function (Request $request, Response $response) {
// 你的受保護端點邏輯
$auth = $request->getAttribute('auth');
$data = [
'auth' => $auth->toArray(),
'message' => '成功存取受保護資料'
];
$response->getBody()->write(json_encode($data));
return $response->withHeader('Content-Type', 'application/json');
});
})->add(new JwtMiddleware());
// 公開端點(未受保護)
$app->get('/', function (Request $request, Response $response) {
$response->getBody()->write(json_encode(['message' => '公開端點']));
return $response->withHeader('Content-Type', 'application/json');
});
$app->run();
或使用更結構化的方式:
<?php
namespace App\Controllers;
use Psr\Http\Message\ResponseInterface as Response;
use Psr\Http\Message\ServerRequestInterface as Request;
class ProtectedController
{
public function index(Request $request, Response $response): Response
{
// 從 request 屬性取得驗證 (Authentication) 資訊
$auth = $request->getAttribute('auth');
$response->getBody()->write(json_encode(['auth' => $auth->toArray()]));
return $response->withHeader('Content-Type', 'application/json');
}
public function detailed(Request $request, Response $response): Response
{
// 你的受保護端點邏輯
$auth = $request->getAttribute('auth');
$data = [
'auth' => $auth->toArray(),
'message' => '成功存取受保護資料'
];
$response->getBody()->write(json_encode($data));
return $response->withHeader('Content-Type', 'application/json');
}
}
- Rails
- Sinatra
- Grape
class ApplicationController < ActionController::API # 僅限 API 應用程式
# class ApplicationController < ActionController::Base # 完整 Rails 應用程式
include JwtAuthentication
end
class Api::ProtectedController < ApplicationController
before_action :verify_access_token
def index
# 從 @auth 取得驗證資訊
render json: { auth: @auth.to_h }
end
def show
# 你的受保護端點邏輯
render json: {
auth: @auth.to_h,
message: "成功存取受保護資料"
}
end
end
Rails.application.routes.draw do
namespace :api do
resources :protected, only: [:index, :show]
end
end
require 'sinatra'
require 'json'
require_relative 'auth_middleware'
require_relative 'auth_constants'
require_relative 'auth_info'
require_relative 'authorization_error'
require_relative 'auth_helpers'
require_relative 'jwt_validator'
# 套用中介軟體
use AuthMiddleware
get '/api/protected' do
content_type :json
# 從 env 取得驗證資訊
auth = env['auth']
{ auth: auth.to_h }.to_json
end
get '/api/protected/:id' do
content_type :json
# 你的受保護端點邏輯
auth = env['auth']
{
auth: auth.to_h,
id: params[:id],
message: "成功存取受保護資料"
}.to_json
end
# 公開端點(未受中介軟體保護)
get '/' do
content_type :json
{ message: "公開端點" }.to_json
end
require 'grape'
require_relative 'auth_helpers'
require_relative 'auth_constants'
require_relative 'auth_info'
require_relative 'authorization_error'
require_relative 'jwt_validator'
class API < Grape::API
format :json
helpers GrapeAuthHelpers
namespace :api do
namespace :protected do
before do
authenticate_user!
end
get do
# 從驗證輔助方法取得驗證資訊
{ auth: auth.to_h }
end
get ':id' do
# 你的受保護端點邏輯
{
auth: auth.to_h,
id: params[:id],
message: "成功存取受保護資料"
}
end
end
end
# 公開端點(未受保護)
get :public do
{ message: "公開端點" }
end
end
require_relative 'api'
run API
- Axum
- Actix Web
- Rocket
use axum::{
extract::Extension,
http::StatusCode,
middleware,
response::Json,
routing::get,
Router,
};
use serde_json::{json, Value};
use std::sync::Arc;
use tower_http::cors::CorsLayer;
mod lib;
mod jwt_validator;
mod middleware as jwt_middleware;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
#[tokio::main]
async fn main() {
let validator = Arc::new(JwtValidator::new().await.expect("Failed to initialize JWT validator"));
let app = Router::new()
.route("/api/protected", get(protected_handler))
.route("/api/protected/detailed", get(detailed_handler))
.layer(middleware::from_fn(jwt_middleware::jwt_middleware))
.layer(Extension(validator))
.layer(CorsLayer::permissive());
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn protected_handler(Extension(auth): Extension<AuthInfo>) -> Json<Value> {
// 直接從 Extension 取得驗證 (Authentication) 資訊
Json(json!({ "auth": auth }))
}
async fn detailed_handler(Extension(auth): Extension<AuthInfo>) -> Json<Value> {
// 你的受保護端點邏輯
Json(json!({
"auth": auth,
"message": "成功存取受保護資料"
}))
}
use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Result};
use serde_json::{json, Value};
use std::sync::Arc;
mod lib;
mod jwt_validator;
mod middleware as jwt_middleware;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
use jwt_middleware::JwtMiddleware;
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let validator = Arc::new(JwtValidator::new().await.expect("Failed to initialize JWT validator"));
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(validator.clone()))
.wrap(Logger::default())
.service(
web::scope("/api/protected")
.wrap(JwtMiddleware::new(validator.clone()))
.route("", web::get().to(protected_handler))
.route("/detailed", web::get().to(detailed_handler))
)
})
.bind("127.0.0.1:8080")?
.run()
.await
}
async fn protected_handler(req: HttpRequest) -> Result<web::Json<Value>> {
// 從 request extensions 取得驗證 (Authentication) 資訊
let auth = req.extensions().get::<AuthInfo>().unwrap();
Ok(web::Json(json!({ "auth": auth })))
}
async fn detailed_handler(req: HttpRequest) -> Result<web::Json<Value>> {
// 你的受保護端點邏輯
let auth = req.extensions().get::<AuthInfo>().unwrap();
Ok(web::Json(json!({
"auth": auth,
"message": "成功存取受保護資料"
})))
}
use rocket::{get, launch, routes, serde::json::Json, State};
use serde_json::{json, Value};
mod lib;
mod jwt_validator;
mod guards;
use lib::AuthInfo;
use jwt_validator::JwtValidator;
#[get("/api/protected")]
fn protected_handler(auth: AuthInfo) -> Json<Value> {
// 直接從 request guard 取得驗證 (Authentication) 資訊
Json(json!({ "auth": auth }))
}
#[get("/api/protected/detailed")]
fn detailed_handler(auth: AuthInfo) -> Json<Value> {
// 你的受保護端點邏輯
Json(json!({
"auth": auth,
"message": "成功存取受保護資料"
}))
}
#[launch]
async fn rocket() -> _ {
let validator = JwtValidator::new().await.expect("Failed to initialize JWT validator");
rocket::build()
.manage(validator)
.mount("/", routes![protected_handler, detailed_handler])
}
步驟 5:測試你的實作
使用有效與無效權杖測試你的 API,確保:
- 有效權杖可通過並取得存取權。
- 無效或缺少權杖時回傳
401 Unauthorized
。若權杖有效但缺少必要權限或上下文,回傳403 Forbidden
。
相關資源
自訂權杖宣告 (Claim) JSON Web Token (JWT)OpenID Connect 探索 (Discovery)
RFC 8707:資源標示符 (Resource Indicators)