Source code for pydna.snapgene_history_parser

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright 2013-2023 by Björn Johansson.  All rights reserved.
# This code is part of the Python-dna distribution and governed by its
# license.  Please see the LICENSE.txt file that should have been included
# as part of this package.
"""Parse SnapGene .dna files and reconstruct their cloning history as
:class:`~pydna.dseqrecord.Dseqrecord` objects.

The single public entry point is :func:`parse_snapgene_history`, which reads a
``.dna`` file, resolves every step recorded in its SnapGene history, and
returns a ``Dseqrecord`` whose ``source`` attribute tree mirrors the full
cloning provenance.
"""

from sgffp import SgffReader, SgffObject, SgffSegment, SgffFeature
from sgffp.models.history import (
    SgffHistoryNode,
    SgffHistoryTreeNode,
    SgffInputSummary,
    SgffHistoryOligo,
)
from pydna.dseq import Dseq
import re
from pydna.assembly2 import (
    gibson_assembly,
    pcr_assembly,
    restriction_ligation_assembly,
    gateway_assembly,
    in_fusion_assembly,
    ligation_assembly,
    fusion_pcr_assembly,
)
from pydna.oligonucleotide_hybridization import oligonucleotide_hybridization
from pydna.primer import Primer
from Bio.SeqFeature import SeqFeature, SimpleLocation, CompoundLocation
from pydna.dseqrecord import Dseqrecord
import os
from pydna.opencloning_models import (
    AssemblyFragment,
    Source,
    AddgeneIdSource,
    UploadedFileSource,
)
from Bio.Restriction.Restriction_Dictionary import rest_dict
from Bio.Restriction import RestrictionBatch
from pydna.parsers import parse_snapgene
from pydna.utils import flatten
import itertools
import warnings
from sgffp.models.notes import SgffNotes

STRAND_MAP = {"+": 1, "-": -1, ".": 0, "=": 0}
UNSUPPORTED_OPERATIONS = [
    "gcCloning",
    "taCloning",
    "topoTA",
    "topoDirectional",
    "topoBlunt",
    "destroyRestrictionFragment",
]
GIBSON_LIKE_FUNCTION_DICT = {
    "gibsonAssembly": gibson_assembly,
    "inFusionCloning": in_fusion_assembly,
    "hifiAssembly": gibson_assembly,
}


[docs] class SnapgeneHistoryParserWarning(Warning): pass
def _segments_to_location( segments: list[SgffSegment], strand_int: int, seq_len: int, circular: bool ) -> SimpleLocation | CompoundLocation: """Convert SgffSegment list to a Biopython location.""" locations = [] for seg in segments: if seg.type == "gap": continue # Handle wrap-around for circular sequences if circular and seg.start > seg.end: locations.append(SimpleLocation(seg.start, seq_len, strand=strand_int)) locations.append(SimpleLocation(0, seg.end, strand=strand_int)) else: locations.append(SimpleLocation(seg.start, seg.end, strand=strand_int)) if len(locations) == 1: return locations[0] # For reverse strand, reverse order if strand_int == -1: locations = locations[::-1] return CompoundLocation(locations) def _feature_to_seqfeature( feature: SgffFeature, seq_len: int, circular: bool ) -> SeqFeature: """Convert an SgffFeature to a Biopython SeqFeature.""" strand_int = STRAND_MAP.get(feature.strand, 0) location = _segments_to_location(feature.segments, strand_int, seq_len, circular) # Convert qualifiers: Biopython expects lists as values qualifiers = { k: [v] if not isinstance(v, list) else v for k, v in feature.qualifiers.items() } if feature.name and "label" not in qualifiers: qualifiers["label"] = [feature.name] return SeqFeature(location=location, type=feature.type, qualifiers=qualifiers) def _dseq_from_seq_properties(sequence: str, circular: bool, seq_props: dict) -> Dseq: if circular: return Dseq(sequence, circular=True) elif ( seq_props is not None and "UpstreamStickiness" in seq_props and "DownstreamStickiness" in seq_props ): left_ovhg = -int(seq_props.get("UpstreamStickiness")) right_ovhg = -int(seq_props.get("DownstreamStickiness")) try: return Dseq.from_full_sequence_and_overhangs( sequence, left_ovhg, right_ovhg ) except ValueError as e: raise NotImplementedError(f"Sequence not supported: {sequence}") from e else: # pragma: no cover (I don't expect this to happen) return Dseq(sequence) def _history_node_to_dseqrecord(sgff_object: SgffObject, node_id: str) -> Dseqrecord: """Convert a history node to a Dseqrecord. Sequence comes from the history node, metadata from the tree node. """ node: SgffHistoryNode = sgff_object.history.nodes[node_id] tree_node = sgff_object.history.get_tree_node(node_id) circular = tree_node.circular if tree_node else False seq_props = node.properties.get("AdditionalSequenceProperties") seq = _dseq_from_seq_properties(node.sequence, circular, seq_props) seq_len = node.length name = tree_node.name if tree_node else f"node_{node_id}" name = re.sub(r"\s+", "_", name) # Replace whitespace with underscores features = [] if seq_len != 0: # Convert features from the node's content, only if sequence is present, # else the sequence is created by _source_from_tree_node features = [ _feature_to_seqfeature(feat, seq_len, circular) for feat in node.features ] annotations = {} annotations["topology"] = "circular" if circular else "linear" annotations["molecule_type"] = "DNA" record = Dseqrecord( record=seq, id=name, name=name, description=name, features=features, annotations=annotations, ) return record def _filter_assembly_fragments_that_are_sequences( input_value: list[AssemblyFragment], ) -> list[AssemblyFragment]: return [ fragment for fragment in input_value if isinstance(fragment.sequence, Dseqrecord) ] def _get_restriction_batch_from_enzyme_names( enzyme_names: list[str], ) -> RestrictionBatch: if all(enz_name in rest_dict.keys() for enz_name in enzyme_names): return RestrictionBatch(first=[e for e in enzyme_names]) else: # pragma: no cover (I don't expect this to happen) raise ValueError(f"Unknown enzymes: {enzyme_names}") def _get_enzyme_batch_from_input_summaries( input_summaries: list[SgffInputSummary], ) -> RestrictionBatch: enzyme_names = set( flatten([input_summary.enzyme_names for input_summary in input_summaries]) ) # Sometimes enzymes come with < > around, we remove them enzyme_names = set( enz_name.replace("<", "").replace(">", "") for enz_name in enzyme_names ) # Sometimes they have Start or End tags, we remove them enzyme_names = enzyme_names.difference({"Start", "End"}) return _get_restriction_batch_from_enzyme_names(enzyme_names) def _get_sequence_inputs(source: Source) -> list[Dseqrecord]: """Return the most ancestral sequences used as inputs. These will typically be the immediate inputs, but when a restriction-ligation is split into restriction then ligation (see _get_restriction_input_combinations), we walk up the tree to find the most ancestral sequences. """ out_value = list() for input_value in _filter_assembly_fragments_that_are_sequences(source.input): if ( input_value.sequence.source is None or len(input_value.sequence.source.input) == 0 ): out_value.append(input_value.sequence) else: out_value.extend(_get_sequence_inputs(input_value.sequence.source)) return out_value def _parse_oligos(oligos: list[SgffHistoryOligo]) -> list[Primer]: return [ Primer(oligo.sequence, name=oligo.name or f"oligo_{i + 1}") for i, oligo in enumerate(oligos) ] def _get_restriction_input_combinations( input_sequences: list[Dseqrecord], node: SgffHistoryTreeNode ) -> list[list[Dseqrecord]]: """Get all possible combinations of input sequences after digestion. This is used to try different combinations of inputs to see if we can find the expected product. """ digestion_products = list() for input_sequence, input_summary in zip(input_sequences, node.input_summaries): rb = _get_enzyme_batch_from_input_summaries([input_summary]) digestion = input_sequence.cut(rb) if len(digestion) == 0: digestion_products.append([input_sequence]) else: digestion_products.append(digestion) return list(itertools.product(*digestion_products)) def _handle_circularize_operation( input_sequences: list[Dseqrecord], node: SgffHistoryTreeNode, sgff_object: SgffObject, expected_product: Dseqrecord, ) -> tuple[list[Dseqrecord], list[Dseqrecord]]: """Handle the circularize operation.""" # This operation does not come with a sequence assert len(input_sequences[0].seq) == 0 history_node = sgff_object.history.nodes[node.children[0].id] seq_props = history_node.content.properties.get("AdditionalSequenceProperties") if not ( seq_props.get("UpstreamEnzymeName") and seq_props.get("DownstreamEnzymeName") ): # pragma: no cover (I don't expect this to happen) warnings.warn( "Stopped at circularize operation without enzymes", category=SnapgeneHistoryParserWarning, ) return None, [] rb = _get_restriction_batch_from_enzyme_names( [seq_props.get("UpstreamEnzymeName"), seq_props.get("DownstreamEnzymeName")] ) original_fragments = expected_product.cut(rb) if len(original_fragments) != 1: # pragma: no cover (I don't expect this to happen) warnings.warn( "Stopped at circularize operation not coming from a single fragment", category=SnapgeneHistoryParserWarning, ) return None, [] input_sequences = [original_fragments[0]] input_sequences[0].source = None products = ligation_assembly(input_sequences, allow_blunt=True) return input_sequences, products def _restriction_ligation_with_fallbacks( input_sequences: list[Dseqrecord], node: SgffHistoryTreeNode, find: callable, ) -> list[Dseqrecord]: """Try restriction-ligation, then plain ligation, then per-input digest + ligation.""" rb = _get_enzyme_batch_from_input_summaries(node.input_summaries) products = restriction_ligation_assembly(input_sequences, rb) if find(products) is not None: return products products = ligation_assembly(input_sequences) if find(products) is not None: return products # Last resort: digest each input individually, then ligate # (happens when ligation would leave overhangs) for combination in _get_restriction_input_combinations(input_sequences, node): products = ligation_assembly(combination) if find(products) is not None: return products return products def _source_from_tree_node( # noqa: C901 expected_product: Dseqrecord, node: SgffHistoryTreeNode, sgff_object: SgffObject ) -> tuple[Source | None | int, list[SgffHistoryNode]]: input_sequences = [ _history_node_to_dseqrecord(sgff_object, child.id) for child in node.children ] expected_seguid = expected_product.seq.seguid() expected_dseq_and_rc = ( expected_product.seq, expected_product.seq.reverse_complement(), ) def find_expected_product(products: list[Dseqrecord]) -> Dseqrecord | None: if expected_product.circular: return next( (p for p in products if p.seq.seguid() == expected_seguid), None ) else: return next((p for p in products if p.seq in expected_dseq_and_rc), None) if node.operation in UNSUPPORTED_OPERATIONS: raise NotImplementedError(f"Operation {node.operation} not supported") elif node.operation == "amplifyFragment": primers = _parse_oligos(node.oligos) products = pcr_assembly(input_sequences[0], *primers, limit=12) elif node.operation == "primerDirectedMutagenesis": fwd_primer, *_ = _parse_oligos(node.oligos) rvs_primer = Primer( fwd_primer.seq.reverse_complement(), name=f"rvs_{fwd_primer.name}" ) pcr_products = pcr_assembly( input_sequences[0], fwd_primer, rvs_primer, limit=10 ) # They bundle also the fusion pcr on the same step products = list() for pcr_product in pcr_products: pcr_product.name = "mutagenesis_pcr_product" products.extend(fusion_pcr_assembly([pcr_product], limit=6)) elif node.operation in [ "changeStrandedness", "editDNAEnds", "changeMethylation", "changePhosphorylation", "setOrigin", # Rotation of circular sequences "newFileFromSelection", # Selection of sequences from a file ]: return -1, None elif node.operation == "changeTopology": if expected_product.circular: input_sequences = [expected_product[: len(expected_product)]] products = ligation_assembly(input_sequences, True) if len(products) == 0: # pragma: no cover (I don't expect this to happen) warnings.warn( "Stopped at change topology operation", category=SnapgeneHistoryParserWarning, ) return None, [] else: warnings.warn( "Stopped at change topology operation", category=SnapgeneHistoryParserWarning, ) return None, [] elif node.operation in [ "insertFragment", "goldenGateAssembly", "insertFragments", "ligateFragments", ]: products = _restriction_ligation_with_fallbacks( input_sequences, node, find_expected_product ) elif node.operation == "linearize": # Here the input_sequence comes empty, I guess because it can be inferred input_sequences = [expected_product.looped()] input_sequences[0].source = None rb = _get_enzyme_batch_from_input_summaries(node.input_summaries) if len(rb) == 0: warnings.warn( "Stopped at linearize operation without enzymes", category=SnapgeneHistoryParserWarning, ) return None, [] products = input_sequences[0].cut(rb) elif node.operation == "circularize": input_sequences, products = _handle_circularize_operation( input_sequences, node, sgff_object, expected_product ) elif node.operation == "removeRestrictionFragment": rb = _get_enzyme_batch_from_input_summaries(node.input_summaries) products = restriction_ligation_assembly(input_sequences, rb) if find_expected_product(products) is None: # This is using blunting as described here: https://www.neb.com/en-gb/applications/cloning-and-synthetic-biology/dna-end-modification/blunting raise NotImplementedError(f"Blunting not supported for {node.operation}") elif node.operation == "gatewayLRCloning": products = gateway_assembly(input_sequences, "LR") elif node.operation == "gatewayBPCloning": products = gateway_assembly(input_sequences, "BP") elif node.operation in ["gibsonAssembly", "inFusionCloning", "hifiAssembly"]: # It is possible that some inputs are digested prior to the assembly., for combination in _get_restriction_input_combinations(input_sequences, node): products = GIBSON_LIKE_FUNCTION_DICT[node.operation](combination, 10) if find_expected_product(products) is not None: break elif node.operation == "overlapFragments": products = fusion_pcr_assembly(input_sequences, limit=10) elif node.operation == "annealOligos": primers = _parse_oligos(node.oligos) products = oligonucleotide_hybridization(*primers, 10) elif node.operation == "invalid": return None, [] elif node.operation in [ "remove", "insert", "replace", "insertReverseTranslation", "insertCodons", "insertRestrictionSite", "insertFeature", ]: warnings.warn( "Manual editing of sequences not supported", category=SnapgeneHistoryParserWarning, ) return None, [] elif node.operation == "flip": # Here the input_sequence comes empty, I guess because it can be inferred input_sequences = [expected_product.reverse_complement()] input_sequences[0].source = None input_sequences[0].name = node.name products = [input_sequences[0].reverse_complement()] else: # pragma: no cover (Tests will be updated when more are encountered) raise ValueError(f"Unknown operation: {node.operation}") correct_product = find_expected_product(products) if correct_product is None: raise ValueError(f"No product found for expected SEGUID {expected_seguid}") # Return the children nodes in the same order as the inputs input_index = {id(seq): i for i, seq in enumerate(input_sequences)} out_nodes = [node.children[input_index.get(id(seq))] for seq in input_sequences] return correct_product.source, out_nodes def _parse_history( root_record: Dseqrecord, root_node: SgffHistoryTreeNode, sgff_object: SgffObject ) -> None: """Parse the history of a Dseqrecord, editing it in place.""" repeat = True while repeat: source, out_nodes = _source_from_tree_node(root_record, root_node, sgff_object) repeat = source == -1 if repeat: root_node = root_node.children[0] root_record.source = source if source is None: # We may be able to get the source from the metadata (e.g. AddGene ID or NCBI accession number) if root_node.id in sgff_object.history.nodes: root_record.source = _source_from_metadata( sgff_object.history.nodes[root_node.id].content.notes ) # Else, set the default source if root_record.source is None: root_record.source = _get_default_source(root_node.name) return for input_value in _get_sequence_inputs(source): node = out_nodes.pop(0) _parse_history(input_value, node, sgff_object) def _source_from_metadata(notes: SgffNotes) -> None | Source: if notes.get("Comments") and ( match := re.search(r"https://www.addgene.org/(\d+)", notes.get("Comments")) ): return AddgeneIdSource(repository_id=match.group(1)) # This works for sequences imported from NCBI in SnapGene # The problem is that sequences coming from genbank # files will also have an AccessionNumber, even if that is arbitrary (e.g. the name of # the sequence). I don't think there is a reliable way to distinguish between the two # without making a request to NCBI. # elif notes.get("AccessionNumber"): # return NCBISequenceSource(repository_id=notes.get("AccessionNumber")) else: return None def _get_default_source(file_name: str) -> UploadedFileSource: return UploadedFileSource( file_name=file_name, sequence_file_format="snapgene", index_in_file=0, )
[docs] def parse_snapgene_history(filepath: str) -> Dseqrecord: """Parse a SnapGene ``.dna`` file and return a :class:`~pydna.dseqrecord.Dseqrecord` whose ``source`` attribute tree encodes the full cloning history. Parameters ---------- filepath: Path to the ``.dna`` file to parse. Returns ------- :class:`~pydna.dseqrecord.Dseqrecord` The root sequence with its provenance tree resolved. Raises ------ NotImplementedError If the file contains a cloning operation that is not yet supported. ValueError If a recorded operation cannot be reproduced (no matching product found). """ root_record = parse_snapgene(filepath)[0] sgff_object = SgffReader.from_file(filepath) name = os.path.basename(filepath) root_record.name = re.sub(r"\s+", "_", name) seq_props = sgff_object.properties.get("AdditionalSequenceProperties") # The biopython parser does not handle overhangs root_record.seq = _dseq_from_seq_properties( str(root_record.seq), root_record.circular, seq_props ) if not sgff_object.has_history: root_record.source = _source_from_metadata(sgff_object.notes) else: _parse_history(root_record, sgff_object.history.tree.root, sgff_object) if root_record.source is None: root_record.source = _get_default_source(name) root_record = root_record.normalize_history() root_record.validate_history() return root_record