bio-workflow-management-nextflow-pipelines

Basic Pipeline Structure

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "bio-workflow-management-nextflow-pipelines" with this command: npx skills add gptomics/bioskills/gptomics-bioskills-bio-workflow-management-nextflow-pipelines

Nextflow Pipelines

Basic Pipeline Structure

// main.nf nextflow.enable.dsl=2

params.reads = "data/*_{1,2}.fq.gz" params.outdir = "results"

process FASTQC { input: tuple val(sample_id), path(reads)

output:
path("*.html"), emit: html
path("*.zip"), emit: zip

script:
"""
fastqc ${reads}
"""

}

workflow { Channel.fromFilePairs(params.reads) | FASTQC }

DSL2 Modules

// modules/fastqc.nf process FASTQC { tag "${sample_id}" publishDir "${params.outdir}/qc", mode: 'copy'

input:
tuple val(sample_id), path(reads)

output:
tuple val(sample_id), path("*.html"), emit: html
tuple val(sample_id), path("*.zip"), emit: zip

script:
"""
fastqc -t ${task.cpus} ${reads}
"""

}

// main.nf include { FASTQC } from './modules/fastqc' include { ALIGN } from './modules/align'

workflow { reads_ch = Channel.fromFilePairs(params.reads) FASTQC(reads_ch) ALIGN(reads_ch) }

Config File

// nextflow.config params { reads = "data/*_{1,2}.fq.gz" outdir = "results" genome = "ref/genome.fa" }

process { cpus = 4 memory = '8 GB' time = '2h'

withName: 'ALIGN' {
    cpus = 16
    memory = '32 GB'
}

}

profiles { docker { docker.enabled = true } singularity { singularity.enabled = true } slurm { process.executor = 'slurm' } }

Container Support

process SALMON_QUANT { container 'quay.io/biocontainers/salmon:1.10.0--h7e5ed60_0'

input:
tuple val(sample_id), path(reads)
path(index)

output:
tuple val(sample_id), path("${sample_id}"), emit: quant

script:
"""
salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \
    -o ${sample_id} --threads ${task.cpus}
"""

}

Channel Operations

// From file pairs Channel.fromFilePairs("data/*_{1,2}.fq.gz") .set { reads_ch }

// From path Channel.fromPath("data/*.bam") .map { file -> tuple(file.baseName, file) } .set { bam_ch }

// From samplesheet Channel.fromPath(params.samplesheet) .splitCsv(header: true) .map { row -> tuple(row.sample, file(row.fastq_1), file(row.fastq_2)) } .set { samples_ch }

// Combine channels reads_ch.combine(reference_ch)

Subworkflows

// subworkflows/qc.nf include { FASTQC } from '../modules/fastqc' include { MULTIQC } from '../modules/multiqc'

workflow QC { take: reads

main:
FASTQC(reads)
MULTIQC(FASTQC.out.zip.collect())

emit:
qc_report = MULTIQC.out.report

}

// main.nf include { QC } from './subworkflows/qc' include { ALIGN } from './subworkflows/align'

workflow { reads = Channel.fromFilePairs(params.reads) QC(reads) ALIGN(reads) }

Cluster Execution

// nextflow.config for SLURM process { executor = 'slurm' queue = 'normal' clusterOptions = '--account=myproject'

withLabel: 'high_memory' {
    memory = '128 GB'
    queue = 'highmem'
}

}

executor { name = 'slurm' queueSize = 100 submitRateLimit = '10 sec' }

AWS/Cloud Execution

// nextflow.config for AWS Batch process { executor = 'awsbatch' queue = 'my-batch-queue' }

aws { region = 'us-east-1' batch { cliPath = '/usr/local/bin/aws' } }

Run on AWS

nextflow run main.nf -profile awsbatch -bucket-dir s3://my-bucket/work

Resource Labels

process { withLabel: 'process_low' { cpus = 2 memory = '4 GB' time = '1h' } withLabel: 'process_medium' { cpus = 8 memory = '16 GB' time = '4h' } withLabel: 'process_high' { cpus = 16 memory = '64 GB' time = '12h' } }

process ALIGN { label 'process_high' // ... }

Error Handling

process RISKY_PROCESS { errorStrategy 'retry' maxRetries 3 memory { 8.GB * task.attempt }

script:
"""
memory_intensive_command
"""

}

process OPTIONAL_PROCESS { errorStrategy 'ignore' // ... }

Caching and Resume

Resume from last run

nextflow run main.nf -resume

Clean work directory

nextflow clean -f

Show execution trace

nextflow log

Complete RNA-seq Pipeline

nextflow.enable.dsl=2

params.reads = "data/*_{1,2}.fq.gz" params.salmon_index = "ref/salmon_index" params.outdir = "results"

process FASTP { tag "${sample_id}" publishDir "${params.outdir}/trimmed", mode: 'copy'

input:
tuple val(sample_id), path(reads)

output:
tuple val(sample_id), path("${sample_id}_{1,2}.trimmed.fq.gz"), emit: reads
path("${sample_id}.json"), emit: json

script:
"""
fastp -i ${reads[0]} -I ${reads[1]} \
    -o ${sample_id}_1.trimmed.fq.gz -O ${sample_id}_2.trimmed.fq.gz \
    --json ${sample_id}.json --thread ${task.cpus}
"""

}

process SALMON_QUANT { tag "${sample_id}" publishDir "${params.outdir}/salmon", mode: 'copy'

input:
tuple val(sample_id), path(reads)
path(index)

output:
tuple val(sample_id), path("${sample_id}"), emit: quant

script:
"""
salmon quant -i ${index} -l A -1 ${reads[0]} -2 ${reads[1]} \
    -o ${sample_id} --threads ${task.cpus}
"""

}

process MULTIQC { publishDir "${params.outdir}", mode: 'copy'

input:
path('*')

output:
path("multiqc_report.html")

script:
"""
multiqc .
"""

}

workflow { reads_ch = Channel.fromFilePairs(params.reads) index_ch = Channel.fromPath(params.salmon_index)

FASTP(reads_ch)
SALMON_QUANT(FASTP.out.reads, index_ch.first())

qc_files = FASTP.out.json.collect()
    .mix(SALMON_QUANT.out.quant.collect())
MULTIQC(qc_files.collect())

}

Related Skills

  • workflow-management/snakemake-workflows - Snakemake alternative

  • workflows/rnaseq-to-de - End-to-end RNA-seq

  • read-qc/fastp-workflow - QC processes

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

bio-read-qc-fastp-workflow

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

bio-workflows-scrnaseq-pipeline

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

bio-workflows-rnaseq-to-de

No summary provided by upstream source.

Repository SourceNeeds Review