feat: add admin tracking events export
This commit is contained in:
@@ -5,7 +5,7 @@ use std::{
|
||||
|
||||
use axum::{
|
||||
Json,
|
||||
extract::{Extension, Request, State},
|
||||
extract::{Extension, Query, Request, State},
|
||||
http::{
|
||||
HeaderMap, HeaderName, HeaderValue, Method, StatusCode,
|
||||
header::{AUTHORIZATION, CONTENT_TYPE},
|
||||
@@ -20,6 +20,7 @@ use shared_contracts::admin::{
|
||||
AdminDatabaseOverviewPayload, AdminDatabaseTableStatPayload, AdminDebugHeaderInput,
|
||||
AdminDebugHttpRequest, AdminDebugHttpResponse, AdminLoginRequest, AdminLoginResponse,
|
||||
AdminMeResponse, AdminOverviewResponse, AdminServiceOverviewPayload, AdminSessionPayload,
|
||||
AdminTrackingEventEntryPayload, AdminTrackingEventListQuery, AdminTrackingEventListResponse,
|
||||
};
|
||||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||||
|
||||
@@ -42,6 +43,8 @@ const BLOCKED_DEBUG_HEADERS: &[&str] = &[
|
||||
// SpacetimeDB 2.x 的 schema HTTP API 要求显式传入 BSATN JSON 版本。
|
||||
// 后台总览只读取表名,固定使用当前 CLI 2.1.0 兼容的版本参数即可。
|
||||
const SPACETIME_SCHEMA_VERSION_QUERY: &str = "version=9";
|
||||
const ADMIN_TRACKING_EVENT_DEFAULT_LIMIT: u32 = 200;
|
||||
const ADMIN_TRACKING_EVENT_MAX_LIMIT: u32 = 1000;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct AuthenticatedAdmin {
|
||||
@@ -153,6 +156,19 @@ pub async fn admin_debug_http(
|
||||
Ok(json_success_body(Some(&request_context), response))
|
||||
}
|
||||
|
||||
pub async fn admin_list_tracking_events(
|
||||
State(state): State<AppState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||||
Query(query): Query<AdminTrackingEventListQuery>,
|
||||
) -> Result<Json<Value>, AppError> {
|
||||
let entries = fetch_admin_tracking_events(&state, query).await?;
|
||||
Ok(json_success_body(
|
||||
Some(&request_context),
|
||||
AdminTrackingEventListResponse { entries },
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn require_admin_auth(
|
||||
State(state): State<AppState>,
|
||||
mut request: Request,
|
||||
@@ -488,6 +504,216 @@ fn parse_count_value(value: &Value) -> Result<u64, String> {
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_admin_tracking_events(
|
||||
state: &AppState,
|
||||
query: AdminTrackingEventListQuery,
|
||||
) -> Result<Vec<AdminTrackingEventEntryPayload>, AppError> {
|
||||
let client = Client::new();
|
||||
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
|
||||
let database = state.config.spacetime_database.trim();
|
||||
let token = state
|
||||
.config
|
||||
.spacetime_token
|
||||
.as_deref()
|
||||
.map(str::trim)
|
||||
.filter(|value| !value.is_empty());
|
||||
let sql = build_admin_tracking_events_sql(&query)
|
||||
.map_err(|error| AppError::from_status(StatusCode::BAD_REQUEST).with_message(error))?;
|
||||
|
||||
let payload = fetch_spacetime_sql_json(&client, server_root, database, token, &sql)
|
||||
.await
|
||||
.map_err(|error| {
|
||||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||||
.with_message(format!("埋点数据读取失败:{error}"))
|
||||
})?;
|
||||
parse_admin_tracking_events_sql_response(payload).map_err(|error| {
|
||||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||||
.with_message(format!("埋点数据解析失败:{error}"))
|
||||
})
|
||||
}
|
||||
|
||||
fn build_admin_tracking_events_sql(query: &AdminTrackingEventListQuery) -> Result<String, String> {
|
||||
let mut conditions = Vec::new();
|
||||
if let Some(value) = normalized_non_empty(query.event_key.as_deref()) {
|
||||
conditions.push(format!("event_key = {}", quote_sql_string(value)));
|
||||
}
|
||||
if let Some(value) = normalized_non_empty(query.user_id.as_deref()) {
|
||||
conditions.push(format!("user_id = {}", quote_sql_string(value)));
|
||||
}
|
||||
if let Some(value) = normalized_non_empty(query.scope_kind.as_deref()) {
|
||||
let scope_kind = normalize_admin_tracking_scope_kind(value)?;
|
||||
conditions.push(format!("scope_kind = {}", quote_sql_string(scope_kind)));
|
||||
}
|
||||
if let Some(value) = normalized_non_empty(query.scope_id.as_deref()) {
|
||||
conditions.push(format!("scope_id = {}", quote_sql_string(value)));
|
||||
}
|
||||
|
||||
let where_clause = if conditions.is_empty() {
|
||||
String::new()
|
||||
} else {
|
||||
format!(" WHERE {}", conditions.join(" AND "))
|
||||
};
|
||||
let limit = clamp_admin_tracking_event_limit(query.limit);
|
||||
Ok(format!(
|
||||
"SELECT event_id, event_key, scope_kind, scope_id, day_key, user_id, owner_user_id, profile_id, module_key, metadata_json, occurred_at FROM tracking_event{where_clause} ORDER BY occurred_at DESC LIMIT {limit}"
|
||||
))
|
||||
}
|
||||
|
||||
fn normalized_non_empty(value: Option<&str>) -> Option<&str> {
|
||||
value.map(str::trim).filter(|value| !value.is_empty())
|
||||
}
|
||||
|
||||
fn quote_sql_string(value: &str) -> String {
|
||||
format!("'{}'", value.replace('\'', "''"))
|
||||
}
|
||||
|
||||
fn normalize_admin_tracking_scope_kind(value: &str) -> Result<&'static str, String> {
|
||||
match value.trim().to_ascii_lowercase().as_str() {
|
||||
"site" => Ok("site"),
|
||||
"work" => Ok("work"),
|
||||
"module" => Ok("module"),
|
||||
"user" => Ok("user"),
|
||||
_ => Err("scopeKind 必须是 site/work/module/user".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn clamp_admin_tracking_event_limit(limit: Option<u32>) -> u32 {
|
||||
limit
|
||||
.unwrap_or(ADMIN_TRACKING_EVENT_DEFAULT_LIMIT)
|
||||
.clamp(1, ADMIN_TRACKING_EVENT_MAX_LIMIT)
|
||||
}
|
||||
|
||||
async fn fetch_spacetime_sql_json(
|
||||
client: &Client,
|
||||
server_root: &str,
|
||||
database: &str,
|
||||
token: Option<&str>,
|
||||
sql: &str,
|
||||
) -> Result<Value, String> {
|
||||
let mut request = client
|
||||
.post(format!("{server_root}/v1/database/{database}/sql"))
|
||||
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
|
||||
.body(sql.to_string());
|
||||
if let Some(token) = token {
|
||||
request = request.bearer_auth(token);
|
||||
}
|
||||
|
||||
let response = request
|
||||
.send()
|
||||
.await
|
||||
.map_err(|error| format!("SQL 请求失败:{error}"))?;
|
||||
if !response.status().is_success() {
|
||||
let status = response.status();
|
||||
let body = response.text().await.unwrap_or_default();
|
||||
return Err(format!("HTTP {}:{}", status.as_u16(), trim_preview(&body)));
|
||||
}
|
||||
|
||||
response
|
||||
.json::<Value>()
|
||||
.await
|
||||
.map_err(|error| format!("SQL 响应解析失败:{error}"))
|
||||
}
|
||||
|
||||
fn parse_admin_tracking_events_sql_response(
|
||||
payload: Value,
|
||||
) -> Result<Vec<AdminTrackingEventEntryPayload>, String> {
|
||||
let rows = extract_first_sql_rows(payload)?;
|
||||
rows.iter()
|
||||
.map(parse_admin_tracking_event_row)
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
}
|
||||
|
||||
fn extract_first_sql_rows(payload: Value) -> Result<Vec<Value>, String> {
|
||||
let statement = match payload {
|
||||
Value::Array(statements) => statements
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or_else(|| "SQL 结果为空".to_string())?,
|
||||
Value::Object(statement) => Value::Object(statement),
|
||||
_ => return Err("SQL 响应格式非法".to_string()),
|
||||
};
|
||||
let Value::Object(mut statement) = statement else {
|
||||
return Err("SQL statement 结果格式非法".to_string());
|
||||
};
|
||||
let rows = statement
|
||||
.remove("rows")
|
||||
.ok_or_else(|| "SQL 响应缺少 rows 字段".to_string())?;
|
||||
match rows {
|
||||
Value::Array(rows) => Ok(rows),
|
||||
_ => Err("SQL rows 字段格式非法".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_admin_tracking_event_row(row: &Value) -> Result<AdminTrackingEventEntryPayload, String> {
|
||||
let columns = row.as_array().ok_or_else(|| "埋点行格式非法".to_string())?;
|
||||
let event_key = required_string_column(columns, 1, "event_key")?;
|
||||
Ok(AdminTrackingEventEntryPayload {
|
||||
event_id: required_string_column(columns, 0, "event_id")?,
|
||||
event_title: admin_tracking_event_title(&event_key).to_string(),
|
||||
event_key,
|
||||
scope_kind: required_string_column(columns, 2, "scope_kind")?,
|
||||
scope_id: required_string_column(columns, 3, "scope_id")?,
|
||||
day_key: required_i64_column(columns, 4, "day_key")?,
|
||||
user_id: optional_string_column(columns, 5),
|
||||
owner_user_id: optional_string_column(columns, 6),
|
||||
profile_id: optional_string_column(columns, 7),
|
||||
module_key: optional_string_column(columns, 8),
|
||||
metadata_json: required_string_column(columns, 9, "metadata_json")?,
|
||||
occurred_at: required_string_column(columns, 10, "occurred_at")?,
|
||||
})
|
||||
}
|
||||
|
||||
fn required_string_column(
|
||||
columns: &[Value],
|
||||
index: usize,
|
||||
field_name: &str,
|
||||
) -> Result<String, String> {
|
||||
value_to_string(
|
||||
columns
|
||||
.get(index)
|
||||
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?,
|
||||
)
|
||||
.ok_or_else(|| format!("埋点行 {field_name} 不是字符串"))
|
||||
}
|
||||
|
||||
fn optional_string_column(columns: &[Value], index: usize) -> Option<String> {
|
||||
columns.get(index).and_then(value_to_string)
|
||||
}
|
||||
|
||||
fn required_i64_column(columns: &[Value], index: usize, field_name: &str) -> Result<i64, String> {
|
||||
let value = columns
|
||||
.get(index)
|
||||
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?;
|
||||
match value {
|
||||
Value::Number(number) => number
|
||||
.as_i64()
|
||||
.ok_or_else(|| format!("埋点行 {field_name} 不是整数")),
|
||||
Value::String(text) => text
|
||||
.trim()
|
||||
.parse::<i64>()
|
||||
.map_err(|error| format!("埋点行 {field_name} 解析失败:{error}")),
|
||||
_ => Err(format!("埋点行 {field_name} 类型非法")),
|
||||
}
|
||||
}
|
||||
|
||||
fn value_to_string(value: &Value) -> Option<String> {
|
||||
match value {
|
||||
Value::Null => None,
|
||||
Value::String(text) => Some(text.clone()),
|
||||
Value::Object(object) => object.get("some").and_then(value_to_string),
|
||||
Value::Number(number) => Some(number.to_string()),
|
||||
Value::Bool(value) => Some(value.to_string()),
|
||||
_ => Some(value.to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
fn admin_tracking_event_title(event_key: &str) -> &str {
|
||||
match event_key {
|
||||
"daily_login" => "每日登录",
|
||||
_ => event_key,
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_admin_debug_http(
|
||||
state: &AppState,
|
||||
payload: AdminDebugHttpRequest,
|
||||
@@ -648,12 +874,14 @@ fn build_admin_session_payload(session: crate::state::AdminSession) -> AdminSess
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{
|
||||
build_body_preview, build_debug_base_url, build_spacetime_schema_url,
|
||||
is_safe_spacetime_table_name, normalize_debug_path, normalize_table_count_error,
|
||||
parse_spacetime_sql_count_response, trim_preview,
|
||||
build_admin_tracking_events_sql, build_body_preview, build_debug_base_url,
|
||||
build_spacetime_schema_url, clamp_admin_tracking_event_limit, is_safe_spacetime_table_name,
|
||||
normalize_debug_path, normalize_table_count_error,
|
||||
parse_admin_tracking_events_sql_response, parse_spacetime_sql_count_response, trim_preview,
|
||||
};
|
||||
use axum::{http::StatusCode, response::IntoResponse};
|
||||
use serde_json::json;
|
||||
use shared_contracts::admin::AdminTrackingEventListQuery;
|
||||
|
||||
#[test]
|
||||
fn normalize_debug_path_rejects_absolute_url() {
|
||||
@@ -816,6 +1044,61 @@ mod tests {
|
||||
assert_eq!(count, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_admin_tracking_events_sql_quotes_filters_and_clamps_limit() {
|
||||
let sql = build_admin_tracking_events_sql(&AdminTrackingEventListQuery {
|
||||
event_key: Some("daily'login".to_string()),
|
||||
user_id: Some("user-1".to_string()),
|
||||
scope_kind: Some("USER".to_string()),
|
||||
scope_id: Some("scope-1".to_string()),
|
||||
limit: Some(2000),
|
||||
})
|
||||
.expect("tracking sql should build");
|
||||
|
||||
assert!(sql.contains("event_key = 'daily''login'"));
|
||||
assert!(sql.contains("user_id = 'user-1'"));
|
||||
assert!(sql.contains("scope_kind = 'user'"));
|
||||
assert!(sql.contains("scope_id = 'scope-1'"));
|
||||
assert!(sql.ends_with("LIMIT 1000"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn clamp_admin_tracking_event_limit_uses_default_and_bounds() {
|
||||
assert_eq!(clamp_admin_tracking_event_limit(None), 200);
|
||||
assert_eq!(clamp_admin_tracking_event_limit(Some(0)), 1);
|
||||
assert_eq!(clamp_admin_tracking_event_limit(Some(1001)), 1000);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_admin_tracking_events_sql_response_accepts_statement_array_rows() {
|
||||
let payload = json!([
|
||||
{
|
||||
"rows": [[
|
||||
"event-1",
|
||||
"daily_login",
|
||||
"user",
|
||||
"user-1",
|
||||
20580,
|
||||
{"some": "user-1"},
|
||||
null,
|
||||
{"some": "profile-1"},
|
||||
"profile",
|
||||
"{\"source\":\"task\"}",
|
||||
"2026-05-07T00:00:00Z"
|
||||
]]
|
||||
}
|
||||
]);
|
||||
|
||||
let entries =
|
||||
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
|
||||
|
||||
assert_eq!(entries.len(), 1);
|
||||
assert_eq!(entries[0].event_id, "event-1");
|
||||
assert_eq!(entries[0].event_title, "每日登录");
|
||||
assert_eq!(entries[0].user_id.as_deref(), Some("user-1"));
|
||||
assert_eq!(entries[0].profile_id.as_deref(), Some("profile-1"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn build_body_preview_handles_utf8() {
|
||||
let preview = build_body_preview("后台测试".as_bytes());
|
||||
|
||||
@@ -13,7 +13,10 @@ use tower_http::{
|
||||
use tracing::{Level, Span, error, info, info_span, warn};
|
||||
|
||||
use crate::{
|
||||
admin::{admin_debug_http, admin_login, admin_me, admin_overview, require_admin_auth},
|
||||
admin::{
|
||||
admin_debug_http, admin_list_tracking_events, admin_login, admin_me, admin_overview,
|
||||
require_admin_auth,
|
||||
},
|
||||
ai_tasks::{
|
||||
append_ai_text_chunk, attach_ai_result_reference, cancel_ai_task, complete_ai_stage,
|
||||
complete_ai_task, create_ai_task, fail_ai_task, start_ai_task, start_ai_task_stage,
|
||||
@@ -168,6 +171,13 @@ pub fn build_router(state: AppState) -> Router {
|
||||
require_admin_auth,
|
||||
)),
|
||||
)
|
||||
.route(
|
||||
"/admin/api/tracking/events",
|
||||
get(admin_list_tracking_events).route_layer(middleware::from_fn_with_state(
|
||||
state.clone(),
|
||||
require_admin_auth,
|
||||
)),
|
||||
)
|
||||
.route(
|
||||
"/admin/api/profile/redeem-codes",
|
||||
get(admin_list_profile_redeem_codes)
|
||||
|
||||
Reference in New Issue
Block a user