Merge branch 'hermes/hermes-996d586b'
Some checks failed
CI / verify (push) Has been cancelled

This commit is contained in:
2026-05-08 11:23:57 +08:00
14 changed files with 1326 additions and 14 deletions

View File

@@ -1,11 +1,12 @@
use std::{
collections::BTreeSet,
fs,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
};
use axum::{
Json,
extract::{Extension, Request, State},
extract::{Extension, Query, Request, State},
http::{
HeaderMap, HeaderName, HeaderValue, Method, StatusCode,
header::{AUTHORIZATION, CONTENT_TYPE},
@@ -20,6 +21,7 @@ use shared_contracts::admin::{
AdminDatabaseOverviewPayload, AdminDatabaseTableStatPayload, AdminDebugHeaderInput,
AdminDebugHttpRequest, AdminDebugHttpResponse, AdminLoginRequest, AdminLoginResponse,
AdminMeResponse, AdminOverviewResponse, AdminServiceOverviewPayload, AdminSessionPayload,
AdminTrackingEventEntryPayload, AdminTrackingEventListQuery, AdminTrackingEventListResponse,
};
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
@@ -42,6 +44,8 @@ const BLOCKED_DEBUG_HEADERS: &[&str] = &[
// SpacetimeDB 2.x 的 schema HTTP API 要求显式传入 BSATN JSON 版本。
// 后台总览只读取表名,固定使用当前 CLI 2.1.0 兼容的版本参数即可。
const SPACETIME_SCHEMA_VERSION_QUERY: &str = "version=9";
const ADMIN_TRACKING_EVENT_DEFAULT_LIMIT: u32 = 200;
const ADMIN_TRACKING_EVENT_MAX_LIMIT: u32 = 1000;
#[derive(Clone, Debug)]
pub struct AuthenticatedAdmin {
@@ -153,6 +157,19 @@ pub async fn admin_debug_http(
Ok(json_success_body(Some(&request_context), response))
}
pub async fn admin_list_tracking_events(
State(state): State<AppState>,
Extension(request_context): Extension<RequestContext>,
Extension(_admin): Extension<AuthenticatedAdmin>,
Query(query): Query<AdminTrackingEventListQuery>,
) -> Result<Json<Value>, AppError> {
let entries = fetch_admin_tracking_events(&state, query).await?;
Ok(json_success_body(
Some(&request_context),
AdminTrackingEventListResponse { entries },
))
}
pub async fn require_admin_auth(
State(state): State<AppState>,
mut request: Request,
@@ -488,6 +505,290 @@ fn parse_count_value(value: &Value) -> Result<u64, String> {
}
}
async fn fetch_admin_tracking_events(
state: &AppState,
query: AdminTrackingEventListQuery,
) -> Result<Vec<AdminTrackingEventEntryPayload>, AppError> {
let client = Client::new();
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
let database = state.config.spacetime_database.trim();
let token = state
.config
.spacetime_token
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
.or_else(load_local_spacetime_cli_token);
let sql = build_admin_tracking_events_sql(&query)
.map_err(|error| AppError::from_status(StatusCode::BAD_REQUEST).with_message(error))?;
let payload = fetch_spacetime_sql_json(&client, server_root, database, token.as_deref(), &sql)
.await
.map_err(|error| {
AppError::from_status(StatusCode::BAD_GATEWAY)
.with_message(format!("埋点数据读取失败:{error}"))
})?;
parse_admin_tracking_events_sql_response(payload).map_err(|error| {
AppError::from_status(StatusCode::BAD_GATEWAY)
.with_message(format!("埋点数据解析失败:{error}"))
})
}
fn build_admin_tracking_events_sql(query: &AdminTrackingEventListQuery) -> Result<String, String> {
let mut conditions = Vec::new();
if let Some(value) = normalized_non_empty(query.event_key.as_deref()) {
conditions.push(format!("event_key = {}", quote_sql_string(value)));
}
if let Some(value) = normalized_non_empty(query.user_id.as_deref()) {
conditions.push(format!("user_id = {}", quote_sql_string(value)));
}
if let Some(value) = normalized_non_empty(query.scope_kind.as_deref()) {
let scope_kind = normalize_admin_tracking_scope_kind(value)?;
conditions.push(format!("scope_kind = {}", quote_sql_string(scope_kind)));
}
if let Some(value) = normalized_non_empty(query.scope_id.as_deref()) {
conditions.push(format!("scope_id = {}", quote_sql_string(value)));
}
let where_clause = if conditions.is_empty() {
String::new()
} else {
format!(" WHERE {}", conditions.join(" AND "))
};
let limit = clamp_admin_tracking_event_limit(query.limit);
Ok(format!(
"SELECT event_id, event_key, scope_kind, scope_id, day_key, user_id, owner_user_id, profile_id, module_key, metadata_json, occurred_at FROM tracking_event{where_clause} LIMIT {limit}"
))
}
fn normalized_non_empty(value: Option<&str>) -> Option<&str> {
value.map(str::trim).filter(|value| !value.is_empty())
}
fn load_local_spacetime_cli_token() -> Option<String> {
// 本地开发清库后会通过 `/v1/identity` 重新登录 CLI这里复用 CLI token确保 SQL 可读取 private 表。
let content = fs::read_to_string(".spacetimedb/local/config/cli.toml")
.or_else(|_| fs::read_to_string("server-rs/.spacetimedb/local/config/cli.toml"))
.ok()?;
content.lines().find_map(|line| {
let value = line.trim().strip_prefix("spacetimedb_token = ")?;
Some(value.trim().trim_matches('"').to_string()).filter(|token| !token.is_empty())
})
}
fn quote_sql_string(value: &str) -> String {
format!("'{}'", value.replace('\'', "''"))
}
fn normalize_admin_tracking_scope_kind(value: &str) -> Result<&'static str, String> {
match value.trim().to_ascii_lowercase().as_str() {
"site" => Ok("site"),
"work" => Ok("work"),
"module" => Ok("module"),
"user" => Ok("user"),
_ => Err("scopeKind 必须是 site/work/module/user".to_string()),
}
}
fn clamp_admin_tracking_event_limit(limit: Option<u32>) -> u32 {
limit
.unwrap_or(ADMIN_TRACKING_EVENT_DEFAULT_LIMIT)
.clamp(1, ADMIN_TRACKING_EVENT_MAX_LIMIT)
}
async fn fetch_spacetime_sql_json(
client: &Client,
server_root: &str,
database: &str,
token: Option<&str>,
sql: &str,
) -> Result<Value, String> {
let mut request = client
.post(format!("{server_root}/v1/database/{database}/sql"))
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
.body(sql.to_string());
if let Some(token) = token {
request = request.bearer_auth(token);
}
let response = request
.send()
.await
.map_err(|error| format!("SQL 请求失败:{error}"))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(format!("HTTP {}{}", status.as_u16(), trim_preview(&body)));
}
response
.json::<Value>()
.await
.map_err(|error| format!("SQL 响应解析失败:{error}"))
}
fn parse_admin_tracking_events_sql_response(
payload: Value,
) -> Result<Vec<AdminTrackingEventEntryPayload>, String> {
let rows = extract_first_sql_rows(payload)?;
rows.iter()
.map(parse_admin_tracking_event_row)
.collect::<Result<Vec<_>, _>>()
.map(|mut entries| {
// SpacetimeDB 2.2 的 HTTP SQL 暂不支持 ORDER BY后台在 API 层按发生时间倒序收口。
entries.sort_by(|left, right| right.occurred_at.cmp(&left.occurred_at));
entries
})
}
fn extract_first_sql_rows(payload: Value) -> Result<Vec<Value>, String> {
let statement = match payload {
Value::Array(statements) => statements
.into_iter()
.next()
.ok_or_else(|| "SQL 结果为空".to_string())?,
Value::Object(statement) => Value::Object(statement),
_ => return Err("SQL 响应格式非法".to_string()),
};
let Value::Object(mut statement) = statement else {
return Err("SQL statement 结果格式非法".to_string());
};
let rows = statement
.remove("rows")
.ok_or_else(|| "SQL 响应缺少 rows 字段".to_string())?;
match rows {
Value::Array(rows) => Ok(rows),
_ => Err("SQL rows 字段格式非法".to_string()),
}
}
fn parse_admin_tracking_event_row(row: &Value) -> Result<AdminTrackingEventEntryPayload, String> {
let columns = row.as_array().ok_or_else(|| "埋点行格式非法".to_string())?;
let event_key = required_string_column(columns, 1, "event_key")?;
Ok(AdminTrackingEventEntryPayload {
event_id: required_string_column(columns, 0, "event_id")?,
event_title: admin_tracking_event_title(&event_key).to_string(),
event_key,
scope_kind: tracking_scope_kind_to_string(
columns
.get(2)
.ok_or_else(|| "埋点行缺少 scope_kind".to_string())?,
)
.ok_or_else(|| "埋点行 scope_kind 类型非法".to_string())?,
scope_id: required_string_column(columns, 3, "scope_id")?,
day_key: required_i64_column(columns, 4, "day_key")?,
user_id: optional_string_column(columns, 5),
owner_user_id: optional_string_column(columns, 6),
profile_id: optional_string_column(columns, 7),
module_key: optional_string_column(columns, 8),
metadata_json: required_string_column(columns, 9, "metadata_json")?,
occurred_at: timestamp_to_display_string(
columns
.get(10)
.ok_or_else(|| "埋点行缺少 occurred_at".to_string())?,
)
.ok_or_else(|| "埋点行 occurred_at 不是字符串".to_string())?,
})
}
fn required_string_column(
columns: &[Value],
index: usize,
field_name: &str,
) -> Result<String, String> {
value_to_string(
columns
.get(index)
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?,
)
.ok_or_else(|| format!("埋点行 {field_name} 不是字符串"))
}
fn optional_string_column(columns: &[Value], index: usize) -> Option<String> {
columns.get(index).and_then(value_to_string)
}
fn required_i64_column(columns: &[Value], index: usize, field_name: &str) -> Result<i64, String> {
let value = columns
.get(index)
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?;
match value {
Value::Number(number) => number
.as_i64()
.ok_or_else(|| format!("埋点行 {field_name} 不是整数")),
Value::String(text) => text
.trim()
.parse::<i64>()
.map_err(|error| format!("埋点行 {field_name} 解析失败:{error}")),
_ => Err(format!("埋点行 {field_name} 类型非法")),
}
}
fn value_to_string(value: &Value) -> Option<String> {
match value {
Value::Null => None,
Value::String(text) => Some(text.clone()),
Value::Object(object) => object.get("some").and_then(value_to_string),
Value::Number(number) => Some(number.to_string()),
Value::Bool(value) => Some(value.to_string()),
Value::Array(items) => value_array_to_string(items),
}
}
fn value_array_to_string(items: &[Value]) -> Option<String> {
if items.len() == 2 {
if let Some(index) = items.first().and_then(Value::as_u64) {
if index == 0 {
return items.get(1).and_then(value_to_string);
}
if index == 1 && items.get(1).and_then(Value::as_array).is_some() {
return None;
}
}
}
Some(Value::Array(items.to_vec()).to_string())
}
fn tracking_scope_kind_to_string(value: &Value) -> Option<String> {
match value {
Value::String(text) => Some(text.clone()),
Value::Object(object) => object
.get("tag")
.or_else(|| object.get("variant"))
.or_else(|| object.get("name"))
.and_then(value_to_string),
Value::Array(items) => {
let index = items.first().and_then(Value::as_u64)?;
Some(
match index {
0 => "site",
1 => "work",
2 => "module",
3 => "user",
_ => return Some(Value::Array(items.to_vec()).to_string()),
}
.to_string(),
)
}
_ => value_to_string(value),
}
}
fn timestamp_to_display_string(value: &Value) -> Option<String> {
match value {
Value::Array(items) if items.len() == 1 => items.first().and_then(value_to_string),
_ => value_to_string(value),
}
}
fn admin_tracking_event_title(event_key: &str) -> &str {
match event_key {
"daily_login" => "每日登录",
_ => event_key,
}
}
async fn execute_admin_debug_http(
state: &AppState,
payload: AdminDebugHttpRequest,
@@ -648,12 +949,14 @@ fn build_admin_session_payload(session: crate::state::AdminSession) -> AdminSess
#[cfg(test)]
mod tests {
use super::{
build_body_preview, build_debug_base_url, build_spacetime_schema_url,
is_safe_spacetime_table_name, normalize_debug_path, normalize_table_count_error,
parse_spacetime_sql_count_response, trim_preview,
build_admin_tracking_events_sql, build_body_preview, build_debug_base_url,
build_spacetime_schema_url, clamp_admin_tracking_event_limit, is_safe_spacetime_table_name,
normalize_debug_path, normalize_table_count_error,
parse_admin_tracking_events_sql_response, parse_spacetime_sql_count_response, trim_preview,
};
use axum::{http::StatusCode, response::IntoResponse};
use serde_json::json;
use shared_contracts::admin::AdminTrackingEventListQuery;
#[test]
fn normalize_debug_path_rejects_absolute_url() {
@@ -816,6 +1119,136 @@ mod tests {
assert_eq!(count, 3);
}
#[test]
fn build_admin_tracking_events_sql_quotes_filters_and_clamps_limit() {
let sql = build_admin_tracking_events_sql(&AdminTrackingEventListQuery {
event_key: Some("daily'login".to_string()),
user_id: Some("user-1".to_string()),
scope_kind: Some("USER".to_string()),
scope_id: Some("scope-1".to_string()),
limit: Some(2000),
})
.expect("tracking sql should build");
assert!(sql.contains("event_key = 'daily''login'"));
assert!(sql.contains("user_id = 'user-1'"));
assert!(sql.contains("scope_kind = 'user'"));
assert!(sql.contains("scope_id = 'scope-1'"));
assert!(!sql.contains("ORDER BY"));
assert!(sql.ends_with("LIMIT 1000"));
}
#[test]
fn clamp_admin_tracking_event_limit_uses_default_and_bounds() {
assert_eq!(clamp_admin_tracking_event_limit(None), 200);
assert_eq!(clamp_admin_tracking_event_limit(Some(0)), 1);
assert_eq!(clamp_admin_tracking_event_limit(Some(1001)), 1000);
}
#[test]
fn parse_admin_tracking_events_sql_response_accepts_statement_array_rows() {
let payload = json!([
{
"rows": [[
"event-1",
"daily_login",
"user",
"user-1",
20580,
{"some": "user-1"},
null,
{"some": "profile-1"},
"profile",
"{\"source\":\"task\"}",
"2026-05-07T00:00:00Z"
]]
}
]);
let entries =
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].event_id, "event-1");
assert_eq!(entries[0].event_title, "每日登录");
assert_eq!(entries[0].user_id.as_deref(), Some("user-1"));
assert_eq!(entries[0].profile_id.as_deref(), Some("profile-1"));
assert_eq!(entries[0].module_key.as_deref(), Some("profile"));
}
#[test]
fn parse_admin_tracking_events_sql_response_normalizes_sats_values() {
let payload = json!([
{
"rows": [[
"event-1",
"daily_login",
[3, []],
"user-1",
20580,
[0, "user-1"],
[1, []],
[0, "profile-1"],
[0, "profile"],
"{}",
[1778207451731746i64]
]]
}
]);
let entries =
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
assert_eq!(entries[0].scope_kind, "user");
assert_eq!(entries[0].user_id.as_deref(), Some("user-1"));
assert_eq!(entries[0].owner_user_id, None);
assert_eq!(entries[0].profile_id.as_deref(), Some("profile-1"));
assert_eq!(entries[0].module_key.as_deref(), Some("profile"));
assert_eq!(entries[0].occurred_at, "1778207451731746");
}
#[test]
fn parse_admin_tracking_events_sql_response_sorts_by_occurred_at_desc() {
let payload = json!([
{
"rows": [
[
"event-old",
"daily_login",
"user",
"user-1",
20580,
{"some": "user-1"},
null,
{"some": "profile-1"},
"profile",
"{}",
"2026-05-07T00:00:00Z"
],
[
"event-new",
"daily_login",
"user",
"user-1",
20580,
{"some": "user-1"},
null,
{"some": "profile-1"},
"profile",
"{}",
"2026-05-07T01:00:00Z"
]
]
}
]);
let entries =
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
assert_eq!(entries[0].event_id, "event-new");
assert_eq!(entries[1].event_id, "event-old");
}
#[test]
fn build_body_preview_handles_utf8() {
let preview = build_body_preview("后台测试".as_bytes());

View File

@@ -13,7 +13,10 @@ use tower_http::{
use tracing::{Level, Span, error, info, info_span, warn};
use crate::{
admin::{admin_debug_http, admin_login, admin_me, admin_overview, require_admin_auth},
admin::{
admin_debug_http, admin_list_tracking_events, admin_login, admin_me, admin_overview,
require_admin_auth,
},
ai_tasks::{
append_ai_text_chunk, attach_ai_result_reference, cancel_ai_task, complete_ai_stage,
complete_ai_task, create_ai_task, fail_ai_task, start_ai_task, start_ai_task_stage,
@@ -169,6 +172,13 @@ pub fn build_router(state: AppState) -> Router {
require_admin_auth,
)),
)
.route(
"/admin/api/tracking/events",
get(admin_list_tracking_events).route_layer(middleware::from_fn_with_state(
state.clone(),
require_admin_auth,
)),
)
.route(
"/admin/api/profile/redeem-codes",
get(admin_list_profile_redeem_codes)

View File

@@ -1418,8 +1418,9 @@ mod tests {
let object_path = object_key.map(str::trim).filter(|value| !value.is_empty());
let canonical_uri = build_oss_v4_canonical_uri(bucket, object_path);
let payload_hash = "UNSIGNED-PAYLOAD";
let canonical_headers =
format!("host:{bucket}.{endpoint}\nx-oss-content-sha256:{payload_hash}\nx-oss-date:{signed_at_text}\n");
let canonical_headers = format!(
"host:{bucket}.{endpoint}\nx-oss-content-sha256:{payload_hash}\nx-oss-date:{signed_at_text}\n"
);
let additional_headers = "host";
let canonical_request = format!(
"{}\n{}\n\n{}\n{}\n{}",

View File

@@ -1456,9 +1456,19 @@ fn build_aliyun_sms_url(endpoint: &str) -> Result<String, SmsProviderError> {
}
fn current_aliyun_timestamp() -> String {
OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
// 阿里云 OpenAPI ACS3 签名头 x-acs-date 要求使用不带小数秒的 UTC ISO 8601 格式,
// 即 yyyy-MM-dd'T'HH:mm:ss'Z'。time crate 的 Rfc3339 会保留纳秒,
// 形如 2026-05-07T14:23:59.364767Z,阿里云网关会判定为时间格式非法。
let now = OffsetDateTime::now_utc();
format!(
"{:04}-{:02}-{:02}T{:02}:{:02}:{:02}Z",
now.year(),
u8::from(now.month()),
now.day(),
now.hour(),
now.minute(),
now.second()
)
}
fn canonicalize_aliyun_form_params(params: &BTreeMap<String, String>) -> String {
@@ -1480,8 +1490,9 @@ fn build_aliyun_form_body(params: &BTreeMap<String, String>) -> String {
}
fn hmac_sha256_hex(key: &[u8], content: &[u8]) -> Result<String, SmsProviderError> {
let mut signer = HmacSha256::new_from_slice(key)
.map_err(|error| SmsProviderError::InvalidConfig(format!("初始化短信签名器失败:{error}")))?;
let mut signer = HmacSha256::new_from_slice(key).map_err(|error| {
SmsProviderError::InvalidConfig(format!("初始化短信签名器失败:{error}"))
})?;
signer.update(content);
Ok(hex_lower(&signer.finalize().into_bytes()))
}
@@ -2146,6 +2157,23 @@ mod tests {
assert!(headers.get("x-acs-content-sha256").is_some());
}
#[test]
fn current_aliyun_timestamp_uses_acs_iso8601_format_without_fractional_seconds() {
let timestamp = current_aliyun_timestamp();
assert_eq!(timestamp.len(), "2026-05-07T12:34:56Z".len());
assert_eq!(timestamp.as_bytes()[4], b'-');
assert_eq!(timestamp.as_bytes()[7], b'-');
assert_eq!(timestamp.as_bytes()[10], b'T');
assert_eq!(timestamp.as_bytes()[13], b':');
assert_eq!(timestamp.as_bytes()[16], b':');
assert!(timestamp.ends_with('Z'));
assert!(!timestamp.contains('.'));
assert!(timestamp.chars().enumerate().all(|(index, value)| {
matches!(index, 4 | 7 | 10 | 13 | 16 | 19) || value.is_ascii_digit()
}));
}
#[test]
fn aliyun_send_response_deserializes_pascal_case_fields() {
let payload = serde_json::from_str::<AliyunSendSmsVerifyCodeResponse>(

View File

@@ -105,3 +105,39 @@ pub struct AdminDebugHttpResponse {
pub body_text: String,
pub body_json: Option<Value>,
}
// 后台埋点明细查询参数只保留运营筛选需要的只读字段。
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct AdminTrackingEventListQuery {
pub event_key: Option<String>,
pub user_id: Option<String>,
pub scope_kind: Option<String>,
pub scope_id: Option<String>,
pub limit: Option<u32>,
}
// 单条埋点原始事件明细,字段与 tracking_event 表一一对应并补充事件名称。
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct AdminTrackingEventEntryPayload {
pub event_id: String,
pub event_key: String,
pub event_title: String,
pub scope_kind: String,
pub scope_id: String,
pub day_key: i64,
pub user_id: Option<String>,
pub owner_user_id: Option<String>,
pub profile_id: Option<String>,
pub module_key: Option<String>,
pub metadata_json: String,
pub occurred_at: String,
}
// 后台埋点明细列表响应,前端导出 Excel 时直接使用 entries。
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct AdminTrackingEventListResponse {
pub entries: Vec<AdminTrackingEventEntryPayload>,
}