Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 101 additions & 12 deletions tests/integration/defs/accuracy/test_disaggregated_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,69 @@
from .accuracy_core import (GSM8K, MMLU, JsonModeEval,
LlmapiAccuracyTestHarness, get_accuracy_task)

MAX_PERF_METRICS_REQUESTS = 100


def get_worker_env_vars(kv_cache_perf_dir: str = None):
env = os.environ.copy()
if kv_cache_perf_dir:
env["TRTLLM_KVCACHE_TIME_OUTPUT_PATH"] = kv_cache_perf_dir
return env


def show_debug_perf(thread_pool: ThreadPoolExecutor,
kv_cache_perf_dir: str = None,
perf_metrics_url: str = None):

def wait_for_all_tasks_to_complete():
#thread_pool.shutdown(wait=True)
try:
print("Waiting for all tasks to complete")
for future in getattr(thread_pool, "futures", []):
try:
future.result(timeout=300)
except concurrent.futures.TimeoutError:
print("Timeout waiting for a future to complete.")
except Exception as e:
print(f"Future completed with error: {e}")
except Exception as e:
print(f"Error while waiting for futures: {e}")

def show_kvcache_time(kv_cache_perf_dir, max_lines=100):
for file in os.listdir(kv_cache_perf_dir):
print(f"{'-'*25} {file}:{max_lines} {'-'*25}")
with open(os.path.join(kv_cache_perf_dir, file), "r") as f:
for line in f.readlines()[-max_lines:]:
print(line.strip())

def show_perf_metrics(url):
perf_url = f"{url}/perf_metrics"
try:
print(f"Fetching perf metrics from {perf_url}")
resp = requests.get(perf_url, timeout=10)
if resp.status_code == 200:
try:
print("perf_metrics JSON:")
metrics = resp.json()
print(json.dumps(metrics, indent=2, ensure_ascii=False))
print("-" * 100)
except ValueError:
print("perf_metrics returned non-JSON response:", resp.text)
else:
print(
f"perf_metrics returned status {resp.status_code}: {resp.text}"
)
except requests.exceptions.RequestException as e:
print(f"Error fetching {perf_url}: {e}")

wait_for_all_tasks_to_complete()
if kv_cache_perf_dir:
show_kvcache_time(kv_cache_perf_dir)
if perf_metrics_url:
show_perf_metrics(perf_metrics_url)
# force failure to see the logs
assert False


class Result(GenerationResultBase):

Expand Down Expand Up @@ -76,15 +139,29 @@ def launch_disaggregated_llm(
ctx_model: str = None,
gen_model: str = None,
server_waiting_timeout: int = DEFAULT_SERVER_WAITING_TIMEOUT,
max_workers: int = 16):
max_workers: int = 16,
debug_perf: bool = False):
temp_dir = tempfile.TemporaryDirectory()
disaggregated_serving_config_path = os.path.join(
temp_dir.name, "disaggregated_serving_config.yaml")

if debug_perf:
kv_cache_perf_dir = os.path.join(temp_dir.name, "kvcache_perf")
os.makedirs(kv_cache_perf_dir, exist_ok=True)
else:
kv_cache_perf_dir = None
if tensor_parallel_size > 1:
print(
f"Using unified tp parameter for testing is not recommended. Please use server configs instead."
)
if debug_perf:
disaggregated_server_config[
"perf_metrics_max_requests"] = MAX_PERF_METRICS_REQUESTS
ctx_server_config["return_perf_metrics"] = True
ctx_server_config[
"perf_metrics_max_requests"] = MAX_PERF_METRICS_REQUESTS
gen_server_config["return_perf_metrics"] = True
gen_server_config[
"perf_metrics_max_requests"] = MAX_PERF_METRICS_REQUESTS

with open(disaggregated_serving_config_path, "w") as f:
yaml.dump(disaggregated_server_config, f)
Expand Down Expand Up @@ -144,7 +221,7 @@ def launch_disaggregated_llm(
current_gpu_offset = 0

for i, port in enumerate(ctx_ports):
env_ctx = os.environ.copy()
env_ctx = get_worker_env_vars(kv_cache_perf_dir=kv_cache_perf_dir)
gpu_range = range(current_gpu_offset,
current_gpu_offset + ctx_total_gpus)
env_ctx["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
Expand All @@ -164,7 +241,7 @@ def launch_disaggregated_llm(
gen_servers = []

for i, port in enumerate(gen_ports):
env_gen = os.environ.copy()
env_gen = get_worker_env_vars(kv_cache_perf_dir=kv_cache_perf_dir)
gpu_range = range(current_gpu_offset,
current_gpu_offset + gen_total_gpus)
env_gen["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, gpu_range))
Expand Down Expand Up @@ -287,6 +364,14 @@ def generate_async(prompt: str,
tokenizer = load_hf_tokenizer(model_name)
yield DuckLLM(args, tokenizer, generate_async)

if debug_perf:
show_debug_perf(
thread_pool,
kv_cache_perf_dir=kv_cache_perf_dir,
perf_metrics_url=f"http://localhost:8000"
if debug_perf else None,
)


def run_parallel_test(model_name: str,
model_path: str,
Expand Down Expand Up @@ -355,7 +440,7 @@ def run_parallel_test(model_name: str,
task.evaluate(llm)


@pytest.mark.timeout(DEFAULT_TEST_TIMEOUT)
@pytest.mark.timeout(DEFAULT_TEST_TIMEOUT * 5)
class TestLlama3_1_8BInstruct(LlmapiAccuracyTestHarness):
MODEL_NAME = "meta-llama/Llama-3.1-8B-Instruct"
MODEL_PATH = f"{llm_models_root()}/llama-3.1-model/Llama-3.1-8B-Instruct"
Expand Down Expand Up @@ -508,9 +593,13 @@ def test_eagle3(self, overlap_scheduler, eagle3_one_model):
"urls": ["localhost:8002"]
}
}
with launch_disaggregated_llm(disaggregated_server_config,
ctx_server_config, gen_server_config,
self.MODEL_PATH) as llm:
with launch_disaggregated_llm(
disaggregated_server_config,
ctx_server_config,
gen_server_config,
self.MODEL_PATH,
debug_perf=True,
) as llm:
task = GSM8K(self.MODEL_NAME)
task.evaluate(llm)

Expand Down Expand Up @@ -1071,12 +1160,12 @@ def test_auto_dtype(self, overlap_scheduler):
}
}
with launch_disaggregated_llm(disaggregated_server_config,
ctx_server_config, gen_server_config,
self.MODEL_PATH) as llm:
ctx_server_config,
gen_server_config,
self.MODEL_PATH,
debug_perf=True) as llm:
task = GSM8K(self.MODEL_NAME)
task.evaluate(llm)
task = MMLU(self.MODEL_NAME)
task.evaluate(llm)

def test_chunked_prefill(self):
ctx_server_config = {
Expand Down
2 changes: 0 additions & 2 deletions tests/integration/test_lists/waives.txt
Original file line number Diff line number Diff line change
Expand Up @@ -377,9 +377,7 @@ unittest/_torch/modules SKIP (https://nvbugs/5637037)
accuracy/test_disaggregated_serving.py::TestQwen3_8B::test_auto_dtype[False] SKIP (https://nvbugs/5651854)
accuracy/test_disaggregated_serving.py::TestQwen3_8B::test_auto_dtype[True] SKIP (https://nvbugs/5651854)
disaggregated/test_disaggregated.py::test_disaggregated_deepseek_v3_lite_bf16_empty_batch[DeepSeek-V3-Lite-bf16] SKIP (https://nvbugs/5601682)
accuracy/test_disaggregated_serving.py::TestLlama3_1_8BInstruct::test_eagle3[eagle3_one_model=False-overlap_scheduler=False] SKIP (https://nvbugs/5655584)
accuracy/test_disaggregated_serving.py::TestQwen3_8B::test_chunked_prefill SKIP (https://nvbugs/5608930)
accuracy/test_disaggregated_serving.py::TestQwen3_8B::test_auto_dtype[False] SKIP (https://nvbugspro.nvidia.com/bug/5651854)
test_e2e.py::test_ptp_quickstart_multimodal_chunked_prefill[phi4-multimodal-instruct-fp4-multimodals/Phi-4-multimodal-instruct-FP4-0.8-image] SKIP (https://nvbugs/5568836)
test_e2e.py::test_ptp_quickstart_multimodal_chunked_prefill[phi4-multimodal-instruct-fp4-multimodals/Phi-4-multimodal-instruct-FP4-0.8-image] SKIP (https://nvbugs/5568836)
test_e2e.py::test_ptp_quickstart_multimodal_kv_cache_reuse[phi4-multimodal-instruct-fp4-multimodals/Phi-4-multimodal-instruct-FP4-0.8-image] SKIP (https://nvbugs/5568836)
Expand Down