LLM应用上线只是开始。生产中的大模型应用是一个黑箱——你不知道它为什么返回了那个答案也不知道哪个请求花了多少钱。本文系统讲解AI应用可观测性的完整工程体系。为什么AI应用的可观测性特别难传统服务监控有成熟的套路QPS、错误率、延迟P99、CPU/内存。这些对LLM应用同样适用但还远远不够。LLM应用独有的观测挑战1.输出不确定性同样的输入可能给出不同的输出如何判断这次输出是好的2.Token经济成本不是按请求数算而是按Token数算每个请求的成本差异可以达到100倍3.幻觉难检测模型给出了自信满满的错误答案传统监控根本发现不了4.多跳追踪一个用户请求可能触发5次LLM调用3次工具调用如何串联追踪5.Prompt漂移Prompt的微小改动可能导致输出质量的系统性变化—## 一、LLM应用可观测性的三层架构第三层业务指标├── 用户满意度点赞/点踩├── 任务完成率├── 用户留存/转化└── 每用户收益第二层AI质量指标├── 答案相关性Relevance├── 忠实度Faithfulness防幻觉├── 上下文利用率└── 拒绝率模型拒绝回答的比例第一层系统指标├── 请求延迟TTFT/TPOT/总时长├── Token消耗输入/输出/缓存命中├── 错误率超时/API错误/解析失败└── 成本每请求/每用户/每天—## 二、LLM Tracing追踪每一次AI调用使用OpenTelemetry标准对LLM应用做全链路追踪pythonfrom opentelemetry import tracefrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporterimport timeimport tiktoken# 初始化Tracerprovider TracerProvider()exporter OTLPSpanExporter(endpointhttp://jaeger:4317)provider.add_span_processor(BatchSpanProcessor(exporter))trace.set_tracer_provider(provider)tracer trace.get_tracer(llm.application)# Token计数工具def count_tokens(text: str, model: str gpt-4o) - int: encoding tiktoken.encoding_for_model(model) return len(encoding.encode(text))class TracedLLMClient: 带完整链路追踪的LLM客户端 def __init__(self, client, model: str): self.client client self.model model def chat(self, messages: list, session_id: str None, **kwargs) - str: with tracer.start_as_current_span(fllm.chat.{self.model}) as span: # 记录请求元数据 span.set_attribute(llm.model, self.model) span.set_attribute(llm.session_id, session_id or unknown) span.set_attribute(llm.messages.count, len(messages)) # 计算输入Token input_text .join([m[content] for m in messages if isinstance(m.get(content), str)]) input_tokens count_tokens(input_text, self.model) span.set_attribute(llm.usage.input_tokens, input_tokens) # 记录第一个用户消息便于调试 user_messages [m[content] for m in messages if m[role] user] if user_messages: span.set_attribute(llm.user_query, user_messages[-1][:500]) start_time time.time() try: response self.client.chat.completions.create( modelself.model, messagesmessages, **kwargs ) # 记录响应指标 duration time.time() - start_time output_text response.choices[0].message.content output_tokens response.usage.completion_tokens span.set_attribute(llm.usage.output_tokens, output_tokens) span.set_attribute(llm.usage.total_tokens, response.usage.total_tokens) span.set_attribute(llm.duration_seconds, duration) span.set_attribute(llm.tokens_per_second, output_tokens / duration if duration 0 else 0) span.set_attribute(llm.finish_reason, response.choices[0].finish_reason) # 估算成本 cost self.estimate_cost(response.usage.prompt_tokens, output_tokens) span.set_attribute(llm.estimated_cost_usd, cost) return output_text except Exception as e: span.set_attribute(llm.error, str(e)) span.set_attribute(llm.error_type, type(e).__name__) span.record_exception(e) raise def estimate_cost(self, input_tokens: int, output_tokens: int) - float: # 2026年参考价格 price_table { gpt-4o: {input: 0.0000025, output: 0.00001}, gpt-4o-mini: {input: 0.00000015, output: 0.0000006}, deepseek-chat: {input: 0.00000027, output: 0.0000011}, } prices price_table.get(self.model, {input: 0.000001, output: 0.000003}) return input_tokens * prices[input] output_tokens * prices[output]—## 三、TTFT vs TPOTLLM延迟的精细化度量LLM的延迟不是一个单一指标需要拆分为三个关键指标pythonimport asyncioimport timeclass StreamingLatencyTracker: 精细化追踪LLM流式响应的延迟 def __init__(self): self.request_start None self.first_token_time None self.last_token_time None self.token_count 0 self.inter_token_times [] async def track_streaming_response(self, stream): self.request_start time.perf_counter() async for chunk in stream: if chunk.choices[0].delta.content: current_time time.perf_counter() if self.first_token_time is None: self.first_token_time current_time ttft current_time - self.request_start # TTFT: Time to First Token从请求发出到收到第一个Token print(fTTFT: {ttft*1000:.1f}ms) else: # TPOT: Time Per Output TokenToken间隔 self.inter_token_times.append(current_time - self.last_token_time) self.last_token_time current_time self.token_count 1 yield chunk.choices[0].delta.content # 计算汇总指标 total_time self.last_token_time - self.request_start ttft self.first_token_time - self.request_start avg_tpot sum(self.inter_token_times) / len(self.inter_token_times) if self.inter_token_times else 0 throughput self.token_count / total_time if total_time 0 else 0 return { ttft_ms: ttft * 1000, # 首Token延迟影响感知响应速度 avg_tpot_ms: avg_tpot * 1000, # 平均Token间隔影响流式体验 total_time_s: total_time, # 总耗时 throughput_tps: throughput # 吞吐Token/秒 }# 行业参考值2026年优质服务基准LATENCY_BENCHMARKS { excellent: {ttft: 200, tpot: 30}, # ms good: {ttft: 500, tpot: 50}, acceptable: {ttft: 1000, tpot: 100}, poor: {ttft: 2000, tpot: 200}}—## 四、质量监控自动检测幻觉和低质量输出pythonfrom langchain_openai import ChatOpenAIfrom pydantic import BaseModelfrom typing import Optionalclass QualityScore(BaseModel): relevance: float # 0-1答案与问题相关性 faithfulness: float # 0-1答案是否基于上下文防幻觉 completeness: float # 0-1答案是否完整回答了问题 issues: list[str] # 发现的问题列表class LLMQualityMonitor: 生产环境LLM输出质量监控器 def __init__(self, judge_modelgpt-4o-mini): self.judge ChatOpenAI(modeljudge_model) self.alert_threshold 0.6 self.sample_rate 0.1 # 对10%的请求做质量评估全量评估太贵 def should_evaluate(self) - bool: import random return random.random() self.sample_rate def evaluate_response( self, question: str, answer: str, context: Optional[str] None ) - QualityScore: eval_prompt f 评估以下AI回答的质量。只返回JSON格式。 问题{question} {上下文参考文档 context if context else } AI回答{answer} 评分标准0-1 - relevance回答是否回答了问题 - faithfulness回答是否有超出上下文的捏造内容无上下文时评估事实准确性 - completeness回答是否完整是否遗漏重要信息 - issues列出发现的具体问题空列表如果没有问题 返回格式{{relevance: 0.9, faithfulness: 0.8, completeness: 0.7, issues: []}} result self.judge.invoke(eval_prompt).content import json scores json.loads(result) return QualityScore(**scores) def monitor_and_alert(self, question, answer, contextNone): if not self.should_evaluate(): return score self.evaluate_response(question, answer, context) # 上报到监控系统 metrics.gauge(llm.quality.relevance, score.relevance) metrics.gauge(llm.quality.faithfulness, score.faithfulness) metrics.gauge(llm.quality.completeness, score.completeness) # 低质量告警 if score.faithfulness self.alert_threshold: alert_manager.send( severitywarning, messagef潜在幻觉检测, details{ question: question[:200], answer: answer[:200], faithfulness_score: score.faithfulness, issues: score.issues } ) return score—## 五、成本监控与预算控制pythonimport redisfrom datetime import datetime, timedeltaclass TokenBudgetManager: Token预算管理器防止成本爆炸 def __init__(self, redis_client: redis.Redis): self.redis redis_client # 各维度成本限制美元 self.limits { user_daily: 1.0, # 单用户每日上限 user_monthly: 20.0, # 单用户每月上限 global_daily: 500.0, # 全局每日上限 global_monthly: 10000.0 # 全局每月上限 } def record_cost(self, user_id: str, cost_usd: float, tokens: int): 记录Token消耗和成本 today datetime.now().strftime(%Y-%m-%d) month datetime.now().strftime(%Y-%m) pipe self.redis.pipeline() # 更新用户日/月成本 pipe.incrbyfloat(fcost:user:{user_id}:daily:{today}, cost_usd) pipe.incrbyfloat(fcost:user:{user_id}:monthly:{month}, cost_usd) pipe.expire(fcost:user:{user_id}:daily:{today}, 86400 * 2) # 更新全局成本 pipe.incrbyfloat(fcost:global:daily:{today}, cost_usd) pipe.incrbyfloat(fcost:global:monthly:{month}, cost_usd) pipe.execute() def check_budget(self, user_id: str, estimated_cost: float) - tuple[bool, str]: 检查是否超出预算 today datetime.now().strftime(%Y-%m-%d) month datetime.now().strftime(%Y-%m) user_daily float(self.redis.get(fcost:user:{user_id}:daily:{today}) or 0) user_monthly float(self.redis.get(fcost:user:{user_id}:monthly:{month}) or 0) global_daily float(self.redis.get(fcost:global:daily:{today}) or 0) if user_daily estimated_cost self.limits[user_daily]: return False, f用户今日Token使用已达上限已用${user_daily:.2f} if user_monthly estimated_cost self.limits[user_monthly]: return False, f用户本月Token使用已达上限已用${user_monthly:.2f} if global_daily estimated_cost self.limits[global_daily] * 0.9: # 80%时告警90%时限流 if global_daily estimated_cost self.limits[global_daily]: return False, 全局每日预算已达上限服务暂时限流 return True, OK—## 六、LangFuseAI原生可观测平台实战pythonfrom langfuse import Langfusefrom langfuse.decorators import observe, langfuse_contextlangfuse Langfuse( public_keypk-xxx, secret_keysk-xxx, hosthttps://cloud.langfuse.com)observe(namerag_pipeline)def rag_answer(question: str, user_id: str) - str: # 记录用户信息到当前trace langfuse_context.update_current_trace( user_iduser_id, tags[rag, production], metadata{question_length: len(question)} ) # 检索步骤 with langfuse_context.observe_openai(nameretrieval): docs retriever.retrieve(question) langfuse_context.update_current_observation( metadata{retrieved_docs: len(docs)} ) # 生成步骤 context \n.join([d.page_content for d in docs]) observe(namellm_generation) def generate(q, ctx): return llm_client.chat([ {role: system, content: f基于以下上下文回答\n{ctx}}, {role: user, content: q} ]) answer generate(question, context) # 记录用于后续评估 langfuse_context.update_current_trace( outputanswer, metadata{context_length: len(context)} ) return answer# 收集用户反馈def record_user_feedback(trace_id: str, positive: bool, comment: str None): langfuse.score( trace_idtrace_id, nameuser_feedback, value1.0 if positive else 0.0, commentcomment )—## 总结AI应用可观测性建设路线第一阶段立即做- 接入Token计数和成本追踪- 记录每次请求的延迟TTFT/总时长- 建立错误率告警第二阶段1个月内- 接入LangFuse或类似工具做AI原生追踪- 实现抽样质量评估相关性忠实度- 收集用户反馈信号点赞/点踩第三阶段3个月内- 构建离线评估数据集Golden Set- 实现回归测试Prompt变更前后质量对比- 幻觉检测和自动告警- 成本预算管理系统可观测性不是LLM应用的奢侈品是生产可靠性的必要条件。没有监控的AI应用是在黑暗中飞行。