Skip to main content

nf-bactopia Plugin

The nf-bactopia Nextflow plugin provides utility functions that handle input collection, parameter validation, output gathering, and channel operations across all Bactopia workflows. Rather than duplicating this logic in every workflow file, the plugin centralizes it so that subworkflows and modules can focus on their analysis tasks.

Installation & Setup

Declare the plugin in your nextflow.config:

plugins {
id 'nf-bactopia@2.0.3'
}

Then import functions in your Nextflow scripts:

include { gather } from 'plugin/nf-bactopia'
include { gatherCsvtk } from 'plugin/nf-bactopia'
include { filterWithData } from 'plugin/nf-bactopia'

Requires: Nextflow >= 26.03.1-edge

Input Handling

These functions run at pipeline startup to validate parameters and collect sample inputs into standardized channel structures.

validateParameters

Checks all pipeline parameters against the JSON schema, catching type mismatches, missing required values, and invalid combinations before any processes run. Pass false for the main Bactopia pipeline, true for standalone Bactopia Tools.

include { validateParameters } from 'plugin/nf-bactopia'

// In the BACTOPIA_INIT subworkflow
def validation = validateParameters(false)
if (validation.hasErrors) {
log.info(validation.error)
error(" ")
} else {
log.info(validation.logs)
}

Returns a map with hasErrors, error, logs, and data fields.

bactopiaInputs

Collects and organizes sample inputs (FASTQs from SRA/ENA, local files, assemblies) into a standardized list of sample records. Called after validateParameters in the main pipeline, using the validation result to determine the run type.

include { bactopiaInputs } from 'plugin/nf-bactopia'

def collectedInputs = bactopiaInputs(validation.data)
if (collectedInputs.hasErrors) {
log.info(collectedInputs.error)
error(" ")
} else {
log.info(collectedInputs.logs)
}

// Convert to channel of records
def ch_samples = channel.fromList(collectedInputs.samples.collect { sample ->
record(
meta: sample.meta,
r1_files: sample.r1.collect { fastq -> file(fastq) }.toSet(),
r2_files: sample.r2.collect { fastq -> file(fastq) }.toSet(),
se_files: sample.se.collect { fastq -> file(fastq) }.toSet(),
lr_files: sample.lr.collect { fastq -> file(fastq) }.toSet(),
fna_files: sample.assembly.collect { fna -> file(fna) }.toSet()
)
})

bactopiaToolInputs

The equivalent of bactopiaInputs for standalone Bactopia Tools. Instead of collecting raw inputs, it reads from a previous Bactopia run directory and builds channels for assemblies, proteins, GFFs, BLAST databases, and other outputs that tools need.

include { bactopiaToolInputs } from 'plugin/nf-bactopia'

def collectedInputs = bactopiaToolInputs()
// Returns samples with: meta, fna, faa, gff, r1, r2, se, lr, blastdb, etc.

Gathering Outputs

These functions collect per-sample outputs into aggregated structures for merging or downstream analysis. They are the most commonly used plugin functions -- nearly every subworkflow uses at least one.

gather

Collects a single field from all sample records into a Set, keeping the original field name. Used when a downstream process needs all samples' outputs together (e.g., building a heatmap from individual JSON results).

include { gather } from 'plugin/nf-bactopia'

// Collect all RGI JSON outputs for heatmap generation
ch_rgi_heatmap = RGI_HEATMAP(gather(ch_rgi_main, 'json', [name: 'rgi']))

The meta map must contain a name key, and all keys pass through to the output.

gatherCsvtk

Gathers a single field and renames it to csv, preparing it for CSVTK_CONCAT input. This is the most common gathering pattern -- used whenever per-sample TSV/CSV results need to be concatenated into a single merged report.

include { gatherCsvtk } from 'plugin/nf-bactopia'

// Merge all per-sample AMR reports into one file
ch_csvtk_concat = CSVTK_CONCAT(
gatherCsvtk(ch_amrfinderplus_run, 'report', [name: 'amrfinderplus']),
'tsv',
'tsv'
)

You can pass extra args through the meta map:

// For tools that don't include headers in their output
gatherCsvtk(ch_emmtyper, 'tsv', [name: 'emmtyper', args: '--no-header-row'])

gatherFields

Gathers multiple fields with explicit rename mapping. Used when a process needs gathered inputs under different names than the originals, such as renaming fna to query for a comparison tool.

include { gatherFields } from 'plugin/nf-bactopia'

// Gather assemblies and rename 'fna' to 'query' for FastANI
gatherFields(query, [fna: 'query'], [name: 'fastani'])

Channel Operations

filterWithData

Filters out records where all specified fields are null. Necessary because some samples may lack certain data types (e.g., a sample with only long reads has no r1/r2 files), and passing null paths to a process would cause it to fail.

include { filterWithData } from 'plugin/nf-bactopia'

// Only run Seroba on samples that have paired-end reads
ch_seroba_run = SEROBA_RUN(filterWithData(reads, ['r1', 'r2']))

// Filter for samples with any read type available
scrubbed = filterWithData(ch_sample_outputs, ['r1', 'r2', 'se', 'lr'])

combineWith

Creates a Cartesian product between a gathered channel and a multi-item channel, merging each item into the gathered map under a specified field name. Replaces the deprecated Nextflow each input qualifier.

include { combineWith } from 'plugin/nf-bactopia'
include { gatherFields } from 'plugin/nf-bactopia'

// Combine gathered query assemblies with each reference genome
ch_fastani = FASTANI_MODULE(
combineWith(
gatherFields(query, [fna: 'query'], [name: 'fastani']),
ch_ref,
'reference'
)
)

formatSamples

Adapts tuple sizes based on data availability. Takes a channel of 4-element tuples and trims them to 1, 2, or 3 elements depending on the dataTypes parameter.

include { formatSamples } from 'plugin/nf-bactopia'

// Trim to just [meta, inputs] (dataTypes=1)
ch_trimmed = formatSamples(ch_samples, 1)

Logging

collectNextflowLogs

Expands each record's nf_logs field into individual [meta, file] tuples suitable for publishing. Used in workflow publish blocks to write Nextflow execution logs alongside sample outputs.

include { collectNextflowLogs } from 'plugin/nf-bactopia'

publish:
sample_nf_logs = collectNextflowLogs(ch_amrfinderplus.sample_outputs)
run_nf_logs = collectNextflowLogs(ch_amrfinderplus.run_outputs)