nf-bactopia Plugin
The nf-bactopia Nextflow plugin provides utility functions that handle input collection, parameter validation, output gathering, and channel operations across all Bactopia workflows. Rather than duplicating this logic in every workflow file, the plugin centralizes it so that subworkflows and modules can focus on their analysis tasks.
Installation & Setup
Declare the plugin in your nextflow.config:
plugins {
id 'nf-bactopia@2.0.3'
}
Then import functions in your Nextflow scripts:
include { gather } from 'plugin/nf-bactopia'
include { gatherCsvtk } from 'plugin/nf-bactopia'
include { filterWithData } from 'plugin/nf-bactopia'
Requires: Nextflow >= 26.03.1-edge
Input Handling
These functions run at pipeline startup to validate parameters and collect sample inputs into standardized channel structures.
validateParameters
Checks all pipeline parameters against the JSON schema, catching type mismatches, missing
required values, and invalid combinations before any processes run. Pass false for the
main Bactopia pipeline, true for standalone Bactopia Tools.
include { validateParameters } from 'plugin/nf-bactopia'
// In the BACTOPIA_INIT subworkflow
def validation = validateParameters(false)
if (validation.hasErrors) {
log.info(validation.error)
error(" ")
} else {
log.info(validation.logs)
}
Returns a map with hasErrors, error, logs, and data fields.
bactopiaInputs
Collects and organizes sample inputs (FASTQs from SRA/ENA, local files, assemblies)
into a standardized list of sample records. Called after validateParameters in the
main pipeline, using the validation result to determine the run type.
include { bactopiaInputs } from 'plugin/nf-bactopia'
def collectedInputs = bactopiaInputs(validation.data)
if (collectedInputs.hasErrors) {
log.info(collectedInputs.error)
error(" ")
} else {
log.info(collectedInputs.logs)
}
// Convert to channel of records
def ch_samples = channel.fromList(collectedInputs.samples.collect { sample ->
record(
meta: sample.meta,
r1_files: sample.r1.collect { fastq -> file(fastq) }.toSet(),
r2_files: sample.r2.collect { fastq -> file(fastq) }.toSet(),
se_files: sample.se.collect { fastq -> file(fastq) }.toSet(),
lr_files: sample.lr.collect { fastq -> file(fastq) }.toSet(),
fna_files: sample.assembly.collect { fna -> file(fna) }.toSet()
)
})
bactopiaToolInputs
The equivalent of bactopiaInputs for standalone Bactopia Tools. Instead of collecting
raw inputs, it reads from a previous Bactopia run directory and builds channels for
assemblies, proteins, GFFs, BLAST databases, and other outputs that tools need.
include { bactopiaToolInputs } from 'plugin/nf-bactopia'
def collectedInputs = bactopiaToolInputs()
// Returns samples with: meta, fna, faa, gff, r1, r2, se, lr, blastdb, etc.
Gathering Outputs
These functions collect per-sample outputs into aggregated structures for merging or downstream analysis. They are the most commonly used plugin functions -- nearly every subworkflow uses at least one.
gather
Collects a single field from all sample records into a Set, keeping the original field name. Used when a downstream process needs all samples' outputs together (e.g., building a heatmap from individual JSON results).
include { gather } from 'plugin/nf-bactopia'
// Collect all RGI JSON outputs for heatmap generation
ch_rgi_heatmap = RGI_HEATMAP(gather(ch_rgi_main, 'json', [name: 'rgi']))
The meta map must contain a name key, and all keys pass through to the output.
gatherCsvtk
Gathers a single field and renames it to csv, preparing it for CSVTK_CONCAT input.
This is the most common gathering pattern -- used whenever per-sample TSV/CSV results
need to be concatenated into a single merged report.
include { gatherCsvtk } from 'plugin/nf-bactopia'
// Merge all per-sample AMR reports into one file
ch_csvtk_concat = CSVTK_CONCAT(
gatherCsvtk(ch_amrfinderplus_run, 'report', [name: 'amrfinderplus']),
'tsv',
'tsv'
)
You can pass extra args through the meta map:
// For tools that don't include headers in their output
gatherCsvtk(ch_emmtyper, 'tsv', [name: 'emmtyper', args: '--no-header-row'])
gatherFields
Gathers multiple fields with explicit rename mapping. Used when a process needs
gathered inputs under different names than the originals, such as renaming fna to
query for a comparison tool.
include { gatherFields } from 'plugin/nf-bactopia'
// Gather assemblies and rename 'fna' to 'query' for FastANI
gatherFields(query, [fna: 'query'], [name: 'fastani'])
Channel Operations
filterWithData
Filters out records where all specified fields are null. Necessary because some samples
may lack certain data types (e.g., a sample with only long reads has no r1/r2 files),
and passing null paths to a process would cause it to fail.
include { filterWithData } from 'plugin/nf-bactopia'
// Only run Seroba on samples that have paired-end reads
ch_seroba_run = SEROBA_RUN(filterWithData(reads, ['r1', 'r2']))
// Filter for samples with any read type available
scrubbed = filterWithData(ch_sample_outputs, ['r1', 'r2', 'se', 'lr'])
combineWith
Creates a Cartesian product between a gathered channel and a multi-item channel, merging
each item into the gathered map under a specified field name. Replaces the deprecated
Nextflow each input qualifier.
include { combineWith } from 'plugin/nf-bactopia'
include { gatherFields } from 'plugin/nf-bactopia'
// Combine gathered query assemblies with each reference genome
ch_fastani = FASTANI_MODULE(
combineWith(
gatherFields(query, [fna: 'query'], [name: 'fastani']),
ch_ref,
'reference'
)
)
formatSamples
Adapts tuple sizes based on data availability. Takes a channel of 4-element tuples
and trims them to 1, 2, or 3 elements depending on the dataTypes parameter.
include { formatSamples } from 'plugin/nf-bactopia'
// Trim to just [meta, inputs] (dataTypes=1)
ch_trimmed = formatSamples(ch_samples, 1)
Logging
collectNextflowLogs
Expands each record's nf_logs field into individual [meta, file] tuples suitable
for publishing. Used in workflow publish blocks to write Nextflow execution logs
alongside sample outputs.
include { collectNextflowLogs } from 'plugin/nf-bactopia'
publish:
sample_nf_logs = collectNextflowLogs(ch_amrfinderplus.sample_outputs)
run_nf_logs = collectNextflowLogs(ch_amrfinderplus.run_outputs)