Skip to content

SSO 单点登录

概述

系统支持基于 OAuth 2.0 协议的单点登录(SSO),允许用户通过第三方平台账号快速登录,无需单独注册。支持 10+ 主流 OAuth 平台,包括 Gitee、GitHub、QQ、Google、微信、钉钉、飞书、企业微信等。

支持的 OAuth 平台

平台Provider配置前缀说明
GiteegiteeGITEE_Gitee 开放平台
GitHubgithubGITHUB_GitHub OAuth Apps
QQqqQQ_QQ 互联
GooglegoogleGOOGLE_Google OAuth 2.0
微信wechatWECHAT_微信开放平台
MicrosoftmicrosoftMICROSOFT_Microsoft 账户
钉钉dingtalkDINGTALK_钉钉开放平台
飞书feishuFEISHU_飞书开放平台
企业微信wecomWECOM_企业微信

架构设计

核心组件

backend-fastapi/core/oauth/
├── base_oauth_service.py    # OAuth 基础服务类(抽象)
├── service.py                # 各平台 OAuth 服务实现
├── api.py                    # OAuth API 接口
└── schema.py                 # OAuth 数据模型

设计模式

采用 模板方法模式 + 策略模式

  • BaseOAuthService:抽象基类,定义 OAuth 标准流程
  • 各平台服务类:继承基类,实现平台特定逻辑
  • OAUTH_PROVIDERS:服务注册表,统一管理所有平台
python
# 服务注册表
OAUTH_PROVIDERS = {
    'gitee': GiteeOAuthService,
    'github': GitHubOAuthService,
    'qq': QQOAuthService,
    'google': GoogleOAuthService,
    'wechat': WeChatOAuthService,
    'microsoft': MicrosoftOAuthService,
    'dingtalk': DingTalkOAuthService,
    'feishu': FeishuOAuthService,
    'wecom': WeComOAuthService,
}

OAuth 登录流程

流程图

┌─────────┐                    ┌─────────┐                    ┌─────────┐
│  前端    │                    │  后端    │                    │ OAuth   │
│         │                    │         │                    │ 平台    │
└────┬────┘                    └────┬────┘                    └────┬────┘
     │                              │                              │
     │ 1. GET /oauth/{provider}/authorize                         │
     │ ──────────────────────────→ │                              │
     │                              │                              │
     │ 2. 返回授权 URL + CSRF state │                              │
     │ ←────────────────────────── │                              │
     │                              │                              │
     │ 3. 重定向到授权页面                                           │
     │ ─────────────────────────────────────────────────────────→ │
     │                              │                              │
     │                              │      4. 用户授权              │
     │                              │                              │
     │ 5. 回调到前端(带 code + state)                             │
     │ ←───────────────────────────────────────────────────────── │
     │                              │                              │
     │ 6. POST /oauth/{provider}/callback                         │
     │    { code, state }           │                              │
     │ ──────────────────────────→ │                              │
     │                              │                              │
     │                              │ 7. 验证 CSRF state            │
     │                              │                              │
     │                              │ 8. 用 code 换取 access_token  │
     │                              │ ──────────────────────────→ │
     │                              │                              │
     │                              │ 9. 返回 access_token         │
     │                              │ ←────────────────────────── │
     │                              │                              │
     │                              │ 10. 获取用户信息              │
     │                              │ ──────────────────────────→ │
     │                              │                              │
     │                              │ 11. 返回用户信息              │
     │                              │ ←────────────────────────── │
     │                              │                              │
     │                              │ 12. 创建/更新用户             │
     │                              │ 13. 生成 JWT Token           │
     │                              │ 14. 缓存用户信息到 Redis      │
     │                              │                              │
     │ 15. 返回 JWT Token + 用户信息 │                              │
     │ ←────────────────────────── │                              │
     │                              │                              │

详细步骤

1. 获取授权 URL

请求

http
GET /api/core/oauth/{provider}/authorize?state={redirect_info}

响应

json
{
  "authorize_url": "https://github.com/login/oauth/authorize?client_id=xxx&redirect_uri=xxx&state=xxx"
}

后端处理

  1. 生成 32 字节 CSRF token
  2. 将 CSRF token 存入 Redis(5 分钟过期)
  3. 将 CSRF token 和前端 state 合并为 JSON:{"csrf": "xxx", "payload": "redirect_info"}
  4. 构造授权 URL 并返回

2. 用户授权

前端将用户重定向到授权 URL,用户在 OAuth 平台完成授权。

3. 处理回调

请求

http
POST /api/core/oauth/{provider}/callback
Content-Type: application/json

{
  "code": "authorization_code",
  "state": "{\"csrf\":\"xxx\",\"payload\":\"redirect_info\"}"
}

响应

json
{
  "access_token": "eyJhbGciOi...",
  "refresh_token": "eyJhbGciOi...",
  "expire": 86400,
  "user_info": {
    "id": "uuid",
    "username": "user123",
    "name": "张三",
    "email": "user@example.com",
    "avatar": "https://...",
    "user_type": 1,
    "is_superuser": false
  }
}

后端处理

  1. 验证 CSRF:检查 state 中的 CSRF token 是否存在于 Redis
  2. 换取 Access Token:使用 code 向 OAuth 平台换取 access_token
  3. 获取用户信息:使用 access_token 获取用户信息
  4. 标准化用户信息:将不同平台的用户信息格式统一
  5. 创建/更新用户
    • 根据 {provider}_id 查找用户
    • 不存在则创建新用户(自动生成唯一用户名)
    • 存在则更新邮箱、头像等信息
  6. 生成 JWT Token:生成 access_token 和 refresh_token
  7. 缓存用户信息:将角色、部门等信息缓存到 Redis
  8. 记录登录日志:记录 IP、设备、登录方式等

BaseOAuthService 基类

所有 OAuth 服务类都继承自 BaseOAuthService,提供统一的接口。

抽象方法(子类必须实现)

python
class BaseOAuthService(ABC):
    # 子类必须定义
    PROVIDER_NAME: str = None      # 如 'gitee', 'github'
    AUTHORIZE_URL: str = None      # 授权 URL
    TOKEN_URL: str = None          # 获取 token 的 URL
    USER_INFO_URL: str = None      # 获取用户信息的 URL
    
    @classmethod
    @abstractmethod
    def get_client_config(cls) -> Dict[str, str]:
        """获取客户端配置(client_id, client_secret, redirect_uri)"""
        pass
    
    @classmethod
    @abstractmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        """使用 access_token 获取用户信息"""
        pass
    
    @classmethod
    @abstractmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        """标准化用户信息为统一格式"""
        pass

标准化用户信息格式

python
{
    'provider_id': str,    # 平台用户 ID(必填)
    'username': str,       # 用户名(必填)
    'name': str,           # 显示名称(必填)
    'email': str,          # 邮箱(可选)
    'avatar': str,         # 头像 URL(可选)
    'bio': str,            # 个人简介(可选)
}

通用方法(子类可选覆盖)

python
@classmethod
def get_authorize_url(cls, state: str = None) -> str:
    """构造授权 URL"""
    pass

@classmethod
def get_extra_authorize_params(cls) -> Dict[str, str]:
    """获取额外的授权参数(如 scope)"""
    return {}

@classmethod
async def get_access_token(cls, code: str) -> Optional[str]:
    """使用 code 换取 access_token"""
    pass

@classmethod
def get_token_request_headers(cls) -> Dict[str, str]:
    """获取 token 请求的 headers"""
    return {}

@classmethod
async def handle_oauth_login(cls, db, code, ip_address, ...) -> Tuple:
    """处理完整的 OAuth 登录流程"""
    pass

平台特定实现示例

Gitee OAuth

python
class GiteeOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'gitee'
    AUTHORIZE_URL = "https://gitee.com/oauth/authorize"
    TOKEN_URL = "https://gitee.com/oauth/token"
    USER_INFO_URL = "https://gitee.com/api/v5/user"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.GITEE_CLIENT_ID,
            'client_secret': settings.GITEE_CLIENT_SECRET,
            'redirect_uri': settings.GITEE_REDIRECT_URI,
        }
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        params = {'access_token': access_token}
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, params=params)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        return {
            'provider_id': str(raw_user_info.get('id')),
            'username': raw_user_info.get('login'),
            'name': raw_user_info.get('name', raw_user_info.get('login')),
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('avatar_url'),
            'bio': raw_user_info.get('bio'),
        }

GitHub OAuth

python
class GitHubOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'github'
    AUTHORIZE_URL = "https://github.com/login/oauth/authorize"
    TOKEN_URL = "https://github.com/login/oauth/access_token"
    USER_INFO_URL = "https://api.github.com/user"
    
    @classmethod
    def get_extra_authorize_params(cls) -> Dict[str, str]:
        return {'scope': 'user:email'}
    
    @classmethod
    def get_token_request_headers(cls) -> Dict[str, str]:
        return {'Accept': 'application/json'}
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Accept': 'application/json',
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, headers=headers)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        return {
            'provider_id': str(raw_user_info.get('id')),
            'username': raw_user_info.get('login'),
            'name': raw_user_info.get('name') or raw_user_info.get('login'),
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('avatar_url'),
            'bio': raw_user_info.get('bio'),
        }

QQ OAuth

QQ 互联的响应格式比较特殊(URL 参数格式和 JSONP 格式),需要特殊处理:

python
class QQOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'qq'
    AUTHORIZE_URL = "https://graph.qq.com/oauth2.0/authorize"
    TOKEN_URL = "https://graph.qq.com/oauth2.0/token"
    USER_INFO_URL = "https://graph.qq.com/user/get_user_info"
    OPENID_URL = "https://graph.qq.com/oauth2.0/me"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.QQ_APP_ID,
            'client_secret': settings.QQ_APP_KEY,
            'redirect_uri': settings.QQ_REDIRECT_URI,
        }
    
    @classmethod
    async def get_access_token(cls, code: str) -> Optional[str]:
        # QQ 返回 URL 参数格式: access_token=xxx&expires_in=xxx
        config = cls.get_client_config()
        params = {
            'grant_type': 'authorization_code',
            'client_id': config['client_id'],
            'client_secret': config['client_secret'],
            'code': code,
            'redirect_uri': config['redirect_uri'],
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.TOKEN_URL, params=params)
            response_text = response.text
            match = re.search(r'access_token=([^&]+)', response_text)
            return match.group(1) if match else None
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        async with httpx.AsyncClient() as client:
            # 1. 获取 openid(JSONP 格式)
            openid_response = await client.get(cls.OPENID_URL, params={'access_token': access_token})
            openid_text = openid_response.text
            # 解析 JSONP: callback( {"client_id":"xxx","openid":"xxx"} );
            match = re.search(r'callback\(\s*(\{.*?\})\s*\)', openid_text)
            openid_data = json.loads(match.group(1))
            openid = openid_data.get('openid')
            
            # 2. 使用 openid 获取用户信息
            config = cls.get_client_config()
            user_response = await client.get(cls.USER_INFO_URL, params={
                'access_token': access_token,
                'oauth_consumer_key': config['client_id'],
                'openid': openid
            })
            user_info = user_response.json()
            user_info['openid'] = openid
            return user_info
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        return {
            'provider_id': raw_user_info.get('openid'),
            'username': raw_user_info.get('nickname', '').replace(' ', '_'),
            'name': raw_user_info.get('nickname'),
            'email': None,  # QQ 不返回邮箱
            'avatar': raw_user_info.get('figureurl_qq_2') or raw_user_info.get('figureurl_qq_1'),
            'bio': None,
        }

Google OAuth

python
class GoogleOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'google'
    AUTHORIZE_URL = "https://accounts.google.com/o/oauth2/v2/auth"
    TOKEN_URL = "https://oauth2.googleapis.com/token"
    USER_INFO_URL = "https://www.googleapis.com/oauth2/v2/userinfo"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.GOOGLE_CLIENT_ID,
            'client_secret': settings.GOOGLE_CLIENT_SECRET,
            'redirect_uri': settings.GOOGLE_REDIRECT_URI,
        }
    
    @classmethod
    def get_extra_authorize_params(cls) -> Dict[str, str]:
        return {
            'scope': 'openid email profile',
            'access_type': 'offline',
            'response_type': 'code',
        }
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        headers = {'Authorization': f'Bearer {access_token}'}
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, headers=headers)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        return {
            'provider_id': raw_user_info.get('id'),
            'username': raw_user_info.get('email', '').split('@')[0],
            'name': raw_user_info.get('name') or raw_user_info.get('email'),
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('picture'),
            'bio': None,
        }

微信 OAuth

微信的参数名称与标准 OAuth 2.0 不同,且需要同时传递 access_token 和 openid:

python
class WeChatOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'wechat'
    AUTHORIZE_URL = "https://open.weixin.qq.com/connect/qrconnect"
    TOKEN_URL = "https://api.weixin.qq.com/sns/oauth2/access_token"
    USER_INFO_URL = "https://api.weixin.qq.com/sns/userinfo"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.WECHAT_APP_ID,
            'client_secret': settings.WECHAT_APP_SECRET,
            'redirect_uri': settings.WECHAT_REDIRECT_URI,
        }
    
    @classmethod
    def get_user_id_field(cls) -> str:
        return 'wechat_unionid'  # 使用 unionid 作为唯一标识
    
    @classmethod
    def get_authorize_url(cls, state: str = None) -> str:
        # 微信使用 appid 而不是 client_id
        config = cls.get_client_config()
        params = {
            'appid': config['client_id'],
            'redirect_uri': config['redirect_uri'],
            'response_type': 'code',
            'scope': 'snsapi_login',
        }
        if state:
            params['state'] = state
        query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
        return f"{cls.AUTHORIZE_URL}?{query_string}#wechat_redirect"
    
    @classmethod
    async def get_access_token(cls, code: str) -> Optional[Dict]:
        # 微信返回 Dict(含 access_token + openid)
        config = cls.get_client_config()
        params = {
            'appid': config['client_id'],
            'secret': config['client_secret'],
            'code': code,
            'grant_type': 'authorization_code',
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.TOKEN_URL, params=params)
            return response.json()  # 返回 Dict,包含 access_token 和 openid
    
    @classmethod
    async def get_user_info(cls, access_token: str, openid: str = None) -> Optional[Dict]:
        params = {
            'access_token': access_token,
            'openid': openid,
            'lang': 'zh_CN',
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, params=params)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        provider_id = raw_user_info.get('unionid') or raw_user_info.get('openid')
        nickname = raw_user_info.get('nickname', '')
        username = nickname.replace(' ', '_')[:30] if nickname else f"wechat_{provider_id[:8]}"
        return {
            'provider_id': provider_id,
            'username': username,
            'name': nickname or username,
            'email': None,  # 微信不返回邮箱
            'avatar': raw_user_info.get('headimgurl'),
            'bio': None,
        }

Microsoft OAuth

python
class MicrosoftOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'microsoft'
    AUTHORIZE_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/authorize"
    TOKEN_URL = "https://login.microsoftonline.com/common/oauth2/v2.0/token"
    USER_INFO_URL = "https://graph.microsoft.com/v1.0/me"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.MICROSOFT_CLIENT_ID,
            'client_secret': settings.MICROSOFT_CLIENT_SECRET,
            'redirect_uri': settings.MICROSOFT_REDIRECT_URI,
        }
    
    @classmethod
    def get_extra_authorize_params(cls) -> Dict[str, str]:
        return {
            'scope': 'openid email profile User.Read',
            'response_type': 'code',
            'response_mode': 'query',
        }
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        headers = {'Authorization': f'Bearer {access_token}'}
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, headers=headers)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        user_principal_name = raw_user_info.get('userPrincipalName', '')
        username = user_principal_name.split('@')[0] if '@' in user_principal_name else user_principal_name
        email = raw_user_info.get('mail') or raw_user_info.get('userPrincipalName')
        return {
            'provider_id': raw_user_info.get('id'),
            'username': username or f"ms_{raw_user_info.get('id', '')[:8]}",
            'name': raw_user_info.get('displayName') or username,
            'email': email,
            'avatar': None,  # Microsoft Graph 需要额外请求获取头像
            'bio': raw_user_info.get('jobTitle'),
        }

钉钉 OAuth

钉钉使用 JSON body 请求 token,且 header 使用特殊的 x-acs-dingtalk-access-token

python
class DingTalkOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'dingtalk'
    AUTHORIZE_URL = "https://login.dingtalk.com/oauth2/auth"
    TOKEN_URL = "https://api.dingtalk.com/v1.0/oauth2/userAccessToken"
    USER_INFO_URL = "https://api.dingtalk.com/v1.0/contact/users/me"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.DINGTALK_APP_ID,
            'client_secret': settings.DINGTALK_APP_SECRET,
            'redirect_uri': settings.DINGTALK_REDIRECT_URI,
        }
    
    @classmethod
    def get_extra_authorize_params(cls) -> Dict[str, str]:
        return {
            'response_type': 'code',
            'scope': 'openid',
            'prompt': 'consent',
        }
    
    @classmethod
    def get_user_id_field(cls) -> str:
        return 'dingtalk_unionid'
    
    @classmethod
    async def get_access_token(cls, code: str) -> Optional[str]:
        config = cls.get_client_config()
        data = {
            'clientId': config['client_id'],
            'clientSecret': config['client_secret'],
            'code': code,
            'grantType': 'authorization_code',
        }
        headers = {'Content-Type': 'application/json'}
        async with httpx.AsyncClient() as client:
            response = await client.post(cls.TOKEN_URL, json=data, headers=headers)
            result = response.json()
            return result.get('accessToken')
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        headers = {
            'x-acs-dingtalk-access-token': access_token,
            'Content-Type': 'application/json',
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, headers=headers)
            return response.json()
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        provider_id = raw_user_info.get('unionId', '')
        nick = raw_user_info.get('nick', '')
        return {
            'provider_id': provider_id,
            'username': nick if nick else f"dingtalk_{provider_id[:8]}",
            'name': nick or f"dingtalk_{provider_id[:8]}",
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('avatarUrl'),
            'bio': f"钉钉用户 - {nick}" if nick else "钉钉用户",
        }

飞书 OAuth

飞书需要先获取 app_access_token,再用它来换取 user_access_token:

python
class FeishuOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'feishu'
    AUTHORIZE_URL = "https://open.feishu.cn/open-apis/authen/v1/authorize"
    TOKEN_URL = "https://open.feishu.cn/open-apis/authen/v1/oidc/access_token"
    USER_INFO_URL = "https://open.feishu.cn/open-apis/authen/v1/user_info"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.FEISHU_APP_ID,
            'client_secret': settings.FEISHU_APP_SECRET,
            'redirect_uri': settings.FEISHU_REDIRECT_URI,
        }
    
    @classmethod
    def get_extra_authorize_params(cls) -> Dict[str, str]:
        return {
            'response_type': 'code',
            'scope': 'contact:user.base:readonly',
        }
    
    @classmethod
    def get_user_id_field(cls) -> str:
        return 'feishu_union_id'
    
    @classmethod
    async def _get_app_access_token(cls) -> Optional[str]:
        """获取应用级别的 access_token"""
        config = cls.get_client_config()
        url = "https://open.feishu.cn/open-apis/auth/v3/app_access_token/internal"
        data = {
            'app_id': config['client_id'],
            'app_secret': config['client_secret'],
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(url, json=data)
            result = response.json()
            if result.get('code') == 0:
                return result.get('app_access_token')
            return None
    
    @classmethod
    async def get_access_token(cls, code: str) -> Optional[str]:
        app_access_token = await cls._get_app_access_token()
        if not app_access_token:
            return None
        
        data = {'grant_type': 'authorization_code', 'code': code}
        headers = {
            'Content-Type': 'application/json',
            'Authorization': f'Bearer {app_access_token}',
        }
        async with httpx.AsyncClient() as client:
            response = await client.post(cls.TOKEN_URL, json=data, headers=headers)
            result = response.json()
            if result.get('code') == 0 and 'data' in result:
                return result['data'].get('access_token')
            return None
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        headers = {
            'Authorization': f'Bearer {access_token}',
            'Content-Type': 'application/json',
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.USER_INFO_URL, headers=headers)
            result = response.json()
            if result.get('code') == 0 and 'data' in result:
                return result['data']
            return None
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        provider_id = raw_user_info.get('union_id', '')
        name = raw_user_info.get('name', '')
        en_name = raw_user_info.get('en_name', '')
        username = (en_name or name or f"feishu_{provider_id[:8]}").replace(' ', '_')
        return {
            'provider_id': provider_id,
            'username': username,
            'name': name or username,
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('avatar_url') or raw_user_info.get('avatar_big'),
            'bio': f"飞书用户 - {name}" if name else "飞书用户",
        }

企业微信 OAuth

企业微信使用特殊的 SSO 登录页面,需要先获取 corp_access_token,再用 code 获取 userid,最后获取用户详情:

python
class WeComOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'wecom'
    AUTHORIZE_URL = "https://login.work.weixin.qq.com/wwlogin/sso/login"
    TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
    USER_INFO_URL = "https://qyapi.weixin.qq.com/cgi-bin/auth/getuserinfo"
    USER_DETAIL_URL = "https://qyapi.weixin.qq.com/cgi-bin/user/get"
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.WECOM_CORP_ID,
            'client_secret': settings.WECOM_APP_SECRET,
            'redirect_uri': settings.WECOM_REDIRECT_URI,
            'agent_id': settings.WECOM_AGENT_ID,
        }
    
    @classmethod
    def get_user_id_field(cls) -> str:
        return 'wecom_userid'
    
    @classmethod
    def get_authorize_url(cls, state: str = None) -> str:
        config = cls.get_client_config()
        params = {
            'login_type': 'CorpApp',
            'appid': config['client_id'],
            'agentid': config['agent_id'],
            'redirect_uri': config['redirect_uri'],
        }
        if state:
            params['state'] = state
        query_string = '&'.join([f"{k}={v}" for k, v in params.items()])
        return f"{cls.AUTHORIZE_URL}?{query_string}"
    
    @classmethod
    async def _get_corp_access_token(cls) -> Optional[str]:
        """获取企业级 access_token"""
        config = cls.get_client_config()
        params = {
            'corpid': config['client_id'],
            'corpsecret': config['client_secret'],
        }
        async with httpx.AsyncClient() as client:
            response = await client.get(cls.TOKEN_URL, params=params)
            result = response.json()
            if result.get('errcode') == 0:
                return result.get('access_token')
            return None
    
    @classmethod
    async def get_access_token(cls, code: str) -> Optional[str]:
        # 返回 "corp_access_token|code" 格式,在 get_user_info 中拆分使用
        corp_token = await cls._get_corp_access_token()
        if not corp_token:
            return None
        return f"{corp_token}|{code}"
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        parts = access_token.split('|', 1)
        if len(parts) != 2:
            return None
        corp_token, code = parts
        
        async with httpx.AsyncClient() as client:
            # 第一步:用 code 获取 userid
            params = {'access_token': corp_token, 'code': code}
            response = await client.get(cls.USER_INFO_URL, params=params)
            result = response.json()
            if result.get('errcode') != 0:
                return None
            userid = result.get('userid') or result.get('UserId')
            
            # 第二步:用 userid 获取用户详情
            detail_params = {'access_token': corp_token, 'userid': userid}
            detail_response = await client.get(cls.USER_DETAIL_URL, params=detail_params)
            user_detail = detail_response.json()
            if user_detail.get('errcode') != 0:
                return {'userid': userid, 'name': userid}
            return user_detail
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        userid = raw_user_info.get('userid', '')
        name = raw_user_info.get('name', '')
        email = raw_user_info.get('biz_mail') or raw_user_info.get('email')
        avatar = raw_user_info.get('thumb_avatar') or raw_user_info.get('avatar')
        position = raw_user_info.get('position', '')
        return {
            'provider_id': userid,
            'username': userid if userid else f"wecom_{name}",
            'name': name or userid,
            'email': email,
            'avatar': avatar,
            'bio': f"企业微信用户 - {position}" if position else "企业微信用户",
        }

用户创建与关联

用户字段映射

OAuth 用户在 User 表中的字段:

字段说明示例
gitee_idGitee 用户 ID"12345678"
github_idGitHub 用户 ID"87654321"
qq_idQQ OpenID"A1B2C3D4..."
google_idGoogle 用户 ID"1234567890"
wechat_id微信 OpenID"oX1Y2Z3..."
oauth_provider最后登录的 OAuth 平台"github"
last_login_type最后登录方式"github"

用户创建逻辑

python
# 1. 根据 provider_id 查找用户
user_id_field = f"{provider}_id"  # 如 'github_id'
user = await db.execute(
    select(User).where(getattr(User, user_id_field) == provider_id)
).scalar_one_or_none()

if user:
    # 用户已存在,更新信息
    if email and not user.email:
        user.email = email
else:
    # 用户不存在,创建新用户
    # 生成唯一用户名(避免冲突)
    unique_username = username
    counter = 1
    while await db.execute(select(User).where(User.username == unique_username)).scalar_one_or_none():
        unique_username = f"{username}_{counter}"
        counter += 1
    
    user = User(
        username=unique_username,
        name=name,
        email=email,
        **{user_id_field: provider_id},
        oauth_provider=provider,
        user_type=1,
        user_status=1,
        is_active=True,
        is_superuser=settings.GRANT_ADMIN_TO_OAUTH_USER,  # 可配置
    )
    db.add(user)

自动授予管理员权限

通过环境变量 GRANT_ADMIN_TO_OAUTH_USER 控制:

bash
# .env
GRANT_ADMIN_TO_OAUTH_USER=false  # 默认 false,OAuth 用户为普通用户

安全机制

1. CSRF 防护

使用 state 参数防止 CSRF 攻击:

python
# 生成 CSRF token
csrf_token = secrets.token_urlsafe(32)

# 存入 Redis(5 分钟过期)
await redis.set(
    f"oauth_state:{csrf_token}",
    "1",
    ex=300
)

# 合并前端 state
state_data = {"csrf": csrf_token, "payload": frontend_state}
combined_state = json.dumps(state_data)

回调时验证:

python
# 解析 state
state_data = json.loads(state)
csrf_token = state_data.get("csrf")

# 验证 CSRF token
exists = await redis.get(f"oauth_state:{csrf_token}")
if not exists:
    raise HTTPException(400, "授权请求已过期")

# 验证通过后删除(一次性使用)
await redis.delete(f"oauth_state:{csrf_token}")

2. Token 安全

  • Access Token:24 小时过期
  • Refresh Token:7 天过期
  • 设备绑定:Token 包含 device_id
  • 多设备控制:可配置单设备登录

3. 用户状态检查

python
if not user.is_active:
    raise ValueError("账户已被禁用")

if user.user_status == 0:
    raise ValueError("账户已被禁用")

if user.user_status == 2:
    raise ValueError("账户已被锁定")

配置说明

环境变量配置

每个 OAuth 平台需要配置 3 个环境变量:

bash
# Gitee
GITEE_CLIENT_ID=your_client_id
GITEE_CLIENT_SECRET=your_client_secret
GITEE_REDIRECT_URI=http://localhost:5173/oauth/gitee/callback

# GitHub
GITHUB_CLIENT_ID=your_client_id
GITHUB_CLIENT_SECRET=your_client_secret
GITHUB_REDIRECT_URI=http://localhost:5173/oauth/github/callback

# QQ(使用 APP_ID 和 APP_KEY)
QQ_APP_ID=your_app_id
QQ_APP_KEY=your_app_key
QQ_REDIRECT_URI=http://localhost:5173/oauth/qq/callback

# Google
GOOGLE_CLIENT_ID=your_client_id
GOOGLE_CLIENT_SECRET=your_client_secret
GOOGLE_REDIRECT_URI=http://localhost:5173/oauth/google/callback

OAuth 应用注册

Gitee

  1. 访问 https://gitee.com/oauth/applications
  2. 点击「创建应用」
  3. 填写应用信息:
  4. 获取 Client IDClient Secret
  5. 配置环境变量:
    bash
    GITEE_CLIENT_ID=your_client_id
    GITEE_CLIENT_SECRET=your_client_secret
    GITEE_REDIRECT_URI=http://localhost:5173/oauth/gitee/callback

GitHub

  1. 访问 https://github.com/settings/developers
  2. 点击「New OAuth App」
  3. 填写应用信息:
  4. 获取 Client IDClient Secret
  5. 配置环境变量:
    bash
    GITHUB_CLIENT_ID=your_client_id
    GITHUB_CLIENT_SECRET=your_client_secret
    GITHUB_REDIRECT_URI=http://localhost:5173/oauth/github/callback

QQ 互联

  1. 访问 https://connect.qq.com/
  2. 注册成为开发者(需要企业资质或个人实名认证)
  3. 创建应用:
  4. 获取 APP IDAPP Key
  5. 配置环境变量:
    bash
    QQ_APP_ID=your_app_id
    QQ_APP_KEY=your_app_key
    QQ_REDIRECT_URI=http://your-domain.com/oauth/qq/callback

注意:QQ 互联要求回调地址必须是已备案的域名,不支持 localhost。

Google

  1. 访问 https://console.cloud.google.com/
  2. 创建项目或选择现有项目
  3. 进入「API 和服务」→「凭据」
  4. 点击「创建凭据」→「OAuth 客户端 ID」
  5. 配置 OAuth 同意屏幕(首次需要)
  6. 选择应用类型「Web 应用」,填写:
  7. 获取 Client IDClient Secret
  8. 配置环境变量:
    bash
    GOOGLE_CLIENT_ID=your_client_id
    GOOGLE_CLIENT_SECRET=your_client_secret
    GOOGLE_REDIRECT_URI=http://localhost:5173/oauth/google/callback

微信开放平台

  1. 访问 https://open.weixin.qq.com/
  2. 注册成为开发者(需要企业资质)
  3. 创建网站应用:
    • 应用名称:你的应用名称
    • 应用官网:你的网站域名
  4. 提交审核并等待通过
  5. 获取 AppIDAppSecret
  6. 配置环境变量:
    bash
    WECHAT_APP_ID=your_app_id
    WECHAT_APP_SECRET=your_app_secret
    WECHAT_REDIRECT_URI=http://your-domain.com/oauth/wechat/callback

注意:微信开放平台需要企业资质,个人开发者无法使用。

Microsoft

  1. 访问 https://portal.azure.com/
  2. 进入「Azure Active Directory」→「应用注册」
  3. 点击「新注册」:
  4. 在「证书和密码」中创建客户端密码
  5. 获取「应用程序(客户端) ID」和「客户端密码」
  6. 配置环境变量:
    bash
    MICROSOFT_CLIENT_ID=your_client_id
    MICROSOFT_CLIENT_SECRET=your_client_secret
    MICROSOFT_REDIRECT_URI=http://localhost:5173/oauth/microsoft/callback

钉钉

  1. 访问 https://open.dingtalk.com/
  2. 创建应用:
    • 进入「开发者后台」→「应用开发」→「企业内部开发」
    • 点击「创建应用」
  3. 在「登录与分享」中配置:
    • 回调域名:your-domain.com
  4. 获取 AppKey(即 Client ID)和 AppSecret
  5. 配置环境变量:
    bash
    DINGTALK_APP_ID=your_app_key
    DINGTALK_APP_SECRET=your_app_secret
    DINGTALK_REDIRECT_URI=http://your-domain.com/oauth/dingtalk/callback

飞书

  1. 访问 https://open.feishu.cn/
  2. 创建企业自建应用:
    • 进入「开发者后台」→「创建企业自建应用」
  3. 在「安全设置」中配置:
  4. 在「权限管理」中申请权限:
    • contact:user.base:readonly(获取用户基本信息)
  5. 发布应用版本
  6. 获取 App IDApp Secret
  7. 配置环境变量:
    bash
    FEISHU_APP_ID=your_app_id
    FEISHU_APP_SECRET=your_app_secret
    FEISHU_REDIRECT_URI=http://localhost:5173/oauth/feishu/callback

企业微信

  1. 访问 https://work.weixin.qq.com/
  2. 登录企业微信管理后台
  3. 进入「应用管理」→「自建」→「创建应用」
  4. 在应用详情中获取:
    • AgentId:应用 ID
    • Secret:应用密钥
  5. 在「企业信息」中获取 企业 ID(CorpID)
  6. 在「网页授权及 JS-SDK」中配置:
    • 可信域名:your-domain.com
  7. 配置环境变量:
    bash
    WECOM_CORP_ID=your_corp_id
    WECOM_APP_SECRET=your_app_secret
    WECOM_AGENT_ID=your_agent_id
    WECOM_REDIRECT_URI=http://your-domain.com/oauth/wecom/callback

前端集成

发起登录

typescript
// 1. 获取授权 URL
const response = await fetch(`/api/core/oauth/github/authorize?state=${encodeURIComponent(redirectUrl)}`)
const { authorize_url } = await response.json()

// 2. 重定向到授权页面
window.location.href = authorize_url

处理回调

typescript
// OAuth 平台回调到前端(URL 参数包含 code 和 state)
const urlParams = new URLSearchParams(window.location.search)
const code = urlParams.get('code')
const state = urlParams.get('state')

// 发送到后端完成登录
const response = await fetch(`/api/core/oauth/github/callback`, {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ code, state })
})

const { access_token, refresh_token, user_info } = await response.json()

// 存储 token
localStorage.setItem('access_token', access_token)
localStorage.setItem('refresh_token', refresh_token)

// 跳转到应用
const stateData = JSON.parse(state)
const redirectUrl = stateData.payload || '/'
router.push(redirectUrl)

扩展新平台

1. 创建服务类

python
# core/oauth/service.py

class NewPlatformOAuthService(BaseOAuthService):
    PROVIDER_NAME = 'new_platform'
    AUTHORIZE_URL = "https://..."
    TOKEN_URL = "https://..."
    USER_INFO_URL = "https://..."
    
    @classmethod
    def get_client_config(cls) -> Dict[str, str]:
        return {
            'client_id': settings.NEW_PLATFORM_CLIENT_ID,
            'client_secret': settings.NEW_PLATFORM_CLIENT_SECRET,
            'redirect_uri': settings.NEW_PLATFORM_REDIRECT_URI,
        }
    
    @classmethod
    async def get_user_info(cls, access_token: str) -> Optional[Dict]:
        # 实现获取用户信息的逻辑
        pass
    
    @classmethod
    def normalize_user_info(cls, raw_user_info: Dict) -> Dict:
        # 标准化用户信息
        return {
            'provider_id': str(raw_user_info.get('id')),
            'username': raw_user_info.get('username'),
            'name': raw_user_info.get('name'),
            'email': raw_user_info.get('email'),
            'avatar': raw_user_info.get('avatar'),
            'bio': raw_user_info.get('bio'),
        }

2. 注册到服务表

python
# core/oauth/service.py

OAUTH_PROVIDERS = {
    # ... 现有平台
    'new_platform': NewPlatformOAuthService,
}

3. 添加数据库字段

python
# 数据库迁移
alembic revision --autogenerate -m "add new_platform_id to user"

# User 模型会自动包含新字段
new_platform_id = Column(String(255), nullable=True, comment="新平台用户ID")

4. 配置环境变量

bash
NEW_PLATFORM_CLIENT_ID=xxx
NEW_PLATFORM_CLIENT_SECRET=xxx
NEW_PLATFORM_REDIRECT_URI=http://localhost:5173/oauth/new_platform/callback

常见问题

Q1: OAuth 用户没有密码怎么办?

A: OAuth 用户的 password 字段为 NULL,允许空密码。用户只能通过 OAuth 登录,不能使用用户名密码登录。

Q2: 用户可以绑定多个 OAuth 平台吗?

A: 可以。同一个用户可以有多个 {provider}_id 字段,分别对应不同平台。

Q3: 如何处理 OAuth 平台返回的邮箱为空?

A: 邮箱字段为可选,为空时不影响用户创建。后续用户可以在个人设置中补充邮箱。

Q4: CSRF token 过期怎么办?

A: CSRF token 有效期为 5 分钟。如果用户授权超时,需要重新发起登录流程。

Q5: 如何调试 OAuth 登录?

A:

  1. 检查环境变量配置是否正确
  2. 查看后端日志(logger.info/error
  3. 使用浏览器开发者工具查看网络请求
  4. 确认回调 URL 与 OAuth 应用配置一致

API 参考

GET /api/core/oauth/{provider}/authorize

获取 OAuth 授权 URL

路径参数

  • provider: OAuth 平台(gitee/github/qq/google/wechat/microsoft/dingtalk/feishu/wecom)

查询参数

  • state: 前端传递的额外状态参数(可选)

响应

json
{
  "authorize_url": "https://..."
}

POST /api/core/oauth/{provider}/callback

处理 OAuth 回调

路径参数

  • provider: OAuth 平台

请求体

json
{
  "code": "authorization_code",
  "state": "{\"csrf\":\"xxx\",\"payload\":\"...\"}"
}

响应

json
{
  "access_token": "eyJhbGciOi...",
  "refresh_token": "eyJhbGciOi...",
  "expire": 86400,
  "user_info": {
    "id": "uuid",
    "username": "user123",
    "name": "张三",
    "email": "user@example.com",
    "avatar": "https://...",
    "user_type": 1,
    "is_superuser": false
  }
}

Released under the MIT License.