From f7e9ee97b7ee04cc1c84c352cd6ffb073ec44712 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Wed, 18 Jun 2025 19:19:19 -0700 Subject: [PATCH 1/8] rename to register model --- .../models/deepseek_v3/infra/parallelize.py | 26 +++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index f8090683c1..669826356f 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -9,8 +9,9 @@ from torch.distributed.device_mesh import DeviceMesh -from torchtitan.config_manager import JobConfig +from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP from torchtitan.distributed import ParallelDims +from torchtitan.tools.logging import logger def parallelize_deepseekv3( @@ -19,5 +20,26 @@ def parallelize_deepseekv3( parallel_dims: ParallelDims, job_config: JobConfig, ): - # TODO: Add support for parallelizing the model, this is a placeholder function for now + if job_config.activation_checkpoint.mode != "none": + apply_ac(model, job_config.activation_checkpoint) + + if parallel_dims.dp_shard_enabled: # apply FSDP or HSDP + if parallel_dims.dp_replicate_enabled: + dp_mesh_dim_names = ("dp_replicate", "dp_shard") + else: + dp_mesh_dim_names = ("dp_shard",) + + apply_fsdp( + model, + world_mesh[tuple(dp_mesh_dim_names)], + param_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_param], + reduce_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_reduce], + cpu_offload=job_config.training.enable_cpu_offload, + ) + + if parallel_dims.dp_replicate_enabled: + logger.info("Applied HSDP to the model") + else: + logger.info("Applied FSDP to the model") + return model From f1bb6b8874971976835feb584c85be584c87ca58 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 19 Jun 2025 08:18:26 -0700 Subject: [PATCH 2/8] forward and backward --- .../models/deepseek_v3/infra/parallelize.py | 24 +------------------ torchtitan/models/deepseek_v3/model/args.py | 1 - torchtitan/models/deepseek_v3/model/model.py | 4 ++-- 3 files changed, 3 insertions(+), 26 deletions(-) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 669826356f..5985c03ced 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -4,7 +4,6 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. - import torch.nn as nn from torch.distributed.device_mesh import DeviceMesh @@ -20,26 +19,5 @@ def parallelize_deepseekv3( parallel_dims: ParallelDims, job_config: JobConfig, ): - if job_config.activation_checkpoint.mode != "none": - apply_ac(model, job_config.activation_checkpoint) - - if parallel_dims.dp_shard_enabled: # apply FSDP or HSDP - if parallel_dims.dp_replicate_enabled: - dp_mesh_dim_names = ("dp_replicate", "dp_shard") - else: - dp_mesh_dim_names = ("dp_shard",) - - apply_fsdp( - model, - world_mesh[tuple(dp_mesh_dim_names)], - param_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_param], - reduce_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_reduce], - cpu_offload=job_config.training.enable_cpu_offload, - ) - - if parallel_dims.dp_replicate_enabled: - logger.info("Applied HSDP to the model") - else: - logger.info("Applied FSDP to the model") - + # TODO: Add support for parallelizing the model, this is a placeholder function for now return model diff --git a/torchtitan/models/deepseek_v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py index c0134bf548..09e882764f 100644 --- a/torchtitan/models/deepseek_v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -111,7 +111,6 @@ def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, in nparams_dense = 0 for name, p in model.named_parameters(): - print(name) if "embedding" in name: nparams_embedding += p.numel() nparams_dense += p.numel() diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index c5ee02327a..b294d7eea1 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -28,8 +28,8 @@ def precompute_freqs_cis(args: DeepSeekV3ModelArgs) -> torch.Tensor: Returns: torch.Tensor: Precomputed complex exponential values for positional embeddings. """ - dim = args.qk_rope_head_dim - seqlen = args.max_seq_len + dim = args.qk_rope_head_dim # 64 + seqlen = args.max_seq_len # 2048 beta_fast = args.beta_fast beta_slow = args.beta_slow base = args.rope_theta From ea262c47afcb4720b48ba9c3a30c5743b2afcb73 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Thu, 19 Jun 2025 08:42:23 -0700 Subject: [PATCH 3/8] lint --- torchtitan/models/deepseek_v3/infra/parallelize.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 5985c03ced..40df7fc7db 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -8,9 +8,8 @@ from torch.distributed.device_mesh import DeviceMesh -from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP +from torchtitan.config_manager import JobConfig from torchtitan.distributed import ParallelDims -from torchtitan.tools.logging import logger def parallelize_deepseekv3( From 0b56b964b4ccb9044db9dec5281ffd74a0fadc8b Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Fri, 20 Jun 2025 11:58:14 -0700 Subject: [PATCH 4/8] remove useless comments --- torchtitan/models/deepseek_v3/model/args.py | 1 + torchtitan/models/deepseek_v3/model/model.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/torchtitan/models/deepseek_v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py index 09e882764f..c0134bf548 100644 --- a/torchtitan/models/deepseek_v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -111,6 +111,7 @@ def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, in nparams_dense = 0 for name, p in model.named_parameters(): + print(name) if "embedding" in name: nparams_embedding += p.numel() nparams_dense += p.numel() diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index b294d7eea1..c5ee02327a 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -28,8 +28,8 @@ def precompute_freqs_cis(args: DeepSeekV3ModelArgs) -> torch.Tensor: Returns: torch.Tensor: Precomputed complex exponential values for positional embeddings. """ - dim = args.qk_rope_head_dim # 64 - seqlen = args.max_seq_len # 2048 + dim = args.qk_rope_head_dim + seqlen = args.max_seq_len beta_fast = args.beta_fast beta_slow = args.beta_slow base = args.rope_theta From 5cbafadde9cf73e7e62e712fa14bdfd98c9ee7c4 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Fri, 20 Jun 2025 13:05:48 -0700 Subject: [PATCH 5/8] 16b config --- .../train_configs/deepseek_v3_16b.toml | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml new file mode 100644 index 0000000000..20858930c1 --- /dev/null +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -0,0 +1,69 @@ +# torchtitan Config.toml + +[job] +dump_folder = "./outputs" +description = "DeepSeek-V3 debug training" +print_args = false +use_for_integration_test = true + +[profiling] +enable_profiling = false +save_traces_folder = "profile_trace" +profile_freq = 10 +enable_memory_snapshot = false +save_memory_snapshot_folder = "memory_snapshot" + +[metrics] +log_freq = 1 +disable_color_printing = false +enable_tensorboard = false +save_tb_folder = "tb" +enable_wandb = false + +[model] +name = "deepseek_v3" +flavor = "16B" +# test tokenizer.model, for debug purpose only +tokenizer_path = "./tests/assets/test_tiktoken.model" +# converters = ["float8"] + +[optimizer] +name = "AdamW" +lr = 8e-4 +eps = 1e-8 + +[lr_scheduler] +warmup_steps = 2 # lr scheduler warm up, normally 20% of the train steps +decay_ratio = 0.8 # lr scheduler decay ratio, 80% of the train steps +decay_type = "linear" +lr_min = 0.0 + +[training] +local_batch_size = 8 +seq_len = 2048 +max_norm = 1.0 # grad norm clipping +steps = 10 +compile = false +dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) + +[parallelism] +data_parallel_replicate_degree = 1 +data_parallel_shard_degree = -1 +fsdp_reshard_after_forward = "default" # default / never / always + +[checkpoint] +enable_checkpoint = false +folder = "checkpoint" +interval = 10 +last_save_model_weights_only = false +export_dtype = "float32" +async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem"] + +[activation_checkpoint] +mode = "none" # ["none", "selective", "full"] +selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy + +[float8] +enable_fsdp_float8_all_gather = false +precompute_float8_dynamic_scale_for_fsdp = false +filter_fqns = ["output"] From 11d3b38a9f9f4a500e1fbefd39f201310678a750 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Mon, 23 Jun 2025 13:00:38 -0700 Subject: [PATCH 6/8] fsdp --- torchtitan/models/deepseek_v3/__init__.py | 1 + .../models/deepseek_v3/infra/parallelize.py | 34 +++++++++++++++++-- torchtitan/models/deepseek_v3/model/args.py | 1 - torchtitan/models/deepseek_v3/model/model.py | 9 +++-- torchtitan/models/deepseek_v3/model/moe.py | 2 ++ .../train_configs/deepseek_v3_16b.toml | 14 ++++---- 6 files changed, 44 insertions(+), 17 deletions(-) diff --git a/torchtitan/models/deepseek_v3/__init__.py b/torchtitan/models/deepseek_v3/__init__.py index 8a21e53ddc..7eb16a1f3f 100644 --- a/torchtitan/models/deepseek_v3/__init__.py +++ b/torchtitan/models/deepseek_v3/__init__.py @@ -11,6 +11,7 @@ from torchtitan.components.optimizer import build_optimizers from torchtitan.datasets.hf_datasets import build_hf_dataloader from torchtitan.datasets.tokenizer.tiktoken import build_tiktoken_tokenizer + from torchtitan.protocols.train_spec import register_train_spec, TrainSpec from .infra.parallelize import parallelize_deepseekv3 diff --git a/torchtitan/models/deepseek_v3/infra/parallelize.py b/torchtitan/models/deepseek_v3/infra/parallelize.py index 40df7fc7db..99338663f6 100644 --- a/torchtitan/models/deepseek_v3/infra/parallelize.py +++ b/torchtitan/models/deepseek_v3/infra/parallelize.py @@ -5,11 +5,12 @@ # LICENSE file in the root directory of this source tree. import torch.nn as nn - from torch.distributed.device_mesh import DeviceMesh -from torchtitan.config_manager import JobConfig +from torchtitan.config_manager import JobConfig, TORCH_DTYPE_MAP from torchtitan.distributed import ParallelDims +from torchtitan.models.llama3.infra.parallelize import apply_ac, apply_fsdp +from torchtitan.tools.logging import logger def parallelize_deepseekv3( @@ -18,5 +19,32 @@ def parallelize_deepseekv3( parallel_dims: ParallelDims, job_config: JobConfig, ): - # TODO: Add support for parallelizing the model, this is a placeholder function for now + if job_config.activation_checkpoint.mode != "none": + apply_ac(model, job_config.activation_checkpoint) + + dp_mesh: DeviceMesh | None = None + if ( + parallel_dims.dp_shard_enabled + ): # apply FSDP or HSDP, potentially with Context Parallel + if parallel_dims.dp_replicate_enabled: + dp_mesh_dim_names = ("dp_replicate", "dp_shard") + else: + dp_mesh_dim_names = ("dp_shard",) + dp_mesh = world_mesh[tuple(dp_mesh_dim_names)] + + apply_fsdp( + model, + dp_mesh, + param_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_param], + reduce_dtype=TORCH_DTYPE_MAP[job_config.training.mixed_precision_reduce], + pp_enabled=parallel_dims.pp_enabled, + cpu_offload=job_config.training.enable_cpu_offload, + reshard_after_forward_policy=job_config.parallelism.fsdp_reshard_after_forward, + ) + + if parallel_dims.dp_replicate_enabled: + logger.info("Applied HSDP to the model") + else: + logger.info("Applied FSDP to the model") + return model diff --git a/torchtitan/models/deepseek_v3/model/args.py b/torchtitan/models/deepseek_v3/model/args.py index c0134bf548..09e882764f 100644 --- a/torchtitan/models/deepseek_v3/model/args.py +++ b/torchtitan/models/deepseek_v3/model/args.py @@ -111,7 +111,6 @@ def get_nparams_and_flops(self, model: nn.Module, seq_len: int) -> tuple[int, in nparams_dense = 0 for name, p in model.named_parameters(): - print(name) if "embedding" in name: nparams_embedding += p.numel() nparams_dense += p.numel() diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index c5ee02327a..ebce21ed9c 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -310,11 +310,10 @@ def __init__(self, model_args: DeepSeekV3ModelArgs): "freqs_cis", precompute_freqs_cis(model_args), persistent=False ) - self.layers = torch.nn.ModuleList() + self.layers = torch.nn.ModuleDict() for layer_id in range(model_args.n_layers): - self.layers.append( - TransformerBlock(layer_id=layer_id, model_args=model_args) - ) + self.layers[str(layer_id)] = TransformerBlock(layer_id, model_args) + self.norm = nn.RMSNorm(model_args.dim) self.output = nn.Linear( model_args.dim, model_args.vocab_size, dtype=torch.get_default_dtype() @@ -333,7 +332,7 @@ def forward(self, tokens: torch.Tensor): """ h = self.tok_embeddings(tokens) - for layer in self.layers: + for layer in self.layers.values(): h = layer(h, self.freqs_cis) h = self.norm(h) output = self.output(h) # (batch_size, seq_len, dim) diff --git a/torchtitan/models/deepseek_v3/model/moe.py b/torchtitan/models/deepseek_v3/model/moe.py index 3e17968e11..8772ba6cdb 100644 --- a/torchtitan/models/deepseek_v3/model/moe.py +++ b/torchtitan/models/deepseek_v3/model/moe.py @@ -317,6 +317,8 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: else: out = torch.zeros_like(x.reshape(bs * slen, dim)) + # Convert the routed_output to the same dtype as out + routed_output = routed_output.to(out.dtype) out = out.scatter_add(dim=0, index=token_indices, src=routed_output) out = out.reshape(bs, slen, dim) return out diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index 20858930c1..7fffe303de 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -2,9 +2,8 @@ [job] dump_folder = "./outputs" -description = "DeepSeek-V3 debug training" +description = "DeepSeek-V3 16B model training" print_args = false -use_for_integration_test = true [profiling] enable_profiling = false @@ -39,12 +38,12 @@ decay_type = "linear" lr_min = 0.0 [training] -local_batch_size = 8 +local_batch_size = 32 seq_len = 2048 max_norm = 1.0 # grad norm clipping -steps = 10 +steps = 20 compile = false -dataset = "c4_test" # supported datasets: c4_test (2K), c4 (177M) +dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) [parallelism] data_parallel_replicate_degree = 1 @@ -57,11 +56,10 @@ folder = "checkpoint" interval = 10 last_save_model_weights_only = false export_dtype = "float32" -async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem"] +async_mode = "disabled" # ["disabled", "async", "async_with_pinned_mem]" [activation_checkpoint] -mode = "none" # ["none", "selective", "full"] -selective_ac_option = '2' # 'int' = ac every positive int layer or 'op', ac based on ops policy +mode = "full" # ["none", "selective", "full"] [float8] enable_fsdp_float8_all_gather = false From b98683a3a06256638938fcb7c05fbc52353019b8 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Mon, 23 Jun 2025 14:11:49 -0700 Subject: [PATCH 7/8] fix to conversion --- torchtitan/models/deepseek_v3/model/moe.py | 6 +++--- .../models/deepseek_v3/train_configs/deepseek_v3_16b.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/torchtitan/models/deepseek_v3/model/moe.py b/torchtitan/models/deepseek_v3/model/moe.py index 8772ba6cdb..c9217c8be8 100644 --- a/torchtitan/models/deepseek_v3/model/moe.py +++ b/torchtitan/models/deepseek_v3/model/moe.py @@ -307,7 +307,9 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: # shape (bs*slen*top_k, dim) routed_output = self.experts(routed_input, num_local_tokens_per_expert) - routed_output = routed_output * top_scores.unsqueeze(-1) + routed_output = (routed_output.to(torch.float32) * top_scores.unsqueeze(-1)).to( + x.dtype + ) # shared expert if self.shared_expert is not None: @@ -317,8 +319,6 @@ def forward(self, x: torch.Tensor) -> torch.Tensor: else: out = torch.zeros_like(x.reshape(bs * slen, dim)) - # Convert the routed_output to the same dtype as out - routed_output = routed_output.to(out.dtype) out = out.scatter_add(dim=0, index=token_indices, src=routed_output) out = out.reshape(bs, slen, dim) return out diff --git a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml index 7fffe303de..4f08fb0982 100644 --- a/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml +++ b/torchtitan/models/deepseek_v3/train_configs/deepseek_v3_16b.toml @@ -41,7 +41,7 @@ lr_min = 0.0 local_batch_size = 32 seq_len = 2048 max_norm = 1.0 # grad norm clipping -steps = 20 +steps = 10 compile = false dataset = "c4" # supported datasets: c4_test (2K), c4 (177M) From dfe2b6148e9a0fe80227b878aa0e854b8848c098 Mon Sep 17 00:00:00 2001 From: Jiani Wang Date: Tue, 24 Jun 2025 10:48:47 -0700 Subject: [PATCH 8/8] fix the transpose --- torchtitan/models/deepseek_v3/model/model.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/torchtitan/models/deepseek_v3/model/model.py b/torchtitan/models/deepseek_v3/model/model.py index ebce21ed9c..3eb0f2fbc6 100644 --- a/torchtitan/models/deepseek_v3/model/model.py +++ b/torchtitan/models/deepseek_v3/model/model.py @@ -217,6 +217,10 @@ def forward( [k_nope, k_pe.expand(-1, -1, self.n_heads, -1)], dim=-1 ) # (bsz, seqlen, n_heads, qk_head_dim) + q = q.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) + k = k.transpose(1, 2) # (bsz, n_heads, seqlen, qk_head_dim) + v = v.transpose(1, 2) # (bsz, n_heads, seqlen, v_head_dim) + # TODO: Need to pass softmax_scale to sdpa() interface. # For mask, DeepseekV3 uses causal mask, so we can use the default mask in sdpa # https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/model.py#L17