实现Azure Data Factory安全地请求企业内部API返回数据
需要配置一个Web Activity组件在Azure云上的Azure Data Factory运行,它需要访问企业内部的API获取JSON格式的数据,企业有网关和防火墙,API有公司的okta身份认证,通过公司的域账号来授权访问,现在需要创建一个专用的域账号,让Web Activity组件访问Azure Key Vault,来获取账号密码,然后通过配置访问公司内部API的数据,请写出所有的开发配置步骤,以及完成这一功能的所有的Python源代码,需要确保安全性和可靠性。
分步配置及开发流程
1. 创建专用域账号(企业AD账户)
- 联系企业AD管理员创建专用服务账号(如
svc_adf_api
),确保账号具备最小权限访问目标API。 - 记录账号的 Client ID(用于OAuth认证)并设置强密码。
2. 配置Okta应用程序
- 登录Okta管理控制台,创建新应用:
- 应用类型:
Service
(机器到机器) 或Web
(根据API需求)。 - 认证方式: OAuth 2.0 客户端凭证(Client Credentials Grant)。
- 作用域(Scopes): 添加API所需的作用域(如
api_access
)。 - 客户端ID和密钥: 记录生成的Client ID和Secret(若适用,但此处使用AD密码而非Okta密钥)。
- 应用类型:
- 注意:若API使用Okta集成认证,可能需要调整配置,确保服务账号有权限。
3. 创建Azure Key Vault并存储凭据
- 创建Key Vault:
- 在Azure门户中新建Key Vault,如
kv-adf-secrets
。 - 在Key Vault中创建机密(Secret),名称如
internal-api-password
,值为服务账号的密码。
- 在Azure门户中新建Key Vault,如
- 配置访问策略:
- 添加策略,主体为Azure Data Factory的托管身份(系统分配)。
- 权限:授予 Get 和 List 机密权限。
4. 配置Azure Data Factory(ADF)
- 启用托管身份:
- 在ADF资源中,确保启用系统分配的托管身份。
- 链接服务到Key Vault:
- 创建新的 链接服务,类型为Azure Key Vault。
- 使用托管身份认证,连接到上一步创建的Key Vault。
5. 网络配置
- 允许ADF出站IP:
- 获取ADF所在区域的出站IP列表(参考Azure文档)。
- 在企业防火墙中将这些IP加入白名单,允许访问内部API。
- 备选方案:
- 使用Azure APIM(API管理)作为中介,配置私有终结点或VPN/ExpressRoute确保安全连接。
6. 开发ADF管道
- 步骤1:获取Key Vault中的密码
- 使用 Lookup活动 或 Web活动(调用Key Vault REST API)获取机密。
- 示例表达式(在动态内容中):
@activity('GetAPIPassword').output.value
- 步骤2:获取Okta访问令牌
- Web Activity配置:
- URL: Okta的令牌端点(如
https://{your-okta-domain}/oauth2/default/v1/token
)。 - 方法: POST
- Headers:
Content-Type: application/x-www-form-urlencoded
- Body:
grant_type=client_credentials&client_id={ClientID}&client_secret={Password}&scope=api_access
{ClientID}
: 替换为Okta应用的客户端ID。{Password}
: 使用动态内容引用Key Vault获取的密码。
- URL: Okta的令牌端点(如
- Web Activity配置:
- 步骤3:调用内部API
- Web Activity配置:
- URL: 内部API的端点(如
https://internal-api.company.com/data
)。 - 方法: GET/POST
- Headers:
Authorization: Bearer {AccessToken}
{AccessToken}
: 使用上一步输出的访问令牌(如@activity('GetOktaToken').output.access_token
)。
- URL: 内部API的端点(如
- Web Activity配置:
7. Python代码示例(备用方案:Azure Function)
若ADF原生活动无法满足复杂认证逻辑,可使用Azure Function处理认证,ADF调用该函数。
函数代码(HTTP触发器):
import logging
import os
import requests
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
import azure.functions as funcapp = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)@app.route(route="call_internal_api")
def http_trigger(req: func.HttpRequest) -> func.HttpResponse:try:# 从查询参数获取Client IDclient_id = req.params.get('client_id')if not client_id:return func.HttpResponse("Client ID is required.", status_code=400)# 从Key Vault获取密码key_vault_url = "https://kv-adf-secrets.vault.azure.net/"credential = DefaultAzureCredential()secret_client = SecretClient(vault_url=key_vault_url, credential=credential)secret_name = "internal-api-password"client_secret = secret_client.get_secret(secret_name).value# 获取Okta令牌token_url = "https://company.okta.com/oauth2/default/v1/token"data = {'grant_type': 'client_credentials','client_id': client_id,'client_secret': client_secret,'scope': 'api_access'}token_response = requests.post(token_url, data=data)token_response.raise_for_status()access_token = token_response.json()['access_token']# 调用内部APIapi_url = "https://internal-api.company.com/data"headers = {'Authorization': f'Bearer {access_token}'}api_response = requests.get(api_url, headers=headers)api_response.raise_for_status()return func.HttpResponse(api_response.text, status_code=200)except Exception as e:logging.error(f"Error: {str(e)}")return func.HttpResponse(f"Error occurred: {str(e)}", status_code=500)
ADF配置:
- 使用 Web Activity 调用上述Azure Function的URL,传递
client_id
作为参数。
安全性和可靠性增强措施
- 密钥轮换:
- 定期更新Key Vault中的密码,利用Key Vault的自动轮换策略。
- 监控与日志:
- 启用Key Vault和ADF的诊断日志,监控异常访问。
- 使用Azure Monitor跟踪管道运行状态。
- 网络隔离:
- 将Azure资源(如Key Vault、ADF)部署到虚拟网络,使用私有终结点。
- 最小权限原则:
- 确保服务账号仅拥有访问API所需的最小权限。
- HTTPS加密:
- 确保所有端点(Okta、API、Key Vault)均使用TLS加密。