1865 lines
63 KiB
Rust
1865 lines
63 KiB
Rust
use std::{
|
||
collections::BTreeSet,
|
||
fs,
|
||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||
};
|
||
|
||
use axum::{
|
||
Json,
|
||
extract::{Extension, Query, Request, State},
|
||
http::{
|
||
HeaderMap, HeaderName, HeaderValue, Method, StatusCode,
|
||
header::{AUTHORIZATION, CONTENT_TYPE},
|
||
},
|
||
middleware::Next,
|
||
response::Response,
|
||
};
|
||
use reqwest::Client;
|
||
use serde::Deserialize;
|
||
use serde_json::{Map, Value};
|
||
use shared_contracts::admin::{
|
||
AdminCreationEntryConfigResponse, AdminCreationEntryTypeConfigPayload,
|
||
AdminDatabaseOverviewPayload, AdminDatabaseTableListResponse, AdminDatabaseTableRowPayload,
|
||
AdminDatabaseTableRowsQuery, AdminDatabaseTableRowsResponse, AdminDatabaseTableStatPayload,
|
||
AdminDebugHeaderInput, AdminDebugHttpRequest, AdminDebugHttpResponse, AdminLoginRequest,
|
||
AdminLoginResponse, AdminMeResponse, AdminOverviewResponse, AdminServiceOverviewPayload,
|
||
AdminSessionPayload, AdminTrackingEventEntryPayload, AdminTrackingEventListQuery,
|
||
AdminTrackingEventListResponse, AdminUpsertCreationEntryTypeConfigRequest,
|
||
};
|
||
use time::{OffsetDateTime, format_description::well_known::Rfc3339};
|
||
|
||
use crate::{
|
||
api_response::json_success_body,
|
||
http_error::AppError,
|
||
request_context::RequestContext,
|
||
state::{AdminRuntime, AppState},
|
||
};
|
||
|
||
// 首版调试台只允许有限大小的请求体,避免把后台当作通用代理大包转发器。
|
||
const MAX_DEBUG_BODY_BYTES: usize = 128 * 1024;
|
||
const BLOCKED_DEBUG_HEADERS: &[&str] = &[
|
||
"host",
|
||
"content-length",
|
||
"connection",
|
||
"transfer-encoding",
|
||
"expect",
|
||
];
|
||
// SpacetimeDB 2.x 的 schema HTTP API 要求显式传入 BSATN JSON 版本。
|
||
// 后台总览只读取表名,固定使用当前 CLI 2.1.0 兼容的版本参数即可。
|
||
const SPACETIME_SCHEMA_VERSION_QUERY: &str = "version=9";
|
||
const ADMIN_TRACKING_EVENT_DEFAULT_LIMIT: u32 = 200;
|
||
const ADMIN_TRACKING_EVENT_MAX_LIMIT: u32 = 1000;
|
||
const ADMIN_DATABASE_TABLE_DEFAULT_LIMIT: u32 = 100;
|
||
const ADMIN_DATABASE_TABLE_MAX_LIMIT: u32 = 500;
|
||
|
||
#[derive(Clone, Debug)]
|
||
pub struct AuthenticatedAdmin {
|
||
session: AdminSessionPayload,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
#[serde(rename_all = "camelCase")]
|
||
struct SpacetimeDatabaseInfoResponse {
|
||
database_identity: Option<String>,
|
||
owner_identity: Option<String>,
|
||
host_type: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct SpacetimeSchemaResponse {
|
||
tables: Option<Vec<SpacetimeSchemaTable>>,
|
||
}
|
||
|
||
#[derive(Debug, Deserialize)]
|
||
struct SpacetimeSchemaTable {
|
||
name: Option<String>,
|
||
}
|
||
|
||
impl AuthenticatedAdmin {
|
||
pub fn new(session: AdminSessionPayload) -> Self {
|
||
Self { session }
|
||
}
|
||
|
||
pub fn session(&self) -> &AdminSessionPayload {
|
||
&self.session
|
||
}
|
||
}
|
||
|
||
pub async fn admin_login(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Json(payload): Json<AdminLoginRequest>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let runtime = state.admin_runtime().ok_or_else(|| {
|
||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_message("后台管理未启用")
|
||
})?;
|
||
|
||
let expected_username = runtime.username().trim();
|
||
let expected_password = runtime.password().trim();
|
||
let submitted_username = payload.username.trim();
|
||
let submitted_password = payload.password.trim();
|
||
if expected_username.is_empty() || expected_password.is_empty() {
|
||
return Err(
|
||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_message("后台管理未启用")
|
||
);
|
||
}
|
||
|
||
if submitted_username != expected_username || submitted_password != expected_password {
|
||
return Err(
|
||
AppError::from_status(StatusCode::UNAUTHORIZED).with_message("管理员用户名或密码错误")
|
||
);
|
||
}
|
||
|
||
let now = OffsetDateTime::now_utc();
|
||
let claims = runtime.build_claims(now).map_err(|error| {
|
||
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_message(error)
|
||
})?;
|
||
let token = runtime.sign_token(&claims).map_err(|error| {
|
||
AppError::from_status(StatusCode::INTERNAL_SERVER_ERROR).with_message(error)
|
||
})?;
|
||
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
AdminLoginResponse {
|
||
token,
|
||
admin: build_admin_session_payload(runtime.build_session(&claims)),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn admin_me(
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(admin): Extension<AuthenticatedAdmin>,
|
||
) -> Json<Value> {
|
||
json_success_body(
|
||
Some(&request_context),
|
||
AdminMeResponse {
|
||
admin: admin.session().clone(),
|
||
},
|
||
)
|
||
}
|
||
|
||
pub async fn admin_overview(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let runtime = state.admin_runtime().ok_or_else(|| {
|
||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_message("后台管理未启用")
|
||
})?;
|
||
|
||
let overview = build_admin_overview(&state, runtime).await?;
|
||
Ok(json_success_body(Some(&request_context), overview))
|
||
}
|
||
|
||
pub async fn admin_debug_http(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
Json(payload): Json<AdminDebugHttpRequest>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let response = execute_admin_debug_http(&state, payload).await?;
|
||
Ok(json_success_body(Some(&request_context), response))
|
||
}
|
||
|
||
pub async fn admin_list_tracking_events(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
Query(query): Query<AdminTrackingEventListQuery>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let entries = fetch_admin_tracking_events(&state, query).await?;
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
AdminTrackingEventListResponse { entries },
|
||
))
|
||
}
|
||
|
||
pub async fn admin_list_database_tables(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let response = fetch_admin_database_table_list(&state).await?;
|
||
Ok(json_success_body(Some(&request_context), response))
|
||
}
|
||
|
||
pub async fn admin_list_database_table_rows(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
axum::extract::Path(table_name): axum::extract::Path<String>,
|
||
Query(query): Query<AdminDatabaseTableRowsQuery>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let response = fetch_admin_database_table_rows(&state, &table_name, query).await?;
|
||
Ok(json_success_body(Some(&request_context), response))
|
||
}
|
||
|
||
pub async fn admin_get_creation_entry_config(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let config = state
|
||
.get_creation_entry_config()
|
||
.await
|
||
.map_err(map_admin_spacetime_error)?;
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
AdminCreationEntryConfigResponse {
|
||
entries: config
|
||
.creation_types
|
||
.into_iter()
|
||
.map(map_admin_creation_entry_type_config)
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
pub async fn admin_upsert_creation_entry_config(
|
||
State(state): State<AppState>,
|
||
Extension(request_context): Extension<RequestContext>,
|
||
Extension(_admin): Extension<AuthenticatedAdmin>,
|
||
Json(payload): Json<AdminUpsertCreationEntryTypeConfigRequest>,
|
||
) -> Result<Json<Value>, AppError> {
|
||
let entry = validate_admin_creation_entry_config(payload)?;
|
||
let config = state
|
||
.upsert_creation_entry_type_config(entry)
|
||
.await
|
||
.map_err(map_admin_spacetime_error)?;
|
||
Ok(json_success_body(
|
||
Some(&request_context),
|
||
AdminCreationEntryConfigResponse {
|
||
entries: config
|
||
.creation_types
|
||
.into_iter()
|
||
.map(map_admin_creation_entry_type_config)
|
||
.collect(),
|
||
},
|
||
))
|
||
}
|
||
|
||
fn map_admin_creation_entry_type_config(
|
||
entry: shared_contracts::creation_entry_config::CreationEntryTypeResponse,
|
||
) -> AdminCreationEntryTypeConfigPayload {
|
||
AdminCreationEntryTypeConfigPayload {
|
||
id: entry.id,
|
||
title: entry.title,
|
||
subtitle: entry.subtitle,
|
||
badge: entry.badge,
|
||
image_src: entry.image_src,
|
||
visible: entry.visible,
|
||
open: entry.open,
|
||
sort_order: entry.sort_order,
|
||
updated_at_micros: entry.updated_at_micros,
|
||
}
|
||
}
|
||
|
||
fn validate_admin_creation_entry_config(
|
||
payload: AdminUpsertCreationEntryTypeConfigRequest,
|
||
) -> Result<module_runtime::CreationEntryTypeAdminUpsertInput, AppError> {
|
||
let id = payload.id.trim().to_string();
|
||
if id.is_empty() {
|
||
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("入口 ID 不能为空"));
|
||
}
|
||
let title = payload.title.trim().to_string();
|
||
if title.is_empty() {
|
||
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("入口标题不能为空"));
|
||
}
|
||
Ok(module_runtime::CreationEntryTypeAdminUpsertInput {
|
||
id,
|
||
title,
|
||
subtitle: payload.subtitle.trim().to_string(),
|
||
badge: payload.badge.trim().to_string(),
|
||
image_src: payload.image_src.trim().to_string(),
|
||
visible: payload.visible,
|
||
open: payload.open,
|
||
sort_order: payload.sort_order,
|
||
})
|
||
}
|
||
|
||
fn map_admin_spacetime_error(error: spacetime_client::SpacetimeClientError) -> AppError {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY).with_details(serde_json::json!({
|
||
"provider": "spacetimedb",
|
||
"message": error.to_string(),
|
||
}))
|
||
}
|
||
|
||
pub async fn require_admin_auth(
|
||
State(state): State<AppState>,
|
||
mut request: Request,
|
||
next: Next,
|
||
) -> Result<Response, AppError> {
|
||
// 后台鉴权必须同时满足:令牌验签通过、主体匹配当前管理员、roles 含 admin。
|
||
let runtime = state.admin_runtime().ok_or_else(|| {
|
||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE).with_message("后台管理未启用")
|
||
})?;
|
||
let bearer_token = extract_bearer_token(request.headers())?;
|
||
let claims = runtime
|
||
.verify_token(&bearer_token)
|
||
.map_err(|error| AppError::from_status(StatusCode::UNAUTHORIZED).with_message(error))?;
|
||
|
||
let admin_session = runtime
|
||
.validate_claims(&claims)
|
||
.map_err(|error| AppError::from_status(StatusCode::FORBIDDEN).with_message(error))?;
|
||
|
||
request
|
||
.extensions_mut()
|
||
.insert(AuthenticatedAdmin::new(build_admin_session_payload(
|
||
admin_session,
|
||
)));
|
||
Ok(next.run(request).await)
|
||
}
|
||
|
||
fn extract_bearer_token(headers: &HeaderMap) -> Result<String, AppError> {
|
||
let authorization = headers
|
||
.get(AUTHORIZATION)
|
||
.and_then(|value| value.to_str().ok())
|
||
.map(str::trim)
|
||
.ok_or_else(|| AppError::from_status(StatusCode::UNAUTHORIZED))?;
|
||
|
||
let token = authorization
|
||
.strip_prefix("Bearer ")
|
||
.or_else(|| authorization.strip_prefix("bearer "))
|
||
.map(str::trim)
|
||
.filter(|token| !token.is_empty())
|
||
.ok_or_else(|| AppError::from_status(StatusCode::UNAUTHORIZED))?;
|
||
|
||
Ok(token.to_string())
|
||
}
|
||
|
||
async fn build_admin_overview(
|
||
state: &AppState,
|
||
runtime: &AdminRuntime,
|
||
) -> Result<AdminOverviewResponse, AppError> {
|
||
let service = AdminServiceOverviewPayload {
|
||
bind_host: state.config.bind_host.clone(),
|
||
bind_port: state.config.bind_port,
|
||
jwt_issuer: state.config.jwt_issuer.clone(),
|
||
admin_enabled: runtime.is_enabled(),
|
||
spacetime_server_url: state.config.spacetime_server_url.clone(),
|
||
spacetime_database: state.config.spacetime_database.clone(),
|
||
};
|
||
let database = fetch_database_overview(state).await;
|
||
|
||
Ok(AdminOverviewResponse { service, database })
|
||
}
|
||
|
||
async fn fetch_database_overview(state: &AppState) -> AdminDatabaseOverviewPayload {
|
||
// 概览直接读取 SpacetimeDB HTTP API,保证后台看到的是真实数据库元信息而不是本地缓存。
|
||
let client = Client::new();
|
||
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
|
||
let database = state.config.spacetime_database.trim();
|
||
let token = state
|
||
.config
|
||
.spacetime_token
|
||
.as_deref()
|
||
.map(str::trim)
|
||
.filter(|value| !value.is_empty());
|
||
let mut fetch_errors = Vec::new();
|
||
|
||
let database_info = fetch_spacetime_json::<SpacetimeDatabaseInfoResponse>(
|
||
&client,
|
||
&format!("{server_root}/v1/database/{database}"),
|
||
token,
|
||
)
|
||
.await
|
||
.map_err(|error| fetch_errors.push(format!("数据库信息读取失败:{error}")))
|
||
.ok()
|
||
.flatten();
|
||
|
||
let schema = fetch_spacetime_json::<SpacetimeSchemaResponse>(
|
||
&client,
|
||
&build_spacetime_schema_url(server_root, database),
|
||
token,
|
||
)
|
||
.await
|
||
.map_err(|error| fetch_errors.push(format!("数据库 schema 读取失败:{error}")))
|
||
.ok()
|
||
.flatten();
|
||
|
||
let schema_table_names = extract_schema_table_names(schema.as_ref());
|
||
|
||
let mut table_stats = Vec::new();
|
||
for table_name in &schema_table_names {
|
||
if !is_safe_spacetime_table_name(table_name) {
|
||
table_stats.push(AdminDatabaseTableStatPayload {
|
||
table_name: table_name.clone(),
|
||
row_count: None,
|
||
error_message: Some("表名不适合 SQL 统计".to_string()),
|
||
});
|
||
continue;
|
||
}
|
||
|
||
let sql = format!("SELECT COUNT(*) AS row_count FROM {table_name}");
|
||
match fetch_spacetime_sql_count(&client, server_root, database, token, &sql).await {
|
||
Ok(row_count) => table_stats.push(AdminDatabaseTableStatPayload {
|
||
table_name: table_name.clone(),
|
||
row_count: Some(row_count),
|
||
error_message: None,
|
||
}),
|
||
Err(error) => {
|
||
table_stats.push(AdminDatabaseTableStatPayload {
|
||
table_name: table_name.clone(),
|
||
row_count: None,
|
||
error_message: Some(normalize_table_count_error(&error)),
|
||
});
|
||
}
|
||
}
|
||
}
|
||
|
||
AdminDatabaseOverviewPayload {
|
||
database_identity: database_info
|
||
.as_ref()
|
||
.and_then(|value| value.database_identity.clone()),
|
||
owner_identity: database_info
|
||
.as_ref()
|
||
.and_then(|value| value.owner_identity.clone()),
|
||
host_type: database_info
|
||
.as_ref()
|
||
.and_then(|value| value.host_type.clone()),
|
||
schema_table_names,
|
||
table_stats,
|
||
fetch_errors,
|
||
}
|
||
}
|
||
|
||
fn build_spacetime_schema_url(server_root: &str, database: &str) -> String {
|
||
format!("{server_root}/v1/database/{database}/schema?{SPACETIME_SCHEMA_VERSION_QUERY}")
|
||
}
|
||
|
||
// 表名来自 schema,但进入 SQL 前仍做最小标识符校验,避免未来 schema 来源变化时扩大风险面。
|
||
fn is_safe_spacetime_table_name(table_name: &str) -> bool {
|
||
let mut chars = table_name.chars();
|
||
let Some(first) = chars.next() else {
|
||
return false;
|
||
};
|
||
if !(first == '_' || first.is_ascii_alphabetic()) {
|
||
return false;
|
||
}
|
||
chars.all(|ch| ch == '_' || ch.is_ascii_alphanumeric())
|
||
}
|
||
|
||
// private 表在 SpacetimeDB SQL 下会表现为不可见,后台只展示可理解状态,不暴露整段 HTTP 噪音。
|
||
fn normalize_table_count_error(error: &str) -> String {
|
||
let normalized = error.to_ascii_lowercase();
|
||
if normalized.contains("marked private") || normalized.contains("no such table") {
|
||
return "不可统计(private 或当前身份不可见)".to_string();
|
||
}
|
||
error.to_string()
|
||
}
|
||
|
||
async fn fetch_spacetime_json<T>(
|
||
client: &Client,
|
||
url: &str,
|
||
token: Option<&str>,
|
||
) -> Result<Option<T>, String>
|
||
where
|
||
T: for<'de> Deserialize<'de>,
|
||
{
|
||
let mut request = client.get(url);
|
||
if let Some(token) = token {
|
||
request = request.bearer_auth(token);
|
||
}
|
||
|
||
let response = request
|
||
.send()
|
||
.await
|
||
.map_err(|error| format!("请求失败:{error}"))?;
|
||
if !response.status().is_success() {
|
||
let status = response.status();
|
||
let body = response.text().await.unwrap_or_default();
|
||
return Err(format!("HTTP {}:{}", status.as_u16(), trim_preview(&body)));
|
||
}
|
||
|
||
response
|
||
.json::<T>()
|
||
.await
|
||
.map(Some)
|
||
.map_err(|error| format!("响应解析失败:{error}"))
|
||
}
|
||
|
||
async fn fetch_spacetime_sql_count(
|
||
client: &Client,
|
||
server_root: &str,
|
||
database: &str,
|
||
token: Option<&str>,
|
||
sql: &str,
|
||
) -> Result<u64, String> {
|
||
let mut request = client
|
||
.post(format!("{server_root}/v1/database/{database}/sql"))
|
||
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
|
||
.body(sql.to_string());
|
||
if let Some(token) = token {
|
||
request = request.bearer_auth(token);
|
||
}
|
||
|
||
let response = request
|
||
.send()
|
||
.await
|
||
.map_err(|error| format!("SQL 请求失败:{error}"))?;
|
||
if !response.status().is_success() {
|
||
let status = response.status();
|
||
let body = response.text().await.unwrap_or_default();
|
||
return Err(format!("HTTP {}:{}", status.as_u16(), trim_preview(&body)));
|
||
}
|
||
|
||
let payload = response
|
||
.json::<Value>()
|
||
.await
|
||
.map_err(|error| format!("SQL 响应解析失败:{error}"))?;
|
||
parse_spacetime_sql_count_response(payload)
|
||
}
|
||
|
||
fn parse_spacetime_sql_count_response(payload: Value) -> Result<u64, String> {
|
||
match payload {
|
||
// SpacetimeDB 2.x /sql 返回 statement result 数组,每个 result 内含 schema 与 rows。
|
||
Value::Array(statements) => {
|
||
let statement = statements
|
||
.into_iter()
|
||
.next()
|
||
.ok_or_else(|| "SQL 结果为空".to_string())?;
|
||
extract_sql_count_from_statement(statement)
|
||
}
|
||
// 保留兼容旧对象形状,便于本地/远端 API 小版本差异时仍能读取计数。
|
||
Value::Object(statement) => extract_sql_count_from_statement(Value::Object(statement)),
|
||
_ => Err("SQL 响应格式非法".to_string()),
|
||
}
|
||
}
|
||
|
||
fn extract_sql_count_from_statement(statement: Value) -> Result<u64, String> {
|
||
let Value::Object(mut statement) = statement else {
|
||
return Err("SQL statement 结果格式非法".to_string());
|
||
};
|
||
|
||
let schema = statement.remove("schema");
|
||
let rows = statement
|
||
.remove("rows")
|
||
.ok_or_else(|| "SQL 响应缺少 rows 字段".to_string())?;
|
||
extract_sql_count_from_rows(rows, schema.as_ref())
|
||
}
|
||
|
||
fn extract_sql_count_from_rows(rows: Value, schema: Option<&Value>) -> Result<u64, String> {
|
||
let Value::Array(rows) = rows else {
|
||
return Err("SQL rows 字段格式非法".to_string());
|
||
};
|
||
let row = rows.first().ok_or_else(|| "SQL 结果为空".to_string())?;
|
||
extract_sql_count_from_row(row, schema)
|
||
}
|
||
|
||
fn extract_sql_count_from_row(row: &Value, schema: Option<&Value>) -> Result<u64, String> {
|
||
match row {
|
||
Value::Object(columns) => extract_sql_count(columns),
|
||
Value::Array(values) => {
|
||
let count_index = schema.and_then(find_sql_count_column_index).unwrap_or(0);
|
||
values
|
||
.get(count_index)
|
||
.ok_or_else(|| "SQL 结果缺少 count 字段".to_string())
|
||
.and_then(parse_count_value)
|
||
}
|
||
value => parse_count_value(value),
|
||
}
|
||
}
|
||
|
||
fn extract_sql_count(columns: &serde_json::Map<String, Value>) -> Result<u64, String> {
|
||
for key in ["row_count", "count", "COUNT(*)"] {
|
||
if let Some(value) = columns.get(key) {
|
||
return parse_count_value(value);
|
||
}
|
||
}
|
||
columns
|
||
.values()
|
||
.next()
|
||
.ok_or_else(|| "SQL 结果缺少 count 字段".to_string())
|
||
.and_then(parse_count_value)
|
||
}
|
||
|
||
fn find_sql_count_column_index(schema: &Value) -> Option<usize> {
|
||
let elements = schema.get("elements")?.as_array()?;
|
||
elements.iter().position(|element| {
|
||
element
|
||
.get("name")
|
||
.and_then(extract_sql_schema_name)
|
||
.map(|name| matches!(name, "row_count" | "count" | "COUNT(*)"))
|
||
.unwrap_or(false)
|
||
})
|
||
}
|
||
|
||
fn extract_sql_schema_name(value: &Value) -> Option<&str> {
|
||
match value {
|
||
Value::String(text) => Some(text.as_str()),
|
||
Value::Object(object) => object.get("some").and_then(Value::as_str),
|
||
_ => None,
|
||
}
|
||
}
|
||
|
||
fn parse_count_value(value: &Value) -> Result<u64, String> {
|
||
match value {
|
||
Value::Number(number) => number
|
||
.as_u64()
|
||
.ok_or_else(|| "count 字段不是无符号整数".to_string()),
|
||
Value::String(text) => text
|
||
.trim()
|
||
.parse::<u64>()
|
||
.map_err(|error| format!("count 字段解析失败:{error}")),
|
||
_ => Err("count 字段类型非法".to_string()),
|
||
}
|
||
}
|
||
|
||
async fn fetch_admin_database_table_list(
|
||
state: &AppState,
|
||
) -> Result<AdminDatabaseTableListResponse, AppError> {
|
||
let (_, tables, fetch_errors) = fetch_admin_database_schema_tables(state).await;
|
||
Ok(AdminDatabaseTableListResponse {
|
||
tables,
|
||
fetch_errors,
|
||
})
|
||
}
|
||
|
||
async fn fetch_admin_database_table_rows(
|
||
state: &AppState,
|
||
table_name: &str,
|
||
query: AdminDatabaseTableRowsQuery,
|
||
) -> Result<AdminDatabaseTableRowsResponse, AppError> {
|
||
let table_name = table_name.trim();
|
||
if !is_safe_spacetime_table_name(table_name) {
|
||
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("表名不合法"));
|
||
}
|
||
|
||
let (_, tables, _) = fetch_admin_database_schema_tables(state).await;
|
||
if !tables.iter().any(|name| name == table_name) {
|
||
return Err(AppError::from_status(StatusCode::NOT_FOUND).with_message("表不存在"));
|
||
}
|
||
|
||
let client = Client::new();
|
||
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
|
||
let database = state.config.spacetime_database.trim();
|
||
let token = resolve_admin_spacetime_sql_token(state);
|
||
let limit = clamp_admin_database_table_limit(query.limit);
|
||
let sql = format!("SELECT * FROM {table_name} LIMIT {limit}");
|
||
let payload = fetch_spacetime_sql_json(&client, server_root, database, token.as_deref(), &sql)
|
||
.await
|
||
.map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY).with_message(format!(
|
||
"表数据读取失败:{}",
|
||
normalize_table_count_error(&error)
|
||
))
|
||
})?;
|
||
let mut response = parse_admin_database_table_rows_sql_response(table_name, limit, payload)
|
||
.map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||
.with_message(format!("表数据解析失败:{error}"))
|
||
})?;
|
||
apply_admin_database_table_filters(&mut response.rows, &query)?;
|
||
response.total_returned = response.rows.len();
|
||
Ok(response)
|
||
}
|
||
|
||
async fn fetch_admin_database_schema_tables(
|
||
state: &AppState,
|
||
) -> (Option<SpacetimeSchemaResponse>, Vec<String>, Vec<String>) {
|
||
let client = Client::new();
|
||
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
|
||
let database = state.config.spacetime_database.trim();
|
||
let token = resolve_admin_spacetime_sql_token(state);
|
||
let mut fetch_errors = Vec::new();
|
||
let schema = fetch_spacetime_json::<SpacetimeSchemaResponse>(
|
||
&client,
|
||
&build_spacetime_schema_url(server_root, database),
|
||
token.as_deref(),
|
||
)
|
||
.await
|
||
.map_err(|error| fetch_errors.push(format!("数据库 schema 读取失败:{error}")))
|
||
.ok()
|
||
.flatten();
|
||
let tables = extract_schema_table_names(schema.as_ref());
|
||
(schema, tables, fetch_errors)
|
||
}
|
||
|
||
fn extract_schema_table_names(schema: Option<&SpacetimeSchemaResponse>) -> Vec<String> {
|
||
schema
|
||
.and_then(|value| value.tables.as_ref())
|
||
.map(|tables| {
|
||
tables
|
||
.iter()
|
||
.filter_map(|table| table.name.as_deref())
|
||
.map(str::trim)
|
||
.filter(|name| !name.is_empty())
|
||
.map(ToOwned::to_owned)
|
||
.collect::<BTreeSet<_>>()
|
||
.into_iter()
|
||
.collect::<Vec<_>>()
|
||
})
|
||
.unwrap_or_default()
|
||
}
|
||
|
||
fn resolve_admin_spacetime_sql_token(state: &AppState) -> Option<String> {
|
||
state
|
||
.config
|
||
.spacetime_token
|
||
.as_deref()
|
||
.map(str::trim)
|
||
.filter(|value| !value.is_empty())
|
||
.map(str::to_string)
|
||
.or_else(load_local_spacetime_cli_token)
|
||
}
|
||
|
||
fn clamp_admin_database_table_limit(limit: Option<u32>) -> u32 {
|
||
limit
|
||
.unwrap_or(ADMIN_DATABASE_TABLE_DEFAULT_LIMIT)
|
||
.clamp(1, ADMIN_DATABASE_TABLE_MAX_LIMIT)
|
||
}
|
||
|
||
fn parse_admin_database_table_rows_sql_response(
|
||
table_name: &str,
|
||
limit: u32,
|
||
payload: Value,
|
||
) -> Result<AdminDatabaseTableRowsResponse, String> {
|
||
let statement = extract_first_sql_statement(payload)?;
|
||
let columns = extract_sql_statement_columns(&statement);
|
||
let rows_value = statement
|
||
.get("rows")
|
||
.ok_or_else(|| "SQL 响应缺少 rows 字段".to_string())?;
|
||
let row_values = rows_value
|
||
.as_array()
|
||
.ok_or_else(|| "SQL rows 字段格式非法".to_string())?;
|
||
let rows = row_values
|
||
.iter()
|
||
.map(|row| build_admin_database_table_row_for_table(table_name, row, &columns))
|
||
.collect::<Vec<_>>();
|
||
Ok(AdminDatabaseTableRowsResponse {
|
||
table_name: table_name.to_string(),
|
||
columns,
|
||
total_returned: rows.len(),
|
||
rows,
|
||
limit,
|
||
})
|
||
}
|
||
|
||
fn extract_first_sql_statement(payload: Value) -> Result<Value, String> {
|
||
match payload {
|
||
Value::Array(statements) => statements
|
||
.into_iter()
|
||
.next()
|
||
.ok_or_else(|| "SQL 结果为空".to_string()),
|
||
Value::Object(statement) => Ok(Value::Object(statement)),
|
||
_ => Err("SQL 响应格式非法".to_string()),
|
||
}
|
||
}
|
||
|
||
fn extract_sql_statement_columns(statement: &Value) -> Vec<String> {
|
||
statement
|
||
.get("schema")
|
||
.and_then(|schema| schema.get("elements"))
|
||
.and_then(Value::as_array)
|
||
.map(|elements| {
|
||
elements
|
||
.iter()
|
||
.enumerate()
|
||
.map(|(index, element)| {
|
||
element
|
||
.get("name")
|
||
.and_then(extract_sql_schema_name)
|
||
.map(ToOwned::to_owned)
|
||
.unwrap_or_else(|| format!("col_{}", index + 1))
|
||
})
|
||
.collect::<Vec<_>>()
|
||
})
|
||
.unwrap_or_default()
|
||
}
|
||
|
||
fn build_admin_database_table_row(row: &Value, columns: &[String]) -> AdminDatabaseTableRowPayload {
|
||
build_admin_database_table_row_for_table("", row, columns)
|
||
}
|
||
|
||
fn build_admin_database_table_row_for_table(
|
||
table_name: &str,
|
||
row: &Value,
|
||
columns: &[String],
|
||
) -> AdminDatabaseTableRowPayload {
|
||
let raw = normalize_admin_database_table_row_raw(table_name, row, columns);
|
||
let mut cells = Map::new();
|
||
if let Some(values) = row.as_array() {
|
||
for (index, value) in values.iter().enumerate() {
|
||
let key = columns
|
||
.get(index)
|
||
.cloned()
|
||
.unwrap_or_else(|| format!("col_{}", index + 1));
|
||
cells.insert(
|
||
key.clone(),
|
||
normalize_admin_database_table_cell(table_name, &key, value),
|
||
);
|
||
}
|
||
} else if let Some(object) = row.as_object() {
|
||
for (key, value) in object {
|
||
cells.insert(
|
||
key.clone(),
|
||
normalize_admin_database_table_cell(table_name, key, value),
|
||
);
|
||
}
|
||
}
|
||
AdminDatabaseTableRowPayload {
|
||
cells: Value::Object(cells),
|
||
raw,
|
||
}
|
||
}
|
||
|
||
fn normalize_admin_database_table_row_raw(
|
||
table_name: &str,
|
||
row: &Value,
|
||
columns: &[String],
|
||
) -> Value {
|
||
if let Some(values) = row.as_array() {
|
||
return Value::Array(
|
||
values
|
||
.iter()
|
||
.enumerate()
|
||
.map(|(index, value)| {
|
||
let key = columns.get(index).map(String::as_str).unwrap_or_default();
|
||
normalize_admin_database_table_cell(table_name, key, value)
|
||
})
|
||
.collect(),
|
||
);
|
||
}
|
||
|
||
if let Some(object) = row.as_object() {
|
||
return Value::Object(
|
||
object
|
||
.iter()
|
||
.map(|(key, value)| {
|
||
(
|
||
key.clone(),
|
||
normalize_admin_database_table_cell(table_name, key, value),
|
||
)
|
||
})
|
||
.collect(),
|
||
);
|
||
}
|
||
|
||
normalize_admin_database_value(row)
|
||
}
|
||
|
||
fn normalize_admin_database_table_cell(
|
||
table_name: &str,
|
||
column_name: &str,
|
||
value: &Value,
|
||
) -> Value {
|
||
if let Some(enum_value) = normalize_admin_database_known_enum(table_name, column_name, value) {
|
||
return enum_value;
|
||
}
|
||
normalize_admin_database_value(value)
|
||
}
|
||
|
||
fn normalize_admin_database_known_enum(
|
||
table_name: &str,
|
||
column_name: &str,
|
||
value: &Value,
|
||
) -> Option<Value> {
|
||
let variant_index = extract_sats_enum_variant_index(value)?;
|
||
let label = match (table_name, column_name) {
|
||
("profile_recharge_order", "kind") => match variant_index {
|
||
0 => "points",
|
||
1 => "membership",
|
||
_ => return None,
|
||
},
|
||
("profile_recharge_order", "status") => match variant_index {
|
||
0 => "pending",
|
||
1 => "paid",
|
||
2 => "failed",
|
||
3 => "closed",
|
||
4 => "refunded",
|
||
_ => return None,
|
||
},
|
||
_ => return None,
|
||
};
|
||
Some(Value::String(label.to_string()))
|
||
}
|
||
|
||
fn extract_sats_enum_variant_index(value: &Value) -> Option<u64> {
|
||
let items = value.as_array()?;
|
||
if items.len() != 2 {
|
||
return None;
|
||
}
|
||
items.first()?.as_u64()
|
||
}
|
||
|
||
fn normalize_admin_database_value(value: &Value) -> Value {
|
||
match value {
|
||
Value::Array(items) if items.len() == 1 => normalize_admin_database_value(&items[0]),
|
||
Value::Array(items) if items.len() == 2 => {
|
||
if let Some(index) = items.first().and_then(Value::as_u64) {
|
||
if index == 0 {
|
||
return items
|
||
.get(1)
|
||
.map(normalize_admin_database_value)
|
||
.unwrap_or(Value::Null);
|
||
}
|
||
if index == 1 && items.get(1).and_then(Value::as_array).is_some() {
|
||
return Value::Null;
|
||
}
|
||
}
|
||
Value::Array(items.iter().map(normalize_admin_database_value).collect())
|
||
}
|
||
Value::Array(items) => {
|
||
Value::Array(items.iter().map(normalize_admin_database_value).collect())
|
||
}
|
||
Value::Object(object) => {
|
||
if let Some(value) = object.get("some") {
|
||
return normalize_admin_database_value(value);
|
||
}
|
||
Value::Object(
|
||
object
|
||
.iter()
|
||
.map(|(key, value)| (key.clone(), normalize_admin_database_value(value)))
|
||
.collect(),
|
||
)
|
||
}
|
||
_ => value.clone(),
|
||
}
|
||
}
|
||
|
||
fn apply_admin_database_table_filters(
|
||
rows: &mut Vec<AdminDatabaseTableRowPayload>,
|
||
query: &AdminDatabaseTableRowsQuery,
|
||
) -> Result<(), AppError> {
|
||
if let Some(search) = normalized_non_empty(query.search.as_deref()) {
|
||
let needle = search.to_ascii_lowercase();
|
||
rows.retain(|row| row.cells.to_string().to_ascii_lowercase().contains(&needle));
|
||
}
|
||
|
||
if let Some(filters) = normalized_non_empty(query.filters.as_deref()) {
|
||
let parsed = serde_json::from_str::<Value>(filters).map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_REQUEST)
|
||
.with_message(format!("筛选 JSON 解析失败:{error}"))
|
||
})?;
|
||
let object = parsed.as_object().ok_or_else(|| {
|
||
AppError::from_status(StatusCode::BAD_REQUEST)
|
||
.with_message("筛选条件必须是 JSON object")
|
||
})?;
|
||
rows.retain(|row| row_matches_admin_database_filters(row, object));
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
fn row_matches_admin_database_filters(
|
||
row: &AdminDatabaseTableRowPayload,
|
||
filters: &Map<String, Value>,
|
||
) -> bool {
|
||
let Some(cells) = row.cells.as_object() else {
|
||
return filters.is_empty();
|
||
};
|
||
filters.iter().all(|(key, expected)| {
|
||
cells
|
||
.get(key)
|
||
.map(|actual| admin_database_filter_value_matches(actual, expected))
|
||
.unwrap_or(false)
|
||
})
|
||
}
|
||
|
||
fn admin_database_filter_value_matches(actual: &Value, expected: &Value) -> bool {
|
||
if actual == expected {
|
||
return true;
|
||
}
|
||
if let Some(expected_text) = expected.as_str() {
|
||
return value_to_string(actual)
|
||
.map(|actual_text| actual_text == expected_text)
|
||
.unwrap_or(false);
|
||
}
|
||
false
|
||
}
|
||
|
||
async fn fetch_admin_tracking_events(
|
||
state: &AppState,
|
||
query: AdminTrackingEventListQuery,
|
||
) -> Result<Vec<AdminTrackingEventEntryPayload>, AppError> {
|
||
let client = Client::new();
|
||
let server_root = state.config.spacetime_server_url.trim_end_matches('/');
|
||
let database = state.config.spacetime_database.trim();
|
||
let token = state
|
||
.config
|
||
.spacetime_token
|
||
.as_deref()
|
||
.map(str::trim)
|
||
.filter(|value| !value.is_empty())
|
||
.map(str::to_string)
|
||
.or_else(load_local_spacetime_cli_token);
|
||
let sql = build_admin_tracking_events_sql(&query)
|
||
.map_err(|error| AppError::from_status(StatusCode::BAD_REQUEST).with_message(error))?;
|
||
|
||
let payload = fetch_spacetime_sql_json(&client, server_root, database, token.as_deref(), &sql)
|
||
.await
|
||
.map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||
.with_message(format!("埋点数据读取失败:{error}"))
|
||
})?;
|
||
parse_admin_tracking_events_sql_response(payload).map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||
.with_message(format!("埋点数据解析失败:{error}"))
|
||
})
|
||
}
|
||
|
||
fn build_admin_tracking_events_sql(query: &AdminTrackingEventListQuery) -> Result<String, String> {
|
||
let mut conditions = Vec::new();
|
||
if let Some(value) = normalized_non_empty(query.event_key.as_deref()) {
|
||
conditions.push(format!("event_key = {}", quote_sql_string(value)));
|
||
}
|
||
if let Some(value) = normalized_non_empty(query.user_id.as_deref()) {
|
||
conditions.push(format!("user_id = {}", quote_sql_string(value)));
|
||
}
|
||
if let Some(value) = normalized_non_empty(query.scope_kind.as_deref()) {
|
||
let scope_kind = normalize_admin_tracking_scope_kind(value)?;
|
||
conditions.push(format!("scope_kind = {}", quote_sql_string(scope_kind)));
|
||
}
|
||
if let Some(value) = normalized_non_empty(query.scope_id.as_deref()) {
|
||
conditions.push(format!("scope_id = {}", quote_sql_string(value)));
|
||
}
|
||
|
||
let where_clause = if conditions.is_empty() {
|
||
String::new()
|
||
} else {
|
||
format!(" WHERE {}", conditions.join(" AND "))
|
||
};
|
||
let limit = clamp_admin_tracking_event_limit(query.limit);
|
||
Ok(format!(
|
||
"SELECT event_id, event_key, scope_kind, scope_id, day_key, user_id, owner_user_id, profile_id, module_key, metadata_json, occurred_at FROM tracking_event{where_clause} LIMIT {limit}"
|
||
))
|
||
}
|
||
|
||
fn normalized_non_empty(value: Option<&str>) -> Option<&str> {
|
||
value.map(str::trim).filter(|value| !value.is_empty())
|
||
}
|
||
|
||
fn load_local_spacetime_cli_token() -> Option<String> {
|
||
// 本地开发清库后会通过 `/v1/identity` 重新登录 CLI;这里复用 CLI token,确保 SQL 可读取 private 表。
|
||
let content = fs::read_to_string(".spacetimedb/local/config/cli.toml")
|
||
.or_else(|_| fs::read_to_string("server-rs/.spacetimedb/local/config/cli.toml"))
|
||
.ok()?;
|
||
content.lines().find_map(|line| {
|
||
let value = line.trim().strip_prefix("spacetimedb_token = ")?;
|
||
Some(value.trim().trim_matches('"').to_string()).filter(|token| !token.is_empty())
|
||
})
|
||
}
|
||
|
||
fn quote_sql_string(value: &str) -> String {
|
||
format!("'{}'", value.replace('\'', "''"))
|
||
}
|
||
|
||
fn normalize_admin_tracking_scope_kind(value: &str) -> Result<&'static str, String> {
|
||
match value.trim().to_ascii_lowercase().as_str() {
|
||
"site" => Ok("site"),
|
||
"work" => Ok("work"),
|
||
"module" => Ok("module"),
|
||
"user" => Ok("user"),
|
||
_ => Err("scopeKind 必须是 site/work/module/user".to_string()),
|
||
}
|
||
}
|
||
|
||
fn clamp_admin_tracking_event_limit(limit: Option<u32>) -> u32 {
|
||
limit
|
||
.unwrap_or(ADMIN_TRACKING_EVENT_DEFAULT_LIMIT)
|
||
.clamp(1, ADMIN_TRACKING_EVENT_MAX_LIMIT)
|
||
}
|
||
|
||
async fn fetch_spacetime_sql_json(
|
||
client: &Client,
|
||
server_root: &str,
|
||
database: &str,
|
||
token: Option<&str>,
|
||
sql: &str,
|
||
) -> Result<Value, String> {
|
||
let mut request = client
|
||
.post(format!("{server_root}/v1/database/{database}/sql"))
|
||
.header(CONTENT_TYPE, "text/plain; charset=utf-8")
|
||
.body(sql.to_string());
|
||
if let Some(token) = token {
|
||
request = request.bearer_auth(token);
|
||
}
|
||
|
||
let response = request
|
||
.send()
|
||
.await
|
||
.map_err(|error| format!("SQL 请求失败:{error}"))?;
|
||
if !response.status().is_success() {
|
||
let status = response.status();
|
||
let body = response.text().await.unwrap_or_default();
|
||
return Err(format!("HTTP {}:{}", status.as_u16(), trim_preview(&body)));
|
||
}
|
||
|
||
response
|
||
.json::<Value>()
|
||
.await
|
||
.map_err(|error| format!("SQL 响应解析失败:{error}"))
|
||
}
|
||
|
||
fn parse_admin_tracking_events_sql_response(
|
||
payload: Value,
|
||
) -> Result<Vec<AdminTrackingEventEntryPayload>, String> {
|
||
let rows = extract_first_sql_rows(payload)?;
|
||
rows.iter()
|
||
.map(parse_admin_tracking_event_row)
|
||
.collect::<Result<Vec<_>, _>>()
|
||
.map(|mut entries| {
|
||
// SpacetimeDB 2.2 的 HTTP SQL 暂不支持 ORDER BY;后台在 API 层按发生时间倒序收口。
|
||
entries.sort_by(|left, right| right.occurred_at.cmp(&left.occurred_at));
|
||
entries
|
||
})
|
||
}
|
||
|
||
fn extract_first_sql_rows(payload: Value) -> Result<Vec<Value>, String> {
|
||
let statement = match payload {
|
||
Value::Array(statements) => statements
|
||
.into_iter()
|
||
.next()
|
||
.ok_or_else(|| "SQL 结果为空".to_string())?,
|
||
Value::Object(statement) => Value::Object(statement),
|
||
_ => return Err("SQL 响应格式非法".to_string()),
|
||
};
|
||
let Value::Object(mut statement) = statement else {
|
||
return Err("SQL statement 结果格式非法".to_string());
|
||
};
|
||
let rows = statement
|
||
.remove("rows")
|
||
.ok_or_else(|| "SQL 响应缺少 rows 字段".to_string())?;
|
||
match rows {
|
||
Value::Array(rows) => Ok(rows),
|
||
_ => Err("SQL rows 字段格式非法".to_string()),
|
||
}
|
||
}
|
||
|
||
fn parse_admin_tracking_event_row(row: &Value) -> Result<AdminTrackingEventEntryPayload, String> {
|
||
let columns = row.as_array().ok_or_else(|| "埋点行格式非法".to_string())?;
|
||
let event_key = required_string_column(columns, 1, "event_key")?;
|
||
Ok(AdminTrackingEventEntryPayload {
|
||
event_id: required_string_column(columns, 0, "event_id")?,
|
||
event_title: admin_tracking_event_title(&event_key).to_string(),
|
||
event_key,
|
||
scope_kind: tracking_scope_kind_to_string(
|
||
columns
|
||
.get(2)
|
||
.ok_or_else(|| "埋点行缺少 scope_kind".to_string())?,
|
||
)
|
||
.ok_or_else(|| "埋点行 scope_kind 类型非法".to_string())?,
|
||
scope_id: required_string_column(columns, 3, "scope_id")?,
|
||
day_key: required_i64_column(columns, 4, "day_key")?,
|
||
user_id: optional_string_column(columns, 5),
|
||
owner_user_id: optional_string_column(columns, 6),
|
||
profile_id: optional_string_column(columns, 7),
|
||
module_key: optional_string_column(columns, 8),
|
||
metadata_json: required_string_column(columns, 9, "metadata_json")?,
|
||
occurred_at: timestamp_to_display_string(
|
||
columns
|
||
.get(10)
|
||
.ok_or_else(|| "埋点行缺少 occurred_at".to_string())?,
|
||
)
|
||
.ok_or_else(|| "埋点行 occurred_at 不是字符串".to_string())?,
|
||
})
|
||
}
|
||
|
||
fn required_string_column(
|
||
columns: &[Value],
|
||
index: usize,
|
||
field_name: &str,
|
||
) -> Result<String, String> {
|
||
value_to_string(
|
||
columns
|
||
.get(index)
|
||
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?,
|
||
)
|
||
.ok_or_else(|| format!("埋点行 {field_name} 不是字符串"))
|
||
}
|
||
|
||
fn optional_string_column(columns: &[Value], index: usize) -> Option<String> {
|
||
columns.get(index).and_then(value_to_string)
|
||
}
|
||
|
||
fn required_i64_column(columns: &[Value], index: usize, field_name: &str) -> Result<i64, String> {
|
||
let value = columns
|
||
.get(index)
|
||
.ok_or_else(|| format!("埋点行缺少 {field_name}"))?;
|
||
match value {
|
||
Value::Number(number) => number
|
||
.as_i64()
|
||
.ok_or_else(|| format!("埋点行 {field_name} 不是整数")),
|
||
Value::String(text) => text
|
||
.trim()
|
||
.parse::<i64>()
|
||
.map_err(|error| format!("埋点行 {field_name} 解析失败:{error}")),
|
||
_ => Err(format!("埋点行 {field_name} 类型非法")),
|
||
}
|
||
}
|
||
|
||
fn value_to_string(value: &Value) -> Option<String> {
|
||
match value {
|
||
Value::Null => None,
|
||
Value::String(text) => Some(text.clone()),
|
||
Value::Object(object) => object.get("some").and_then(value_to_string),
|
||
Value::Number(number) => Some(number.to_string()),
|
||
Value::Bool(value) => Some(value.to_string()),
|
||
Value::Array(items) => value_array_to_string(items),
|
||
}
|
||
}
|
||
|
||
fn value_array_to_string(items: &[Value]) -> Option<String> {
|
||
if items.len() == 2 {
|
||
if let Some(index) = items.first().and_then(Value::as_u64) {
|
||
if index == 0 {
|
||
return items.get(1).and_then(value_to_string);
|
||
}
|
||
if index == 1 && items.get(1).and_then(Value::as_array).is_some() {
|
||
return None;
|
||
}
|
||
}
|
||
}
|
||
Some(Value::Array(items.to_vec()).to_string())
|
||
}
|
||
|
||
fn tracking_scope_kind_to_string(value: &Value) -> Option<String> {
|
||
match value {
|
||
Value::String(text) => Some(text.clone()),
|
||
Value::Object(object) => object
|
||
.get("tag")
|
||
.or_else(|| object.get("variant"))
|
||
.or_else(|| object.get("name"))
|
||
.and_then(value_to_string),
|
||
Value::Array(items) => {
|
||
let index = items.first().and_then(Value::as_u64)?;
|
||
Some(
|
||
match index {
|
||
0 => "site",
|
||
1 => "work",
|
||
2 => "module",
|
||
3 => "user",
|
||
_ => return Some(Value::Array(items.to_vec()).to_string()),
|
||
}
|
||
.to_string(),
|
||
)
|
||
}
|
||
_ => value_to_string(value),
|
||
}
|
||
}
|
||
|
||
fn timestamp_to_display_string(value: &Value) -> Option<String> {
|
||
match value {
|
||
Value::Array(items) if items.len() == 1 => items.first().and_then(value_to_string),
|
||
_ => value_to_string(value),
|
||
}
|
||
}
|
||
|
||
fn admin_tracking_event_title(event_key: &str) -> &str {
|
||
match event_key {
|
||
"daily_login" => "每日登录",
|
||
_ => event_key,
|
||
}
|
||
}
|
||
|
||
async fn execute_admin_debug_http(
|
||
state: &AppState,
|
||
payload: AdminDebugHttpRequest,
|
||
) -> Result<AdminDebugHttpResponse, AppError> {
|
||
// 调试请求始终回打当前 api-server,同源受控,不允许作为外部代理使用。
|
||
let method = Method::from_bytes(payload.method.trim().as_bytes()).map_err(|_| {
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("HTTP 方法不合法")
|
||
})?;
|
||
let path = normalize_debug_path(&payload.path)?;
|
||
let base_url = build_debug_base_url(&state.config.bind_host, state.config.bind_port);
|
||
let target_url = format!("{base_url}{path}");
|
||
let body_text = payload.body.unwrap_or_default();
|
||
if body_text.len() > MAX_DEBUG_BODY_BYTES {
|
||
return Err(
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("调试请求体超过长度限制")
|
||
);
|
||
}
|
||
|
||
let client = Client::new();
|
||
let mut request = client.request(method, &target_url);
|
||
if !body_text.is_empty() {
|
||
request = request.body(body_text.clone());
|
||
}
|
||
|
||
for header in payload.headers.unwrap_or_default() {
|
||
let header_name = header.name.trim().to_ascii_lowercase();
|
||
if BLOCKED_DEBUG_HEADERS
|
||
.iter()
|
||
.any(|blocked| *blocked == header_name)
|
||
{
|
||
continue;
|
||
}
|
||
let name = HeaderName::from_bytes(header_name.as_bytes()).map_err(|_| {
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("调试请求头名称不合法")
|
||
})?;
|
||
let value = HeaderValue::from_str(header.value.trim()).map_err(|_| {
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("调试请求头值不合法")
|
||
})?;
|
||
request = request.header(name, value);
|
||
}
|
||
|
||
let response = request.send().await.map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||
.with_message(format!("调试请求失败:{error}"))
|
||
})?;
|
||
let status = response.status();
|
||
let headers = response
|
||
.headers()
|
||
.iter()
|
||
.map(|(name, value)| AdminDebugHeaderInput {
|
||
name: name.to_string(),
|
||
value: value.to_str().unwrap_or_default().to_string(),
|
||
})
|
||
.collect::<Vec<_>>();
|
||
let response_body = response.bytes().await.map_err(|error| {
|
||
AppError::from_status(StatusCode::BAD_GATEWAY)
|
||
.with_message(format!("调试响应读取失败:{error}"))
|
||
})?;
|
||
let body_preview = build_body_preview(&response_body);
|
||
let body_json = serde_json::from_slice::<Value>(&response_body).ok();
|
||
|
||
Ok(AdminDebugHttpResponse {
|
||
status: status.as_u16(),
|
||
status_text: status.canonical_reason().unwrap_or("Unknown").to_string(),
|
||
headers,
|
||
body_text: body_preview,
|
||
body_json,
|
||
})
|
||
}
|
||
|
||
fn build_debug_base_url(bind_host: &str, bind_port: u16) -> String {
|
||
let debug_host = resolve_debug_host(bind_host);
|
||
let authority_host = format_http_authority_host(&debug_host);
|
||
format!("http://{authority_host}:{bind_port}")
|
||
}
|
||
|
||
fn resolve_debug_host(bind_host: &str) -> String {
|
||
let trimmed = bind_host.trim();
|
||
if trimmed.is_empty() {
|
||
return Ipv4Addr::LOCALHOST.to_string();
|
||
}
|
||
|
||
match trimmed.parse::<IpAddr>() {
|
||
Ok(IpAddr::V4(ip)) if ip.is_unspecified() => Ipv4Addr::LOCALHOST.to_string(),
|
||
Ok(IpAddr::V6(ip)) if ip.is_unspecified() => Ipv6Addr::LOCALHOST.to_string(),
|
||
Ok(ip) => ip.to_string(),
|
||
Err(_) => trimmed.to_string(),
|
||
}
|
||
}
|
||
|
||
fn format_http_authority_host(host: &str) -> String {
|
||
if host.starts_with('[') && host.ends_with(']') {
|
||
return host.to_string();
|
||
}
|
||
if host.parse::<Ipv6Addr>().is_ok() {
|
||
return format!("[{host}]");
|
||
}
|
||
host.to_string()
|
||
}
|
||
|
||
fn normalize_debug_path(path: &str) -> Result<String, AppError> {
|
||
// 只允许 `/xxx` 形式的同源相对路径,明确拒绝绝对 URL 与后台登录接口。
|
||
let trimmed = path.trim();
|
||
if trimmed.is_empty() {
|
||
return Err(AppError::from_status(StatusCode::BAD_REQUEST).with_message("调试路径不能为空"));
|
||
}
|
||
if trimmed.starts_with("http://") || trimmed.starts_with("https://") {
|
||
return Err(
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("只允许调试同源相对路径")
|
||
);
|
||
}
|
||
if !trimmed.starts_with('/') {
|
||
return Err(
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("调试路径必须以 / 开头")
|
||
);
|
||
}
|
||
if trimmed == "/admin/api/login" {
|
||
return Err(
|
||
AppError::from_status(StatusCode::BAD_REQUEST).with_message("禁止调试后台登录接口")
|
||
);
|
||
}
|
||
Ok(trimmed.to_string())
|
||
}
|
||
|
||
fn build_body_preview(bytes: &[u8]) -> String {
|
||
if bytes.is_empty() {
|
||
return String::new();
|
||
}
|
||
let text = String::from_utf8_lossy(bytes).to_string();
|
||
trim_preview(&text)
|
||
}
|
||
|
||
fn trim_preview(text: &str) -> String {
|
||
let trimmed = text.trim();
|
||
if trimmed.chars().count() <= 4000 {
|
||
return trimmed.to_string();
|
||
}
|
||
trimmed.chars().take(4000).collect::<String>()
|
||
}
|
||
|
||
fn build_admin_session_payload(session: crate::state::AdminSession) -> AdminSessionPayload {
|
||
AdminSessionPayload {
|
||
subject: session.subject,
|
||
username: session.username,
|
||
display_name: session.display_name,
|
||
roles: session.roles,
|
||
issued_at: session
|
||
.issued_at
|
||
.format(&Rfc3339)
|
||
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()),
|
||
expires_at: session
|
||
.expires_at
|
||
.format(&Rfc3339)
|
||
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string()),
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::{
|
||
apply_admin_database_table_filters, build_admin_database_table_row,
|
||
build_admin_tracking_events_sql, build_body_preview, build_debug_base_url,
|
||
build_spacetime_schema_url, clamp_admin_database_table_limit,
|
||
clamp_admin_tracking_event_limit, is_safe_spacetime_table_name, normalize_debug_path,
|
||
normalize_table_count_error, parse_admin_database_table_rows_sql_response,
|
||
parse_admin_tracking_events_sql_response, parse_spacetime_sql_count_response, trim_preview,
|
||
};
|
||
use axum::{http::StatusCode, response::IntoResponse};
|
||
use serde_json::json;
|
||
use shared_contracts::admin::{
|
||
AdminCreationEntryConfigResponse, AdminCreationEntryTypeConfigPayload,
|
||
AdminDatabaseTableRowsQuery, AdminTrackingEventListQuery,
|
||
AdminUpsertCreationEntryTypeConfigRequest,
|
||
};
|
||
|
||
#[test]
|
||
fn normalize_debug_path_rejects_absolute_url() {
|
||
let error =
|
||
normalize_debug_path("https://example.com/api").expect_err("absolute url should fail");
|
||
|
||
assert_eq!(error.into_response().status(), StatusCode::BAD_REQUEST);
|
||
}
|
||
|
||
#[test]
|
||
fn normalize_debug_path_rejects_admin_login_route() {
|
||
let error =
|
||
normalize_debug_path("/admin/api/login").expect_err("admin login route should fail");
|
||
|
||
assert_eq!(error.into_response().status(), StatusCode::BAD_REQUEST);
|
||
}
|
||
|
||
#[test]
|
||
fn normalize_debug_path_accepts_healthz() {
|
||
let path = normalize_debug_path("/healthz").expect("healthz path should pass validation");
|
||
|
||
assert_eq!(path, "/healthz");
|
||
}
|
||
|
||
#[test]
|
||
fn build_debug_base_url_rewrites_wildcard_ipv4_to_loopback() {
|
||
let url = build_debug_base_url("0.0.0.0", 3200);
|
||
|
||
assert_eq!(url, "http://127.0.0.1:3200");
|
||
}
|
||
|
||
#[test]
|
||
fn build_debug_base_url_wraps_ipv6_host() {
|
||
let url = build_debug_base_url("::1", 3200);
|
||
|
||
assert_eq!(url, "http://[::1]:3200");
|
||
}
|
||
|
||
#[test]
|
||
fn trim_preview_limits_length() {
|
||
let text = "a".repeat(5000);
|
||
|
||
assert_eq!(trim_preview(&text).chars().count(), 4000);
|
||
}
|
||
|
||
#[test]
|
||
fn build_spacetime_schema_url_includes_required_version_query() {
|
||
let url = build_spacetime_schema_url("http://127.0.0.1:3101", "xushi-p4wfr");
|
||
|
||
assert_eq!(
|
||
url,
|
||
"http://127.0.0.1:3101/v1/database/xushi-p4wfr/schema?version=9"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn is_safe_spacetime_table_name_accepts_schema_identifiers() {
|
||
assert!(is_safe_spacetime_table_name("runtime_setting"));
|
||
assert!(is_safe_spacetime_table_name("_private_table"));
|
||
assert!(is_safe_spacetime_table_name("AiTaskStage2"));
|
||
}
|
||
|
||
#[test]
|
||
fn is_safe_spacetime_table_name_rejects_sql_fragments() {
|
||
assert!(!is_safe_spacetime_table_name(""));
|
||
assert!(!is_safe_spacetime_table_name("bad-name"));
|
||
assert!(!is_safe_spacetime_table_name("1bad"));
|
||
assert!(!is_safe_spacetime_table_name("runtime_setting;DROP"));
|
||
}
|
||
|
||
#[test]
|
||
fn normalize_table_count_error_hides_private_table_http_noise() {
|
||
let error = "HTTP 400:no such table: `runtime_setting`. If the table exists, it may be marked private.";
|
||
|
||
assert_eq!(
|
||
normalize_table_count_error(error),
|
||
"不可统计(private 或当前身份不可见)"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn normalize_table_count_error_keeps_other_errors() {
|
||
let error = "SQL 请求失败:connection refused";
|
||
|
||
assert_eq!(normalize_table_count_error(error), error);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_spacetime_sql_count_response_accepts_statement_array_rows() {
|
||
let payload = json!([
|
||
{
|
||
"schema": {
|
||
"elements": [
|
||
{
|
||
"name": {
|
||
"some": "row_count"
|
||
},
|
||
"algebraic_type": {
|
||
"U64": []
|
||
}
|
||
}
|
||
]
|
||
},
|
||
"rows": [[7]],
|
||
"total_duration_micros": 116,
|
||
"stats": {
|
||
"rows_inserted": 0,
|
||
"rows_deleted": 0,
|
||
"rows_updated": 0
|
||
}
|
||
}
|
||
]);
|
||
|
||
let count =
|
||
parse_spacetime_sql_count_response(payload).expect("statement array should parse");
|
||
|
||
assert_eq!(count, 7);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_spacetime_sql_count_response_uses_schema_column_index() {
|
||
let payload = json!([
|
||
{
|
||
"schema": {
|
||
"elements": [
|
||
{
|
||
"name": {
|
||
"some": "table_name"
|
||
}
|
||
},
|
||
{
|
||
"name": {
|
||
"some": "row_count"
|
||
}
|
||
}
|
||
]
|
||
},
|
||
"rows": [["runtime_setting", "12"]]
|
||
}
|
||
]);
|
||
|
||
let count =
|
||
parse_spacetime_sql_count_response(payload).expect("schema column index should parse");
|
||
|
||
assert_eq!(count, 12);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_spacetime_sql_count_response_keeps_object_row_compatibility() {
|
||
let payload = json!({
|
||
"rows": [
|
||
{
|
||
"row_count": "3"
|
||
}
|
||
]
|
||
});
|
||
|
||
let count = parse_spacetime_sql_count_response(payload).expect("object row should parse");
|
||
|
||
assert_eq!(count, 3);
|
||
}
|
||
|
||
#[test]
|
||
fn clamp_admin_database_table_limit_uses_default_and_bounds() {
|
||
assert_eq!(clamp_admin_database_table_limit(None), 100);
|
||
assert_eq!(clamp_admin_database_table_limit(Some(0)), 1);
|
||
assert_eq!(clamp_admin_database_table_limit(Some(800)), 500);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_admin_database_table_rows_sql_response_maps_schema_columns() {
|
||
let payload = json!([
|
||
{
|
||
"schema": {
|
||
"elements": [
|
||
{"name": {"some": "user_id"}},
|
||
{"name": {"some": "points"}}
|
||
]
|
||
},
|
||
"rows": [["u1", 12]]
|
||
}
|
||
]);
|
||
|
||
let response = parse_admin_database_table_rows_sql_response("profile_wallet", 100, payload)
|
||
.expect("table rows should parse");
|
||
|
||
assert_eq!(response.table_name, "profile_wallet");
|
||
assert_eq!(response.columns, vec!["user_id", "points"]);
|
||
assert_eq!(response.total_returned, 1);
|
||
assert_eq!(response.rows[0].cells["user_id"], json!("u1"));
|
||
assert_eq!(response.rows[0].cells["points"], json!(12));
|
||
}
|
||
|
||
#[test]
|
||
fn parse_admin_database_table_rows_sql_response_maps_recharge_order_enum_cells() {
|
||
let payload = json!([
|
||
{
|
||
"schema": {
|
||
"elements": [
|
||
{"name": {"some": "order_id"}},
|
||
{"name": {"some": "kind"}},
|
||
{"name": {"some": "status"}},
|
||
{"name": {"some": "paid_at"}}
|
||
]
|
||
},
|
||
"rows": [[
|
||
"recharge:user_00000001:1778757456811099:points_60",
|
||
[0, []],
|
||
[0, []],
|
||
[1, []]
|
||
]]
|
||
}
|
||
]);
|
||
|
||
let response =
|
||
parse_admin_database_table_rows_sql_response("profile_recharge_order", 100, payload)
|
||
.expect("recharge order rows should parse");
|
||
|
||
let cells = &response.rows[0].cells;
|
||
assert_eq!(cells["kind"], json!("points"));
|
||
assert_eq!(cells["status"], json!("pending"));
|
||
assert_eq!(cells["paid_at"], json!(null));
|
||
assert_eq!(
|
||
response.rows[0].raw,
|
||
json!([
|
||
"recharge:user_00000001:1778757456811099:points_60",
|
||
"points",
|
||
"pending",
|
||
null
|
||
])
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn build_admin_database_table_row_normalizes_optional_sats_values() {
|
||
let row = build_admin_database_table_row(
|
||
&json!([[0, "u1"], [1, []]]),
|
||
&["user_id".to_string(), "deleted_at".to_string()],
|
||
);
|
||
|
||
assert_eq!(row.cells["user_id"], json!("u1"));
|
||
assert_eq!(row.cells["deleted_at"], json!(null));
|
||
}
|
||
|
||
#[test]
|
||
fn apply_admin_database_table_filters_supports_search_and_json_filters() {
|
||
let mut rows = vec![
|
||
build_admin_database_table_row(
|
||
&json!(["u1", "alice", 12]),
|
||
&[
|
||
"user_id".to_string(),
|
||
"name".to_string(),
|
||
"points".to_string(),
|
||
],
|
||
),
|
||
build_admin_database_table_row(
|
||
&json!(["u2", "bob", 8]),
|
||
&[
|
||
"user_id".to_string(),
|
||
"name".to_string(),
|
||
"points".to_string(),
|
||
],
|
||
),
|
||
];
|
||
|
||
apply_admin_database_table_filters(
|
||
&mut rows,
|
||
&AdminDatabaseTableRowsQuery {
|
||
search: Some("ali".to_string()),
|
||
filters: Some(r#"{"points":12}"#.to_string()),
|
||
limit: None,
|
||
},
|
||
)
|
||
.expect("filters should apply");
|
||
|
||
assert_eq!(rows.len(), 1);
|
||
assert_eq!(rows[0].cells["user_id"], json!("u1"));
|
||
}
|
||
|
||
#[test]
|
||
fn apply_admin_database_table_filters_rejects_non_object_filter() {
|
||
let mut rows = vec![build_admin_database_table_row(
|
||
&json!(["u1"]),
|
||
&["user_id".to_string()],
|
||
)];
|
||
|
||
let error = apply_admin_database_table_filters(
|
||
&mut rows,
|
||
&AdminDatabaseTableRowsQuery {
|
||
search: None,
|
||
filters: Some("[]".to_string()),
|
||
limit: None,
|
||
},
|
||
)
|
||
.expect_err("non object filter should fail");
|
||
|
||
assert_eq!(error.into_response().status(), StatusCode::BAD_REQUEST);
|
||
}
|
||
|
||
#[test]
|
||
fn build_admin_tracking_events_sql_quotes_filters_and_clamps_limit() {
|
||
let sql = build_admin_tracking_events_sql(&AdminTrackingEventListQuery {
|
||
event_key: Some("daily'login".to_string()),
|
||
user_id: Some("user-1".to_string()),
|
||
scope_kind: Some("USER".to_string()),
|
||
scope_id: Some("scope-1".to_string()),
|
||
limit: Some(2000),
|
||
})
|
||
.expect("tracking sql should build");
|
||
|
||
assert!(sql.contains("event_key = 'daily''login'"));
|
||
assert!(sql.contains("user_id = 'user-1'"));
|
||
assert!(sql.contains("scope_kind = 'user'"));
|
||
assert!(sql.contains("scope_id = 'scope-1'"));
|
||
assert!(!sql.contains("ORDER BY"));
|
||
assert!(sql.ends_with("LIMIT 1000"));
|
||
}
|
||
|
||
#[test]
|
||
fn clamp_admin_tracking_event_limit_uses_default_and_bounds() {
|
||
assert_eq!(clamp_admin_tracking_event_limit(None), 200);
|
||
assert_eq!(clamp_admin_tracking_event_limit(Some(0)), 1);
|
||
assert_eq!(clamp_admin_tracking_event_limit(Some(1001)), 1000);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_admin_tracking_events_sql_response_accepts_statement_array_rows() {
|
||
let payload = json!([
|
||
{
|
||
"rows": [[
|
||
"event-1",
|
||
"daily_login",
|
||
"user",
|
||
"user-1",
|
||
20580,
|
||
{"some": "user-1"},
|
||
null,
|
||
{"some": "profile-1"},
|
||
"profile",
|
||
"{\"source\":\"task\"}",
|
||
"2026-05-07T00:00:00Z"
|
||
]]
|
||
}
|
||
]);
|
||
|
||
let entries =
|
||
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
|
||
|
||
assert_eq!(entries.len(), 1);
|
||
assert_eq!(entries[0].event_id, "event-1");
|
||
assert_eq!(entries[0].event_title, "每日登录");
|
||
assert_eq!(entries[0].user_id.as_deref(), Some("user-1"));
|
||
assert_eq!(entries[0].profile_id.as_deref(), Some("profile-1"));
|
||
assert_eq!(entries[0].module_key.as_deref(), Some("profile"));
|
||
}
|
||
|
||
#[test]
|
||
fn parse_admin_tracking_events_sql_response_normalizes_sats_values() {
|
||
let payload = json!([
|
||
{
|
||
"rows": [[
|
||
"event-1",
|
||
"daily_login",
|
||
[3, []],
|
||
"user-1",
|
||
20580,
|
||
[0, "user-1"],
|
||
[1, []],
|
||
[0, "profile-1"],
|
||
[0, "profile"],
|
||
"{}",
|
||
[1778207451731746i64]
|
||
]]
|
||
}
|
||
]);
|
||
|
||
let entries =
|
||
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
|
||
|
||
assert_eq!(entries[0].scope_kind, "user");
|
||
assert_eq!(entries[0].user_id.as_deref(), Some("user-1"));
|
||
assert_eq!(entries[0].owner_user_id, None);
|
||
assert_eq!(entries[0].profile_id.as_deref(), Some("profile-1"));
|
||
assert_eq!(entries[0].module_key.as_deref(), Some("profile"));
|
||
assert_eq!(entries[0].occurred_at, "1778207451731746");
|
||
}
|
||
|
||
#[test]
|
||
fn parse_admin_tracking_events_sql_response_sorts_by_occurred_at_desc() {
|
||
let payload = json!([
|
||
{
|
||
"rows": [
|
||
[
|
||
"event-old",
|
||
"daily_login",
|
||
"user",
|
||
"user-1",
|
||
20580,
|
||
{"some": "user-1"},
|
||
null,
|
||
{"some": "profile-1"},
|
||
"profile",
|
||
"{}",
|
||
"2026-05-07T00:00:00Z"
|
||
],
|
||
[
|
||
"event-new",
|
||
"daily_login",
|
||
"user",
|
||
"user-1",
|
||
20580,
|
||
{"some": "user-1"},
|
||
null,
|
||
{"some": "profile-1"},
|
||
"profile",
|
||
"{}",
|
||
"2026-05-07T01:00:00Z"
|
||
]
|
||
]
|
||
}
|
||
]);
|
||
|
||
let entries =
|
||
parse_admin_tracking_events_sql_response(payload).expect("tracking rows should parse");
|
||
|
||
assert_eq!(entries[0].event_id, "event-new");
|
||
assert_eq!(entries[1].event_id, "event-old");
|
||
}
|
||
|
||
#[test]
|
||
fn build_body_preview_handles_utf8() {
|
||
let preview = build_body_preview("后台测试".as_bytes());
|
||
|
||
assert_eq!(preview, "后台测试");
|
||
}
|
||
}
|