proxy, ui: add pending requests count to the main dashboard (#516)
add a real time counter of pending (inflight) requests to the UI.
This commit is contained in:
@@ -8,6 +8,7 @@ const ConfigFileChangedEventID = 0x03
|
||||
const LogDataEventID = 0x04
|
||||
const TokenMetricsEventID = 0x05
|
||||
const ModelPreloadedEventID = 0x06
|
||||
const InFlightRequestsEventID = 0x07
|
||||
|
||||
type ProcessStateChangeEvent struct {
|
||||
ProcessName string
|
||||
@@ -58,3 +59,11 @@ type ModelPreloadedEvent struct {
|
||||
func (e ModelPreloadedEvent) Type() uint32 {
|
||||
return ModelPreloadedEventID
|
||||
}
|
||||
|
||||
type InFlightRequestsEvent struct {
|
||||
Total int
|
||||
}
|
||||
|
||||
func (e InFlightRequestsEvent) Type() uint32 {
|
||||
return InFlightRequestsEventID
|
||||
}
|
||||
|
||||
+65
-19
@@ -28,6 +28,40 @@ const (
|
||||
|
||||
type proxyCtxKey string
|
||||
|
||||
type InflightCounter struct {
|
||||
mu sync.Mutex
|
||||
total int
|
||||
}
|
||||
|
||||
func newInflightCounter() *InflightCounter {
|
||||
return &InflightCounter{}
|
||||
}
|
||||
|
||||
func (ic *InflightCounter) Current() int {
|
||||
ic.mu.Lock()
|
||||
total := ic.total
|
||||
ic.mu.Unlock()
|
||||
return total
|
||||
}
|
||||
|
||||
func (ic *InflightCounter) Increment() int {
|
||||
ic.mu.Lock()
|
||||
ic.total++
|
||||
total := ic.total
|
||||
ic.mu.Unlock()
|
||||
return total
|
||||
}
|
||||
|
||||
func (ic *InflightCounter) Decrement() int {
|
||||
ic.mu.Lock()
|
||||
if ic.total > 0 {
|
||||
ic.total--
|
||||
}
|
||||
total := ic.total
|
||||
ic.mu.Unlock()
|
||||
return total
|
||||
}
|
||||
|
||||
type ProxyManager struct {
|
||||
sync.Mutex
|
||||
|
||||
@@ -43,6 +77,8 @@ type ProxyManager struct {
|
||||
|
||||
processGroups map[string]*ProcessGroup
|
||||
|
||||
inFlightCounter *InflightCounter
|
||||
|
||||
// shutdown signaling
|
||||
shutdownCtx context.Context
|
||||
shutdownCancel context.CancelFunc
|
||||
@@ -155,6 +191,8 @@ func New(proxyConfig config.Config) *ProxyManager {
|
||||
|
||||
processGroups: make(map[string]*ProcessGroup),
|
||||
|
||||
inFlightCounter: newInflightCounter(),
|
||||
|
||||
shutdownCtx: shutdownCtx,
|
||||
shutdownCancel: shutdownCancel,
|
||||
|
||||
@@ -276,37 +314,37 @@ func (pm *ProxyManager) setupGinEngine() {
|
||||
|
||||
// Set up routes using the Gin engine
|
||||
// Protected routes use pm.apiKeyAuth() middleware
|
||||
pm.ginEngine.POST("/v1/chat/completions", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/responses", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/chat/completions", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/responses", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
// Support legacy /v1/completions api, see issue #12
|
||||
pm.ginEngine.POST("/v1/completions", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/completions", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
// Support anthropic /v1/messages (added https://github.com/ggml-org/llama.cpp/pull/17570)
|
||||
pm.ginEngine.POST("/v1/messages", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/messages", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
// Support anthropic count_tokens API (Also added in the above PR)
|
||||
pm.ginEngine.POST("/v1/messages/count_tokens", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/messages/count_tokens", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
|
||||
// Support embeddings and reranking
|
||||
pm.ginEngine.POST("/v1/embeddings", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/embeddings", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
|
||||
// llama-server's /reranking endpoint + aliases
|
||||
pm.ginEngine.POST("/reranking", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/rerank", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/rerank", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/reranking", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/reranking", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/rerank", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/rerank", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/reranking", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
|
||||
// llama-server's /infill endpoint for code infilling
|
||||
pm.ginEngine.POST("/infill", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/infill", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
|
||||
// llama-server's /completion endpoint
|
||||
pm.ginEngine.POST("/completion", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/completion", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
|
||||
// Support audio/speech endpoint
|
||||
pm.ginEngine.POST("/v1/audio/speech", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/audio/voices", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.GET("/v1/audio/voices", pm.apiKeyAuth(), pm.proxyGETModelHandler)
|
||||
pm.ginEngine.POST("/v1/audio/transcriptions", pm.apiKeyAuth(), pm.proxyOAIPostFormHandler)
|
||||
pm.ginEngine.POST("/v1/images/generations", pm.apiKeyAuth(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/images/edits", pm.apiKeyAuth(), pm.proxyOAIPostFormHandler)
|
||||
pm.ginEngine.POST("/v1/audio/speech", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/audio/voices", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.GET("/v1/audio/voices", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyGETModelHandler)
|
||||
pm.ginEngine.POST("/v1/audio/transcriptions", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyOAIPostFormHandler)
|
||||
pm.ginEngine.POST("/v1/images/generations", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyInferenceHandler)
|
||||
pm.ginEngine.POST("/v1/images/edits", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyOAIPostFormHandler)
|
||||
|
||||
pm.ginEngine.GET("/v1/models", pm.apiKeyAuth(), pm.listModelsHandler)
|
||||
|
||||
@@ -325,7 +363,7 @@ func (pm *ProxyManager) setupGinEngine() {
|
||||
pm.ginEngine.GET("/upstream", func(c *gin.Context) {
|
||||
c.Redirect(http.StatusFound, "/ui/models")
|
||||
})
|
||||
pm.ginEngine.Any("/upstream/*upstreamPath", pm.apiKeyAuth(), pm.proxyToUpstream)
|
||||
pm.ginEngine.Any("/upstream/*upstreamPath", pm.apiKeyAuth(), pm.trackInflight(), pm.proxyToUpstream)
|
||||
pm.ginEngine.GET("/unload", pm.apiKeyAuth(), pm.unloadAllModelsHandler)
|
||||
pm.ginEngine.GET("/running", pm.apiKeyAuth(), pm.listRunningProcessesHandler)
|
||||
pm.ginEngine.GET("/health", func(c *gin.Context) {
|
||||
@@ -389,6 +427,14 @@ func (pm *ProxyManager) setupGinEngine() {
|
||||
gin.DisableConsoleColor()
|
||||
}
|
||||
|
||||
func (pm *ProxyManager) trackInflight() gin.HandlerFunc {
|
||||
return func(c *gin.Context) {
|
||||
event.Emit(InFlightRequestsEvent{Total: pm.inFlightCounter.Increment()})
|
||||
defer event.Emit(InFlightRequestsEvent{Total: pm.inFlightCounter.Decrement()})
|
||||
c.Next()
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP implements http.Handler interface
|
||||
func (pm *ProxyManager) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
pm.ginEngine.ServeHTTP(w, r)
|
||||
|
||||
@@ -107,6 +107,7 @@ const (
|
||||
msgTypeModelStatus messageType = "modelStatus"
|
||||
msgTypeLogData messageType = "logData"
|
||||
msgTypeMetrics messageType = "metrics"
|
||||
msgTypeInFlight messageType = "inflight"
|
||||
)
|
||||
|
||||
type messageEnvelope struct {
|
||||
@@ -166,6 +167,18 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
sendInFlight := func(total int) {
|
||||
jsonData, err := json.Marshal(gin.H{"total": total})
|
||||
if err == nil {
|
||||
select {
|
||||
case sendBuffer <- messageEnvelope{Type: msgTypeInFlight, Data: string(jsonData)}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Send updated models list
|
||||
*/
|
||||
@@ -193,11 +206,19 @@ func (pm *ProxyManager) apiSendEvents(c *gin.Context) {
|
||||
sendMetrics([]TokenMetrics{e.Metrics})
|
||||
})()
|
||||
|
||||
/**
|
||||
* Send in-flight request stats related to token stats "Waiting: N" count.
|
||||
*/
|
||||
defer event.On(func(e InFlightRequestsEvent) {
|
||||
sendInFlight(e.Total)
|
||||
})()
|
||||
|
||||
// send initial batch of data
|
||||
sendLogData("proxy", pm.proxyLogger.GetHistory())
|
||||
sendLogData("upstream", pm.upstreamLogger.GetHistory())
|
||||
sendModels()
|
||||
sendMetrics(pm.metricsMonitor.getMetrics())
|
||||
sendInFlight(pm.inFlightCounter.Current())
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
<script lang="ts">
|
||||
import { metrics } from "../stores/api";
|
||||
import { inFlightRequests, metrics } from "../stores/api";
|
||||
import TokenHistogram from "./TokenHistogram.svelte";
|
||||
|
||||
interface HistogramData {
|
||||
@@ -15,7 +15,14 @@
|
||||
let stats = $derived.by(() => {
|
||||
const totalRequests = $metrics.length;
|
||||
if (totalRequests === 0) {
|
||||
return { totalRequests: 0, totalInputTokens: 0, totalOutputTokens: 0, tokenStats: { p99: "0", p95: "0", p50: "0" }, histogramData: null };
|
||||
return {
|
||||
totalRequests: 0,
|
||||
totalInputTokens: 0,
|
||||
totalOutputTokens: 0,
|
||||
inFlightRequests: $inFlightRequests,
|
||||
tokenStats: { p99: "0", p95: "0", p50: "0" },
|
||||
histogramData: null,
|
||||
};
|
||||
}
|
||||
|
||||
const totalInputTokens = $metrics.reduce((sum, m) => sum + m.input_tokens, 0);
|
||||
@@ -24,7 +31,14 @@
|
||||
// Calculate token statistics using output_tokens and duration_ms
|
||||
const validMetrics = $metrics.filter((m) => m.duration_ms > 0 && m.output_tokens > 0);
|
||||
if (validMetrics.length === 0) {
|
||||
return { totalRequests, totalInputTokens, totalOutputTokens, tokenStats: { p99: "0", p95: "0", p50: "0" }, histogramData: null };
|
||||
return {
|
||||
totalRequests,
|
||||
totalInputTokens,
|
||||
totalOutputTokens,
|
||||
inFlightRequests: $inFlightRequests,
|
||||
tokenStats: { p99: "0", p95: "0", p50: "0" },
|
||||
histogramData: null,
|
||||
};
|
||||
}
|
||||
|
||||
// Calculate tokens/second for each valid metric
|
||||
@@ -63,6 +77,7 @@
|
||||
totalRequests,
|
||||
totalInputTokens,
|
||||
totalOutputTokens,
|
||||
inFlightRequests: $inFlightRequests,
|
||||
tokenStats: {
|
||||
p99: p99.toFixed(2),
|
||||
p95: p95.toFixed(2),
|
||||
@@ -95,7 +110,12 @@
|
||||
|
||||
<tbody class="bg-surface divide-y divide-card-border-inner">
|
||||
<tr class="hover:bg-secondary">
|
||||
<td class="px-4 py-4 text-sm font-semibold text-gray-900 dark:text-white">{stats.totalRequests}</td>
|
||||
<td class="px-4 py-4 text-sm font-semibold text-gray-900 dark:text-white">
|
||||
<div class="flex flex-col gap-1">
|
||||
<span class="text-xs font-medium text-gray-500 dark:text-gray-400">Completed: {nf.format(stats.totalRequests)}</span>
|
||||
<span class="text-xs font-medium text-gray-500 dark:text-gray-400">Waiting: {nf.format(stats.inFlightRequests)}</span>
|
||||
</div>
|
||||
</td>
|
||||
|
||||
<td class="px-4 py-4 text-sm text-gray-700 dark:text-gray-300 border-l border-gray-200 dark:border-white/10">
|
||||
<div class="flex items-center gap-2">
|
||||
|
||||
@@ -38,8 +38,12 @@ export interface LogData {
|
||||
data: string;
|
||||
}
|
||||
|
||||
export interface InFlightStats {
|
||||
total: number;
|
||||
}
|
||||
|
||||
export interface APIEventEnvelope {
|
||||
type: "modelStatus" | "logData" | "metrics";
|
||||
type: "modelStatus" | "logData" | "metrics" | "inflight";
|
||||
data: string;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { writable } from "svelte/store";
|
||||
import type { Model, Metrics, VersionInfo, LogData, APIEventEnvelope, ReqRespCapture } from "../lib/types";
|
||||
import type { Model, Metrics, VersionInfo, LogData, APIEventEnvelope, ReqRespCapture, InFlightStats } from "../lib/types";
|
||||
import { connectionState } from "./theme";
|
||||
|
||||
const LOG_LENGTH_LIMIT = 1024 * 100; /* 100KB of log data */
|
||||
@@ -9,6 +9,7 @@ export const models = writable<Model[]>([]);
|
||||
export const proxyLogs = writable<string>("");
|
||||
export const upstreamLogs = writable<string>("");
|
||||
export const metrics = writable<Metrics[]>([]);
|
||||
export const inFlightRequests = writable<number>(0);
|
||||
export const versionInfo = writable<VersionInfo>({
|
||||
build_date: "unknown",
|
||||
commit: "unknown",
|
||||
@@ -29,6 +30,7 @@ export function enableAPIEvents(enabled: boolean): void {
|
||||
apiEventSource?.close();
|
||||
apiEventSource = null;
|
||||
metrics.set([]);
|
||||
inFlightRequests.set(0);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -46,6 +48,7 @@ export function enableAPIEvents(enabled: boolean): void {
|
||||
proxyLogs.set("");
|
||||
upstreamLogs.set("");
|
||||
metrics.set([]);
|
||||
inFlightRequests.set(0);
|
||||
models.set([]);
|
||||
retryCount = 0;
|
||||
connectionState.set("connected");
|
||||
@@ -83,6 +86,11 @@ export function enableAPIEvents(enabled: boolean): void {
|
||||
metrics.update((prevMetrics) => [...newMetrics, ...prevMetrics]);
|
||||
break;
|
||||
}
|
||||
case "inflight": {
|
||||
const stats = JSON.parse(message.data) as InFlightStats;
|
||||
inFlightRequests.set(stats.total ?? 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(e.data, err);
|
||||
|
||||
Reference in New Issue
Block a user