Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ down-sampling occurring

- `treeDag.json` and `forktree.json` files are no longer hidden and are now stored in the
`resources` directory
- `trimmomatic` now forces `-phred33` when the encoding can't be determined. If it still fails, the encoding is
changed to `-phred64` and the run is retried.

## 1.4.2

Expand Down
61 changes: 60 additions & 1 deletion flowcraft/generator/components/metagenomics.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, **kwargs):
self.input_type = "fasta"
self.output_type = "fasta"

self.link_end.append({"link": "__fastq", "alias": "_LAST_fastq"})
self.link_end.append({"link": "SIDE_max_len", "alias": "SIDE_max_len"})

self.params = {
"clusters": {
Expand Down Expand Up @@ -561,3 +561,62 @@ def __init__(self, **kwargs):
self.status_channels = [
"split_assembly"
]


class Vamb(Process):
"""
Vamb process template interface for the
taxonomic independent binning of metagenomic
assemblies.

This process is set with:
- ``input_type``: assembly
- ``output_type``: assembly
- ``ptype``: post_assembly

It contains one **dependency process**:

- ``assembly_mapping``: Requires the BAM file generated by the
assembly mapping process

"""
def __init__(self, **kwargs):

super().__init__(**kwargs)

self.input_type = "fasta"
self.output_type = "fasta"

self.dependencies = ["assembly_mapping"]

self.params = {
"minContig": {
"default": 2000,
"description": "Ignore contigs shorter than this. Default: 2000"
},
"minAlignScore":{
"default": 50,
"description": "Ignore reads with alignment score below this. Default: 50"
},
"clearInput": {
"default": "false",
"description":
"Permanently removes temporary input files. This option "
"is only useful to remove temporary files in large "
"workflows and prevents nextflow's resume functionality. "
"Use with caution."
}
}

self.directives = {
"vamb": {
"container": "flowcraft/vamb",
"version": "1.0.1-1",
"cpus": 4,
"memory": "{ 5.GB * task.attempt }"
}
}

self.status_channels = [
"vamb"
]
50 changes: 50 additions & 0 deletions flowcraft/generator/templates/vamb.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
IN_min_contig_{{ pid }} = Channel.value(params.minContig{{ param_id }})
IN_min_align_score_{{ pid }} = Channel.value(params.minAlignScore{{ param_id }})

clear = params.clearInput{{ param_id }} ? "true" : "false"
checkpointClear_{{ pid }} = Channel.value(clear)

process vamb_{{ pid }} {

// Send POST request to platform
{% include "post.txt" ignore missing %}

tag { sample_id }

//publishDir "results/assembly/binning/vamb_{{ pid }}/${sample_id}/"

input:
set sample_id, file(assembly), file(bam_file), file(bam_index) from {{ input_channel }}
val length_threshold from IN_min_contig_{{ pid }}
val min_score from IN_min_align_score_{{ pid }}
val clear from checkpointClear_{{ pid }}

output:

{% with task_name="vamb"%}
{%- include "compiler_channels.txt" ignore missing -%}
{% endwith %}

script:
"""
{
# run METABAT2
run.py results/ ${assembly} ${bam_file} -m ${length_threshold} -s ${min_score}

# In case no sequences are binned
if [ -z "\$(ls -A *metabat-bins*/)" ]; then
echo "false" > false_bin.fa
mv false_bin.fa *metabat-bins*/
echo "false" > bin_status.txt;
else
echo "true" > bin_status.txt
fi

} || {
echo fail > .status
}
"""
}


{{ forks }}
90 changes: 62 additions & 28 deletions flowcraft/templates/trimmomatic.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

# TODO: More control over read trimming
# TODO: Add option to remove adapters
# TODO: What to do when there is encoding failure
# TODO: What to do when there is encoding failure - forcing phred33 at the moment

__version__ = "1.0.3"
__build__ = "29062018"
Expand Down Expand Up @@ -283,6 +283,43 @@ def merge_default_adapters():
return filepath


def run_trimmomatic(cli, logfile, sample_id):
"""
Runs trimmomatic command
Parameters
----------
cli : lst
list containing trimmomatic command
logfile : str
Path to file for trimmomatic to write log
sample_id: str
Sample Identification string.
"""

logger.debug("Running trimmomatic subprocess with command: {}".format(cli))

p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()

# Attempt to decode STDERR output from bytes. If unsuccessful, coerce to
# string
try:
stderr = stderr.decode("utf8")
except (UnicodeDecodeError, AttributeError):
stderr = str(stderr)

logger.info("Finished trimmomatic subprocess with STDOUT:\\n"
"======================================\\n{}".format(stdout))
logger.info("Finished trimmomatic subprocesswith STDERR:\\n"
"======================================\\n{}".format(stderr))
logger.info("Finished trimmomatic with return code: {}".format(
p.returncode))

trimmomatic_log(logfile, sample_id)

return p.returncode


@MainWrapper
def main(sample_id, fastq_pair, trim_range, trim_opts, phred, adapters_file,
clear):
Expand Down Expand Up @@ -329,10 +366,12 @@ def main(sample_id, fastq_pair, trim_range, trim_opts, phred, adapters_file,
phred = int(phred)
phred_flag = "-phred{}".format(str(phred))
cli += [phred_flag]
# Could not detect phred encoding. Do not add explicit encoding to
# trimmomatic and let it guess
# Could not detect phred encoding.
# Forcing as phred33 to avoid encoding errors
except ValueError:
pass
logger.info("Could not detect quality encoding. Setting it to phred33")
phred_flag = "-phred33"
cli += [phred_flag]

# Add input samples to CLI
cli += fastq_pair
Expand Down Expand Up @@ -378,37 +417,32 @@ def main(sample_id, fastq_pair, trim_range, trim_opts, phred, adapters_file,
logfile
]

logger.debug("Running trimmomatic subprocess with command: {}".format(cli))

p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
stdout, stderr = p.communicate()

# Attempt to decode STDERR output from bytes. If unsuccessful, coerce to
# string
try:
stderr = stderr.decode("utf8")
except (UnicodeDecodeError, AttributeError):
stderr = str(stderr)

logger.info("Finished trimmomatic subprocess with STDOUT:\\n"
"======================================\\n{}".format(stdout))
logger.info("Finished trimmomatic subprocesswith STDERR:\\n"
"======================================\\n{}".format(stderr))
logger.info("Finished trimmomatic with return code: {}".format(
p.returncode))

trimmomatic_log(logfile, sample_id)
returncode = run_trimmomatic(cli, logfile, sample_id)

if p.returncode == 0 and os.path.exists("{}_1_trim.fastq.gz".format(
if returncode == 0 and os.path.exists("{}_1_trim.fastq.gz".format(
SAMPLE_ID)):
clean_up(fastq_pair, clear)

# Check if trimmomatic ran successfully. If not, write the error message
# to the status channel and exit.
with open(".status", "w") as status_fh:
if p.returncode != 0:
status_fh.write("fail")
return
if returncode != 0:
# retry to run trimmomatic by changing the encoding from phred33 to phred64
if "-phred33" in cli:

logger.info("Trimmomatic failed while running with phred33. Setting it to phred64 and trying again...")
cli[7] = "-phred64"

returncode = run_trimmomatic(cli, logfile, sample_id)

if returncode != 0:
status_fh.write("fail")
return
else:
status_fh.write("pass")
else:
status_fh.write("fail")
return
else:
status_fh.write("pass")

Expand Down