SSO 单点登录
概述
系统支持基于 OAuth 2.0 协议的单点登录(SSO),允许用户通过第三方平台账号快速登录,无需单独注册。支持 10+ 主流 OAuth 平台,包括 Gitee、GitHub、QQ、Google、微信、钉钉、飞书、企业微信等。
支持的 OAuth 平台
| 平台 | Provider | 配置前缀 | 说明 |
|---|---|---|---|
| Gitee | gitee | GITEE_ | Gitee 开放平台 |
| GitHub | github | GITHUB_ | GitHub OAuth Apps |
qq | QQ_ | QQ 互联 | |
google | GOOGLE_ | Google OAuth 2.0 | |
| 微信 | wechat | WECHAT_ | 微信开放平台 |
| Microsoft | microsoft | MICROSOFT_ | Microsoft 账户 |
| 钉钉 | dingtalk | DINGTALK_ | 钉钉开放平台 |
| 飞书 | feishu | FEISHU_ | 飞书开放平台 |
| 企业微信 | wecom | WECOM_ | 企业微信 |
架构设计
核心组件
backend-fastapi/core/oauth/
├── base_oauth_service.py # OAuth 基础服务类(抽象)
├── service.py # 各平台 OAuth 服务实现
├── api.py # OAuth API 接口
└── schema.py # OAuth 数据模型设计模式
采用 模板方法模式 + 策略模式:
BaseOAuthService:抽象基类,定义 OAuth 标准流程- 各平台服务类:继承基类,实现平台特定逻辑
OAUTH_PROVIDERS:服务注册表,统一管理所有平台
# 服务注册表
OAUTH_PROVIDERS = {
'gitee': GiteeOAuthService,
'github': GitHubOAuthService,
'qq': QQOAuthService,
'google': GoogleOAuthService,
'wechat': WeChatOAuthService,
'microsoft': MicrosoftOAuthService,
'dingtalk': DingTalkOAuthService,
'feishu': FeishuOAuthService,
'wecom': WeComOAuthService,
}OAuth 登录流程
流程图
┌─────────┐ ┌─────────┐ ┌─────────┐
│ 前端 │ │ 后端 │ │ OAuth │
│ │ │ │ │ 平台 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
│ 1. GET /oauth/{provider}/authorize │
│ ──────────────────────────→ │ │
│ │ │
│ 2. 返回授权 URL + CSRF state │ │
│ ←────────────────────────── │ │
│ │ │
│ 3. 重定向到授权页面 │
│ ─────────────────────────────────────────────────────────→ │
│ │ │
│ │ 4. 用户授权 │
│ │ │
│ 5. 回调到前端(带 code + state) │
│ ←───────────────────────────────────────────────────────── │
│ │ │
│ 6. POST /oauth/{provider}/callback │
│ { code, state } │ │
│ ──────────────────────────→ │ │
│ │ │
│ │ 7. 验证 CSRF state │
│ │ │
│ │ 8. 用 code 换取 access_token │
│ │ ──────────────────────────→ │
│ │ │
│ │ 9. 返回 access_token │
│ │ ←────────────────────────── │
│ │ │
│ │ 10. 获取用户信息 │
│ │ ──────────────────────────→ │
│ │ │
│ │ 11. 返回用户信息 │
│ │ ←────────────────────────── │
│ │ │
│ │ 12. 创建/更新用户 │
│ │ 13. 生成 JWT Token │
│ │ 14. 缓存用户信息到 Redis │
│ │ │
│ 15. 返回 JWT Token + 用户信息 │ │
│ ←────────────────────────── │ │
│ │ │详细步骤
1. 获取授权 URL
请求:
GET /api/core/oauth/{provider}/authorize?state={redirect_info}响应:
{
"authorize_url": "https://github.com/login/oauth/authorize?client_id=xxx&redirect_uri=xxx&state=xxx"
}后端处理:
- 生成 32 字节 CSRF token
- 将 CSRF token 存入 Redis(5 分钟过期)
- 将 CSRF token 和前端 state 合并为 JSON:
{"csrf": "xxx", "payload": "redirect_info"} - 构造授权 URL 并返回
2. 用户授权
前端将用户重定向到授权 URL,用户在 OAuth 平台完成授权。
3. 处理回调
请求:
POST /api/core/oauth/{provider}/callback
Content-Type: application/json
{
"code": "authorization_code",
"state": "{\"csrf\":\"xxx\",\"payload\":\"redirect_info\"}"
}响应:
{
"access_token": "eyJhbGciOi...",
"refresh_token": "eyJhbGciOi...",
"expire": 86400,
"user_info": {
"id": "uuid",
"username": "user123",
"name": "张三",
"email": "user@example.com",
"avatar": "https://...",
"user_type": 1,
"is_superuser": false
}
}后端处理:
- 验证 CSRF:检查 state 中的 CSRF token 是否存在于 Redis
- 换取 Access Token:使用 code 向 OAuth 平台换取 access_token
- 获取用户信息:使用 access_token 获取用户信息
- 标准化用户信息:将不同平台的用户信息格式统一
- 创建/更新用户:
- 根据
{provider}_id查找用户 - 不存在则创建新用户(自动生成唯一用户名)
- 存在则更新邮箱、头像等信息
- 根据
- 生成 JWT Token:生成 access_token 和 refresh_token
- 缓存用户信息:将角色、部门等信息缓存到 Redis
- 记录登录日志:记录 IP、设备、登录方式等
BaseOAuthService 基类
所有 OAuth 服务类都继承自 BaseOAuthService,提供统一的接口。
抽象方法(子类必须实现)
class BaseOAuthService(ABC):
# 子类必须定义
PROVIDER_NAME: str = None # 如 'gitee', 'github'
AUTHORIZE_URL: str = None # 授权 URL
TOKEN_URL: str = None # 获取 token 的 URL
USER_INFO_URL: str = None # 获取用户信息的 URL
@classmethod
@abstractmethod
def get_client_config(cls) -> Dict[str, str]:
"""获取客户端配置(client_id, client_secret, redirect_uri)"""
pass
@classmethod
@abstractmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
"""使用 access_token 获取用户信息"""
pass
@classmethod
@abstractmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
"""标准化用户信息为统一格式"""
pass标准化用户信息格式
{
'provider_id': str, # 平台用户 ID(必填)
'username': str, # 用户名(必填)
'name': str, # 显示名称(必填)
'email': str, # 邮箱(可选)
'avatar': str, # 头像 URL(可选)
'bio': str, # 个人简介(可选)
}通用方法(子类可选覆盖)
@classmethod
def get_authorize_url(cls, state: str = None) -> str:
"""构造授权 URL"""
pass
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
"""获取额外的授权参数(如 scope)"""
return {}
@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
"""使用 code 换取 access_token"""
pass
@classmethod
def get_token_request_headers(cls) -> Dict[str, str]:
"""获取 token 请求的 headers"""
return {}
@classmethod
async def handle_oauth_login(cls, db, code, ip_address, ...) -> Tuple:
"""处理完整的 OAuth 登录流程"""
pass平台特定实现示例
Gitee OAuth
class GiteeOAuthService(BaseOAuthService):
PROVIDER_NAME = 'gitee'
AUTHORIZE_URL = "https://gitee.com/oauth/authorize"
TOKEN_URL = "https://gitee.com/oauth/token"
USER_INFO_URL = "https://gitee.com/api/v5/user"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.GITEE_CLIENT_ID,
'client_secret': settings.GITEE_CLIENT_SECRET,
'redirect_uri': settings.GITEE_REDIRECT_URI,
}
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
params = {'access_token': access_token}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, params=params)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
return {
'provider_id': str(raw_user_info.get('id')),
'username': raw_user_info.get('login'),
'name': raw_user_info.get('name', raw_user_info.get('login')),
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('avatar_url'),
'bio': raw_user_info.get('bio'),
}GitHub OAuth
class GitHubOAuthService(BaseOAuthService):
PROVIDER_NAME = 'github'
AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
USER_INFO_URL = "https://api.github.com/user"
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
return {'scope': 'user:email'}
@classmethod
def get_token_request_headers(cls) -> Dict[str, str]:
return {'Accept': 'application/json'}
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
headers = {
'Authorization': f'Bearer {access_token}',
'Accept': 'application/json',
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, headers=headers)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
return {
'provider_id': str(raw_user_info.get('id')),
'username': raw_user_info.get('login'),
'name': raw_user_info.get('name') or raw_user_info.get('login'),
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('avatar_url'),
'bio': raw_user_info.get('bio'),
}QQ OAuth
QQ 互联的响应格式比较特殊(URL 参数格式和 JSONP 格式),需要特殊处理:
class QQOAuthService(BaseOAuthService):
PROVIDER_NAME = 'qq'
AUTHORIZE_URL = "https://graph.qq.com/oauth2.0/authorize"
TOKEN_URL = "https://graph.qq.com/oauth2.0/token"
USER_INFO_URL = "https://graph.qq.com/user/get_user_info"
OPENID_URL = "https://graph.qq.com/oauth2.0/me"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.QQ_APP_ID,
'client_secret': settings.QQ_APP_KEY,
'redirect_uri': settings.QQ_REDIRECT_URI,
}
@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
# QQ 返回 URL 参数格式: access_token=xxx&expires_in=xxx
config = cls.get_client_config()
params = {
'grant_type': 'authorization_code',
'client_id': config['client_id'],
'client_secret': config['client_secret'],
'code': code,
'redirect_uri': config['redirect_uri'],
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.TOKEN_URL, params=params)
response_text = response.text
match = re.search(r'access_token=([^&]+)', response_text)
return match.group(1) if match else None
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
async with httpx.AsyncClient() as client:
# 1. 获取 openid(JSONP 格式)
openid_response = await client.get(cls.OPENID_URL, params={'access_token': access_token})
openid_text = openid_response.text
# 解析 JSONP: callback( {"client_id":"xxx","openid":"xxx"} );
match = re.search(r'callback\(\s*(\{.*?\})\s*\)', openid_text)
openid_data = json.loads(match.group(1))
openid = openid_data.get('openid')
# 2. 使用 openid 获取用户信息
config = cls.get_client_config()
user_response = await client.get(cls.USER_INFO_URL, params={
'access_token': access_token,
'oauth_consumer_key': config['client_id'],
'openid': openid
})
user_info = user_response.json()
user_info['openid'] = openid
return user_info
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
return {
'provider_id': raw_user_info.get('openid'),
'username': raw_user_info.get('nickname', '').replace(' ', '_'),
'name': raw_user_info.get('nickname'),
'email': None, # QQ 不返回邮箱
'avatar': raw_user_info.get('figureurl_qq_2') or raw_user_info.get('figureurl_qq_1'),
'bio': None,
}Google OAuth
class GoogleOAuthService(BaseOAuthService):
PROVIDER_NAME = 'google'
AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
TOKEN_URL = "https://oauth2.googleapis.com/token"
USER_INFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.GOOGLE_CLIENT_ID,
'client_secret': settings.GOOGLE_CLIENT_SECRET,
'redirect_uri': settings.GOOGLE_REDIRECT_URI,
}
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
return {
'scope': 'openid email profile',
'access_type': 'offline',
'response_type': 'code',
}
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, headers=headers)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
return {
'provider_id': raw_user_info.get('id'),
'username': raw_user_info.get('email', '').split('@')[0],
'name': raw_user_info.get('name') or raw_user_info.get('email'),
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('picture'),
'bio': None,
}微信 OAuth
微信的参数名称与标准 OAuth 2.0 不同,且需要同时传递 access_token 和 openid:
class WeChatOAuthService(BaseOAuthService):
PROVIDER_NAME = 'wechat'
AUTHORIZE_URL = "https://open.weixin.qq.com/connect/qrconnect"
TOKEN_URL = "https://api.weixin.qq.com/sns/oauth2/access_token"
USER_INFO_URL = "https://api.weixin.qq.com/sns/userinfo"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.WECHAT_APP_ID,
'client_secret': settings.WECHAT_APP_SECRET,
'redirect_uri': settings.WECHAT_REDIRECT_URI,
}
@classmethod
def get_user_id_field(cls) -> str:
return 'wechat_unionid' # 使用 unionid 作为唯一标识
@classmethod
def get_authorize_url(cls, state: str = None) -> str:
# 微信使用 appid 而不是 client_id
config = cls.get_client_config()
params = {
'appid': config['client_id'],
'redirect_uri': config['redirect_uri'],
'response_type': 'code',
'scope': 'snsapi_login',
}
if state:
params['state'] = state
query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
return f"{cls.AUTHORIZE_URL}?{query_string}#wechat_redirect"
@classmethod
async def get_access_token(cls, code: str) -> Optional[Dict]:
# 微信返回 Dict(含 access_token + openid)
config = cls.get_client_config()
params = {
'appid': config['client_id'],
'secret': config['client_secret'],
'code': code,
'grant_type': 'authorization_code',
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.TOKEN_URL, params=params)
return response.json() # 返回 Dict,包含 access_token 和 openid
@classmethod
async def get_user_info(cls, access_token: str, openid: str = None) -> Optional[Dict]:
params = {
'access_token': access_token,
'openid': openid,
'lang': 'zh_CN',
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, params=params)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
provider_id = raw_user_info.get('unionid') or raw_user_info.get('openid')
nickname = raw_user_info.get('nickname', '')
username = nickname.replace(' ', '_')[:30] if nickname else f"wechat_{provider_id[:8]}"
return {
'provider_id': provider_id,
'username': username,
'name': nickname or username,
'email': None, # 微信不返回邮箱
'avatar': raw_user_info.get('headimgurl'),
'bio': None,
}Microsoft OAuth
class MicrosoftOAuthService(BaseOAuthService):
PROVIDER_NAME = 'microsoft'
AUTHORIZE_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
TOKEN_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
USER_INFO_URL = "https://graph.microsoft.com/v1.0/me"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.MICROSOFT_CLIENT_ID,
'client_secret': settings.MICROSOFT_CLIENT_SECRET,
'redirect_uri': settings.MICROSOFT_REDIRECT_URI,
}
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
return {
'scope': 'openid email profile User.Read',
'response_type': 'code',
'response_mode': 'query',
}
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
headers = {'Authorization': f'Bearer {access_token}'}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, headers=headers)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
user_principal_name = raw_user_info.get('userPrincipalName', '')
username = user_principal_name.split('@')[0] if '@' in user_principal_name else user_principal_name
email = raw_user_info.get('mail') or raw_user_info.get('userPrincipalName')
return {
'provider_id': raw_user_info.get('id'),
'username': username or f"ms_{raw_user_info.get('id', '')[:8]}",
'name': raw_user_info.get('displayName') or username,
'email': email,
'avatar': None, # Microsoft Graph 需要额外请求获取头像
'bio': raw_user_info.get('jobTitle'),
}钉钉 OAuth
钉钉使用 JSON body 请求 token,且 header 使用特殊的 x-acs-dingtalk-access-token:
class DingTalkOAuthService(BaseOAuthService):
PROVIDER_NAME = 'dingtalk'
AUTHORIZE_URL = "https://login.dingtalk.com/oauth2/auth"
TOKEN_URL = "https://api.dingtalk.com/v1.0/oauth2/userAccessToken"
USER_INFO_URL = "https://api.dingtalk.com/v1.0/contact/users/me"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.DINGTALK_APP_ID,
'client_secret': settings.DINGTALK_APP_SECRET,
'redirect_uri': settings.DINGTALK_REDIRECT_URI,
}
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
return {
'response_type': 'code',
'scope': 'openid',
'prompt': 'consent',
}
@classmethod
def get_user_id_field(cls) -> str:
return 'dingtalk_unionid'
@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
config = cls.get_client_config()
data = {
'clientId': config['client_id'],
'clientSecret': config['client_secret'],
'code': code,
'grantType': 'authorization_code',
}
headers = {'Content-Type': 'application/json'}
async with httpx.AsyncClient() as client:
response = await client.post(cls.TOKEN_URL, json=data, headers=headers)
result = response.json()
return result.get('accessToken')
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
headers = {
'x-acs-dingtalk-access-token': access_token,
'Content-Type': 'application/json',
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, headers=headers)
return response.json()
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
provider_id = raw_user_info.get('unionId', '')
nick = raw_user_info.get('nick', '')
return {
'provider_id': provider_id,
'username': nick if nick else f"dingtalk_{provider_id[:8]}",
'name': nick or f"dingtalk_{provider_id[:8]}",
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('avatarUrl'),
'bio': f"钉钉用户 - {nick}" if nick else "钉钉用户",
}飞书 OAuth
飞书需要先获取 app_access_token,再用它来换取 user_access_token:
class FeishuOAuthService(BaseOAuthService):
PROVIDER_NAME = 'feishu'
AUTHORIZE_URL = "https://open.feishu.cn/open-apis/authen/v1/authorize"
TOKEN_URL = "https://open.feishu.cn/open-apis/authen/v1/oidc/access_token"
USER_INFO_URL = "https://open.feishu.cn/open-apis/authen/v1/user_info"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.FEISHU_APP_ID,
'client_secret': settings.FEISHU_APP_SECRET,
'redirect_uri': settings.FEISHU_REDIRECT_URI,
}
@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
return {
'response_type': 'code',
'scope': 'contact:user.base:readonly',
}
@classmethod
def get_user_id_field(cls) -> str:
return 'feishu_union_id'
@classmethod
async def _get_app_access_token(cls) -> Optional[str]:
"""获取应用级别的 access_token"""
config = cls.get_client_config()
url = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal"
data = {
'app_id': config['client_id'],
'app_secret': config['client_secret'],
}
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
result = response.json()
if result.get('code') == 0:
return result.get('app_access_token')
return None
@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
app_access_token = await cls._get_app_access_token()
if not app_access_token:
return None
data = {'grant_type': 'authorization_code', 'code': code}
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {app_access_token}',
}
async with httpx.AsyncClient() as client:
response = await client.post(cls.TOKEN_URL, json=data, headers=headers)
result = response.json()
if result.get('code') == 0 and 'data' in result:
return result['data'].get('access_token')
return None
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json',
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.USER_INFO_URL, headers=headers)
result = response.json()
if result.get('code') == 0 and 'data' in result:
return result['data']
return None
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
provider_id = raw_user_info.get('union_id', '')
name = raw_user_info.get('name', '')
en_name = raw_user_info.get('en_name', '')
username = (en_name or name or f"feishu_{provider_id[:8]}").replace(' ', '_')
return {
'provider_id': provider_id,
'username': username,
'name': name or username,
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('avatar_url') or raw_user_info.get('avatar_big'),
'bio': f"飞书用户 - {name}" if name else "飞书用户",
}企业微信 OAuth
企业微信使用特殊的 SSO 登录页面,需要先获取 corp_access_token,再用 code 获取 userid,最后获取用户详情:
class WeComOAuthService(BaseOAuthService):
PROVIDER_NAME = 'wecom'
AUTHORIZE_URL = "https://login.work.weixin.qq.com/wwlogin/sso/login"
TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
USER_INFO_URL = "https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo"
USER_DETAIL_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.WECOM_CORP_ID,
'client_secret': settings.WECOM_APP_SECRET,
'redirect_uri': settings.WECOM_REDIRECT_URI,
'agent_id': settings.WECOM_AGENT_ID,
}
@classmethod
def get_user_id_field(cls) -> str:
return 'wecom_userid'
@classmethod
def get_authorize_url(cls, state: str = None) -> str:
config = cls.get_client_config()
params = {
'login_type': 'CorpApp',
'appid': config['client_id'],
'agentid': config['agent_id'],
'redirect_uri': config['redirect_uri'],
}
if state:
params['state'] = state
query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
return f"{cls.AUTHORIZE_URL}?{query_string}"
@classmethod
async def _get_corp_access_token(cls) -> Optional[str]:
"""获取企业级 access_token"""
config = cls.get_client_config()
params = {
'corpid': config['client_id'],
'corpsecret': config['client_secret'],
}
async with httpx.AsyncClient() as client:
response = await client.get(cls.TOKEN_URL, params=params)
result = response.json()
if result.get('errcode') == 0:
return result.get('access_token')
return None
@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
# 返回 "corp_access_token|code" 格式,在 get_user_info 中拆分使用
corp_token = await cls._get_corp_access_token()
if not corp_token:
return None
return f"{corp_token}|{code}"
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
parts = access_token.split('|', 1)
if len(parts) != 2:
return None
corp_token, code = parts
async with httpx.AsyncClient() as client:
# 第一步:用 code 获取 userid
params = {'access_token': corp_token, 'code': code}
response = await client.get(cls.USER_INFO_URL, params=params)
result = response.json()
if result.get('errcode') != 0:
return None
userid = result.get('userid') or result.get('UserId')
# 第二步:用 userid 获取用户详情
detail_params = {'access_token': corp_token, 'userid': userid}
detail_response = await client.get(cls.USER_DETAIL_URL, params=detail_params)
user_detail = detail_response.json()
if user_detail.get('errcode') != 0:
return {'userid': userid, 'name': userid}
return user_detail
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
userid = raw_user_info.get('userid', '')
name = raw_user_info.get('name', '')
email = raw_user_info.get('biz_mail') or raw_user_info.get('email')
avatar = raw_user_info.get('thumb_avatar') or raw_user_info.get('avatar')
position = raw_user_info.get('position', '')
return {
'provider_id': userid,
'username': userid if userid else f"wecom_{name}",
'name': name or userid,
'email': email,
'avatar': avatar,
'bio': f"企业微信用户 - {position}" if position else "企业微信用户",
}用户创建与关联
用户字段映射
OAuth 用户在 User 表中的字段:
| 字段 | 说明 | 示例 |
|---|---|---|
gitee_id | Gitee 用户 ID | "12345678" |
github_id | GitHub 用户 ID | "87654321" |
qq_id | QQ OpenID | "A1B2C3D4..." |
google_id | Google 用户 ID | "1234567890" |
wechat_id | 微信 OpenID | "oX1Y2Z3..." |
oauth_provider | 最后登录的 OAuth 平台 | "github" |
last_login_type | 最后登录方式 | "github" |
用户创建逻辑
# 1. 根据 provider_id 查找用户
user_id_field = f"{provider}_id" # 如 'github_id'
user = await db.execute(
select(User).where(getattr(User, user_id_field) == provider_id)
).scalar_one_or_none()
if user:
# 用户已存在,更新信息
if email and not user.email:
user.email = email
else:
# 用户不存在,创建新用户
# 生成唯一用户名(避免冲突)
unique_username = username
counter = 1
while await db.execute(select(User).where(User.username == unique_username)).scalar_one_or_none():
unique_username = f"{username}_{counter}"
counter += 1
user = User(
username=unique_username,
name=name,
email=email,
**{user_id_field: provider_id},
oauth_provider=provider,
user_type=1,
user_status=1,
is_active=True,
is_superuser=settings.GRANT_ADMIN_TO_OAUTH_USER, # 可配置
)
db.add(user)自动授予管理员权限
通过环境变量 GRANT_ADMIN_TO_OAUTH_USER 控制:
# .env
GRANT_ADMIN_TO_OAUTH_USER=false # 默认 false,OAuth 用户为普通用户安全机制
1. CSRF 防护
使用 state 参数防止 CSRF 攻击:
# 生成 CSRF token
csrf_token = secrets.token_urlsafe(32)
# 存入 Redis(5 分钟过期)
await redis.set(
f"oauth_state:{csrf_token}",
"1",
ex=300
)
# 合并前端 state
state_data = {"csrf": csrf_token, "payload": frontend_state}
combined_state = json.dumps(state_data)回调时验证:
# 解析 state
state_data = json.loads(state)
csrf_token = state_data.get("csrf")
# 验证 CSRF token
exists = await redis.get(f"oauth_state:{csrf_token}")
if not exists:
raise HTTPException(400, "授权请求已过期")
# 验证通过后删除(一次性使用)
await redis.delete(f"oauth_state:{csrf_token}")2. Token 安全
- Access Token:24 小时过期
- Refresh Token:7 天过期
- 设备绑定:Token 包含 device_id
- 多设备控制:可配置单设备登录
3. 用户状态检查
if not user.is_active:
raise ValueError("账户已被禁用")
if user.user_status == 0:
raise ValueError("账户已被禁用")
if user.user_status == 2:
raise ValueError("账户已被锁定")配置说明
环境变量配置
每个 OAuth 平台需要配置 3 个环境变量:
# Gitee
GITEE_CLIENT_ID=your_client_id
GITEE_CLIENT_SECRET=your_client_secret
GITEE_REDIRECT_URI=http://localhost:5173/oauth/gitee/callback
# GitHub
GITHUB_CLIENT_ID=your_client_id
GITHUB_CLIENT_SECRET=your_client_secret
GITHUB_REDIRECT_URI=http://localhost:5173/oauth/github/callback
# QQ(使用 APP_ID 和 APP_KEY)
QQ_APP_ID=your_app_id
QQ_APP_KEY=your_app_key
QQ_REDIRECT_URI=http://localhost:5173/oauth/qq/callback
# Google
GOOGLE_CLIENT_ID=your_client_id
GOOGLE_CLIENT_SECRET=your_client_secret
GOOGLE_REDIRECT_URI=http://localhost:5173/oauth/google/callbackOAuth 应用注册
Gitee
- 访问 https://gitee.com/oauth/applications
- 点击「创建应用」
- 填写应用信息:
- 应用名称:你的应用名称
- 应用主页:http://localhost:5173
- 应用回调地址:http://localhost:5173/oauth/gitee/callback
- 权限:勾选
user_info
- 获取
Client ID和Client Secret - 配置环境变量:bash
GITEE_CLIENT_ID=your_client_id GITEE_CLIENT_SECRET=your_client_secret GITEE_REDIRECT_URI=http://localhost:5173/oauth/gitee/callback
GitHub
- 访问 https://github.com/settings/developers
- 点击「New OAuth App」
- 填写应用信息:
- Application name:应用名称
- Homepage URL:http://localhost:5173
- Authorization callback URL:http://localhost:5173/oauth/github/callback
- 获取
Client ID和Client Secret - 配置环境变量:bash
GITHUB_CLIENT_ID=your_client_id GITHUB_CLIENT_SECRET=your_client_secret GITHUB_REDIRECT_URI=http://localhost:5173/oauth/github/callback
QQ 互联
- 访问 https://connect.qq.com/
- 注册成为开发者(需要企业资质或个人实名认证)
- 创建应用:
- 应用名称:你的应用名称
- 应用类型:网站应用
- 网站地址:你的网站域名
- 回调地址:http://your-domain.com/oauth/qq/callback
- 获取
APP ID和APP Key - 配置环境变量:bash
QQ_APP_ID=your_app_id QQ_APP_KEY=your_app_key QQ_REDIRECT_URI=http://your-domain.com/oauth/qq/callback
注意:QQ 互联要求回调地址必须是已备案的域名,不支持 localhost。
Google
- 访问 https://console.cloud.google.com/
- 创建项目或选择现有项目
- 进入「API 和服务」→「凭据」
- 点击「创建凭据」→「OAuth 客户端 ID」
- 配置 OAuth 同意屏幕(首次需要)
- 选择应用类型「Web 应用」,填写:
- 名称:应用名称
- 已授权的 JavaScript 来源:http://localhost:5173
- 已授权的重定向 URI:http://localhost:5173/oauth/google/callback
- 获取
Client ID和Client Secret - 配置环境变量:bash
GOOGLE_CLIENT_ID=your_client_id GOOGLE_CLIENT_SECRET=your_client_secret GOOGLE_REDIRECT_URI=http://localhost:5173/oauth/google/callback
微信开放平台
- 访问 https://open.weixin.qq.com/
- 注册成为开发者(需要企业资质)
- 创建网站应用:
- 应用名称:你的应用名称
- 应用官网:你的网站域名
- 提交审核并等待通过
- 获取
AppID和AppSecret - 配置环境变量:bash
WECHAT_APP_ID=your_app_id WECHAT_APP_SECRET=your_app_secret WECHAT_REDIRECT_URI=http://your-domain.com/oauth/wechat/callback
注意:微信开放平台需要企业资质,个人开发者无法使用。
Microsoft
- 访问 https://portal.azure.com/
- 进入「Azure Active Directory」→「应用注册」
- 点击「新注册」:
- 名称:应用名称
- 受支持的帐户类型:任何组织目录中的帐户和个人 Microsoft 帐户
- 重定向 URI:Web - http://localhost:5173/oauth/microsoft/callback
- 在「证书和密码」中创建客户端密码
- 获取「应用程序(客户端) ID」和「客户端密码」
- 配置环境变量:bash
MICROSOFT_CLIENT_ID=your_client_id MICROSOFT_CLIENT_SECRET=your_client_secret MICROSOFT_REDIRECT_URI=http://localhost:5173/oauth/microsoft/callback
钉钉
- 访问 https://open.dingtalk.com/
- 创建应用:
- 进入「开发者后台」→「应用开发」→「企业内部开发」
- 点击「创建应用」
- 在「登录与分享」中配置:
- 回调域名:your-domain.com
- 获取
AppKey(即 Client ID)和AppSecret - 配置环境变量:bash
DINGTALK_APP_ID=your_app_key DINGTALK_APP_SECRET=your_app_secret DINGTALK_REDIRECT_URI=http://your-domain.com/oauth/dingtalk/callback
飞书
- 访问 https://open.feishu.cn/
- 创建企业自建应用:
- 进入「开发者后台」→「创建企业自建应用」
- 在「安全设置」中配置:
- 在「权限管理」中申请权限:
contact:user.base:readonly(获取用户基本信息)
- 发布应用版本
- 获取
App ID和App Secret - 配置环境变量:bash
FEISHU_APP_ID=your_app_id FEISHU_APP_SECRET=your_app_secret FEISHU_REDIRECT_URI=http://localhost:5173/oauth/feishu/callback
企业微信
- 访问 https://work.weixin.qq.com/
- 登录企业微信管理后台
- 进入「应用管理」→「自建」→「创建应用」
- 在应用详情中获取:
- AgentId:应用 ID
- Secret:应用密钥
- 在「企业信息」中获取 企业 ID(CorpID)
- 在「网页授权及 JS-SDK」中配置:
- 可信域名:your-domain.com
- 配置环境变量:bash
WECOM_CORP_ID=your_corp_id WECOM_APP_SECRET=your_app_secret WECOM_AGENT_ID=your_agent_id WECOM_REDIRECT_URI=http://your-domain.com/oauth/wecom/callback
前端集成
发起登录
// 1. 获取授权 URL
const response = await fetch(`/api/core/oauth/github/authorize?state=${encodeURIComponent(redirectUrl)}`)
const { authorize_url } = await response.json()
// 2. 重定向到授权页面
window.location.href = authorize_url处理回调
// OAuth 平台回调到前端(URL 参数包含 code 和 state)
const urlParams = new URLSearchParams(window.location.search)
const code = urlParams.get('code')
const state = urlParams.get('state')
// 发送到后端完成登录
const response = await fetch(`/api/core/oauth/github/callback`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ code, state })
})
const { access_token, refresh_token, user_info } = await response.json()
// 存储 token
localStorage.setItem('access_token', access_token)
localStorage.setItem('refresh_token', refresh_token)
// 跳转到应用
const stateData = JSON.parse(state)
const redirectUrl = stateData.payload || '/'
router.push(redirectUrl)扩展新平台
1. 创建服务类
# core/oauth/service.py
class NewPlatformOAuthService(BaseOAuthService):
PROVIDER_NAME = 'new_platform'
AUTHORIZE_URL = "https://..."
TOKEN_URL = "https://..."
USER_INFO_URL = "https://..."
@classmethod
def get_client_config(cls) -> Dict[str, str]:
return {
'client_id': settings.NEW_PLATFORM_CLIENT_ID,
'client_secret': settings.NEW_PLATFORM_CLIENT_SECRET,
'redirect_uri': settings.NEW_PLATFORM_REDIRECT_URI,
}
@classmethod
async def get_user_info(cls, access_token: str) -> Optional[Dict]:
# 实现获取用户信息的逻辑
pass
@classmethod
def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
# 标准化用户信息
return {
'provider_id': str(raw_user_info.get('id')),
'username': raw_user_info.get('username'),
'name': raw_user_info.get('name'),
'email': raw_user_info.get('email'),
'avatar': raw_user_info.get('avatar'),
'bio': raw_user_info.get('bio'),
}2. 注册到服务表
# core/oauth/service.py
OAUTH_PROVIDERS = {
# ... 现有平台
'new_platform': NewPlatformOAuthService,
}3. 添加数据库字段
# 数据库迁移
alembic revision --autogenerate -m "add new_platform_id to user"
# User 模型会自动包含新字段
new_platform_id = Column(String(255), nullable=True, comment="新平台用户ID")4. 配置环境变量
NEW_PLATFORM_CLIENT_ID=xxx
NEW_PLATFORM_CLIENT_SECRET=xxx
NEW_PLATFORM_REDIRECT_URI=http://localhost:5173/oauth/new_platform/callback常见问题
Q1: OAuth 用户没有密码怎么办?
A: OAuth 用户的 password 字段为 NULL,允许空密码。用户只能通过 OAuth 登录,不能使用用户名密码登录。
Q2: 用户可以绑定多个 OAuth 平台吗?
A: 可以。同一个用户可以有多个 {provider}_id 字段,分别对应不同平台。
Q3: 如何处理 OAuth 平台返回的邮箱为空?
A: 邮箱字段为可选,为空时不影响用户创建。后续用户可以在个人设置中补充邮箱。
Q4: CSRF token 过期怎么办?
A: CSRF token 有效期为 5 分钟。如果用户授权超时,需要重新发起登录流程。
Q5: 如何调试 OAuth 登录?
A:
- 检查环境变量配置是否正确
- 查看后端日志(
logger.info/error) - 使用浏览器开发者工具查看网络请求
- 确认回调 URL 与 OAuth 应用配置一致
API 参考
GET /api/core/oauth/{provider}/authorize
获取 OAuth 授权 URL
路径参数:
provider: OAuth 平台(gitee/github/qq/google/wechat/microsoft/dingtalk/feishu/wecom)
查询参数:
state: 前端传递的额外状态参数(可选)
响应:
{
"authorize_url": "https://..."
}POST /api/core/oauth/{provider}/callback
处理 OAuth 回调
路径参数:
provider: OAuth 平台
请求体:
{
"code": "authorization_code",
"state": "{\"csrf\":\"xxx\",\"payload\":\"...\"}"
}响应:
{
"access_token": "eyJhbGciOi...",
"refresh_token": "eyJhbGciOi...",
"expire": 86400,
"user_info": {
"id": "uuid",
"username": "user123",
"name": "张三",
"email": "user@example.com",
"avatar": "https://...",
"user_type": 1,
"is_superuser": false
}
}