Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5fad24c16f | |||
| 8404244fab | |||
| 712cd01081 | |||
| 1f7aa359b1 | |||
| b138d6cf25 | |||
| fb7c808082 | |||
| a7e640b0f7 | |||
| 593604dfdc | |||
| b8f888f864 | |||
| 192b2ae621 | |||
| b7f8cb5094 | |||
| a23da6eb57 | |||
| 4c3aa40564 | |||
| 84e2c07a7e | |||
| 680af28bcc | |||
| d94db42ffe | |||
| 93cd83c55c | |||
| 5565fca3ac | |||
| d625ab8d92 | |||
| a3f82c140b |
@@ -16,3 +16,15 @@ builds:
|
|||||||
goarch: arm64
|
goarch: arm64
|
||||||
- goos: windows
|
- goos: windows
|
||||||
goarch: arm64
|
goarch: arm64
|
||||||
|
|
||||||
|
# use zip format for windows
|
||||||
|
archives:
|
||||||
|
- id: default
|
||||||
|
format: tar.gz
|
||||||
|
name_template: "{{ .ProjectName }}_{{ .Version }}_{{ .Os }}_{{ .Arch }}"
|
||||||
|
builds_info:
|
||||||
|
group: root
|
||||||
|
owner: root
|
||||||
|
format_overrides:
|
||||||
|
- goos: windows
|
||||||
|
format: zip
|
||||||
@@ -1,4 +1,7 @@
|
|||||||

|

|
||||||
|

|
||||||
|

|
||||||
|

|
||||||
|
|
||||||
# llama-swap
|
# llama-swap
|
||||||
|
|
||||||
@@ -64,13 +67,20 @@ models:
|
|||||||
# Default (and minimum) is 15 seconds
|
# Default (and minimum) is 15 seconds
|
||||||
healthCheckTimeout: 60
|
healthCheckTimeout: 60
|
||||||
|
|
||||||
# Write HTTP logs (useful for troubleshooting), defaults to false
|
# Valid log levels: debug, info (default), warn, error
|
||||||
logRequests: true
|
logLevel: info
|
||||||
|
|
||||||
# define valid model values and the upstream server start
|
# define valid model values and the upstream server start
|
||||||
models:
|
models:
|
||||||
"llama":
|
"llama":
|
||||||
cmd: llama-server --port 8999 -m Llama-3.2-1B-Instruct-Q4_K_M.gguf
|
# multiline for readability
|
||||||
|
cmd: >
|
||||||
|
llama-server --port 8999
|
||||||
|
--model path/to/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
|
||||||
|
|
||||||
|
# environment variables to pass to the command
|
||||||
|
env:
|
||||||
|
- "CUDA_VISIBLE_DEVICES=0"
|
||||||
|
|
||||||
# where to reach the server started by cmd, make sure the ports match
|
# where to reach the server started by cmd, make sure the ports match
|
||||||
proxy: http://127.0.0.1:8999
|
proxy: http://127.0.0.1:8999
|
||||||
@@ -91,16 +101,9 @@ models:
|
|||||||
# default: 0 = never unload model
|
# default: 0 = never unload model
|
||||||
ttl: 60
|
ttl: 60
|
||||||
|
|
||||||
"qwen":
|
# `useModelName` overrides the model name in the request
|
||||||
# environment variables to pass to the command
|
# and sends a specific name to the upstream server
|
||||||
env:
|
useModelName: "qwen:qwq"
|
||||||
- "CUDA_VISIBLE_DEVICES=0"
|
|
||||||
|
|
||||||
# multiline for readability
|
|
||||||
cmd: >
|
|
||||||
llama-server --port 8999
|
|
||||||
--model path/to/Qwen2.5-1.5B-Instruct-Q4_K_M.gguf
|
|
||||||
proxy: http://127.0.0.1:8999
|
|
||||||
|
|
||||||
# unlisted models do not show up in /v1/models or /upstream lists
|
# unlisted models do not show up in /v1/models or /upstream lists
|
||||||
# but they can still be requested as normal
|
# but they can still be requested as normal
|
||||||
@@ -117,14 +120,7 @@ models:
|
|||||||
ghcr.io/ggerganov/llama.cpp:server
|
ghcr.io/ggerganov/llama.cpp:server
|
||||||
--model '/models/Qwen2.5-Coder-0.5B-Instruct-Q4_K_M.gguf'
|
--model '/models/Qwen2.5-Coder-0.5B-Instruct-Q4_K_M.gguf'
|
||||||
|
|
||||||
# `useModelName` will send a specific model name to the upstream server
|
# profiles eliminates swapping by running multiple models at the same time
|
||||||
# overriding whatever was set in the request
|
|
||||||
"qwq":
|
|
||||||
proxy: http://127.0.0.1:11434
|
|
||||||
cmd: my-server
|
|
||||||
useModelName: "qwen:qwq"
|
|
||||||
|
|
||||||
# profiles make it easy to managing multi model (and gpu) configurations.
|
|
||||||
#
|
#
|
||||||
# Tips:
|
# Tips:
|
||||||
# - each model must be listening on a unique address and port
|
# - each model must be listening on a unique address and port
|
||||||
@@ -132,8 +128,8 @@ models:
|
|||||||
# - the profile will load and unload all models in the profile at the same time
|
# - the profile will load and unload all models in the profile at the same time
|
||||||
profiles:
|
profiles:
|
||||||
coding:
|
coding:
|
||||||
- "qwen"
|
|
||||||
- "llama"
|
- "llama"
|
||||||
|
- "qwen-unlisted"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Use Case Examples
|
### Use Case Examples
|
||||||
@@ -223,9 +219,15 @@ Of course, CLI access is also supported:
|
|||||||
# sends up to the last 10KB of logs
|
# sends up to the last 10KB of logs
|
||||||
curl http://host/logs'
|
curl http://host/logs'
|
||||||
|
|
||||||
# streams logs
|
# streams combined logs
|
||||||
curl -Ns 'http://host/logs/stream'
|
curl -Ns 'http://host/logs/stream'
|
||||||
|
|
||||||
|
# just llama-swap's logs
|
||||||
|
curl -Ns 'http://host/logs/stream/proxy'
|
||||||
|
|
||||||
|
# just upstream's logs
|
||||||
|
curl -Ns 'http://host/logs/stream/upstream'
|
||||||
|
|
||||||
# stream and filter logs with linux pipes
|
# stream and filter logs with linux pipes
|
||||||
curl -Ns http://host/logs/stream | grep 'eval time'
|
curl -Ns http://host/logs/stream | grep 'eval time'
|
||||||
|
|
||||||
@@ -267,8 +269,4 @@ WantedBy=multi-user.target
|
|||||||
|
|
||||||
## Star History
|
## Star History
|
||||||
|
|
||||||
<picture>
|
[](https://www.star-history.com/#mostlygeek/llama-swap&Date)
|
||||||
<source media="(prefers-color-scheme: dark)" srcset="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date&theme=dark" />
|
|
||||||
<source media="(prefers-color-scheme: light)" srcset="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date" />
|
|
||||||
<img alt="Star History Chart" src="https://api.star-history.com/svg?repos=mostlygeek/llama-swap&type=Date" />
|
|
||||||
</picture>
|
|
||||||
|
|||||||
+3
-3
@@ -1,9 +1,9 @@
|
|||||||
# Seconds to wait for llama.cpp to be available to serve requests
|
# Seconds to wait for llama.cpp to be available to serve requests
|
||||||
# Default (and minimum): 15 seconds
|
# Default (and minimum): 15 seconds
|
||||||
healthCheckTimeout: 15
|
healthCheckTimeout: 90
|
||||||
|
|
||||||
# Log HTTP requests helpful for troubleshoot, defaults to False
|
# valid log levels: debug, info (default), warn, error
|
||||||
logRequests: true
|
logLevel: debug
|
||||||
|
|
||||||
models:
|
models:
|
||||||
"llama":
|
"llama":
|
||||||
|
|||||||
@@ -0,0 +1,153 @@
|
|||||||
|
# aider, QwQ, Qwen-Coder 2.5 and llama-swap
|
||||||
|
|
||||||
|
This guide show how to use aider and llama-swap to get a 100% local coding co-pilot setup. The focus is on the trickest part which is configuring aider, llama-swap and llama-server to work together.
|
||||||
|
|
||||||
|
## Here's what you you need:
|
||||||
|
|
||||||
|
- aider - [installation docs](https://aider.chat/docs/install.html)
|
||||||
|
- llama-server - [download latest release](https://github.com/ggml-org/llama.cpp/releases)
|
||||||
|
- llama-swap - [download latest release](https://github.com/mostlygeek/llama-swap/releases)
|
||||||
|
- [QwQ 32B](https://huggingface.co/bartowski/Qwen_QwQ-32B-GGUF) and [Qwen Coder 2.5 32B](https://huggingface.co/bartowski/Qwen2.5-Coder-32B-Instruct-GGUF) models
|
||||||
|
- 24GB VRAM video card
|
||||||
|
|
||||||
|
## Running aider
|
||||||
|
|
||||||
|
The goal is getting this command line to work:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
aider --architect \
|
||||||
|
--no-show-model-warnings \
|
||||||
|
--model openai/QwQ \
|
||||||
|
--editor-model openai/qwen-coder-32B \
|
||||||
|
--model-settings-file aider.model.settings.yml \
|
||||||
|
--openai-api-key "sk-na" \
|
||||||
|
--openai-api-base "http://10.0.1.24:8080/v1" \
|
||||||
|
```
|
||||||
|
|
||||||
|
Set `--openai-api-base` to the IP and port where your llama-swap is running.
|
||||||
|
|
||||||
|
## Create an aider model settings file
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# aider.model.settings.yml
|
||||||
|
|
||||||
|
#
|
||||||
|
# !!! important: model names must match llama-swap configuration names !!!
|
||||||
|
#
|
||||||
|
|
||||||
|
- name: "openai/QwQ"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.95
|
||||||
|
top_k: 40
|
||||||
|
presence_penalty: 0.1
|
||||||
|
repetition_penalty: 1
|
||||||
|
num_ctx: 16384
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
weak_model_name: "openai/qwen-coder-32B"
|
||||||
|
editor_model_name: "openai/qwen-coder-32B"
|
||||||
|
|
||||||
|
- name: "openai/qwen-coder-32B"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.8
|
||||||
|
top_k: 20
|
||||||
|
repetition_penalty: 1.05
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
editor_edit_format: editor-diff
|
||||||
|
editor_model_name: "openai/qwen-coder-32B"
|
||||||
|
```
|
||||||
|
|
||||||
|
## llama-swap configuration
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# config.yaml
|
||||||
|
|
||||||
|
# The parameters are tweaked to fit model+context into 24GB VRAM GPUs
|
||||||
|
models:
|
||||||
|
"qwen-coder-32B":
|
||||||
|
proxy: "http://127.0.0.1:8999"
|
||||||
|
cmd: >
|
||||||
|
/path/to/llama-server
|
||||||
|
--host 127.0.0.1 --port 8999 --flash-attn --slots
|
||||||
|
--ctx-size 16000
|
||||||
|
--cache-type-k q8_0 --cache-type-v q8_0
|
||||||
|
-ngl 99
|
||||||
|
--model /path/to/Qwen2.5-Coder-32B-Instruct-Q4_K_M.gguf
|
||||||
|
|
||||||
|
"QwQ":
|
||||||
|
proxy: "http://127.0.0.1:9503"
|
||||||
|
cmd: >
|
||||||
|
/path/to/llama-server
|
||||||
|
--host 127.0.0.1 --port 9503 --flash-attn --metrics--slots
|
||||||
|
--cache-type-k q8_0 --cache-type-v q8_0
|
||||||
|
--ctx-size 32000
|
||||||
|
--samplers "top_k;top_p;min_p;temperature;dry;typ_p;xtc"
|
||||||
|
--temp 0.6 --repeat-penalty 1.1 --dry-multiplier 0.5
|
||||||
|
--min-p 0.01 --top-k 40 --top-p 0.95
|
||||||
|
-ngl 99
|
||||||
|
--model /mnt/nvme/models/bartowski/Qwen_QwQ-32B-Q4_K_M.gguf
|
||||||
|
```
|
||||||
|
|
||||||
|
## Advanced, Dual GPU Configuration
|
||||||
|
|
||||||
|
If you have _dual 24GB GPUs_ you can use llama-swap profiles to avoid swapping between QwQ and Qwen Coder.
|
||||||
|
|
||||||
|
In llama-swap's configuration file:
|
||||||
|
|
||||||
|
1. add a `profiles` section with `aider` as the profile name
|
||||||
|
2. using the `env` field to specify the GPU IDs for each model
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# config.yaml
|
||||||
|
|
||||||
|
# Add a profile for aider
|
||||||
|
profiles:
|
||||||
|
aider:
|
||||||
|
- qwen-coder-32B
|
||||||
|
- QwQ
|
||||||
|
|
||||||
|
models:
|
||||||
|
"qwen-coder-32B":
|
||||||
|
# manually set the GPU to run on
|
||||||
|
env:
|
||||||
|
- "CUDA_VISIBLE_DEVICES=0"
|
||||||
|
proxy: "http://127.0.0.1:8999"
|
||||||
|
cmd: /path/to/llama-server ...
|
||||||
|
|
||||||
|
"QwQ":
|
||||||
|
# manually set the GPU to run on
|
||||||
|
env:
|
||||||
|
- "CUDA_VISIBLE_DEVICES=1"
|
||||||
|
proxy: "http://127.0.0.1:9503"
|
||||||
|
cmd: /path/to/llama-server ...
|
||||||
|
```
|
||||||
|
|
||||||
|
Append the profile tag, `aider:`, to the model names in the model settings file
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
# aider.model.settings.yml
|
||||||
|
- name: "openai/aider:QwQ"
|
||||||
|
weak_model_name: "openai/aider:qwen-coder-32B-aider"
|
||||||
|
editor_model_name: "openai/aider:qwen-coder-32B-aider"
|
||||||
|
|
||||||
|
- name: "openai/aider:qwen-coder-32B"
|
||||||
|
editor_model_name: "openai/aider:qwen-coder-32B-aider"
|
||||||
|
```
|
||||||
|
|
||||||
|
Run aider with:
|
||||||
|
|
||||||
|
```sh
|
||||||
|
$ aider --architect \
|
||||||
|
--no-show-model-warnings \
|
||||||
|
--model openai/aider:QwQ \
|
||||||
|
--editor-model openai/aider:qwen-coder-32B \
|
||||||
|
--config aider.conf.yml \
|
||||||
|
--model-settings-file aider.model.settings.yml
|
||||||
|
--openai-api-key "sk-na" \
|
||||||
|
--openai-api-base "http://10.0.1.24:8080/v1"
|
||||||
|
```
|
||||||
@@ -0,0 +1,28 @@
|
|||||||
|
# this makes use of llama-swap's profile feature to
|
||||||
|
# keep the architect and editor models in VRAM on different GPUs
|
||||||
|
|
||||||
|
- name: "openai/aider:QwQ"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.95
|
||||||
|
top_k: 40
|
||||||
|
presence_penalty: 0.1
|
||||||
|
repetition_penalty: 1
|
||||||
|
num_ctx: 16384
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
weak_model_name: "openai/aider:qwen-coder-32B"
|
||||||
|
editor_model_name: "openai/aider:qwen-coder-32B"
|
||||||
|
|
||||||
|
- name: "openai/aider:qwen-coder-32B"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.8
|
||||||
|
top_k: 20
|
||||||
|
repetition_penalty: 1.05
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
editor_edit_format: editor-diff
|
||||||
|
editor_model_name: "openai/aider:qwen-coder-32B"
|
||||||
@@ -0,0 +1,26 @@
|
|||||||
|
- name: "openai/QwQ"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.95
|
||||||
|
top_k: 40
|
||||||
|
presence_penalty: 0.1
|
||||||
|
repetition_penalty: 1
|
||||||
|
num_ctx: 16384
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
weak_model_name: "openai/qwen-coder-32B"
|
||||||
|
editor_model_name: "openai/qwen-coder-32B"
|
||||||
|
|
||||||
|
- name: "openai/qwen-coder-32B"
|
||||||
|
edit_format: diff
|
||||||
|
extra_params:
|
||||||
|
max_tokens: 16384
|
||||||
|
top_p: 0.8
|
||||||
|
top_k: 20
|
||||||
|
repetition_penalty: 1.05
|
||||||
|
use_temperature: 0.6
|
||||||
|
reasoning_tag: think
|
||||||
|
editor_edit_format: editor-diff
|
||||||
|
editor_model_name: "openai/qwen-coder-32B"
|
||||||
|
|
||||||
@@ -0,0 +1,49 @@
|
|||||||
|
healthCheckTimeout: 300
|
||||||
|
logLevel: debug
|
||||||
|
|
||||||
|
profiles:
|
||||||
|
aider:
|
||||||
|
- qwen-coder-32B
|
||||||
|
- QwQ
|
||||||
|
|
||||||
|
models:
|
||||||
|
"qwen-coder-32B":
|
||||||
|
env:
|
||||||
|
- "CUDA_VISIBLE_DEVICES=0"
|
||||||
|
aliases:
|
||||||
|
- coder
|
||||||
|
proxy: "http://127.0.0.1:8999"
|
||||||
|
|
||||||
|
# set appropriate paths for your environment
|
||||||
|
cmd: >
|
||||||
|
/path/to/llama-server
|
||||||
|
--host 127.0.0.1 --port 8999 --flash-attn --slots
|
||||||
|
--ctx-size 16000
|
||||||
|
--ctx-size-draft 16000
|
||||||
|
--model /path/to/Qwen2.5-Coder-32B-Instruct-Q4_K_M.gguf
|
||||||
|
--model-draft /path/to/Qwen2.5-Coder-1.5B-Instruct-Q8_0.gguf
|
||||||
|
-ngl 99 -ngld 99
|
||||||
|
--draft-max 16 --draft-min 4 --draft-p-min 0.4
|
||||||
|
--cache-type-k q8_0 --cache-type-v q8_0
|
||||||
|
"QwQ":
|
||||||
|
env:
|
||||||
|
- "CUDA_VISIBLE_DEVICES=1"
|
||||||
|
proxy: "http://127.0.0.1:9503"
|
||||||
|
|
||||||
|
# set appropriate paths for your environment
|
||||||
|
cmd: >
|
||||||
|
/path/to/llama-server
|
||||||
|
--host 127.0.0.1 --port 9503
|
||||||
|
--flash-attn --metrics
|
||||||
|
--slots
|
||||||
|
--model /path/to/Qwen_QwQ-32B-Q4_K_M.gguf
|
||||||
|
--cache-type-k q8_0 --cache-type-v q8_0
|
||||||
|
--ctx-size 32000
|
||||||
|
--samplers "top_k;top_p;min_p;temperature;dry;typ_p;xtc"
|
||||||
|
--temp 0.6
|
||||||
|
--repeat-penalty 1.1
|
||||||
|
--dry-multiplier 0.5
|
||||||
|
--min-p 0.01
|
||||||
|
--top-k 40
|
||||||
|
--top-p 0.95
|
||||||
|
-ngl 99 -ngld 99
|
||||||
@@ -37,7 +37,7 @@ require (
|
|||||||
github.com/ugorji/go/codec v1.2.12 // indirect
|
github.com/ugorji/go/codec v1.2.12 // indirect
|
||||||
golang.org/x/arch v0.8.0 // indirect
|
golang.org/x/arch v0.8.0 // indirect
|
||||||
golang.org/x/crypto v0.36.0 // indirect
|
golang.org/x/crypto v0.36.0 // indirect
|
||||||
golang.org/x/net v0.37.0 // indirect
|
golang.org/x/net v0.38.0 // indirect
|
||||||
golang.org/x/sys v0.31.0 // indirect
|
golang.org/x/sys v0.31.0 // indirect
|
||||||
golang.org/x/text v0.23.0 // indirect
|
golang.org/x/text v0.23.0 // indirect
|
||||||
google.golang.org/protobuf v1.34.1 // indirect
|
google.golang.org/protobuf v1.34.1 // indirect
|
||||||
|
|||||||
@@ -86,6 +86,8 @@ golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
|
|||||||
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
|
||||||
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
|
golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c=
|
||||||
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
||||||
|
golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8=
|
||||||
|
golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8=
|
||||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
|
||||||
|
|||||||
BIN
Binary file not shown.
|
After Width: | Height: | Size: 351 KiB |
@@ -27,6 +27,7 @@ func (m *ModelConfig) SanitizedCommand() ([]string, error) {
|
|||||||
type Config struct {
|
type Config struct {
|
||||||
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
|
HealthCheckTimeout int `yaml:"healthCheckTimeout"`
|
||||||
LogRequests bool `yaml:"logRequests"`
|
LogRequests bool `yaml:"logRequests"`
|
||||||
|
LogLevel string `yaml:"logLevel"`
|
||||||
Models map[string]ModelConfig `yaml:"models"`
|
Models map[string]ModelConfig `yaml:"models"`
|
||||||
Profiles map[string][]string `yaml:"profiles"`
|
Profiles map[string][]string `yaml:"profiles"`
|
||||||
|
|
||||||
|
|||||||
+187
-74
@@ -12,32 +12,65 @@
|
|||||||
flex-direction: column;
|
flex-direction: column;
|
||||||
font-family: "Courier New", Courier, monospace;
|
font-family: "Courier New", Courier, monospace;
|
||||||
}
|
}
|
||||||
#log-controls {
|
.log-container {
|
||||||
margin: 0.5em;
|
|
||||||
display: flex;
|
display: flex;
|
||||||
align-items: center;
|
|
||||||
justify-content: space-between; /* Spaces out elements evenly */
|
|
||||||
}
|
|
||||||
#log-controls input {
|
|
||||||
flex: 1;
|
|
||||||
}
|
|
||||||
#log-controls input:focus {
|
|
||||||
outline: none; /* Ensures no outline is shown when the input is focused */
|
|
||||||
}
|
|
||||||
#log-stream {
|
|
||||||
flex: 1;
|
flex: 1;
|
||||||
|
gap: 0.5em;
|
||||||
margin: 0.5em;
|
margin: 0.5em;
|
||||||
|
min-height: 0;
|
||||||
|
}
|
||||||
|
.log-column {
|
||||||
|
display: flex;
|
||||||
|
flex-direction: column;
|
||||||
|
flex: 1;
|
||||||
|
min-width: 0;
|
||||||
|
transition: flex 0.3s ease;
|
||||||
|
}
|
||||||
|
.log-column.minimized {
|
||||||
|
flex: 0.1;
|
||||||
|
max-width: 50px;
|
||||||
|
border: 1px solid #777;
|
||||||
|
color: green;
|
||||||
|
}
|
||||||
|
.log-controls {
|
||||||
|
display: grid;
|
||||||
|
grid-template-columns: 1fr auto;
|
||||||
|
gap: 0.5em;
|
||||||
|
margin-bottom: 0.5em;
|
||||||
|
}
|
||||||
|
.log-controls input {
|
||||||
|
width: 100%;
|
||||||
|
padding: 4px;
|
||||||
|
}
|
||||||
|
.log-controls input:focus {
|
||||||
|
outline: none;
|
||||||
|
}
|
||||||
|
.log-stream {
|
||||||
|
flex: 1;
|
||||||
padding: 1em;
|
padding: 1em;
|
||||||
background: #f4f4f4;
|
background: #f4f4f4;
|
||||||
overflow-y: auto;
|
overflow-y: auto;
|
||||||
white-space: pre-wrap; /* Ensures line wrapping */
|
white-space: pre-wrap;
|
||||||
word-wrap: break-word; /* Ensures long words wrap */
|
word-wrap: break-word;
|
||||||
|
min-height: 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
.regex-error {
|
.regex-error {
|
||||||
background-color: #ff0000 !important;
|
background-color: #ff0000 !important;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Make headers clickable and show pointer cursor */
|
||||||
|
h2 {
|
||||||
|
cursor: pointer;
|
||||||
|
user-select: none;
|
||||||
|
margin: 0 0 0.5em 0;
|
||||||
|
padding: 0.5em;
|
||||||
|
}
|
||||||
|
|
||||||
|
h2:hover {
|
||||||
|
background-color: rgba(0, 0, 0, 0.05);
|
||||||
|
}
|
||||||
|
|
||||||
/* Dark mode styles */
|
/* Dark mode styles */
|
||||||
@media (prefers-color-scheme: dark) {
|
@media (prefers-color-scheme: dark) {
|
||||||
body {
|
body {
|
||||||
@@ -45,101 +78,181 @@
|
|||||||
color: #fff;
|
color: #fff;
|
||||||
}
|
}
|
||||||
|
|
||||||
#log-stream {
|
.log-stream {
|
||||||
background: #444;
|
background: #444;
|
||||||
color: #fff;
|
color: #fff;
|
||||||
}
|
}
|
||||||
|
|
||||||
#log-controls input {
|
.log-controls input {
|
||||||
background: #555;
|
background: #555;
|
||||||
color: #fff;
|
color: #fff;
|
||||||
border: 1px solid #777;
|
border: 1px solid #777;
|
||||||
}
|
}
|
||||||
|
|
||||||
#log-controls button {
|
.log-controls button {
|
||||||
background: #555;
|
background: #555;
|
||||||
color: #fff;
|
color: #fff;
|
||||||
border: 1px solid #777;
|
border: 1px solid #777;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h2:hover {
|
||||||
|
background-color: rgba(255, 255, 255, 0.1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Hide content when minimized */
|
||||||
|
.log-column.minimized .log-controls,
|
||||||
|
.log-column.minimized .log-stream {
|
||||||
|
display: none;
|
||||||
|
}
|
||||||
|
|
||||||
|
.log-column.minimized h2 {
|
||||||
|
writing-mode: vertical-rl;
|
||||||
|
text-orientation: mixed;
|
||||||
|
transform: rotate(180deg);
|
||||||
|
white-space: nowrap;
|
||||||
|
margin: auto;
|
||||||
}
|
}
|
||||||
</style>
|
</style>
|
||||||
</head>
|
</head>
|
||||||
<body>
|
<body>
|
||||||
<pre id="log-stream">Waiting for logs...</pre>
|
<div class="log-container">
|
||||||
<div id="log-controls">
|
<div class="log-column">
|
||||||
<input type="text" id="filter-input" placeholder="regex filter">
|
<h2>Proxy Logs</h2>
|
||||||
<button id="clear-button">clear</button>
|
<div class="log-controls">
|
||||||
|
<input type="text" id="proxy-filter-input" placeholder="proxy regex filter">
|
||||||
|
<button id="proxy-clear-button">clear</button>
|
||||||
|
</div>
|
||||||
|
<pre class="log-stream" id="proxy-log-stream">Waiting for proxy logs...</pre>
|
||||||
|
</div>
|
||||||
|
<div class="log-column minimized">
|
||||||
|
<h2>Upstream Logs</h2>
|
||||||
|
<div class="log-controls">
|
||||||
|
<input type="text" id="upstream-filter-input" placeholder="upstream regex filter">
|
||||||
|
<button id="upstream-clear-button">clear</button>
|
||||||
|
</div>
|
||||||
|
<pre class="log-stream" id="upstream-log-stream">Waiting for upstream logs...</pre>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<script>
|
<script>
|
||||||
const logStream = document.getElementById('log-stream');
|
class LogStream {
|
||||||
const filterInput = document.getElementById('filter-input');
|
constructor(streamElement, filterInput, clearButton, endpoint) {
|
||||||
var logData = "";
|
this.streamElement = streamElement;
|
||||||
let regexFilter = null;
|
this.filterInput = filterInput;
|
||||||
|
this.clearButton = clearButton;
|
||||||
|
this.endpoint = endpoint;
|
||||||
|
this.logData = "";
|
||||||
|
this.regexFilter = null;
|
||||||
|
this.eventSource = null;
|
||||||
|
|
||||||
function setupEventSource() {
|
this.initialize();
|
||||||
if (typeof(EventSource) !== "undefined") {
|
|
||||||
const eventSource = new EventSource("/logs/streamSSE");
|
|
||||||
|
|
||||||
eventSource.onmessage = function(event) {
|
|
||||||
logData += event.data;
|
|
||||||
render()
|
|
||||||
};
|
|
||||||
|
|
||||||
eventSource.onerror = function(err) {
|
|
||||||
logData = "EventSource failed: " + err.message;
|
|
||||||
};
|
|
||||||
} else {
|
|
||||||
logData = "SSE Not supported by this browser."
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// poor-ai's react ¯\_(ツ)_/¯
|
initialize() {
|
||||||
function render() {
|
this.filterInput.addEventListener('input', () => this.updateFilter());
|
||||||
if (regexFilter) {
|
this.clearButton.addEventListener('click', () => {
|
||||||
const lines = logData.split('\n');
|
this.filterInput.value = "";
|
||||||
const filteredLines = lines.filter(line => {
|
this.regexFilter = null;
|
||||||
return regexFilter === null || regexFilter.test(line);
|
this.render();
|
||||||
});
|
});
|
||||||
|
this.setupEventSource();
|
||||||
if (filteredLines.length > 0) {
|
|
||||||
logStream.textContent = filteredLines.join('\n') + '\n';
|
|
||||||
} else {
|
|
||||||
logStream.textContent = "";
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
logStream.textContent = logData;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
logStream.scrollTop = logStream.scrollHeight;
|
setupEventSource() {
|
||||||
|
if (typeof(EventSource) === "undefined") {
|
||||||
|
this.logData = "SSE Not supported by this browser.";
|
||||||
|
this.render();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const connect = () => {
|
||||||
|
this.eventSource = new EventSource(this.endpoint);
|
||||||
|
|
||||||
|
this.eventSource.onmessage = (event) => {
|
||||||
|
this.logData += event.data;
|
||||||
|
this.render();
|
||||||
|
};
|
||||||
|
|
||||||
|
this.eventSource.onerror = (err) => {
|
||||||
|
// Close the current connection
|
||||||
|
this.eventSource.close();
|
||||||
|
|
||||||
|
this.logData += "\nConnection lost. Retrying in 5 seconds...\n";
|
||||||
|
this.render();
|
||||||
|
|
||||||
|
// Attempt to reconnect after 5 seconds
|
||||||
|
setTimeout(() => {
|
||||||
|
this.logData += "Attempting to reconnect...\n";
|
||||||
|
this.render();
|
||||||
|
connect();
|
||||||
|
}, 5000);
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
// Initial connection
|
||||||
|
connect();
|
||||||
|
}
|
||||||
|
|
||||||
|
render() {
|
||||||
|
let content = this.logData;
|
||||||
|
|
||||||
|
if (this.regexFilter) {
|
||||||
|
const lines = content.split('\n');
|
||||||
|
const filteredLines = lines.filter(line => this.regexFilter.test(line));
|
||||||
|
content = filteredLines.length > 0 ? filteredLines.join('\n') + '\n' : "";
|
||||||
|
}
|
||||||
|
|
||||||
|
this.streamElement.textContent = content;
|
||||||
|
this.streamElement.scrollTop = this.streamElement.scrollHeight;
|
||||||
|
}
|
||||||
|
|
||||||
|
updateFilter() {
|
||||||
|
const pattern = this.filterInput.value.trim();
|
||||||
|
this.filterInput.classList.remove('regex-error');
|
||||||
|
|
||||||
|
if (!pattern) {
|
||||||
|
this.regexFilter = null;
|
||||||
|
this.render();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
function updateFilter() {
|
|
||||||
const pattern = filterInput.value.trim();
|
|
||||||
filterInput.classList.remove('regex-error');
|
|
||||||
if (pattern) {
|
|
||||||
try {
|
try {
|
||||||
regexFilter = new RegExp(pattern);
|
this.regexFilter = new RegExp(pattern);
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error("Invalid regex pattern:", e);
|
console.error("Invalid regex pattern:", e);
|
||||||
regexFilter = null;
|
this.regexFilter = null;
|
||||||
filterInput.classList.add('regex-error');
|
this.filterInput.classList.add('regex-error');
|
||||||
return
|
return;
|
||||||
}
|
|
||||||
} else {
|
|
||||||
regexFilter = null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
render();
|
this.render();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
filterInput.addEventListener('input', updateFilter);
|
// Initialize both log streams
|
||||||
document.getElementById('clear-button').addEventListener('click', () => {
|
document.addEventListener('DOMContentLoaded', () => {
|
||||||
filterInput.value = "";
|
new LogStream(
|
||||||
regexFilter = null;
|
document.getElementById('proxy-log-stream'),
|
||||||
render();
|
document.getElementById('proxy-filter-input'),
|
||||||
|
document.getElementById('proxy-clear-button'),
|
||||||
|
"/logs/streamSSE/proxy"
|
||||||
|
);
|
||||||
|
|
||||||
|
new LogStream(
|
||||||
|
document.getElementById('upstream-log-stream'),
|
||||||
|
document.getElementById('upstream-filter-input'),
|
||||||
|
document.getElementById('upstream-clear-button'),
|
||||||
|
"/logs/streamSSE/upstream"
|
||||||
|
);
|
||||||
|
|
||||||
|
// Initialize clickable headers
|
||||||
|
document.querySelectorAll('h2').forEach(header => {
|
||||||
|
header.addEventListener('click', () => {
|
||||||
|
const column = header.closest('.log-column');
|
||||||
|
column.classList.toggle('minimized');
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
setupEventSource();
|
|
||||||
updateFilter();
|
|
||||||
</script>
|
</script>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
@@ -2,11 +2,21 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"container/ring"
|
"container/ring"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type LogLevel int
|
||||||
|
|
||||||
|
const (
|
||||||
|
LevelDebug LogLevel = iota
|
||||||
|
LevelInfo
|
||||||
|
LevelWarn
|
||||||
|
LevelError
|
||||||
|
)
|
||||||
|
|
||||||
type LogMonitor struct {
|
type LogMonitor struct {
|
||||||
clients map[chan []byte]bool
|
clients map[chan []byte]bool
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
@@ -15,6 +25,10 @@ type LogMonitor struct {
|
|||||||
|
|
||||||
// typically this can be os.Stdout
|
// typically this can be os.Stdout
|
||||||
stdout io.Writer
|
stdout io.Writer
|
||||||
|
|
||||||
|
// logging levels
|
||||||
|
level LogLevel
|
||||||
|
prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewLogMonitor() *LogMonitor {
|
func NewLogMonitor() *LogMonitor {
|
||||||
@@ -26,6 +40,8 @@ func NewLogMonitorWriter(stdout io.Writer) *LogMonitor {
|
|||||||
clients: make(map[chan []byte]bool),
|
clients: make(map[chan []byte]bool),
|
||||||
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
|
buffer: ring.New(10 * 1024), // keep 10KB of buffered logs
|
||||||
stdout: stdout,
|
stdout: stdout,
|
||||||
|
level: LevelInfo,
|
||||||
|
prefix: "",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,3 +110,77 @@ func (w *LogMonitor) broadcast(msg []byte) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) SetPrefix(prefix string) {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
w.prefix = prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) SetLogLevel(level LogLevel) {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
w.level = level
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) formatMessage(level string, msg string) []byte {
|
||||||
|
prefix := ""
|
||||||
|
if w.prefix != "" {
|
||||||
|
prefix = fmt.Sprintf("[%s] ", w.prefix)
|
||||||
|
}
|
||||||
|
return []byte(fmt.Sprintf("%s[%s] %s\n", prefix, level, msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) log(level LogLevel, msg string) {
|
||||||
|
if level < w.level {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.Write(w.formatMessage(level.String(), msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Debug(msg string) {
|
||||||
|
w.log(LevelDebug, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Info(msg string) {
|
||||||
|
w.log(LevelInfo, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Warn(msg string) {
|
||||||
|
w.log(LevelWarn, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Error(msg string) {
|
||||||
|
w.log(LevelError, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Debugf(format string, args ...interface{}) {
|
||||||
|
w.log(LevelDebug, fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Infof(format string, args ...interface{}) {
|
||||||
|
w.log(LevelInfo, fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Warnf(format string, args ...interface{}) {
|
||||||
|
w.log(LevelWarn, fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *LogMonitor) Errorf(format string, args ...interface{}) {
|
||||||
|
w.log(LevelError, fmt.Sprintf(format, args...))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l LogLevel) String() string {
|
||||||
|
switch l {
|
||||||
|
case LevelDebug:
|
||||||
|
return "DEBUG"
|
||||||
|
case LevelInfo:
|
||||||
|
return "INFO"
|
||||||
|
case LevelWarn:
|
||||||
|
return "WARN"
|
||||||
|
case LevelError:
|
||||||
|
return "ERROR"
|
||||||
|
default:
|
||||||
|
return "UNKNOWN"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+160
-121
@@ -33,8 +33,15 @@ type Process struct {
|
|||||||
ID string
|
ID string
|
||||||
config ModelConfig
|
config ModelConfig
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
logMonitor *LogMonitor
|
|
||||||
|
// for p.cmd.Wait() select { ... }
|
||||||
|
cmdWaitChan chan error
|
||||||
|
|
||||||
|
processLogger *LogMonitor
|
||||||
|
proxyLogger *LogMonitor
|
||||||
|
|
||||||
healthCheckTimeout int
|
healthCheckTimeout int
|
||||||
|
healthCheckLoopInterval time.Duration
|
||||||
|
|
||||||
lastRequestHandled time.Time
|
lastRequestHandled time.Time
|
||||||
|
|
||||||
@@ -51,54 +58,70 @@ type Process struct {
|
|||||||
shutdownCancel context.CancelFunc
|
shutdownCancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, logMonitor *LogMonitor) *Process {
|
func NewProcess(ID string, healthCheckTimeout int, config ModelConfig, processLogger *LogMonitor, proxyLogger *LogMonitor) *Process {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
return &Process{
|
return &Process{
|
||||||
ID: ID,
|
ID: ID,
|
||||||
config: config,
|
config: config,
|
||||||
cmd: nil,
|
cmd: nil,
|
||||||
logMonitor: logMonitor,
|
cmdWaitChan: make(chan error, 1),
|
||||||
|
processLogger: processLogger,
|
||||||
|
proxyLogger: proxyLogger,
|
||||||
healthCheckTimeout: healthCheckTimeout,
|
healthCheckTimeout: healthCheckTimeout,
|
||||||
|
healthCheckLoopInterval: 5 * time.Second, /* default, can not be set by user - used for testing */
|
||||||
state: StateStopped,
|
state: StateStopped,
|
||||||
shutdownCtx: ctx,
|
shutdownCtx: ctx,
|
||||||
shutdownCancel: cancel,
|
shutdownCancel: cancel,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) setState(newState ProcessState) error {
|
// LogMonitor returns the log monitor associated with the process.
|
||||||
// enforce valid state transitions
|
func (p *Process) LogMonitor() *LogMonitor {
|
||||||
invalidTransition := false
|
return p.processLogger
|
||||||
if p.state == StateStopped {
|
}
|
||||||
// stopped -> starting
|
|
||||||
if newState != StateStarting {
|
// custom error types for swapping state
|
||||||
invalidTransition = true
|
var (
|
||||||
}
|
ErrExpectedStateMismatch = errors.New("expected state mismatch")
|
||||||
} else if p.state == StateStarting {
|
ErrInvalidStateTransition = errors.New("invalid state transition")
|
||||||
// starting -> ready | failed | stopping
|
)
|
||||||
if newState != StateReady && newState != StateFailed && newState != StateStopping {
|
|
||||||
invalidTransition = true
|
// swapState performs a compare and swap of the state atomically. It returns the current state
|
||||||
}
|
// and an error if the swap failed.
|
||||||
} else if p.state == StateReady {
|
func (p *Process) swapState(expectedState, newState ProcessState) (ProcessState, error) {
|
||||||
// ready -> stopping
|
p.stateMutex.Lock()
|
||||||
if newState != StateStopping {
|
defer p.stateMutex.Unlock()
|
||||||
invalidTransition = true
|
|
||||||
}
|
if p.state != expectedState {
|
||||||
} else if p.state == StateStopping {
|
p.proxyLogger.Warnf("swapState() Unexpected current state %s, expected %s", p.state, expectedState)
|
||||||
// stopping -> stopped | shutdown
|
return p.state, ErrExpectedStateMismatch
|
||||||
if newState != StateStopped && newState != StateShutdown {
|
|
||||||
invalidTransition = true
|
|
||||||
}
|
|
||||||
} else if p.state == StateFailed || p.state == StateShutdown {
|
|
||||||
invalidTransition = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if invalidTransition {
|
if !isValidTransition(p.state, newState) {
|
||||||
//panic(fmt.Sprintf("Invalid state transition from %s to %s", p.state, newState))
|
p.proxyLogger.Warnf("swapState() Invalid state transition from %s to %s", p.state, newState)
|
||||||
return fmt.Errorf("invalid state transition from %s to %s", p.state, newState)
|
return p.state, ErrInvalidStateTransition
|
||||||
}
|
}
|
||||||
|
|
||||||
p.state = newState
|
p.state = newState
|
||||||
return nil
|
p.proxyLogger.Debugf("swapState() State transitioned from %s to %s", expectedState, newState)
|
||||||
|
return p.state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to encapsulate transition rules
|
||||||
|
func isValidTransition(from, to ProcessState) bool {
|
||||||
|
switch from {
|
||||||
|
case StateStopped:
|
||||||
|
return to == StateStarting
|
||||||
|
case StateStarting:
|
||||||
|
return to == StateReady || to == StateFailed || to == StateStopping
|
||||||
|
case StateReady:
|
||||||
|
return to == StateStopping
|
||||||
|
case StateStopping:
|
||||||
|
return to == StateStopped || to == StateShutdown
|
||||||
|
case StateFailed, StateShutdown:
|
||||||
|
return false // No transitions allowed from these states
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) CurrentState() ProcessState {
|
func (p *Process) CurrentState() ProcessState {
|
||||||
@@ -116,68 +139,58 @@ func (p *Process) start() error {
|
|||||||
return fmt.Errorf("can not start(), upstream proxy missing")
|
return fmt.Errorf("can not start(), upstream proxy missing")
|
||||||
}
|
}
|
||||||
|
|
||||||
// multiple start() calls will wait for the one that is actually starting to
|
|
||||||
// complete before proceeding.
|
|
||||||
// ===========
|
|
||||||
curState := p.CurrentState()
|
|
||||||
|
|
||||||
if curState == StateReady {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if curState == StateStarting {
|
|
||||||
p.waitStarting.Wait()
|
|
||||||
|
|
||||||
if state := p.CurrentState(); state != StateReady {
|
|
||||||
return fmt.Errorf("start() failed current state: %v", state)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// ===========
|
|
||||||
|
|
||||||
// There is the possibility of a hard to replicate race condition where
|
|
||||||
// curState *WAS* StateStopped but by the time we get to the p.stateMutex.Lock()
|
|
||||||
// below, it's value has changed!
|
|
||||||
|
|
||||||
p.stateMutex.Lock()
|
|
||||||
defer p.stateMutex.Unlock()
|
|
||||||
|
|
||||||
// with the exclusive lock, check if p.state is StateStopped, which is the only valid state
|
|
||||||
// to transition from to StateReady
|
|
||||||
|
|
||||||
if p.state != StateStopped {
|
|
||||||
if p.state == StateReady {
|
|
||||||
return nil
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("start() can not proceed expected StateReady but process is in %v", p.state)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := p.setState(StateStarting); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
p.waitStarting.Add(1)
|
|
||||||
defer p.waitStarting.Done()
|
|
||||||
|
|
||||||
args, err := p.config.SanitizedCommand()
|
args, err := p.config.SanitizedCommand()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("unable to get sanitized command: %v", err)
|
return fmt.Errorf("unable to get sanitized command: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if curState, err := p.swapState(StateStopped, StateStarting); err != nil {
|
||||||
|
if err == ErrExpectedStateMismatch {
|
||||||
|
// already starting, just wait for it to complete and expect
|
||||||
|
// it to be be in the Ready start after. If not, return an error
|
||||||
|
if curState == StateStarting {
|
||||||
|
p.waitStarting.Wait()
|
||||||
|
if state := p.CurrentState(); state == StateReady {
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("process was already starting but wound up in state %v", state)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("processes was in state %v when start() was called", curState)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("failed to set Process state to starting: current state: %v, error: %v", curState, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
p.waitStarting.Add(1)
|
||||||
|
defer p.waitStarting.Done()
|
||||||
|
|
||||||
p.cmd = exec.Command(args[0], args[1:]...)
|
p.cmd = exec.Command(args[0], args[1:]...)
|
||||||
p.cmd.Stdout = p.logMonitor
|
p.cmd.Stdout = p.processLogger
|
||||||
p.cmd.Stderr = p.logMonitor
|
p.cmd.Stderr = p.processLogger
|
||||||
p.cmd.Env = p.config.Env
|
p.cmd.Env = p.config.Env
|
||||||
|
|
||||||
err = p.cmd.Start()
|
err = p.cmd.Start()
|
||||||
|
|
||||||
|
// Set process state to failed
|
||||||
if err != nil {
|
if err != nil {
|
||||||
p.setState(StateFailed)
|
if curState, swapErr := p.swapState(StateStarting, StateFailed); swapErr != nil {
|
||||||
|
return fmt.Errorf(
|
||||||
|
"failed to start command and state swap failed. command error: %v, current state: %v, state swap error: %v",
|
||||||
|
err, curState, swapErr,
|
||||||
|
)
|
||||||
|
}
|
||||||
return fmt.Errorf("start() failed: %v", err)
|
return fmt.Errorf("start() failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Capture the exit error for later signaling
|
||||||
|
go func() {
|
||||||
|
exitErr := p.cmd.Wait()
|
||||||
|
p.proxyLogger.Debugf("cmd.Wait() returned for [%s] error: %v", p.ID, exitErr)
|
||||||
|
p.cmdWaitChan <- exitErr
|
||||||
|
}()
|
||||||
|
|
||||||
// One of three things can happen at this stage:
|
// One of three things can happen at this stage:
|
||||||
// 1. The command exits unexpectedly
|
// 1. The command exits unexpectedly
|
||||||
// 2. The health check fails
|
// 2. The health check fails
|
||||||
@@ -209,31 +222,51 @@ func (p *Process) start() error {
|
|||||||
)
|
)
|
||||||
defer cancelHealthCheck()
|
defer cancelHealthCheck()
|
||||||
|
|
||||||
// Health check loop
|
|
||||||
loop:
|
loop:
|
||||||
|
// Ready Check loop
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-checkDeadline.Done():
|
case <-checkDeadline.Done():
|
||||||
p.setState(StateFailed)
|
if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
|
||||||
return fmt.Errorf("health check failed after %vs", maxDuration.Seconds())
|
return fmt.Errorf("health check timed out after %vs AND state swap failed: %v, current state: %v", maxDuration.Seconds(), err, curState)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("health check timed out after %vs", maxDuration.Seconds())
|
||||||
|
}
|
||||||
case <-p.shutdownCtx.Done():
|
case <-p.shutdownCtx.Done():
|
||||||
return errors.New("health check interrupted due to shutdown")
|
return errors.New("health check interrupted due to shutdown")
|
||||||
|
case exitErr := <-p.cmdWaitChan:
|
||||||
|
if exitErr != nil {
|
||||||
|
p.proxyLogger.Warnf("upstream command exited prematurely with error: %v", exitErr)
|
||||||
|
if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
|
||||||
|
return fmt.Errorf("upstream command exited unexpectedly: %s AND state swap failed: %v, current state: %v", exitErr.Error(), err, curState)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("upstream command exited unexpectedly: %s", exitErr.Error())
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
p.proxyLogger.Warnf("upstream command exited prematurely with no error")
|
||||||
|
if curState, err := p.swapState(StateStarting, StateFailed); err != nil {
|
||||||
|
return fmt.Errorf("upstream command exited prematurely with no error AND state swap failed: %v, current state: %v", err, curState)
|
||||||
|
} else {
|
||||||
|
return fmt.Errorf("upstream command exited prematurely with no error")
|
||||||
|
}
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
if err := p.checkHealthEndpoint(healthURL); err == nil {
|
if err := p.checkHealthEndpoint(healthURL); err == nil {
|
||||||
|
p.proxyLogger.Infof("Health check passed on %s", healthURL)
|
||||||
cancelHealthCheck()
|
cancelHealthCheck()
|
||||||
break loop
|
break loop
|
||||||
} else {
|
} else {
|
||||||
if strings.Contains(err.Error(), "connection refused") {
|
if strings.Contains(err.Error(), "connection refused") {
|
||||||
endTime, _ := checkDeadline.Deadline()
|
endTime, _ := checkDeadline.Deadline()
|
||||||
ttl := time.Until(endTime)
|
ttl := time.Until(endTime)
|
||||||
fmt.Fprintf(p.logMonitor, "!!! Connection refused on %s, ttl %.0fs\n", healthURL, ttl.Seconds())
|
p.proxyLogger.Infof("Connection refused on %s, giving up in %.0fs", healthURL, ttl.Seconds())
|
||||||
} else {
|
} else {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! Health check error: %v\n", err)
|
p.proxyLogger.Infof("Health check error on %s, %v", healthURL, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
<-time.After(5 * time.Second)
|
<-time.After(p.healthCheckLoopInterval)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -244,7 +277,7 @@ func (p *Process) start() error {
|
|||||||
maxDuration := time.Duration(p.config.UnloadAfter) * time.Second
|
maxDuration := time.Duration(p.config.UnloadAfter) * time.Second
|
||||||
|
|
||||||
for range time.Tick(time.Second) {
|
for range time.Tick(time.Second) {
|
||||||
if p.state != StateReady {
|
if p.CurrentState() != StateReady {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -252,7 +285,7 @@ func (p *Process) start() error {
|
|||||||
p.inFlightRequests.Wait()
|
p.inFlightRequests.Wait()
|
||||||
|
|
||||||
if time.Since(p.lastRequestHandled) > maxDuration {
|
if time.Since(p.lastRequestHandled) > maxDuration {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! Unloading model %s, TTL of %ds reached.\n", p.ID, p.config.UnloadAfter)
|
p.proxyLogger.Infof("Unloading model %s, TTL of %ds reached.", p.ID, p.config.UnloadAfter)
|
||||||
p.Stop()
|
p.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -260,26 +293,29 @@ func (p *Process) start() error {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.setState(StateReady)
|
if curState, err := p.swapState(StateStarting, StateReady); err != nil {
|
||||||
|
return fmt.Errorf("failed to set Process state to ready: current state: %v, error: %v", curState, err)
|
||||||
|
} else {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) Stop() {
|
func (p *Process) Stop() {
|
||||||
// wait for any inflight requests before proceeding
|
// wait for any inflight requests before proceeding
|
||||||
p.inFlightRequests.Wait()
|
p.inFlightRequests.Wait()
|
||||||
p.stateMutex.Lock()
|
p.proxyLogger.Debugf("Stopping process [%s]", p.ID)
|
||||||
defer p.stateMutex.Unlock()
|
|
||||||
|
|
||||||
// calling Stop() when state is invalid is a no-op
|
// calling Stop() when state is invalid is a no-op
|
||||||
if err := p.setState(StateStopping); err != nil {
|
if curState, err := p.swapState(StateReady, StateStopping); err != nil {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! Info - Stop() err: %v\n", err)
|
p.proxyLogger.Infof("Stop() Ready -> StateStopping err: %v, current state: %v", err, curState)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop the process with a graceful exit timeout
|
// stop the process with a graceful exit timeout
|
||||||
p.stopCommand(5 * time.Second)
|
p.stopCommand(5 * time.Second)
|
||||||
|
|
||||||
if err := p.setState(StateStopped); err != nil {
|
if curState, err := p.swapState(StateStopping, StateStopped); err != nil {
|
||||||
panic(fmt.Sprintf("Stop() failed to set state to stopped: %v", err))
|
p.proxyLogger.Infof("Stop() StateStopping -> StateStopped err: %v, current state: %v", err, curState)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -287,58 +323,53 @@ func (p *Process) Stop() {
|
|||||||
// of time for any inflight requests to complete before shutting down. If the Process
|
// of time for any inflight requests to complete before shutting down. If the Process
|
||||||
// is in the state of starting, it will cancel it and shut it down
|
// is in the state of starting, it will cancel it and shut it down
|
||||||
func (p *Process) Shutdown() {
|
func (p *Process) Shutdown() {
|
||||||
// cancel anything that can be interrupted by a shutdown (ie: healthcheck)
|
|
||||||
p.shutdownCancel()
|
p.shutdownCancel()
|
||||||
|
|
||||||
p.stateMutex.Lock()
|
|
||||||
defer p.stateMutex.Unlock()
|
|
||||||
p.setState(StateStopping)
|
|
||||||
|
|
||||||
// 5 seconds to stop the process
|
|
||||||
p.stopCommand(5 * time.Second)
|
p.stopCommand(5 * time.Second)
|
||||||
if err := p.setState(StateShutdown); err != nil {
|
p.state = StateShutdown
|
||||||
fmt.Printf("!!! Shutdown() failed to set state to shutdown: %v", err)
|
|
||||||
}
|
|
||||||
p.setState(StateShutdown)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// stopCommand will send a SIGTERM to the process and wait for it to exit.
|
// stopCommand will send a SIGTERM to the process and wait for it to exit.
|
||||||
// If it does not exit within 5 seconds, it will send a SIGKILL.
|
// If it does not exit within 5 seconds, it will send a SIGKILL.
|
||||||
func (p *Process) stopCommand(sigtermTTL time.Duration) {
|
func (p *Process) stopCommand(sigtermTTL time.Duration) {
|
||||||
|
stopStartTime := time.Now()
|
||||||
|
defer func() {
|
||||||
|
p.proxyLogger.Debugf("Process [%s] stopCommand took %v", p.ID, time.Since(stopStartTime))
|
||||||
|
}()
|
||||||
|
|
||||||
sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL)
|
sigtermTimeout, cancelTimeout := context.WithTimeout(context.Background(), sigtermTTL)
|
||||||
defer cancelTimeout()
|
defer cancelTimeout()
|
||||||
|
|
||||||
sigtermNormal := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
sigtermNormal <- p.cmd.Wait()
|
|
||||||
}()
|
|
||||||
|
|
||||||
if p.cmd == nil || p.cmd.Process == nil {
|
if p.cmd == nil || p.cmd.Process == nil {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] cmd or cmd.Process is nil", p.ID)
|
p.proxyLogger.Warnf("Process [%s] cmd or cmd.Process is nil", p.ID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
p.cmd.Process.Signal(syscall.SIGTERM)
|
if err := p.terminateProcess(); err != nil {
|
||||||
|
p.proxyLogger.Infof("Failed to gracefully terminate process [%s]: %v", p.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-sigtermTimeout.Done():
|
case <-sigtermTimeout.Done():
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] timed out waiting to stop, sending KILL signal\n", p.ID)
|
p.proxyLogger.Infof("Process [%s] timed out waiting to stop, sending KILL signal", p.ID)
|
||||||
p.cmd.Process.Kill()
|
p.cmd.Process.Kill()
|
||||||
case err := <-sigtermNormal:
|
case err := <-p.cmdWaitChan:
|
||||||
|
// Note: in start(), p.cmdWaitChan also has a select { ... }. That should be OK
|
||||||
|
// because if we make it here then the cmd has been successfully running and made it
|
||||||
|
// through the health check. There is a possibility that ithe cmd crashed after the health check
|
||||||
|
// succeeded but that's not a case llama-swap is handling for now.
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errno, ok := err.(syscall.Errno); ok {
|
if errno, ok := err.(syscall.Errno); ok {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] errno >> %v\n", p.ID, errno)
|
p.proxyLogger.Errorf("Process [%s] errno >> %v", p.ID, errno)
|
||||||
} else if exitError, ok := err.(*exec.ExitError); ok {
|
} else if exitError, ok := err.(*exec.ExitError); ok {
|
||||||
if strings.Contains(exitError.String(), "signal: terminated") {
|
if strings.Contains(exitError.String(), "signal: terminated") {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] stopped OK\n", p.ID)
|
p.proxyLogger.Infof("Process [%s] stopped OK", p.ID)
|
||||||
} else if strings.Contains(exitError.String(), "signal: interrupt") {
|
} else if strings.Contains(exitError.String(), "signal: interrupt") {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] interrupted OK\n", p.ID)
|
p.proxyLogger.Infof("Process [%s] interrupted OK", p.ID)
|
||||||
} else {
|
} else {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] ExitError >> %v, exit code: %d\n", p.ID, exitError, exitError.ExitCode())
|
p.proxyLogger.Warnf("Process [%s] ExitError >> %v, exit code: %d", p.ID, exitError, exitError.ExitCode())
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
fmt.Fprintf(p.logMonitor, "!!! process [%s] exited >> %v\n", p.ID, err)
|
p.proxyLogger.Errorf("Process [%s] exited >> %v", p.ID, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -370,6 +401,8 @@ func (p *Process) checkHealthEndpoint(healthURL string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
||||||
|
requestBeginTime := time.Now()
|
||||||
|
var startDuration time.Duration
|
||||||
|
|
||||||
// prevent new requests from being made while stopping or irrecoverable
|
// prevent new requests from being made while stopping or irrecoverable
|
||||||
currentState := p.CurrentState()
|
currentState := p.CurrentState()
|
||||||
@@ -386,11 +419,13 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
// start the process on demand
|
// start the process on demand
|
||||||
if p.CurrentState() != StateReady {
|
if p.CurrentState() != StateReady {
|
||||||
|
beginStartTime := time.Now()
|
||||||
if err := p.start(); err != nil {
|
if err := p.start(); err != nil {
|
||||||
errstr := fmt.Sprintf("unable to start process: %s", err)
|
errstr := fmt.Sprintf("unable to start process: %s", err)
|
||||||
http.Error(w, errstr, http.StatusBadGateway)
|
http.Error(w, errstr, http.StatusBadGateway)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
startDuration = time.Since(beginStartTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyTo := p.config.Proxy
|
proxyTo := p.config.Proxy
|
||||||
@@ -434,4 +469,8 @@ func (p *Process) ProxyRequest(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
totalTime := time.Since(requestBeginTime)
|
||||||
|
p.proxyLogger.Debugf("Process [%s] request %s - start: %v, total: %v",
|
||||||
|
p.ID, r.RequestURI, startDuration, totalTime)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,9 @@
|
|||||||
|
//go:build !windows
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import "syscall"
|
||||||
|
|
||||||
|
func (p *Process) terminateProcess() error {
|
||||||
|
return p.cmd.Process.Signal(syscall.SIGTERM)
|
||||||
|
}
|
||||||
@@ -0,0 +1,14 @@
|
|||||||
|
//go:build windows
|
||||||
|
|
||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os/exec"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *Process) terminateProcess() error {
|
||||||
|
pid := fmt.Sprintf("%d", p.cmd.Process.Pid)
|
||||||
|
cmd := exec.Command("taskkill", "/f", "/t", "/pid", pid)
|
||||||
|
return cmd.Run()
|
||||||
|
}
|
||||||
+69
-35
@@ -2,7 +2,6 @@ package proxy
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
"os"
|
"os"
|
||||||
@@ -13,13 +12,26 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
debugLogger = NewLogMonitorWriter(os.Stdout)
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
// flip to help with debugging tests
|
||||||
|
if false {
|
||||||
|
debugLogger.SetLogLevel(LevelDebug)
|
||||||
|
} else {
|
||||||
|
debugLogger.SetLogLevel(LevelError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
|
func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
|
||||||
logMonitor := NewLogMonitorWriter(io.Discard)
|
|
||||||
expectedMessage := "testing91931"
|
expectedMessage := "testing91931"
|
||||||
config := getTestSimpleResponderConfig(expectedMessage)
|
config := getTestSimpleResponderConfig(expectedMessage)
|
||||||
|
|
||||||
// Create a process
|
// Create a process
|
||||||
process := NewProcess("test-process", 5, config, logMonitor)
|
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
|
||||||
defer process.Stop()
|
defer process.Stop()
|
||||||
|
|
||||||
req := httptest.NewRequest("GET", "/test", nil)
|
req := httptest.NewRequest("GET", "/test", nil)
|
||||||
@@ -52,11 +64,10 @@ func TestProcess_AutomaticallyStartsUpstream(t *testing.T) {
|
|||||||
// are all handled successfully, even though they all may ask for the process to .start()
|
// are all handled successfully, even though they all may ask for the process to .start()
|
||||||
func TestProcess_WaitOnMultipleStarts(t *testing.T) {
|
func TestProcess_WaitOnMultipleStarts(t *testing.T) {
|
||||||
|
|
||||||
logMonitor := NewLogMonitorWriter(io.Discard)
|
|
||||||
expectedMessage := "testing91931"
|
expectedMessage := "testing91931"
|
||||||
config := getTestSimpleResponderConfig(expectedMessage)
|
config := getTestSimpleResponderConfig(expectedMessage)
|
||||||
|
|
||||||
process := NewProcess("test-process", 5, config, logMonitor)
|
process := NewProcess("test-process", 5, config, debugLogger, debugLogger)
|
||||||
defer process.Stop()
|
defer process.Stop()
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -84,7 +95,7 @@ func TestProcess_BrokenModelConfig(t *testing.T) {
|
|||||||
CheckEndpoint: "/health",
|
CheckEndpoint: "/health",
|
||||||
}
|
}
|
||||||
|
|
||||||
process := NewProcess("broken", 1, config, NewLogMonitor())
|
process := NewProcess("broken", 1, config, debugLogger, debugLogger)
|
||||||
|
|
||||||
req := httptest.NewRequest("GET", "/", nil)
|
req := httptest.NewRequest("GET", "/", nil)
|
||||||
w := httptest.NewRecorder()
|
w := httptest.NewRecorder()
|
||||||
@@ -109,7 +120,7 @@ func TestProcess_UnloadAfterTTL(t *testing.T) {
|
|||||||
config.UnloadAfter = 3 // seconds
|
config.UnloadAfter = 3 // seconds
|
||||||
assert.Equal(t, 3, config.UnloadAfter)
|
assert.Equal(t, 3, config.UnloadAfter)
|
||||||
|
|
||||||
process := NewProcess("ttl_test", 2, config, NewLogMonitorWriter(io.Discard))
|
process := NewProcess("ttl_test", 2, config, debugLogger, debugLogger)
|
||||||
defer process.Stop()
|
defer process.Stop()
|
||||||
|
|
||||||
// this should take 4 seconds
|
// this should take 4 seconds
|
||||||
@@ -151,7 +162,7 @@ func TestProcess_LowTTLValue(t *testing.T) {
|
|||||||
config.UnloadAfter = 1 // second
|
config.UnloadAfter = 1 // second
|
||||||
assert.Equal(t, 1, config.UnloadAfter)
|
assert.Equal(t, 1, config.UnloadAfter)
|
||||||
|
|
||||||
process := NewProcess("ttl", 2, config, NewLogMonitorWriter(os.Stdout))
|
process := NewProcess("ttl", 2, config, debugLogger, debugLogger)
|
||||||
defer process.Stop()
|
defer process.Stop()
|
||||||
|
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
@@ -178,7 +189,7 @@ func TestProcess_HTTPRequestsHaveTimeToFinish(t *testing.T) {
|
|||||||
|
|
||||||
expectedMessage := "12345"
|
expectedMessage := "12345"
|
||||||
config := getTestSimpleResponderConfig(expectedMessage)
|
config := getTestSimpleResponderConfig(expectedMessage)
|
||||||
process := NewProcess("t", 10, config, NewLogMonitorWriter(os.Stdout))
|
process := NewProcess("t", 10, config, debugLogger, debugLogger)
|
||||||
defer process.Stop()
|
defer process.Stop()
|
||||||
|
|
||||||
results := map[string]string{
|
results := map[string]string{
|
||||||
@@ -225,39 +236,40 @@ func TestProcess_HTTPRequestsHaveTimeToFinish(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSetState(t *testing.T) {
|
func TestProcess_SwapState(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
currentState ProcessState
|
currentState ProcessState
|
||||||
|
expectedState ProcessState
|
||||||
newState ProcessState
|
newState ProcessState
|
||||||
expectedError error
|
expectedError error
|
||||||
expectedResult ProcessState
|
expectedResult ProcessState
|
||||||
}{
|
}{
|
||||||
{"Stopped to Starting", StateStopped, StateStarting, nil, StateStarting},
|
{"Stopped to Starting", StateStopped, StateStopped, StateStarting, nil, StateStarting},
|
||||||
{"Starting to Ready", StateStarting, StateReady, nil, StateReady},
|
{"Starting to Ready", StateStarting, StateStarting, StateReady, nil, StateReady},
|
||||||
{"Starting to Failed", StateStarting, StateFailed, nil, StateFailed},
|
{"Starting to Failed", StateStarting, StateStarting, StateFailed, nil, StateFailed},
|
||||||
{"Starting to Stopping", StateStarting, StateStopping, nil, StateStopping},
|
{"Starting to Stopping", StateStarting, StateStarting, StateStopping, nil, StateStopping},
|
||||||
{"Ready to Stopping", StateReady, StateStopping, nil, StateStopping},
|
{"Ready to Stopping", StateReady, StateReady, StateStopping, nil, StateStopping},
|
||||||
{"Stopping to Stopped", StateStopping, StateStopped, nil, StateStopped},
|
{"Stopping to Stopped", StateStopping, StateStopping, StateStopped, nil, StateStopped},
|
||||||
{"Stopping to Shutdown", StateStopping, StateShutdown, nil, StateShutdown},
|
{"Stopping to Shutdown", StateStopping, StateStopping, StateShutdown, nil, StateShutdown},
|
||||||
{"Stopped to Ready", StateStopped, StateReady, fmt.Errorf("invalid state transition from stopped to ready"), StateStopped},
|
{"Stopped to Ready", StateStopped, StateStopped, StateReady, ErrInvalidStateTransition, StateStopped},
|
||||||
{"Starting to Stopped", StateStarting, StateStopped, fmt.Errorf("invalid state transition from starting to stopped"), StateStarting},
|
{"Starting to Stopped", StateStarting, StateStarting, StateStopped, ErrInvalidStateTransition, StateStarting},
|
||||||
{"Ready to Starting", StateReady, StateStarting, fmt.Errorf("invalid state transition from ready to starting"), StateReady},
|
{"Ready to Starting", StateReady, StateReady, StateStarting, ErrInvalidStateTransition, StateReady},
|
||||||
{"Ready to Failed", StateReady, StateFailed, fmt.Errorf("invalid state transition from ready to failed"), StateReady},
|
{"Ready to Failed", StateReady, StateReady, StateFailed, ErrInvalidStateTransition, StateReady},
|
||||||
{"Stopping to Ready", StateStopping, StateReady, fmt.Errorf("invalid state transition from stopping to ready"), StateStopping},
|
{"Stopping to Ready", StateStopping, StateStopping, StateReady, ErrInvalidStateTransition, StateStopping},
|
||||||
{"Failed to Stopped", StateFailed, StateStopped, fmt.Errorf("invalid state transition from failed to stopped"), StateFailed},
|
{"Failed to Stopped", StateFailed, StateFailed, StateStopped, ErrInvalidStateTransition, StateFailed},
|
||||||
{"Failed to Starting", StateFailed, StateStarting, fmt.Errorf("invalid state transition from failed to starting"), StateFailed},
|
{"Failed to Starting", StateFailed, StateFailed, StateStarting, ErrInvalidStateTransition, StateFailed},
|
||||||
{"Shutdown to Stopped", StateShutdown, StateStopped, fmt.Errorf("invalid state transition from shutdown to stopped"), StateShutdown},
|
{"Shutdown to Stopped", StateShutdown, StateShutdown, StateStopped, ErrInvalidStateTransition, StateShutdown},
|
||||||
{"Shutdown to Starting", StateShutdown, StateStarting, fmt.Errorf("invalid state transition from shutdown to starting"), StateShutdown},
|
{"Shutdown to Starting", StateShutdown, StateShutdown, StateStarting, ErrInvalidStateTransition, StateShutdown},
|
||||||
|
{"Expected state mismatch", StateStopped, StateStarting, StateStarting, ErrExpectedStateMismatch, StateStopped},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
p := &Process{
|
p := NewProcess("test", 10, getTestSimpleResponderConfig("test"), debugLogger, debugLogger)
|
||||||
state: test.currentState,
|
p.state = test.currentState
|
||||||
}
|
|
||||||
|
|
||||||
err := p.setState(test.newState)
|
resultState, err := p.swapState(test.expectedState, test.newState)
|
||||||
if err != nil && test.expectedError == nil {
|
if err != nil && test.expectedError == nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
} else if err == nil && test.expectedError != nil {
|
} else if err == nil && test.expectedError != nil {
|
||||||
@@ -268,8 +280,8 @@ func TestSetState(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.state != test.expectedResult {
|
if resultState != test.expectedResult {
|
||||||
t.Errorf("Expected state: %v, got: %v", test.expectedResult, p.state)
|
t.Errorf("Expected state: %v, got: %v", test.expectedResult, resultState)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -280,7 +292,6 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) {
|
|||||||
t.Skip("skipping long shutdown test")
|
t.Skip("skipping long shutdown test")
|
||||||
}
|
}
|
||||||
|
|
||||||
logMonitor := NewLogMonitorWriter(io.Discard)
|
|
||||||
expectedMessage := "testing91931"
|
expectedMessage := "testing91931"
|
||||||
|
|
||||||
// make a config where the healthcheck will always fail because port is wrong
|
// make a config where the healthcheck will always fail because port is wrong
|
||||||
@@ -288,13 +299,16 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) {
|
|||||||
config.Proxy = "http://localhost:9998/test"
|
config.Proxy = "http://localhost:9998/test"
|
||||||
|
|
||||||
healthCheckTTLSeconds := 30
|
healthCheckTTLSeconds := 30
|
||||||
process := NewProcess("test-process", healthCheckTTLSeconds, config, logMonitor)
|
process := NewProcess("test-process", healthCheckTTLSeconds, config, debugLogger, debugLogger)
|
||||||
|
|
||||||
|
// make it a lot faster
|
||||||
|
process.healthCheckLoopInterval = time.Second
|
||||||
|
|
||||||
// start a goroutine to simulate a shutdown
|
// start a goroutine to simulate a shutdown
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
<-time.After(time.Second * 2)
|
<-time.After(time.Millisecond * 500)
|
||||||
process.Shutdown()
|
process.Shutdown()
|
||||||
}()
|
}()
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
@@ -306,3 +320,23 @@ func TestProcess_ShutdownInterruptsHealthCheck(t *testing.T) {
|
|||||||
assert.ErrorContains(t, err, "health check interrupted due to shutdown")
|
assert.ErrorContains(t, err, "health check interrupted due to shutdown")
|
||||||
assert.Equal(t, StateShutdown, process.CurrentState())
|
assert.Equal(t, StateShutdown, process.CurrentState())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestProcess_ExitInterruptsHealthCheck(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping Exit Interrupts Health Check test")
|
||||||
|
}
|
||||||
|
|
||||||
|
// should run and exit but interrupt the long checkHealthTimeout
|
||||||
|
checkHealthTimeout := 5
|
||||||
|
config := ModelConfig{
|
||||||
|
Cmd: "sleep 1",
|
||||||
|
Proxy: "http://127.0.0.1:9913",
|
||||||
|
CheckEndpoint: "/health",
|
||||||
|
}
|
||||||
|
|
||||||
|
process := NewProcess("sleepy", checkHealthTimeout, config, debugLogger, debugLogger)
|
||||||
|
process.healthCheckLoopInterval = time.Second // make it faster
|
||||||
|
err := process.start()
|
||||||
|
assert.Equal(t, "upstream command exited prematurely with no error", err.Error())
|
||||||
|
assert.Equal(t, process.CurrentState(), StateFailed)
|
||||||
|
}
|
||||||
|
|||||||
+59
-14
@@ -7,6 +7,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -27,19 +28,52 @@ type ProxyManager struct {
|
|||||||
|
|
||||||
config *Config
|
config *Config
|
||||||
currentProcesses map[string]*Process
|
currentProcesses map[string]*Process
|
||||||
logMonitor *LogMonitor
|
|
||||||
ginEngine *gin.Engine
|
ginEngine *gin.Engine
|
||||||
|
|
||||||
|
// logging
|
||||||
|
proxyLogger *LogMonitor
|
||||||
|
upstreamLogger *LogMonitor
|
||||||
|
muxLogger *LogMonitor
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(config *Config) *ProxyManager {
|
func New(config *Config) *ProxyManager {
|
||||||
|
// set up loggers
|
||||||
|
stdoutLogger := NewLogMonitorWriter(os.Stdout)
|
||||||
|
upstreamLogger := NewLogMonitorWriter(stdoutLogger)
|
||||||
|
proxyLogger := NewLogMonitorWriter(stdoutLogger)
|
||||||
|
|
||||||
|
if config.LogRequests {
|
||||||
|
proxyLogger.Warn("LogRequests configuration is deprecated. Use logLevel instead.")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch strings.ToLower(strings.TrimSpace(config.LogLevel)) {
|
||||||
|
case "debug":
|
||||||
|
proxyLogger.SetLogLevel(LevelDebug)
|
||||||
|
upstreamLogger.SetLogLevel(LevelDebug)
|
||||||
|
case "info":
|
||||||
|
proxyLogger.SetLogLevel(LevelInfo)
|
||||||
|
upstreamLogger.SetLogLevel(LevelInfo)
|
||||||
|
case "warn":
|
||||||
|
proxyLogger.SetLogLevel(LevelWarn)
|
||||||
|
upstreamLogger.SetLogLevel(LevelWarn)
|
||||||
|
case "error":
|
||||||
|
proxyLogger.SetLogLevel(LevelError)
|
||||||
|
upstreamLogger.SetLogLevel(LevelError)
|
||||||
|
default:
|
||||||
|
proxyLogger.SetLogLevel(LevelInfo)
|
||||||
|
upstreamLogger.SetLogLevel(LevelInfo)
|
||||||
|
}
|
||||||
|
|
||||||
pm := &ProxyManager{
|
pm := &ProxyManager{
|
||||||
config: config,
|
config: config,
|
||||||
currentProcesses: make(map[string]*Process),
|
currentProcesses: make(map[string]*Process),
|
||||||
logMonitor: NewLogMonitor(),
|
|
||||||
ginEngine: gin.New(),
|
ginEngine: gin.New(),
|
||||||
|
|
||||||
|
proxyLogger: proxyLogger,
|
||||||
|
muxLogger: stdoutLogger,
|
||||||
|
upstreamLogger: upstreamLogger,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.LogRequests {
|
|
||||||
pm.ginEngine.Use(func(c *gin.Context) {
|
pm.ginEngine.Use(func(c *gin.Context) {
|
||||||
// Start timer
|
// Start timer
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
@@ -58,9 +92,8 @@ func New(config *Config) *ProxyManager {
|
|||||||
statusCode := c.Writer.Status()
|
statusCode := c.Writer.Status()
|
||||||
bodySize := c.Writer.Size()
|
bodySize := c.Writer.Size()
|
||||||
|
|
||||||
fmt.Fprintf(pm.logMonitor, "[llama-swap] %s [%s] \"%s %s %s\" %d %d \"%s\" %v\n",
|
pm.proxyLogger.Infof("Request %s \"%s %s %s\" %d %d \"%s\" %v",
|
||||||
clientIP,
|
clientIP,
|
||||||
time.Now().Format("2006-01-02 15:04:05"),
|
|
||||||
method,
|
method,
|
||||||
path,
|
path,
|
||||||
c.Request.Proto,
|
c.Request.Proto,
|
||||||
@@ -70,16 +103,26 @@ func New(config *Config) *ProxyManager {
|
|||||||
duration,
|
duration,
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
// see: https://github.com/mostlygeek/llama-swap/issues/42
|
// see: issue: #81, #77 and #42 for CORS issues
|
||||||
// respond with permissive OPTIONS for any endpoint
|
// respond with permissive OPTIONS for any endpoint
|
||||||
pm.ginEngine.Use(func(c *gin.Context) {
|
pm.ginEngine.Use(func(c *gin.Context) {
|
||||||
if c.Request.Method == "OPTIONS" {
|
if c.Request.Method == "OPTIONS" {
|
||||||
c.Header("Access-Control-Allow-Origin", "*")
|
c.Header("Access-Control-Allow-Origin", "*")
|
||||||
c.Header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
|
c.Header("Access-Control-Allow-Methods", "GET, POST, PUT, PATCH, DELETE, OPTIONS")
|
||||||
c.Header("Access-Control-Allow-Headers", "Content-Type, Authorization")
|
|
||||||
c.AbortWithStatus(204)
|
// allow whatever the client requested by default
|
||||||
|
if headers := c.Request.Header.Get("Access-Control-Request-Headers"); headers != "" {
|
||||||
|
sanitized := SanitizeAccessControlRequestHeaderValues(headers)
|
||||||
|
c.Header("Access-Control-Allow-Headers", sanitized)
|
||||||
|
} else {
|
||||||
|
c.Header(
|
||||||
|
"Access-Control-Allow-Headers",
|
||||||
|
"Content-Type, Authorization, Accept, X-Requested-With",
|
||||||
|
)
|
||||||
|
}
|
||||||
|
c.Header("Access-Control-Max-Age", "86400")
|
||||||
|
c.AbortWithStatus(http.StatusNoContent)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
c.Next()
|
c.Next()
|
||||||
@@ -104,6 +147,8 @@ func New(config *Config) *ProxyManager {
|
|||||||
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
|
pm.ginEngine.GET("/logs", pm.sendLogsHandlers)
|
||||||
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
|
pm.ginEngine.GET("/logs/stream", pm.streamLogsHandler)
|
||||||
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
|
pm.ginEngine.GET("/logs/streamSSE", pm.streamLogsHandlerSSE)
|
||||||
|
pm.ginEngine.GET("/logs/stream/:logMonitorID", pm.streamLogsHandler)
|
||||||
|
pm.ginEngine.GET("/logs/streamSSE/:logMonitorID", pm.streamLogsHandlerSSE)
|
||||||
|
|
||||||
pm.ginEngine.GET("/upstream", pm.upstreamIndex)
|
pm.ginEngine.GET("/upstream", pm.upstreamIndex)
|
||||||
pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream)
|
pm.ginEngine.Any("/upstream/:model_id/*upstreamPath", pm.proxyToUpstream)
|
||||||
@@ -263,19 +308,20 @@ func (pm *ProxyManager) swapModel(requestedModel string) (*Process, error) {
|
|||||||
requestedProcessKey := ProcessKeyName(profileName, realModelName)
|
requestedProcessKey := ProcessKeyName(profileName, realModelName)
|
||||||
|
|
||||||
if process, found := pm.currentProcesses[requestedProcessKey]; found {
|
if process, found := pm.currentProcesses[requestedProcessKey]; found {
|
||||||
|
pm.proxyLogger.Debugf("No-swap, using existing process for model [%s]", requestedModel)
|
||||||
return process, nil
|
return process, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop all running models
|
// stop all running models
|
||||||
|
pm.proxyLogger.Infof("Swapping model to [%s]", requestedModel)
|
||||||
pm.stopProcesses()
|
pm.stopProcesses()
|
||||||
|
|
||||||
if profileName == "" {
|
if profileName == "" {
|
||||||
modelConfig, modelID, found := pm.config.FindConfig(realModelName)
|
modelConfig, modelID, found := pm.config.FindConfig(realModelName)
|
||||||
if !found {
|
if !found {
|
||||||
return nil, fmt.Errorf("could not find configuration for %s", realModelName)
|
return nil, fmt.Errorf("could not find configuration for %s", realModelName)
|
||||||
}
|
}
|
||||||
|
|
||||||
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor)
|
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.upstreamLogger, pm.proxyLogger)
|
||||||
processKey := ProcessKeyName(profileName, modelID)
|
processKey := ProcessKeyName(profileName, modelID)
|
||||||
pm.currentProcesses[processKey] = process
|
pm.currentProcesses[processKey] = process
|
||||||
} else {
|
} else {
|
||||||
@@ -286,7 +332,7 @@ func (pm *ProxyManager) swapModel(requestedModel string) (*Process, error) {
|
|||||||
return nil, fmt.Errorf("could not find configuration for %s in group %s", realModelName, profileName)
|
return nil, fmt.Errorf("could not find configuration for %s in group %s", realModelName, profileName)
|
||||||
}
|
}
|
||||||
|
|
||||||
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.logMonitor)
|
process := NewProcess(modelID, pm.config.HealthCheckTimeout, modelConfig, pm.upstreamLogger, pm.proxyLogger)
|
||||||
processKey := ProcessKeyName(profileName, modelID)
|
processKey := ProcessKeyName(profileName, modelID)
|
||||||
pm.currentProcesses[processKey] = process
|
pm.currentProcesses[processKey] = process
|
||||||
}
|
}
|
||||||
@@ -374,7 +420,6 @@ func (pm *ProxyManager) proxyOAIHandler(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
||||||
|
|||||||
@@ -9,7 +9,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) {
|
func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) {
|
||||||
|
|
||||||
accept := c.GetHeader("Accept")
|
accept := c.GetHeader("Accept")
|
||||||
if strings.Contains(accept, "text/html") {
|
if strings.Contains(accept, "text/html") {
|
||||||
// Set the Content-Type header to text/html
|
// Set the Content-Type header to text/html
|
||||||
@@ -28,7 +27,7 @@ func (pm *ProxyManager) sendLogsHandlers(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
c.Header("Content-Type", "text/plain")
|
c.Header("Content-Type", "text/plain")
|
||||||
history := pm.logMonitor.GetHistory()
|
history := pm.muxLogger.GetHistory()
|
||||||
_, err := c.Writer.Write(history)
|
_, err := c.Writer.Write(history)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.AbortWithError(http.StatusInternalServerError, err)
|
c.AbortWithError(http.StatusInternalServerError, err)
|
||||||
@@ -42,8 +41,14 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
|
|||||||
c.Header("Transfer-Encoding", "chunked")
|
c.Header("Transfer-Encoding", "chunked")
|
||||||
c.Header("X-Content-Type-Options", "nosniff")
|
c.Header("X-Content-Type-Options", "nosniff")
|
||||||
|
|
||||||
ch := pm.logMonitor.Subscribe()
|
logMonitorId := c.Param("logMonitorID")
|
||||||
defer pm.logMonitor.Unsubscribe(ch)
|
logger, err := pm.getLogger(logMonitorId)
|
||||||
|
if err != nil {
|
||||||
|
c.String(http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ch := logger.Subscribe()
|
||||||
|
defer logger.Unsubscribe(ch)
|
||||||
|
|
||||||
notify := c.Request.Context().Done()
|
notify := c.Request.Context().Done()
|
||||||
flusher, ok := c.Writer.(http.Flusher)
|
flusher, ok := c.Writer.(http.Flusher)
|
||||||
@@ -56,7 +61,7 @@ func (pm *ProxyManager) streamLogsHandler(c *gin.Context) {
|
|||||||
// Send history first if not skipped
|
// Send history first if not skipped
|
||||||
|
|
||||||
if !skipHistory {
|
if !skipHistory {
|
||||||
history := pm.logMonitor.GetHistory()
|
history := logger.GetHistory()
|
||||||
if len(history) != 0 {
|
if len(history) != 0 {
|
||||||
c.Writer.Write(history)
|
c.Writer.Write(history)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
@@ -85,15 +90,21 @@ func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
|
|||||||
c.Header("Connection", "keep-alive")
|
c.Header("Connection", "keep-alive")
|
||||||
c.Header("X-Content-Type-Options", "nosniff")
|
c.Header("X-Content-Type-Options", "nosniff")
|
||||||
|
|
||||||
ch := pm.logMonitor.Subscribe()
|
logMonitorId := c.Param("logMonitorID")
|
||||||
defer pm.logMonitor.Unsubscribe(ch)
|
logger, err := pm.getLogger(logMonitorId)
|
||||||
|
if err != nil {
|
||||||
|
c.String(http.StatusBadRequest, err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ch := logger.Subscribe()
|
||||||
|
defer logger.Unsubscribe(ch)
|
||||||
|
|
||||||
notify := c.Request.Context().Done()
|
notify := c.Request.Context().Done()
|
||||||
|
|
||||||
// Send history first if not skipped
|
// Send history first if not skipped
|
||||||
_, skipHistory := c.GetQuery("no-history")
|
_, skipHistory := c.GetQuery("no-history")
|
||||||
if !skipHistory {
|
if !skipHistory {
|
||||||
history := pm.logMonitor.GetHistory()
|
history := logger.GetHistory()
|
||||||
if len(history) != 0 {
|
if len(history) != 0 {
|
||||||
c.SSEvent("message", string(history))
|
c.SSEvent("message", string(history))
|
||||||
c.Writer.Flush()
|
c.Writer.Flush()
|
||||||
@@ -111,3 +122,21 @@ func (pm *ProxyManager) streamLogsHandlerSSE(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getLogger searches for the appropriate logger based on the logMonitorId
|
||||||
|
func (pm *ProxyManager) getLogger(logMonitorId string) (*LogMonitor, error) {
|
||||||
|
var logger *LogMonitor
|
||||||
|
|
||||||
|
if logMonitorId == "" {
|
||||||
|
// maintain the default
|
||||||
|
logger = pm.muxLogger
|
||||||
|
} else if logMonitorId == "proxy" {
|
||||||
|
logger = pm.proxyLogger
|
||||||
|
} else if logMonitorId == "upstream" {
|
||||||
|
logger = pm.upstreamLogger
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("invalid logger. Use 'proxy' or 'upstream'")
|
||||||
|
}
|
||||||
|
|
||||||
|
return logger, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ func TestProxyManager_SwapProcessCorrectly(t *testing.T) {
|
|||||||
"model1": getTestSimpleResponderConfig("model1"),
|
"model1": getTestSimpleResponderConfig("model1"),
|
||||||
"model2": getTestSimpleResponderConfig("model2"),
|
"model2": getTestSimpleResponderConfig("model2"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -62,6 +63,7 @@ func TestProxyManager_SwapMultiProcess(t *testing.T) {
|
|||||||
Profiles: map[string][]string{
|
Profiles: map[string][]string{
|
||||||
"test": {model1, model2},
|
"test": {model1, model2},
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -103,6 +105,7 @@ func TestProxyManager_SwapMultiProcessParallelRequests(t *testing.T) {
|
|||||||
"model2": getTestSimpleResponderConfig("model2"),
|
"model2": getTestSimpleResponderConfig("model2"),
|
||||||
"model3": getTestSimpleResponderConfig("model3"),
|
"model3": getTestSimpleResponderConfig("model3"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -153,6 +156,7 @@ func TestProxyManager_ListModelsHandler(t *testing.T) {
|
|||||||
"model2": getTestSimpleResponderConfig("model2"),
|
"model2": getTestSimpleResponderConfig("model2"),
|
||||||
"model3": getTestSimpleResponderConfig("model3"),
|
"model3": getTestSimpleResponderConfig("model3"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -230,6 +234,7 @@ func TestProxyManager_ProfileNonMember(t *testing.T) {
|
|||||||
Profiles: map[string][]string{
|
Profiles: map[string][]string{
|
||||||
"test": {model1},
|
"test": {model1},
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -278,6 +283,7 @@ func TestProxyManager_Shutdown(t *testing.T) {
|
|||||||
"model2": model2Config,
|
"model2": model2Config,
|
||||||
"model3": model3Config,
|
"model3": model3Config,
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -313,6 +319,7 @@ func TestProxyManager_Unload(t *testing.T) {
|
|||||||
Models: map[string]ModelConfig{
|
Models: map[string]ModelConfig{
|
||||||
"model1": getTestSimpleResponderConfig("model1"),
|
"model1": getTestSimpleResponderConfig("model1"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -339,6 +346,7 @@ func TestProxyManager_StripProfileSlug(t *testing.T) {
|
|||||||
Models: map[string]ModelConfig{
|
Models: map[string]ModelConfig{
|
||||||
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
|
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -365,6 +373,7 @@ func TestProxyManager_RunningEndpoint(t *testing.T) {
|
|||||||
Profiles: map[string][]string{
|
Profiles: map[string][]string{
|
||||||
"test": {"model1", "model2"},
|
"test": {"model1", "model2"},
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
// Define a helper struct to parse the JSON response.
|
// Define a helper struct to parse the JSON response.
|
||||||
@@ -472,6 +481,7 @@ func TestProxyManager_AudioTranscriptionHandler(t *testing.T) {
|
|||||||
Models: map[string]ModelConfig{
|
Models: map[string]ModelConfig{
|
||||||
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
|
"TheExpectedModel": getTestSimpleResponderConfig("TheExpectedModel"),
|
||||||
},
|
},
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -580,6 +590,8 @@ func TestProxyManager_UseModelName(t *testing.T) {
|
|||||||
Models: map[string]ModelConfig{
|
Models: map[string]ModelConfig{
|
||||||
"model1": modelConfig,
|
"model1": modelConfig,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
LogLevel: "error",
|
||||||
}
|
}
|
||||||
|
|
||||||
proxy := New(config)
|
proxy := New(config)
|
||||||
@@ -639,5 +651,72 @@ func TestProxyManager_UseModelName(t *testing.T) {
|
|||||||
assert.Equal(t, upstreamModelName, response["model"])
|
assert.Equal(t, upstreamModelName, response["model"])
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestProxyManager_CORSOptionsHandler(t *testing.T) {
|
||||||
|
config := &Config{
|
||||||
|
HealthCheckTimeout: 15,
|
||||||
|
Models: map[string]ModelConfig{
|
||||||
|
"model1": getTestSimpleResponderConfig("model1"),
|
||||||
|
},
|
||||||
|
LogLevel: "error",
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
method string
|
||||||
|
requestHeaders map[string]string
|
||||||
|
expectedStatus int
|
||||||
|
expectedHeaders map[string]string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "OPTIONS with no headers",
|
||||||
|
method: "OPTIONS",
|
||||||
|
expectedStatus: http.StatusNoContent,
|
||||||
|
expectedHeaders: map[string]string{
|
||||||
|
"Access-Control-Allow-Origin": "*",
|
||||||
|
"Access-Control-Allow-Methods": "GET, POST, PUT, PATCH, DELETE, OPTIONS",
|
||||||
|
"Access-Control-Allow-Headers": "Content-Type, Authorization, Accept, X-Requested-With",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "OPTIONS with specific headers",
|
||||||
|
method: "OPTIONS",
|
||||||
|
requestHeaders: map[string]string{
|
||||||
|
"Access-Control-Request-Headers": "X-Custom-Header, Some-Other-Header",
|
||||||
|
},
|
||||||
|
expectedStatus: http.StatusNoContent,
|
||||||
|
expectedHeaders: map[string]string{
|
||||||
|
"Access-Control-Allow-Origin": "*",
|
||||||
|
"Access-Control-Allow-Methods": "GET, POST, PUT, PATCH, DELETE, OPTIONS",
|
||||||
|
"Access-Control-Allow-Headers": "X-Custom-Header, Some-Other-Header",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Non-OPTIONS request",
|
||||||
|
method: "GET",
|
||||||
|
expectedStatus: http.StatusNotFound, // Since we don't have a GET route defined
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
proxy := New(config)
|
||||||
|
defer proxy.StopProcesses()
|
||||||
|
|
||||||
|
req := httptest.NewRequest(tt.method, "/v1/chat/completions", nil)
|
||||||
|
for k, v := range tt.requestHeaders {
|
||||||
|
req.Header.Set(k, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
w := httptest.NewRecorder()
|
||||||
|
proxy.ginEngine.ServeHTTP(w, req)
|
||||||
|
|
||||||
|
assert.Equal(t, tt.expectedStatus, w.Code)
|
||||||
|
|
||||||
|
for header, expectedValue := range tt.expectedHeaders {
|
||||||
|
assert.Equal(t, expectedValue, w.Header().Get(header))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,43 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
func isTokenChar(r rune) bool {
|
||||||
|
switch {
|
||||||
|
case r >= 'a' && r <= 'z':
|
||||||
|
case r >= 'A' && r <= 'Z':
|
||||||
|
case r >= '0' && r <= '9':
|
||||||
|
case strings.ContainsRune("!#$%&'*+-.^_`|~", r):
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func SanitizeAccessControlRequestHeaderValues(headerValues string) string {
|
||||||
|
parts := strings.Split(headerValues, ",")
|
||||||
|
valid := make([]string, 0, len(parts))
|
||||||
|
|
||||||
|
for _, p := range parts {
|
||||||
|
v := strings.TrimSpace(p)
|
||||||
|
if v == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
validPart := true
|
||||||
|
for _, c := range v {
|
||||||
|
if !isTokenChar(c) {
|
||||||
|
validPart = false
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if validPart {
|
||||||
|
valid = append(valid, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(valid, ", ")
|
||||||
|
}
|
||||||
@@ -0,0 +1,77 @@
|
|||||||
|
package proxy
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestSanitizeAccessControlRequestHeaderValues(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
input string
|
||||||
|
expected string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty string",
|
||||||
|
input: "",
|
||||||
|
expected: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "whitespace only",
|
||||||
|
input: " ",
|
||||||
|
expected: "",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "single valid value",
|
||||||
|
input: "content-type",
|
||||||
|
expected: "content-type",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "multiple valid values",
|
||||||
|
input: "content-type, authorization, x-requested-with",
|
||||||
|
expected: "content-type, authorization, x-requested-with",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "values with extra spaces",
|
||||||
|
input: " content-type , authorization ",
|
||||||
|
expected: "content-type, authorization",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "values with tabs",
|
||||||
|
input: "content-type,\tauthorization",
|
||||||
|
expected: "content-type, authorization",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "values with invalid characters",
|
||||||
|
input: "content-type, auth\n, x-requested-with\r",
|
||||||
|
expected: "content-type, auth, x-requested-with",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "empty values in list",
|
||||||
|
input: "content-type,,authorization",
|
||||||
|
expected: "content-type, authorization",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "leading and trailing commas",
|
||||||
|
input: ",content-type,authorization,",
|
||||||
|
expected: "content-type, authorization",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mixed valid and invalid values",
|
||||||
|
input: "content-type, \x00invalid, x-requested-with",
|
||||||
|
expected: "content-type, x-requested-with",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "mixed case values",
|
||||||
|
input: "Content-Type, my-Valid-Header, Another-hEader",
|
||||||
|
expected: "Content-Type, my-Valid-Header, Another-hEader",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got := SanitizeAccessControlRequestHeaderValues(tt.input)
|
||||||
|
if got != tt.expected {
|
||||||
|
t.Errorf("SanitizeAccessControlRequestHeaderValues(%q) = %q, want %q",
|
||||||
|
tt.input, got, tt.expected)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user