补充 release SpacetimeDB 健康检查与巡检防回退
增加 SpacetimeDB 阶段化健康检查与 /readyz 阶段输出 记录 procedure/reducer/read 失败的阶段和耗时 补充 release 健康巡检 systemd timer 与生产 ops 预检 同步 API 构建部署、provision 脚本和运维文档
This commit is contained in:
@@ -269,6 +269,7 @@ mod tests {
|
||||
};
|
||||
use reqwest::Client;
|
||||
use serde_json::Value;
|
||||
use spacetime_client::{SpacetimeClientHealthSnapshot, SpacetimeClientStage};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::net::TcpListener;
|
||||
use tower::ServiceExt;
|
||||
@@ -724,6 +725,45 @@ mod tests {
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn readyz_reports_spacetime_health_stage() {
|
||||
let state = AppState::new(AppConfig::default()).expect("state should build");
|
||||
state.set_test_spacetime_health(SpacetimeClientHealthSnapshot {
|
||||
ok: false,
|
||||
stage: SpacetimeClientStage::ProcedureResult,
|
||||
checked_at_micros: 1_713_680_000_000_000,
|
||||
elapsed_ms: 2_000,
|
||||
timeout_ms: 2_000,
|
||||
error: Some("SpacetimeDB procedure 调用超时".to_string()),
|
||||
last_success_at_micros: Some(1_713_679_999_000_000),
|
||||
last_error: Some("SpacetimeDB procedure 调用超时".to_string()),
|
||||
});
|
||||
let app = build_router(state);
|
||||
|
||||
let response = app
|
||||
.oneshot(
|
||||
Request::builder()
|
||||
.uri("/readyz")
|
||||
.header("x-request-id", "req-ready-spacetime")
|
||||
.body(Body::empty())
|
||||
.expect("readyz request should build"),
|
||||
)
|
||||
.await
|
||||
.expect("readyz request should succeed");
|
||||
|
||||
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
|
||||
let body = read_json_response(response).await;
|
||||
assert_eq!(body["error"]["details"]["reason"], "spacetime_unhealthy");
|
||||
assert_eq!(
|
||||
body["error"]["details"]["spacetime"]["stage"],
|
||||
"procedure_result"
|
||||
);
|
||||
assert_eq!(
|
||||
body["error"]["details"]["spacetime"]["timeoutMs"],
|
||||
Value::from(2_000)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn creative_agent_draft_edit_rejects_unconfirmed_template_session() {
|
||||
let app = build_internal_creative_agent_app();
|
||||
|
||||
@@ -12,6 +12,7 @@ use platform_speech::{
|
||||
|
||||
const DEFAULT_INTERNAL_API_SECRET: &str = "genarrative-dev-internal-bridge";
|
||||
const SPACETIME_LOCAL_CONFIG_FILE: &str = "spacetime.local.json";
|
||||
const DEFAULT_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS: u64 = 2;
|
||||
pub(crate) const DEFAULT_VECTOR_ENGINE_IMAGE_REQUEST_TIMEOUT_MS: u64 = 1_000_000;
|
||||
|
||||
// 集中管理 api-server 的启动配置,避免入口层直接散落环境变量解析逻辑。
|
||||
@@ -118,6 +119,7 @@ pub struct AppConfig {
|
||||
pub spacetime_token: Option<String>,
|
||||
pub spacetime_pool_size: u32,
|
||||
pub spacetime_procedure_timeout: Duration,
|
||||
pub spacetime_health_check_timeout: Duration,
|
||||
pub llm_provider: LlmProvider,
|
||||
pub llm_base_url: String,
|
||||
pub llm_api_key: Option<String>,
|
||||
@@ -276,6 +278,9 @@ impl Default for AppConfig {
|
||||
spacetime_token: None,
|
||||
spacetime_pool_size: 4,
|
||||
spacetime_procedure_timeout: Duration::from_secs(30),
|
||||
spacetime_health_check_timeout: Duration::from_secs(
|
||||
DEFAULT_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS,
|
||||
),
|
||||
llm_provider: LlmProvider::Ark,
|
||||
llm_base_url: String::new(),
|
||||
llm_api_key: None,
|
||||
@@ -704,6 +709,12 @@ impl AppConfig {
|
||||
config.spacetime_procedure_timeout =
|
||||
Duration::from_secs(spacetime_procedure_timeout_seconds);
|
||||
}
|
||||
if let Some(spacetime_health_check_timeout_seconds) =
|
||||
read_first_duration_seconds_env(&["GENARRATIVE_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS"])
|
||||
{
|
||||
config.spacetime_health_check_timeout =
|
||||
Duration::from_secs(spacetime_health_check_timeout_seconds);
|
||||
}
|
||||
|
||||
if let Some(llm_provider) =
|
||||
read_first_llm_provider_env(&["GENARRATIVE_LLM_PROVIDER", "LLM_PROVIDER"])
|
||||
@@ -1610,6 +1621,26 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn from_env_reads_spacetime_health_check_timeout() {
|
||||
let _guard = ENV_LOCK
|
||||
.get_or_init(|| Mutex::new(()))
|
||||
.lock()
|
||||
.expect("env lock should not poison");
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var("GENARRATIVE_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS");
|
||||
std::env::set_var("GENARRATIVE_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS", "3");
|
||||
}
|
||||
|
||||
let config = AppConfig::from_env();
|
||||
assert_eq!(config.spacetime_health_check_timeout.as_secs(), 3);
|
||||
|
||||
unsafe {
|
||||
std::env::remove_var("GENARRATIVE_SPACETIME_HEALTH_CHECK_TIMEOUT_SECONDS");
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn default_keeps_structured_llm_web_search_disabled() {
|
||||
let config = AppConfig::default();
|
||||
|
||||
@@ -10,6 +10,7 @@ use crate::{
|
||||
api_response::json_success_body, http_error::AppError, request_context::RequestContext,
|
||||
state::AppState,
|
||||
};
|
||||
use spacetime_client::SpacetimeClientHealthSnapshot;
|
||||
|
||||
pub async fn health_check(Extension(request_context): Extension<RequestContext>) -> Json<Value> {
|
||||
json_success_body(
|
||||
@@ -25,23 +26,49 @@ pub async fn readiness_check(
|
||||
State(state): State<AppState>,
|
||||
Extension(request_context): Extension<RequestContext>,
|
||||
) -> Response {
|
||||
if state.is_ready() {
|
||||
if !state.is_ready() {
|
||||
return AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
|
||||
.with_message("api-server 正在退出,不再接收新流量")
|
||||
.with_details(json!({
|
||||
"reason": "api_server_draining",
|
||||
"ready": false,
|
||||
}))
|
||||
.into_response_with_context(Some(&request_context));
|
||||
}
|
||||
|
||||
let spacetime_health = state.spacetime_health_check().await;
|
||||
if spacetime_health.ok {
|
||||
return json_success_body(
|
||||
Some(&request_context),
|
||||
json!({
|
||||
"ok": true,
|
||||
"ready": true,
|
||||
"service": "genarrative-api-server",
|
||||
"spacetime": spacetime_health_to_json(&spacetime_health),
|
||||
}),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
|
||||
AppError::from_status(StatusCode::SERVICE_UNAVAILABLE)
|
||||
.with_message("api-server 正在退出,不再接收新流量")
|
||||
.with_message("SpacetimeDB 连接健康检查失败,api-server 暂不接收新流量")
|
||||
.with_details(json!({
|
||||
"reason": "api_server_draining",
|
||||
"reason": "spacetime_unhealthy",
|
||||
"ready": false,
|
||||
"spacetime": spacetime_health_to_json(&spacetime_health),
|
||||
}))
|
||||
.into_response_with_context(Some(&request_context))
|
||||
}
|
||||
|
||||
fn spacetime_health_to_json(snapshot: &SpacetimeClientHealthSnapshot) -> Value {
|
||||
json!({
|
||||
"ok": snapshot.ok,
|
||||
"stage": snapshot.stage.as_str(),
|
||||
"checkedAtMicros": snapshot.checked_at_micros,
|
||||
"elapsedMs": snapshot.elapsed_ms,
|
||||
"timeoutMs": snapshot.timeout_ms,
|
||||
"error": snapshot.error,
|
||||
"lastSuccessAtMicros": snapshot.last_success_at_micros,
|
||||
"lastError": snapshot.last_error,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -31,7 +31,9 @@ use platform_wechat::{WechatClient, WechatConfig, pay::WechatPayClient};
|
||||
use serde_json::Value;
|
||||
use shared_contracts::creation_entry_config::CreationEntryConfigResponse;
|
||||
use shared_contracts::creative_agent::CreativeAgentSessionSnapshot;
|
||||
use spacetime_client::{SpacetimeClient, SpacetimeClientConfig, SpacetimeClientError};
|
||||
use spacetime_client::{
|
||||
SpacetimeClient, SpacetimeClientConfig, SpacetimeClientError, SpacetimeClientHealthSnapshot,
|
||||
};
|
||||
use time::OffsetDateTime;
|
||||
use tokio::sync::{Semaphore, broadcast};
|
||||
use tracing::{info, warn};
|
||||
@@ -242,6 +244,8 @@ pub struct AppStateInner {
|
||||
refresh_cookie_config: RefreshCookieConfig,
|
||||
#[cfg(test)]
|
||||
test_creation_entry_config: Arc<Mutex<Option<CreationEntryConfigResponse>>>,
|
||||
#[cfg(test)]
|
||||
test_spacetime_health: Arc<Mutex<Option<SpacetimeClientHealthSnapshot>>>,
|
||||
oss_client: Option<OssClient>,
|
||||
#[cfg_attr(test, allow(dead_code))]
|
||||
auth_store: InMemoryAuthStore,
|
||||
@@ -418,6 +422,10 @@ impl AppState {
|
||||
test_creation_entry_config: Arc::new(Mutex::new(Some(
|
||||
crate::creation_entry_config::test_creation_entry_config_response(),
|
||||
))),
|
||||
#[cfg(test)]
|
||||
test_spacetime_health: Arc::new(Mutex::new(Some(
|
||||
SpacetimeClientHealthSnapshot::healthy_for_test(),
|
||||
))),
|
||||
oss_client,
|
||||
auth_store,
|
||||
password_entry_service,
|
||||
@@ -467,6 +475,30 @@ impl AppState {
|
||||
self.ready.store(false, Ordering::Release);
|
||||
}
|
||||
|
||||
pub async fn spacetime_health_check(&self) -> SpacetimeClientHealthSnapshot {
|
||||
#[cfg(test)]
|
||||
if let Some(snapshot) = self
|
||||
.test_spacetime_health
|
||||
.lock()
|
||||
.expect("test spacetime health should lock")
|
||||
.clone()
|
||||
{
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
self.spacetime_client
|
||||
.health_check(self.config.spacetime_health_check_timeout)
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn set_test_spacetime_health(&self, snapshot: SpacetimeClientHealthSnapshot) {
|
||||
*self
|
||||
.test_spacetime_health
|
||||
.lock()
|
||||
.expect("test spacetime health should lock") = Some(snapshot);
|
||||
}
|
||||
|
||||
pub async fn upsert_creation_entry_type_config(
|
||||
&self,
|
||||
input: module_runtime::CreationEntryTypeAdminUpsertInput,
|
||||
|
||||
@@ -105,8 +105,8 @@ pub mod auth;
|
||||
pub mod bark_battle;
|
||||
pub use bark_battle::{
|
||||
BarkBattleDraftConfigUpsertRecordInput, BarkBattleDraftCreateRecordInput,
|
||||
BarkBattleRunFinishRecordInput, BarkBattleRunStartRecordInput,
|
||||
BarkBattleWorkDeleteRecordInput, BarkBattleWorkPublishRecordInput,
|
||||
BarkBattleRunFinishRecordInput, BarkBattleRunStartRecordInput, BarkBattleWorkDeleteRecordInput,
|
||||
BarkBattleWorkPublishRecordInput,
|
||||
};
|
||||
pub mod big_fish;
|
||||
pub mod combat;
|
||||
@@ -132,7 +132,7 @@ use std::{
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::{Arc, Mutex},
|
||||
thread::JoinHandle,
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use module_ai::{
|
||||
@@ -241,6 +241,7 @@ use tokio::{
|
||||
sync::{OwnedSemaphorePermit, RwLock, Semaphore, oneshot},
|
||||
time::timeout,
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::module_bindings::*;
|
||||
|
||||
@@ -253,6 +254,60 @@ pub struct SpacetimeClientConfig {
|
||||
pub procedure_timeout: Duration,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum SpacetimeClientStage {
|
||||
Ready,
|
||||
PoolAcquire,
|
||||
ConnectBuild,
|
||||
ConnectHandshake,
|
||||
ReadModelSubscribe,
|
||||
ProcedureResult,
|
||||
ReducerResult,
|
||||
ReadCache,
|
||||
}
|
||||
|
||||
impl SpacetimeClientStage {
|
||||
pub fn as_str(self) -> &'static str {
|
||||
match self {
|
||||
Self::Ready => "ready",
|
||||
Self::PoolAcquire => "pool_acquire",
|
||||
Self::ConnectBuild => "connect_build",
|
||||
Self::ConnectHandshake => "connect_handshake",
|
||||
Self::ReadModelSubscribe => "read_model_subscribe",
|
||||
Self::ProcedureResult => "procedure_result",
|
||||
Self::ReducerResult => "reducer_result",
|
||||
Self::ReadCache => "read_cache",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct SpacetimeClientHealthSnapshot {
|
||||
pub ok: bool,
|
||||
pub stage: SpacetimeClientStage,
|
||||
pub checked_at_micros: i64,
|
||||
pub elapsed_ms: u64,
|
||||
pub timeout_ms: u64,
|
||||
pub error: Option<String>,
|
||||
pub last_success_at_micros: Option<i64>,
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
impl SpacetimeClientHealthSnapshot {
|
||||
pub fn healthy_for_test() -> Self {
|
||||
Self {
|
||||
ok: true,
|
||||
stage: SpacetimeClientStage::Ready,
|
||||
checked_at_micros: current_unix_micros(),
|
||||
elapsed_ms: 0,
|
||||
timeout_ms: 0,
|
||||
error: None,
|
||||
last_success_at_micros: Some(current_unix_micros()),
|
||||
last_error: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct AuthStoreSnapshotRecord {
|
||||
pub snapshot_json: Option<String>,
|
||||
@@ -270,6 +325,7 @@ pub struct AuthStoreSnapshotImportRecord {
|
||||
pub struct SpacetimeClient {
|
||||
config: SpacetimeClientConfig,
|
||||
pool: Arc<SpacetimeConnectionPool>,
|
||||
health_state: Arc<RwLock<SpacetimeClientHealthState>>,
|
||||
creation_entry_config_cache: Arc<RwLock<Option<CreationEntryConfigRecord>>>,
|
||||
custom_world_gallery_legacy_sync_attempted: Arc<AtomicBool>,
|
||||
}
|
||||
@@ -296,6 +352,24 @@ struct SpacetimeConnectionPool {
|
||||
permits: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct SpacetimeClientHealthState {
|
||||
last_success_at_micros: Option<i64>,
|
||||
last_error: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SpacetimeStageError {
|
||||
stage: SpacetimeClientStage,
|
||||
error: SpacetimeClientError,
|
||||
}
|
||||
|
||||
impl SpacetimeStageError {
|
||||
fn new(stage: SpacetimeClientStage, error: SpacetimeClientError) -> Self {
|
||||
Self { stage, error }
|
||||
}
|
||||
}
|
||||
|
||||
struct PooledConnectionSlot {
|
||||
connection: Option<PooledConnection>,
|
||||
in_use: bool,
|
||||
@@ -341,6 +415,7 @@ impl SpacetimeClient {
|
||||
Self {
|
||||
config,
|
||||
pool,
|
||||
health_state: Arc::new(RwLock::new(SpacetimeClientHealthState::default())),
|
||||
creation_entry_config_cache: Arc::new(RwLock::new(None)),
|
||||
custom_world_gallery_legacy_sync_attempted: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
@@ -354,29 +429,58 @@ impl SpacetimeClient {
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
let started_at = Instant::now();
|
||||
let metrics_guard = telemetry::begin_procedure(procedure);
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
||||
let final_result = match self.acquire_connection().await {
|
||||
let final_result = match self
|
||||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||||
.await
|
||||
{
|
||||
Ok(lease) => {
|
||||
let result = if let Some(connection) = lease.connection.as_ref() {
|
||||
let (result, failed_stage) = if let Some(connection) = lease.connection.as_ref() {
|
||||
call(&connection.connection, result_sender.clone());
|
||||
match timeout(self.config.procedure_timeout, receiver).await {
|
||||
Ok(inner) => match inner {
|
||||
Ok(value) => value,
|
||||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||||
let stage = SpacetimeClientStage::ProcedureResult;
|
||||
(
|
||||
match timeout(self.config.procedure_timeout, receiver).await {
|
||||
Ok(inner) => match inner {
|
||||
Ok(value) => value,
|
||||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||||
},
|
||||
Err(_) => Err(Self::resolve_timeout_error(Some(connection), stage)),
|
||||
},
|
||||
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
|
||||
}
|
||||
stage,
|
||||
)
|
||||
} else {
|
||||
Err(SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||||
))
|
||||
(
|
||||
Err(SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||||
)),
|
||||
SpacetimeClientStage::ProcedureResult,
|
||||
)
|
||||
};
|
||||
self.release_connection(lease).await;
|
||||
if let Err(error) = &result {
|
||||
log_spacetime_client_failure(
|
||||
"procedure",
|
||||
procedure,
|
||||
failed_stage,
|
||||
started_at,
|
||||
error,
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
Err(error) => Err(error),
|
||||
Err(error) => {
|
||||
log_spacetime_client_failure(
|
||||
"procedure",
|
||||
procedure,
|
||||
error.stage,
|
||||
started_at,
|
||||
&error.error,
|
||||
);
|
||||
Err(error.error)
|
||||
}
|
||||
};
|
||||
|
||||
metrics_guard.finish(&final_result);
|
||||
@@ -388,29 +492,58 @@ impl SpacetimeClient {
|
||||
procedure: &'static str,
|
||||
call: impl FnOnce(&DbConnection, ReducerResultSender) + Send + 'static,
|
||||
) -> Result<(), SpacetimeClientError> {
|
||||
let started_at = Instant::now();
|
||||
let metrics_guard = telemetry::begin_procedure(procedure);
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
let result_sender = Arc::new(Mutex::new(Some(sender)));
|
||||
let final_result = match self.acquire_connection().await {
|
||||
let final_result = match self
|
||||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||||
.await
|
||||
{
|
||||
Ok(lease) => {
|
||||
let result = if let Some(connection) = lease.connection.as_ref() {
|
||||
let (result, failed_stage) = if let Some(connection) = lease.connection.as_ref() {
|
||||
call(&connection.connection, result_sender.clone());
|
||||
match timeout(self.config.procedure_timeout, receiver).await {
|
||||
Ok(inner) => match inner {
|
||||
Ok(value) => value,
|
||||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||||
let stage = SpacetimeClientStage::ReducerResult;
|
||||
(
|
||||
match timeout(self.config.procedure_timeout, receiver).await {
|
||||
Ok(inner) => match inner {
|
||||
Ok(value) => value,
|
||||
Err(_) => Err(SpacetimeClientError::ConnectDropped),
|
||||
},
|
||||
Err(_) => Err(Self::resolve_timeout_error(Some(connection), stage)),
|
||||
},
|
||||
Err(_) => Err(Self::resolve_timeout_error(Some(connection))),
|
||||
}
|
||||
stage,
|
||||
)
|
||||
} else {
|
||||
Err(SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||||
))
|
||||
(
|
||||
Err(SpacetimeClientError::Runtime(
|
||||
"SpacetimeDB 连接租约缺少连接".to_string(),
|
||||
)),
|
||||
SpacetimeClientStage::ReducerResult,
|
||||
)
|
||||
};
|
||||
self.release_connection(lease).await;
|
||||
if let Err(error) = &result {
|
||||
log_spacetime_client_failure(
|
||||
"reducer",
|
||||
procedure,
|
||||
failed_stage,
|
||||
started_at,
|
||||
error,
|
||||
);
|
||||
}
|
||||
result
|
||||
}
|
||||
Err(error) => Err(error),
|
||||
Err(error) => {
|
||||
log_spacetime_client_failure(
|
||||
"reducer",
|
||||
procedure,
|
||||
error.stage,
|
||||
started_at,
|
||||
&error.error,
|
||||
);
|
||||
Err(error.error)
|
||||
}
|
||||
};
|
||||
|
||||
metrics_guard.finish(&final_result);
|
||||
@@ -425,11 +558,22 @@ impl SpacetimeClient {
|
||||
where
|
||||
T: Send + 'static,
|
||||
{
|
||||
let started_at = Instant::now();
|
||||
let metrics_guard = telemetry::begin_read(read_name);
|
||||
let lease = match self.acquire_connection().await {
|
||||
let lease = match self
|
||||
.acquire_connection_with_timeout(self.config.procedure_timeout)
|
||||
.await
|
||||
{
|
||||
Ok(lease) => lease,
|
||||
Err(error) => {
|
||||
let final_result = Err(error);
|
||||
log_spacetime_client_failure(
|
||||
"read",
|
||||
read_name,
|
||||
error.stage,
|
||||
started_at,
|
||||
&error.error,
|
||||
);
|
||||
let final_result = Err(error.error);
|
||||
metrics_guard.finish(&final_result);
|
||||
return final_result;
|
||||
}
|
||||
@@ -443,6 +587,15 @@ impl SpacetimeClient {
|
||||
};
|
||||
self.release_connection(lease).await;
|
||||
|
||||
if let Err(error) = &final_result {
|
||||
log_spacetime_client_failure(
|
||||
"read",
|
||||
read_name,
|
||||
SpacetimeClientStage::ReadCache,
|
||||
started_at,
|
||||
error,
|
||||
);
|
||||
}
|
||||
metrics_guard.finish(&final_result);
|
||||
final_result
|
||||
}
|
||||
@@ -455,14 +608,75 @@ impl SpacetimeClient {
|
||||
self.creation_entry_config_cache.read().await.clone()
|
||||
}
|
||||
|
||||
async fn acquire_connection(&self) -> Result<PooledConnectionLease, SpacetimeClientError> {
|
||||
let permit = timeout(
|
||||
self.config.procedure_timeout,
|
||||
self.pool.permits.clone().acquire_owned(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))?;
|
||||
pub async fn health_check(&self, probe_timeout: Duration) -> SpacetimeClientHealthSnapshot {
|
||||
let timeout = if probe_timeout.is_zero() {
|
||||
DEFAULT_PROCEDURE_TIMEOUT
|
||||
} else {
|
||||
probe_timeout
|
||||
};
|
||||
let started_at = Instant::now();
|
||||
let checked_at_micros = current_unix_micros();
|
||||
let result = self.acquire_connection_with_timeout(timeout).await;
|
||||
match result {
|
||||
Ok(lease) => {
|
||||
self.release_connection(lease).await;
|
||||
let mut health_state = self.health_state.write().await;
|
||||
health_state.last_success_at_micros = Some(checked_at_micros);
|
||||
health_state.last_error = None;
|
||||
SpacetimeClientHealthSnapshot {
|
||||
ok: true,
|
||||
stage: SpacetimeClientStage::Ready,
|
||||
checked_at_micros,
|
||||
elapsed_ms: duration_millis_u64(started_at.elapsed()),
|
||||
timeout_ms: duration_millis_u64(timeout),
|
||||
error: None,
|
||||
last_success_at_micros: health_state.last_success_at_micros,
|
||||
last_error: health_state.last_error.clone(),
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
log_spacetime_client_failure(
|
||||
"health_check",
|
||||
"spacetime_connection",
|
||||
error.stage,
|
||||
started_at,
|
||||
&error.error,
|
||||
);
|
||||
let mut health_state = self.health_state.write().await;
|
||||
let error_message = error.error.to_string();
|
||||
health_state.last_error = Some(error_message.clone());
|
||||
SpacetimeClientHealthSnapshot {
|
||||
ok: false,
|
||||
stage: error.stage,
|
||||
checked_at_micros,
|
||||
elapsed_ms: duration_millis_u64(started_at.elapsed()),
|
||||
timeout_ms: duration_millis_u64(timeout),
|
||||
error: Some(error_message),
|
||||
last_success_at_micros: health_state.last_success_at_micros,
|
||||
last_error: health_state.last_error.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn acquire_connection_with_timeout(
|
||||
&self,
|
||||
operation_timeout: Duration,
|
||||
) -> Result<PooledConnectionLease, SpacetimeStageError> {
|
||||
let permit = timeout(operation_timeout, self.pool.permits.clone().acquire_owned())
|
||||
.await
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::PoolAcquire,
|
||||
SpacetimeClientError::Timeout,
|
||||
)
|
||||
})?
|
||||
.map_err(|error| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::PoolAcquire,
|
||||
SpacetimeClientError::Runtime(error.to_string()),
|
||||
)
|
||||
})?;
|
||||
|
||||
loop {
|
||||
for (slot_index, slot) in self.pool.slots.iter().enumerate() {
|
||||
@@ -480,7 +694,7 @@ impl SpacetimeClient {
|
||||
let connection = if let Some(connection) = reusable_connection {
|
||||
connection
|
||||
} else {
|
||||
match self.build_pooled_connection().await {
|
||||
match self.build_pooled_connection(operation_timeout).await {
|
||||
Ok(connection) => connection,
|
||||
Err(error) => {
|
||||
let mut slot_guard = self.pool.slots[slot_index].lock().await;
|
||||
@@ -502,7 +716,10 @@ impl SpacetimeClient {
|
||||
}
|
||||
}
|
||||
|
||||
async fn build_pooled_connection(&self) -> Result<PooledConnection, SpacetimeClientError> {
|
||||
async fn build_pooled_connection(
|
||||
&self,
|
||||
operation_timeout: Duration,
|
||||
) -> Result<PooledConnection, SpacetimeStageError> {
|
||||
let config = self.config.clone();
|
||||
let broken = Arc::new(AtomicBool::new(false));
|
||||
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
||||
@@ -510,7 +727,7 @@ impl SpacetimeClient {
|
||||
let broken_flag = broken.clone();
|
||||
let disconnect_sender = connect_sender.clone();
|
||||
let connection = timeout(
|
||||
self.config.procedure_timeout,
|
||||
operation_timeout,
|
||||
tokio::task::spawn_blocking(move || {
|
||||
DbConnection::builder()
|
||||
.with_uri(config.server_url)
|
||||
@@ -534,17 +751,41 @@ impl SpacetimeClient {
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|error| SpacetimeClientError::Runtime(error.to_string()))??;
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ConnectBuild,
|
||||
SpacetimeClientError::Timeout,
|
||||
)
|
||||
})?
|
||||
.map_err(|error| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ConnectBuild,
|
||||
SpacetimeClientError::Runtime(error.to_string()),
|
||||
)
|
||||
})?
|
||||
.map_err(|error| SpacetimeStageError::new(SpacetimeClientStage::ConnectBuild, error))?;
|
||||
|
||||
let runner = connection.run_threaded();
|
||||
timeout(self.config.procedure_timeout, receiver)
|
||||
timeout(operation_timeout, receiver)
|
||||
.await
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ConnectHandshake,
|
||||
SpacetimeClientError::Timeout,
|
||||
)
|
||||
})?
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ConnectHandshake,
|
||||
SpacetimeClientError::ConnectDropped,
|
||||
)
|
||||
})?
|
||||
.map_err(|error| {
|
||||
SpacetimeStageError::new(SpacetimeClientStage::ConnectHandshake, error)
|
||||
})?;
|
||||
|
||||
let read_model_subscriptions = self
|
||||
.subscribe_cached_read_models(&connection, broken.clone())
|
||||
.subscribe_cached_read_models(&connection, broken.clone(), operation_timeout)
|
||||
.await?;
|
||||
|
||||
Ok(PooledConnection {
|
||||
@@ -559,7 +800,8 @@ impl SpacetimeClient {
|
||||
&self,
|
||||
connection: &DbConnection,
|
||||
broken: Arc<AtomicBool>,
|
||||
) -> Result<Vec<SubscriptionHandle>, SpacetimeClientError> {
|
||||
operation_timeout: Duration,
|
||||
) -> Result<Vec<SubscriptionHandle>, SpacetimeStageError> {
|
||||
let mut subscriptions = Vec::new();
|
||||
for query in [
|
||||
"SELECT * FROM public_work_gallery_entry",
|
||||
@@ -576,7 +818,13 @@ impl SpacetimeClient {
|
||||
"SELECT * FROM big_fish_gallery_view",
|
||||
] {
|
||||
let subscription = self
|
||||
.subscribe_cached_read_model_query(connection, broken.clone(), query, true)
|
||||
.subscribe_cached_read_model_query(
|
||||
connection,
|
||||
broken.clone(),
|
||||
query,
|
||||
true,
|
||||
operation_timeout,
|
||||
)
|
||||
.await?;
|
||||
subscriptions.push(subscription);
|
||||
}
|
||||
@@ -597,7 +845,13 @@ impl SpacetimeClient {
|
||||
"SELECT * FROM asset_object",
|
||||
] {
|
||||
if let Ok(subscription) = self
|
||||
.subscribe_cached_read_model_query(connection, broken.clone(), query, false)
|
||||
.subscribe_cached_read_model_query(
|
||||
connection,
|
||||
broken.clone(),
|
||||
query,
|
||||
false,
|
||||
operation_timeout,
|
||||
)
|
||||
.await
|
||||
{
|
||||
subscriptions.push(subscription);
|
||||
@@ -613,7 +867,8 @@ impl SpacetimeClient {
|
||||
broken: Arc<AtomicBool>,
|
||||
query: &'static str,
|
||||
mark_broken_on_error: bool,
|
||||
) -> Result<SubscriptionHandle, SpacetimeClientError> {
|
||||
operation_timeout: Duration,
|
||||
) -> Result<SubscriptionHandle, SpacetimeStageError> {
|
||||
let (sender, receiver) = oneshot::channel::<Result<(), SpacetimeClientError>>();
|
||||
let applied_sender = Arc::new(Mutex::new(Some(sender)));
|
||||
let on_applied_sender = applied_sender.clone();
|
||||
@@ -635,10 +890,23 @@ impl SpacetimeClient {
|
||||
})
|
||||
.subscribe(query);
|
||||
|
||||
timeout(self.config.procedure_timeout, receiver)
|
||||
timeout(operation_timeout, receiver)
|
||||
.await
|
||||
.map_err(|_| SpacetimeClientError::Timeout)?
|
||||
.map_err(|_| SpacetimeClientError::ConnectDropped)??;
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ReadModelSubscribe,
|
||||
SpacetimeClientError::Timeout,
|
||||
)
|
||||
})?
|
||||
.map_err(|_| {
|
||||
SpacetimeStageError::new(
|
||||
SpacetimeClientStage::ReadModelSubscribe,
|
||||
SpacetimeClientError::ConnectDropped,
|
||||
)
|
||||
})?
|
||||
.map_err(|error| {
|
||||
SpacetimeStageError::new(SpacetimeClientStage::ReadModelSubscribe, error)
|
||||
})?;
|
||||
|
||||
Ok(subscription)
|
||||
}
|
||||
@@ -658,7 +926,10 @@ impl SpacetimeClient {
|
||||
}
|
||||
|
||||
// 超时后必须统一归还租约;若连接已先一步断开则回传断线,否则标记坏连接并回传超时。
|
||||
fn resolve_timeout_error(connection: Option<&PooledConnection>) -> SpacetimeClientError {
|
||||
fn resolve_timeout_error(
|
||||
connection: Option<&PooledConnection>,
|
||||
_stage: SpacetimeClientStage,
|
||||
) -> SpacetimeClientError {
|
||||
if let Some(connection) = connection {
|
||||
if connection.is_broken() {
|
||||
return SpacetimeClientError::ConnectDropped;
|
||||
@@ -681,6 +952,27 @@ fn current_public_work_day() -> i64 {
|
||||
current_unix_micros().div_euclid(PUBLIC_WORK_PLAY_DAY_MICROS)
|
||||
}
|
||||
|
||||
fn duration_millis_u64(duration: Duration) -> u64 {
|
||||
duration.as_millis().min(u64::MAX as u128) as u64
|
||||
}
|
||||
|
||||
fn log_spacetime_client_failure(
|
||||
operation_kind: &'static str,
|
||||
operation_name: &'static str,
|
||||
stage: SpacetimeClientStage,
|
||||
started_at: Instant,
|
||||
error: &SpacetimeClientError,
|
||||
) {
|
||||
warn!(
|
||||
operation_kind,
|
||||
operation_name,
|
||||
spacetime_stage = stage.as_str(),
|
||||
elapsed_ms = duration_millis_u64(started_at.elapsed()),
|
||||
error = %error,
|
||||
"SpacetimeDB client operation failed"
|
||||
);
|
||||
}
|
||||
|
||||
fn public_work_recent_play_counts(
|
||||
connection: &DbConnection,
|
||||
source_type: &str,
|
||||
|
||||
Reference in New Issue
Block a user