Source code for pydna.opencloning_models

# -*- coding: utf-8 -*-
"""
This module provides classes that roughly map to the `OpenCloning <https://opencloning.org>`_
data model, which is defined using `LinkML <https://linkml.io>`, and available as a python
package `opencloning-linkml <https://pypi.org/project/opencloning-linkml/>`_. These classes
are documented there, and the ones in this module essentially replace the fields pointing to
sequences and primers (which use ids in the data model) to ``Dseqrecord`` and ``Primer``
objects, respectively. Similarly, it uses Location from ``Biopython`` instead of a string,
which is what the data model uses.

When using pydna to plan cloning, it stores the provenance of ``Dseqrecord`` objects in
their ``source`` attribute. Not all methods generate sources so far, so refer to the
documentation notebooks for examples on how to use this feature. The ``history`` method of
``Dseqrecord`` objects can be used to get a string representation of the provenance of the
sequence. You can also use the ``CloningStrategy`` class to create a JSON representation of
the cloning strategy. That ``CloningStrategy`` can be loaded in the OpenCloning web interface
to see a representation of the cloning strategy.


Contributing
============

Not all fields can be readily serialized to be converted to regular types in pydantic. For
instance, the ``coordinates`` field of the ``GenomeCoordinatesSource`` class is a
``SimpleLocation`` object, or the ``input`` field of ``Source`` is a list of ``SourceInput``
objects, which can be ``Dseqrecord`` or ``Primer`` objects, or ``AssemblyFragment`` objects.
For these type of fields, you have to define a ``field_serializer`` method to serialize them
to the correct type.

"""

from __future__ import annotations

from typing import Optional, Union, Any, ClassVar, Type
from pydantic_core import core_schema
from contextlib import contextmanager
from threading import local

from pydantic import BaseModel, ConfigDict, Field, field_serializer, field_validator

from opencloning_linkml.datamodel import (
    CloningStrategy as _BaseCloningStrategy,
    DatabaseSource as _DatabaseSource,
    Primer as _PrimerModel,
    Source as _Source,
    TextFileSequence as _TextFileSequence,
    AssemblySource as _AssemblySource,
    SourceInput as _SourceInput,
    AssemblyFragment as _AssemblyFragment,
    ManuallyTypedSource as _ManuallyTypedSource,
    RestrictionAndLigationSource as _RestrictionAndLigationSource,
    GibsonAssemblySource as _GibsonAssemblySource,
    RestrictionEnzymeDigestionSource as _RestrictionEnzymeDigestionSource,
    SequenceCutSource as _SequenceCutSource,
    RestrictionSequenceCut as _RestrictionSequenceCut,
    SequenceCut as _SequenceCut,
    InFusionSource as _InFusionSource,
    OverlapExtensionPCRLigationSource as _OverlapExtensionPCRLigationSource,
    InVivoAssemblySource as _InVivoAssemblySource,
    LigationSource as _LigationSource,
    GatewaySource as _GatewaySource,
    GatewayReactionType,
    AnnotationTool,
    HomologousRecombinationSource as _HomologousRecombinationSource,
    CreLoxRecombinationSource as _CreLoxRecombinationSource,
    PCRSource as _PCRSource,
    CRISPRSource as _CRISPRSource,
    RepositoryIdSource as _RepositoryIdSource,
    UploadedFileSource as _UploadedFileSource,
    AddgeneIdSource as _AddgeneIdSource,
    AddgeneSequenceType,
    BenchlingUrlSource as _BenchlingUrlSource,
    SnapGenePlasmidSource as _SnapGenePlasmidSource,
    EuroscarfSource as _EuroscarfSource,
    WekWikGeneIdSource as _WekWikGeneIdSource,
    SEVASource as _SEVASource,
    IGEMSource as _IGEMSource,
    OpenDNACollectionsSource as _OpenDNACollectionsSource,
    GenomeCoordinatesSource as _GenomeCoordinatesSource,
    OligoHybridizationSource as _OligoHybridizationSource,
    PolymeraseExtensionSource as _PolymeraseExtensionSource,
    AnnotationSource as _AnnotationSource,
    AnnotationReport as _AnnotationReport,
    PlannotateAnnotationReport as _PlannotateAnnotationReport,
    ReverseComplementSource as _ReverseComplementSource,
    NCBISequenceSource as _NCBISequenceSource,
    RecombinaseSource as _RecombinaseSource,
    Recombinase as _Recombinase,
)
from Bio.SeqFeature import Location, LocationParserError, SimpleLocation
from Bio.Restriction.Restriction import AbstractCut
import networkx as nx
from typing import List

from Bio.SeqIO.InsdcIO import _insdc_location_string as format_feature_location

from pydna.types import CutSiteType, SubFragmentRepresentationAssembly
from pydna.utils import create_location, location_boundaries, shift_location
from typing import TYPE_CHECKING
import Bio.Restriction as _restr_module
import copy
import textwrap
from pydna._pretty import pretty_str

if TYPE_CHECKING:  # pragma: no cover
    from pydna.dseqrecord import Dseqrecord
    from pydna.primer import Primer
    from pydna.recombinase import RecombinaseCollection, Recombinase


# Thread-local storage for ID strategy
_thread_local = local()


[docs] @contextmanager def id_mode(use_python_internal_id: bool = True): """Context manager that is used to determine how ids are assigned to objects when mapping them to the OpenCloning data model. If ``use_python_internal_id`` is True, the built-in python ``id()`` function is used to assign ids to objects. That function produces a unique integer for each object in python, so it's guaranteed to be unique. If ``use_python_internal_id`` is False, the object's ``.id`` attribute (must be a string integer) is used to assign ids to objects. This is useful when the objects already have meaningful ids, and you want to keep references to them in ``SourceInput`` objects (which sequences and primers are used in a particular source). Parameters ---------- use_python_internal_id: bool If True, use Python's built-in id() function. If False, use the object's .id attribute (must be a string integer). Examples -------- >>> from pydna.dseqrecord import Dseqrecord >>> from pydna.opencloning_models import get_id, id_mode >>> dseqr = Dseqrecord("ATGC") >>> dseqr.name = "my_sequence" >>> dseqr.id = "123" >>> get_id(dseqr) == id(dseqr) True >>> with id_mode(use_python_internal_id=False): ... get_id(dseqr) 123 """ old_value = getattr(_thread_local, "use_python_internal_id", True) _thread_local.use_python_internal_id = use_python_internal_id try: yield finally: _thread_local.use_python_internal_id = old_value
[docs] def get_id(obj: "Primer" | "Dseqrecord") -> int: """Get ID using the current strategy from thread-local storage (see id_mode) Parameters ---------- obj: Primer | Dseqrecord The object to get the id of Returns ------- int: The id of the object """ use_python_internal_id = getattr(_thread_local, "use_python_internal_id", True) if use_python_internal_id: return id(obj) if not isinstance(obj.id, str) or not obj.id.isdigit(): raise ValueError( f"If use_python_internal_id is False, id must be a string representing an integer, " f"but object {obj} has an invalid id: {obj.id}" ) return int(obj.id)
[docs] class SequenceLocationStr(str): """A string representation of a sequence location, genbank-like."""
[docs] @classmethod def from_biopython_location(cls, location: Location): return cls(format_feature_location(location, None))
[docs] def to_biopython_location(self) -> Location: return Location.fromstring(self)
[docs] @classmethod def field_validator(cls, v): if isinstance(v, str): value = cls(v) try: value.to_biopython_location() except LocationParserError as err: raise ValueError(f"Location {v!r} is not a valid location") from err return value raise ValueError(f"Location must be a string or a {cls.__name__}")
@classmethod def __get_pydantic_core_schema__( cls, source_type, handler, ) -> core_schema.CoreSchema: """Generate Pydantic core schema for SequenceLocationStr.""" return core_schema.with_info_after_validator_function( cls._validate, core_schema.str_schema(), ) @classmethod def _validate(cls, value: str, info): """Validate and create SequenceLocationStr instance.""" return cls.field_validator(value)
[docs] @classmethod def from_start_and_end( cls, start: int, end: int, seq_len: int | None = None, strand: int | None = 1 ): return cls.from_biopython_location(create_location(start, end, seq_len, strand))
[docs] def get_ncbi_format_coordinates(self) -> str: """Return start, end, strand in the same format as the NCBI eutils API (1-based, inclusive)""" return ( self.to_biopython_location().start + 1, self.to_biopython_location().end, self.to_biopython_location().strand, )
[docs] class ConfiguredBaseModel(BaseModel): model_config = ConfigDict( validate_assignment=True, validate_default=True, extra="forbid", arbitrary_types_allowed=True, use_enum_values=True, strict=False, ) pass
[docs] class TextFileSequence(_TextFileSequence):
[docs] @classmethod def from_dseqrecord(cls, dseqr: "Dseqrecord"): return cls( id=get_id(dseqr), sequence_file_format="genbank", overhang_crick_3prime=dseqr.seq.ovhg, overhang_watson_3prime=dseqr.seq.watson_ovhg, file_content=dseqr.format("genbank"), )
[docs] class PrimerModel(_PrimerModel):
[docs] @classmethod def from_primer(cls, primer: "Primer"): return cls( id=get_id(primer), name=primer.name, sequence=str(primer.seq), )
[docs] class SourceInput(ConfiguredBaseModel): sequence: object @field_validator("sequence") @classmethod def _validate_sequence_field(cls, value: Any): """Separate validation to avoid circular imports.""" from pydna.dseqrecord import Dseqrecord from pydna.primer import Primer if isinstance(value, (Dseqrecord, Primer)): return value module = type(value).__module__ name = type(value).__name__ raise TypeError(f"sequence must be Dseqrecord or Primer; got {module}.{name}")
[docs] def to_pydantic_model(self) -> _SourceInput: return _SourceInput(sequence=get_id(self.sequence))
[docs] class AssemblyFragment(SourceInput): left_location: Optional[Location] = Field(default=None) right_location: Optional[Location] = Field(default=None) reverse_complemented: bool
[docs] @staticmethod def from_biopython_location(location: Location | None): if location is None: return None return SequenceLocationStr.from_biopython_location(location)
[docs] def to_pydantic_model(self) -> _AssemblyFragment: return _AssemblyFragment( sequence=get_id(self.sequence), left_location=self.from_biopython_location(self.left_location), right_location=self.from_biopython_location(self.right_location), reverse_complemented=self.reverse_complemented, )
def _source_input_from_model( model: _SourceInput | _AssemblyFragment, sequences: dict[int, "Dseqrecord"], primers: dict[int, "Primer"], ) -> SourceInput | AssemblyFragment: """Convert a linkml SourceInput/AssemblyFragment model back to a pydna object.""" seq_id = model.sequence seq_obj = sequences.get(seq_id) or primers.get(seq_id) if seq_obj is None: raise ValueError(f"Sequence/Primer with id {seq_id} not found") if isinstance(model, _AssemblyFragment): # It's important to set stranded=False here, because that's # how assembly2 creates locations. left_loc = ( Location.fromstring(model.left_location, stranded=False) if model.left_location else None ) right_loc = ( Location.fromstring(model.right_location, stranded=False) if model.right_location else None ) return AssemblyFragment( sequence=seq_obj, left_location=left_loc, right_location=right_loc, reverse_complemented=model.reverse_complemented, ) return SourceInput(sequence=seq_obj)
[docs] class Source(ConfiguredBaseModel): TARGET_MODEL: ClassVar[Type[_Source]] = _Source input: list[Union[SourceInput, AssemblyFragment]] = Field(default_factory=list) database_id: Optional[int] = None
[docs] @field_serializer("input") def serialize_input( self, input: list[Union[SourceInput, AssemblyFragment]] ) -> list[_SourceInput | _AssemblyFragment]: return [fragment.to_pydantic_model() for fragment in input]
[docs] def to_pydantic_model(self, seq_id: int): model_dict = self.model_dump() model_dict["id"] = seq_id return self.TARGET_MODEL(**model_dict)
@classmethod def _get_deserialization_overrides(cls, model, sequences, primers) -> dict: """Return field overrides for from_pydantic_model. Subclasses override this to handle fields that require custom deserialization logic.""" return {}
[docs] @classmethod def from_pydantic_model( cls, model: _Source, sequences: dict[int, "Dseqrecord"], primers: dict[int, "Primer"], ) -> "Source": """Convert an opencloning_linkml Source model back to a pydna Source.""" overrides = cls._get_deserialization_overrides(model, sequences, primers) kwargs = {} for field_name in cls.__pydantic_fields__: if field_name in overrides: kwargs[field_name] = overrides[field_name] elif field_name == "input": kwargs["input"] = [ _source_input_from_model(inp, sequences, primers) for inp in (model.input or []) ] else: kwargs[field_name] = getattr(model, field_name) return cls(**kwargs)
[docs] def to_unserialized_dict(self): """ Converts into a dictionary without serializing the fields. This is used to be able to recast. """ return {field: getattr(self, field) for field in self.__pydantic_fields__}
[docs] def add_to_history_graph(self, history_graph: nx.DiGraph, seq: "Dseqrecord"): """ Add the source to the history graph. It does not use the get_id function, because it just uses it to have unique identifiers for graph nodes, not to store them anywhere. """ from pydna.dseqrecord import Dseqrecord history_graph.add_node(id(seq), label=f"{seq.name} ({repr(seq)})") history_graph.add_node(id(self), label=str(self.TARGET_MODEL.__name__)) history_graph.add_edge(id(seq), id(self)) for fragment in self.input: fragment_seq = fragment.sequence # This could be a Primer as well, which doesn't have a source if isinstance(fragment_seq, Dseqrecord) and fragment_seq.source is not None: fragment_seq.source.add_to_history_graph(history_graph, fragment_seq) else: history_graph.add_node( id(fragment_seq), label=f"{fragment_seq.name} ({repr(fragment_seq)})", ) history_graph.add_edge(id(self), id(fragment_seq))
[docs] def history_string(self, seq: "Dseqrecord"): """ Returns a string representation of the cloning history of the sequence. See dseqrecord.history() for examples. """ history_graph = nx.DiGraph() self.add_to_history_graph(history_graph, seq) return "\n".join( nx.generate_network_text(history_graph, with_labels=True, sources=[id(seq)]) )
def _replay_products(self, **kwargs) -> list["Dseqrecord"]: """Replay the cloning operation and return all possible products. Subclasses must override this to provide the replay logic. """ raise NotImplementedError( f"_replay_products() not implemented for {type(self).__name__}" ) def _validate_result_in_products( self, result: "Dseqrecord", products: list["Dseqrecord"] ) -> None: # This has to be a shallow copy, because the source object points to # parent sequences. We don't want to copy them, but also the equality # check would fail. self_without_db_id = copy.copy(self) self_without_db_id.database_id = None if not any( (p.seq == result.seq) and (p.source == self_without_db_id) for p in products ): raise ValueError( f"Result sequence does not match any of the {len(products)} " f"product(s) from {type(self).__name__}" ) def _find_product_by_seguid( self, result: "Dseqrecord", products: list["Dseqrecord"] ) -> "Dseqrecord": """Find and return the product whose seguid matches result's seguid.""" target_seguid = result.seq.seguid() for p in products: if p.seq.seguid() == target_seguid: return p raise ValueError( f"No product with matching seguid among {len(products)} " f"product(s) from {type(self).__name__}" )
[docs] def validate(self, result: "Dseqrecord") -> None: """Replay the cloning operation and verify the result sequence matches. Raises ValueError if the result doesn't match any expected product. Raises NotImplementedError for source types that don't support validation yet. Sources with no inputs (external imports) return silently. """ if not self.input: return self._validate_result_in_products(result, self._replay_products())
[docs] def normalize(self, result: "Dseqrecord") -> "Dseqrecord": """Replay the cloning operation and return the product matching result by seguid. Uses seguid comparison which is invariant to rotation and orientation, so it finds the correct product even if inputs were rotated or reverse-complemented. Copies name and id from the original result onto the returned product. """ if not self.input: return result product = self._find_product_by_seguid(result, self._replay_products()) product.name = result.name product.id = result.id return product
[docs] def figure(self, fig_type=None): return None
[docs] class AssemblySource(Source): circular: bool input: list[Union[SourceInput, AssemblyFragment]] = Field( default_factory=list, min_length=1 ) TARGET_MODEL: ClassVar[Type[_AssemblySource]] = _AssemblySource
[docs] @classmethod def from_subfragment_representation( cls, assembly: SubFragmentRepresentationAssembly, fragments: list["Dseqrecord"], is_circular: bool, ): input_list = [] for f_index, loc1, loc2 in assembly: input_list.append( AssemblyFragment( sequence=fragments[abs(f_index) - 1], left_location=loc1, right_location=loc2, reverse_complemented=f_index < 0, ) ) return AssemblySource(input=input_list, circular=is_circular)
def _get_input_sequences(self, handle_insertion: bool = True) -> list["Dseqrecord"]: """Return Dseqrecord inputs (excludes Primers), preserving order, and handling insertion assemblies (removing the last fragment if the first and last are the same).""" from pydna.dseqrecord import Dseqrecord seqs: list[Dseqrecord] = [] for inp in self.input: if isinstance(inp.sequence, Dseqrecord): seqs.append(inp.sequence) # This is necessary for insertion assemblies, where the first and last fragment are the same. if handle_insertion and len(seqs) > 1 and seqs[0] == seqs[-1]: return seqs[:-1] return seqs def _get_input_primers(self) -> list["Primer"]: """Return Primer inputs, preserving order.""" from pydna.primer import Primer return [inp.sequence for inp in self.input if isinstance(inp.sequence, Primer)] def _minimal_assembly_overlap(self) -> int: """Return the smallest overlap length across all AssemblyFragment locations.""" overlaps: list[int] = [] for inp in self.input: if not isinstance(inp, AssemblyFragment): continue if inp.left_location is not None: overlaps.append(len(inp.left_location)) if inp.right_location is not None: overlaps.append(len(inp.right_location)) if not overlaps: raise ValueError("Assembly is not complete") return min(overlaps)
[docs] def validate(self, result: "Dseqrecord") -> None: products = self._replay_products(True) try: self._validate_result_in_products(result, products) except ValueError: products = self._replay_products(False) self._validate_result_in_products(result, products)
[docs] def normalize(self, result: "Dseqrecord") -> "Dseqrecord": products = self._replay_products(True) try: product = self._find_product_by_seguid(result, products) except ValueError: products = self._replay_products(False) product = self._find_product_by_seguid(result, products) product.name = result.name product.id = result.id return product
def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: super()._replay_products(handle_insertion=handle_insertion) def _detailed_figure(self) -> str: shift = 0 print_list = [] # With this filter, we exclude things such as guide RNAs for i, inp in enumerate( filter(lambda x: isinstance(x, AssemblyFragment), self.input) ): inp: AssemblyFragment left_location = inp.left_location right_location = inp.right_location seq = inp.sequence.seq if inp.reverse_complemented: seq = seq.reverse_complement() if seq.circular: rotate = location_boundaries(left_location)[0] seq = seq.shifted(rotate) left_location = shift_location(left_location, -rotate, len(seq)) right_location = shift_location(right_location, -rotate, len(seq)) if left_location is not None: shift -= location_boundaries(left_location)[0] # For circular assemblies if i == 0: print_list.append((-shift, "|" * len(left_location))) shift = 0 print_list.append((shift, seq)) if right_location is not None: shift += location_boundaries(right_location)[0] overlap = str(right_location.extract(seq)).upper() print_list.append((shift, overlap)) min_shift = min(print_list, key=lambda x: x[0])[0] fig = list() for shift, seq in print_list: fig.append("{}{}".format(" " * (shift - min_shift), seq)) return pretty_str("\n".join(fig))
[docs] def figure(self, fig_type=None): if fig_type == "detailed": return self._detailed_figure() print_list = [] for i, inp in enumerate( filter(lambda x: isinstance(x, AssemblyFragment), self.input) ): left_overlap = len(inp.left_location) if inp.left_location else 0 right_overlap = len(inp.right_location) if inp.right_location else 0 name = inp.sequence.name print_list.append((left_overlap, name, right_overlap)) fig = list() shift = 0 for i, (left_overlap, name, right_overlap) in enumerate(print_list): left_part = str(left_overlap) + "|" if left_overlap else "" if self.circular and i == 0: left_part = "-|" left_part += name right_part = "|" + str(right_overlap) if right_overlap else "" fig.append("{}{}".format(" " * shift, left_part + right_part)) # +1 for the eventual "|" shift += len(left_part) + 1 if right_overlap: fig.append("{}{}".format(" " * shift, "\\/")) fig.append("{}{}".format(" " * shift, "/\\")) if self.circular: fig.append("{}{}".format(" " * shift, print_list[0][0])) fig[0] = " " + fig[0] for i in range(1, len(fig)): fig[i] = "|" + fig[i] fig[-1] = fig[-1] + "-" fig.append("|" + " " * (len(fig[-1]) - 1) + "|") fig.append(" " + "-" * (len(fig[-1]) - 2)) return pretty_str("\n".join(fig))
_empty_input_list = Field( default_factory=list, min_length=0, max_length=0, description="Must always be an empty list.", )
[docs] class DatabaseSource(Source): TARGET_MODEL: ClassVar[Type[_DatabaseSource]] = _DatabaseSource input: list[Union[SourceInput, AssemblyFragment]] = _empty_input_list database_id: int
[docs] class UploadedFileSource(Source): TARGET_MODEL: ClassVar[Type[_UploadedFileSource]] = _UploadedFileSource input: list[Union[SourceInput, AssemblyFragment]] = _empty_input_list file_name: str index_in_file: int sequence_file_format: str
[docs] class RepositoryIdSource(Source): TARGET_MODEL: ClassVar[Type[_RepositoryIdSource]] = _RepositoryIdSource input: list[Union[SourceInput, AssemblyFragment]] = _empty_input_list repository_id: str
# location: Location
[docs] class RepositoryIdSourceWithSequenceFileUrl(RepositoryIdSource): """ Auxiliary class to avoid code duplication in the sources that have a sequence file url. """ input: list[Union[SourceInput, AssemblyFragment]] = _empty_input_list sequence_file_url: Optional[str] = None
[docs] class AddgeneIdSource(RepositoryIdSourceWithSequenceFileUrl): TARGET_MODEL: ClassVar[Type[_AddgeneIdSource]] = _AddgeneIdSource addgene_sequence_type: Optional[AddgeneSequenceType] = None
[docs] class BenchlingUrlSource(RepositoryIdSource): TARGET_MODEL: ClassVar[Type[_BenchlingUrlSource]] = _BenchlingUrlSource
[docs] class SnapGenePlasmidSource(RepositoryIdSource): TARGET_MODEL: ClassVar[Type[_SnapGenePlasmidSource]] = _SnapGenePlasmidSource
[docs] class EuroscarfSource(RepositoryIdSource): TARGET_MODEL: ClassVar[Type[_EuroscarfSource]] = _EuroscarfSource
[docs] class WekWikGeneIdSource(RepositoryIdSourceWithSequenceFileUrl): TARGET_MODEL: ClassVar[Type[_WekWikGeneIdSource]] = _WekWikGeneIdSource
[docs] class SEVASource(RepositoryIdSourceWithSequenceFileUrl): TARGET_MODEL: ClassVar[Type[_SEVASource]] = _SEVASource
[docs] class IGEMSource(RepositoryIdSourceWithSequenceFileUrl): TARGET_MODEL: ClassVar[Type[_IGEMSource]] = _IGEMSource
[docs] class OpenDNACollectionsSource(RepositoryIdSourceWithSequenceFileUrl): TARGET_MODEL: ClassVar[Type[_OpenDNACollectionsSource]] = _OpenDNACollectionsSource
[docs] class NCBISequenceSource(RepositoryIdSource): TARGET_MODEL: ClassVar[Type[_NCBISequenceSource]] = _NCBISequenceSource coordinates: SimpleLocation | None = None @classmethod def _get_deserialization_overrides( cls, model: "_NCBISequenceSource", sequences, primers ): return { "coordinates": ( Location.fromstring(model.coordinates) if model.coordinates else None ) }
[docs] class GenomeCoordinatesSource(NCBISequenceSource): TARGET_MODEL: ClassVar[Type[_GenomeCoordinatesSource]] = _GenomeCoordinatesSource assembly_accession: Optional[str] = None locus_tag: Optional[str] = None gene_id: Optional[int] = None coordinates: SimpleLocation
[docs] @field_serializer("coordinates") def serialize_coordinates(self, coordinates: SimpleLocation) -> str: return SequenceLocationStr.from_biopython_location(coordinates)
[docs] class RestrictionAndLigationSource(AssemblySource): restriction_enzymes: list[AbstractCut] TARGET_MODEL: ClassVar[Type[_RestrictionAndLigationSource]] = ( _RestrictionAndLigationSource )
[docs] @field_serializer("restriction_enzymes") def serialize_restriction_enzymes( self, restriction_enzymes: list[AbstractCut] ) -> list[str]: return [str(enzyme) for enzyme in restriction_enzymes]
@classmethod def _get_deserialization_overrides(cls, model, sequences, primers): return { "restriction_enzymes": [ getattr(_restr_module, name) for name in model.restriction_enzymes ] } def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import restriction_ligation_assembly return restriction_ligation_assembly( self._get_input_sequences(handle_insertion), self.restriction_enzymes )
[docs] class GibsonAssemblySource(AssemblySource): TARGET_MODEL: ClassVar[Type[_GibsonAssemblySource]] = _GibsonAssemblySource def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import gibson_assembly return gibson_assembly( self._get_input_sequences(handle_insertion), limit=self._minimal_assembly_overlap(), )
[docs] class InFusionSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_InFusionSource]] = _InFusionSource def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import in_fusion_assembly return in_fusion_assembly( self._get_input_sequences(handle_insertion), limit=self._minimal_assembly_overlap(), )
[docs] class OverlapExtensionPCRLigationSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_OverlapExtensionPCRLigationSource]] = ( _OverlapExtensionPCRLigationSource ) def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import fusion_pcr_assembly return fusion_pcr_assembly( self._get_input_sequences(handle_insertion), limit=self._minimal_assembly_overlap(), )
[docs] class InVivoAssemblySource(AssemblySource): TARGET_MODEL: ClassVar[Type[_InVivoAssemblySource]] = _InVivoAssemblySource def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import in_vivo_assembly return in_vivo_assembly( self._get_input_sequences(handle_insertion), limit=self._minimal_assembly_overlap(), )
[docs] class LigationSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_LigationSource]] = _LigationSource def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import ligation_assembly overlap = self._minimal_assembly_overlap() return ligation_assembly( self._get_input_sequences(handle_insertion), allow_blunt=overlap == 0, allow_partial_overlap=True, )
[docs] class GatewaySource(AssemblySource): TARGET_MODEL: ClassVar[Type[_GatewaySource]] = _GatewaySource reaction_type: GatewayReactionType greedy: bool = Field(default=False) def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import gateway_assembly return gateway_assembly( self._get_input_sequences(handle_insertion), self.reaction_type, self.greedy, )
[docs] class HomologousRecombinationSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_HomologousRecombinationSource]] = ( _HomologousRecombinationSource ) def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import ( homologous_recombination_integration, homologous_recombination_excision_or_inversion, ) seqs = self._get_input_sequences(handle_insertion) limit = self._minimal_assembly_overlap() if len(seqs) == 1: return homologous_recombination_excision_or_inversion(seqs[0], limit) else: return homologous_recombination_integration(seqs[0], seqs[1:], limit)
[docs] class CRISPRSource(HomologousRecombinationSource): TARGET_MODEL: ClassVar[Type[_CRISPRSource]] = _CRISPRSource def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import crispr_integration seqs = self._get_input_sequences(handle_insertion) guides = self._get_input_primers() limit = self._minimal_assembly_overlap() return crispr_integration(seqs[0], seqs[1:], guides, limit)
[docs] class CreLoxRecombinationSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_CreLoxRecombinationSource]] = ( _CreLoxRecombinationSource ) def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import cre_lox_integration, cre_lox_excision_or_inversion seqs = self._get_input_sequences(handle_insertion) if len(seqs) == 1: return cre_lox_excision_or_inversion(seqs[0]) else: return cre_lox_integration(seqs[0], seqs[1:])
[docs] class PCRSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_PCRSource]] = _PCRSource add_primer_features: bool = Field(default=False) def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import pcr_assembly seqs = self._get_input_sequences(handle_insertion) primers = self._get_input_primers() limit = self._minimal_assembly_overlap() return pcr_assembly( seqs[0], primers[0], primers[1], limit=limit, add_primer_features=self.add_primer_features, )
[docs] def figure(self, fig_type=None): fp = self.input[0].sequence rp = self.input[2].sequence tp = self.input[1].sequence fp_fp = len(self.input[0].right_location) rp_fp = len(self.input[2].left_location) fp_position = self.input[1].left_location.end rp_position = self.input[1].right_location.start ft = len(fp) - fp_fp # forward tail length # rt = len(rp) - rp_fp # reverse tail length faz = tp[fp_position - fp_fp : fp_position].seq raz = tp[rp_position : rp_position + rp_fp].seq sp3 = " " * (len(fp.seq) + 3) # breakpoint() fzc = tp.seq.crick[::-1][fp_position - fp_fp : fp_position] rzc = tp.seq.crick[::-1][rp_position : rp_position + rp_fp] f = f""" {" " * ft}5{faz}...{raz}3 {sp3}{"|" * rp_fp} {sp3}3{rp.seq[::-1]}5 5{fp.seq}3 {"|" * fp_fp:>{len(fp)}} {" " * ft}3{fzc}...{rzc}5 """ return pretty_str(textwrap.dedent(f).strip("\n"))
[docs] class RecombinaseSource(AssemblySource): TARGET_MODEL: ClassVar[Type[_RecombinaseSource]] = _RecombinaseSource recombinases: object @field_validator("recombinases") @classmethod def _validate_recombinases(cls, value: "RecombinaseCollection" | "Recombinase"): from pydna.recombinase import RecombinaseCollection, Recombinase if isinstance(value, RecombinaseCollection) or isinstance(value, Recombinase): return value raise ValueError( "recombinases must be a RecombinaseCollection or a Recombinase" )
[docs] @field_serializer("recombinases") def serialize_recombinases( self, recombinases: "RecombinaseCollection" | "Recombinase" ) -> list[_Recombinase]: return recombinases.to_opencloning_model()
@classmethod def _get_deserialization_overrides( cls, model: "_RecombinaseSource", sequences, primers ): from pydna.recombinase import ( RecombinaseCollection, Recombinase as _RecombinaseClass, ) recomb_objs = [ _RecombinaseClass( site1=r.site1, site2=r.site2, site1_name=r.site1_name, site2_name=r.site2_name, name=r.name, ) for r in model.recombinases ] return { "recombinases": ( recomb_objs[0] if len(recomb_objs) == 1 else RecombinaseCollection(recomb_objs) ) } def _replay_products(self, handle_insertion: bool = True) -> list["Dseqrecord"]: from pydna.assembly2 import ( recombinase_integration, recombinase_assembly, ) seqs = self._get_input_sequences(handle_insertion) if len(seqs) == 1: return recombinase_assembly(seqs, self.recombinases) else: return recombinase_integration( seqs[0], seqs[1:], self.recombinases ) + recombinase_assembly(seqs, self.recombinases)
[docs] class SequenceCutSource(Source): left_edge: CutSiteType | None right_edge: CutSiteType | None input: list[Union[SourceInput, AssemblyFragment]] = Field( default_factory=list, min_length=1, max_length=1 ) @property def TARGET_MODEL(self): return ( _RestrictionEnzymeDigestionSource if self._has_enzyme() else _SequenceCutSource )
[docs] @field_serializer("left_edge", "right_edge") def serialize_cut_site( self, cut_site: CutSiteType | None ) -> _RestrictionSequenceCut | _SequenceCut | None: return self._cutsite_to_model(cut_site)
@staticmethod def _cutsite_to_model(cut_site: CutSiteType | None): if cut_site is None: return None watson, overhang = cut_site[0] enzyme_or_none = cut_site[1] if isinstance(enzyme_or_none, AbstractCut): return _RestrictionSequenceCut( cut_watson=watson, overhang=overhang, restriction_enzyme=str(enzyme_or_none), ) return _SequenceCut(cut_watson=watson, overhang=overhang)
[docs] @classmethod def from_parent( cls, parent: "Dseqrecord", left_edge: CutSiteType, right_edge: CutSiteType ): return cls( input=[SourceInput(sequence=parent)], left_edge=left_edge, right_edge=right_edge, )
def _has_enzyme(self) -> bool: def has_enzyme(edge): return edge is not None and isinstance(edge[1], AbstractCut) return has_enzyme(self.left_edge) or has_enzyme(self.right_edge) def _replay_products(self) -> list["Dseqrecord"]: parent = self.input[0].sequence if self._has_enzyme(): enzymes = set() for edge in (self.left_edge, self.right_edge): if edge is not None and isinstance(edge[1], AbstractCut): enzymes.add(edge[1]) return list(parent.cut(*enzymes)) return [parent.apply_cut(self.left_edge, self.right_edge)]
[docs] def normalize(self, result: "Dseqrecord") -> "Dseqrecord": parent = self.input[0].sequence if self._has_enzyme() or parent.seq == result.seq: return super().normalize(result) raise ValueError( "SequenceCutSource must have an enzyme to be normalized if sequences differ" )
@classmethod def _get_deserialization_overrides( cls, model: "_RestrictionEnzymeDigestionSource", sequences, primers ): def _model_to_cutsite(edge_model): if edge_model is None: return None enzyme = None if isinstance(edge_model, _RestrictionSequenceCut): enzyme = getattr(_restr_module, edge_model.restriction_enzyme) return ((edge_model.cut_watson, edge_model.overhang), enzyme) return { "left_edge": _model_to_cutsite(model.left_edge), "right_edge": _model_to_cutsite(model.right_edge), }
[docs] class OligoHybridizationSource(Source): TARGET_MODEL: ClassVar[Type[_OligoHybridizationSource]] = _OligoHybridizationSource input: list[Union[SourceInput]] = Field( default_factory=list, min_length=2, max_length=2 ) overhang_crick_3prime: Optional[int] = None def _replay_products(self) -> list["Dseqrecord"]: from pydna.oligonucleotide_hybridization import oligonucleotide_hybridization fwd = self.input[0].sequence rvs = self.input[1].sequence annealing = min(len(fwd.seq), len(rvs.seq)) - abs( self.overhang_crick_3prime or 0 ) return oligonucleotide_hybridization(fwd, rvs, annealing)
[docs] class PolymeraseExtensionSource(Source): TARGET_MODEL: ClassVar[Type[_PolymeraseExtensionSource]] = ( _PolymeraseExtensionSource ) input: list[Union[SourceInput]] = Field( default_factory=list, min_length=1, max_length=1 ) def _replay_products(self) -> list["Dseqrecord"]: from pydna.dseqrecord import Dseqrecord inp = self.input[0].sequence if not isinstance(inp, Dseqrecord): raise ValueError( f"PolymeraseExtensionSource input must be a Dseqrecord, got {type(inp)}" ) result = copy.deepcopy(inp) result.seq = result.seq.fill_in() result.source = self return [result]
[docs] class AnnotationSource(Source): TARGET_MODEL: ClassVar[Type[_AnnotationSource]] = _AnnotationSource input: list[Union[SourceInput]] = Field( default_factory=list, min_length=1, max_length=1 ) annotation_tool: AnnotationTool annotation_tool_version: Optional[str] = None annotation_report: Optional[ list[_AnnotationReport | _PlannotateAnnotationReport] ] = None
[docs] def validate(self, result: "Dseqrecord") -> None: """Just validates that there is a single input, and that its sequence is the same as the result.""" from pydna.dseqrecord import Dseqrecord if not isinstance(self.input[0].sequence, Dseqrecord): raise ValueError( f"AnnotationSource input must be a Dseqrecord, got {type(self.input[0].sequence)}" ) if self.input[0].sequence.seq != result.seq: raise ValueError("AnnotationSource input sequence does not match result")
[docs] def normalize(self, result: "Dseqrecord") -> "Dseqrecord": from pydna.dseqrecord import Dseqrecord input_sequence = self.input[0].sequence if not isinstance(input_sequence, Dseqrecord): raise ValueError( f"AnnotationSource input must be a Dseqrecord, got {type(input_sequence)}" ) if input_sequence.seq.seguid() != result.seq.seguid(): raise ValueError("AnnotationSource input sequence does not match result") if input_sequence.seq == result.seq: return copy.deepcopy(result) if input_sequence.circular: # synced already handles reverse complementing out_seq = result.synced(input_sequence.seq) else: out_seq = result.reverse_complement() out_source = copy.deepcopy(self) # Here we remove the annotation report, since it referenced original coordinates out_source.annotation_report = [] out_seq.source = out_source return out_seq
[docs] class ReverseComplementSource(Source): TARGET_MODEL: ClassVar[Type[_ReverseComplementSource]] = _ReverseComplementSource input: list[Union[SourceInput]] = Field( default_factory=list, min_length=1, max_length=1 ) def _replay_products(self) -> list["Dseqrecord"]: out_seq = self.input[0].sequence.reverse_complement() return [out_seq]
[docs] def read_dseqrecord_from_text_file_sequence(seq: TextFileSequence) -> Dseqrecord: from pydna.parsers import parse from pydna.dseq import Dseq out_dseq_record = parse(seq.file_content, ds=True)[0] if seq.overhang_watson_3prime != 0 or seq.overhang_crick_3prime != 0: out_dseq_record.seq = Dseq.from_full_sequence_and_overhangs( str(out_dseq_record.seq), seq.overhang_crick_3prime, seq.overhang_watson_3prime, ) out_dseq_record.id = str(seq.id) return out_dseq_record
[docs] class CloningStrategy(_BaseCloningStrategy): # For now, we don't add anything, but the classes will not have the new # methods if this is used # It will be used for validation for now primers: Optional[List[PrimerModel]] = Field( default_factory=list, description="""The primers that are used in the cloning strategy""", json_schema_extra={ "linkml_meta": {"alias": "primers", "domain_of": ["CloningStrategy"]} }, )
[docs] def add_primer(self, primer: "Primer"): existing_ids = {seq.id for seq in self.primers} if get_id(primer) in existing_ids: return self.primers.append(PrimerModel.from_primer(primer))
[docs] def add_dseqrecord(self, dseqr: "Dseqrecord"): from pydna.dseqrecord import Dseqrecord existing_ids = {seq.id for seq in self.sequences} if get_id(dseqr) in existing_ids: return self.sequences.append(TextFileSequence.from_dseqrecord(dseqr)) if dseqr.source is not None: self.sources.append(dseqr.source.to_pydantic_model(get_id(dseqr))) this_source: Source = dseqr.source for source_input in this_source.input: if isinstance(source_input.sequence, Dseqrecord): self.add_dseqrecord(source_input.sequence) else: self.add_primer(source_input.sequence) else: self.sources.append(_ManuallyTypedSource(id=get_id(dseqr), input=[]))
[docs] def reassign_ids(self): all_ids = ( {seq.id for seq in self.sequences} | {source.id for source in self.sources} | {primer.id for primer in self.primers} ) id_mappings = {id: i + 1 for i, id in enumerate(sorted(all_ids))} for seq in self.sequences: seq.id = id_mappings[seq.id] for primer in self.primers: primer.id = id_mappings[primer.id] for source in self.sources: source.id = id_mappings[source.id] for assembly_fragment in source.input: assembly_fragment.sequence = id_mappings[assembly_fragment.sequence]
[docs] @classmethod def from_dseqrecords(cls, dseqrs: list["Dseqrecord"], description: str = ""): cloning_strategy = cls(sources=[], sequences=[], description=description) for dseqr in dseqrs: cloning_strategy.add_dseqrecord(dseqr) return cloning_strategy
[docs] def model_dump_json(self, *args, **kwargs): if getattr(_thread_local, "use_python_internal_id", True): # Make a deep copy of the cloning strategy and reassign ids cs = self.__deepcopy__() cs.reassign_ids() return super(CloningStrategy, cs).model_dump_json(*args, **kwargs) return super().model_dump_json(*args, **kwargs)
[docs] def model_dump(self, *args, **kwargs): if getattr(_thread_local, "use_python_internal_id", True): cs = self.__deepcopy__() cs.reassign_ids() return super(CloningStrategy, cs).model_dump(*args, **kwargs) return super().model_dump(*args, **kwargs)
[docs] def get_ids_of_sequences_that_are_not_inputs(self) -> list[int]: """ Get the ids of the sequences that are not inputs to any source. """ return [ seq.id for seq in self.sequences if seq.id not in [input.sequence for source in self.sources for input in source.input] ]
[docs] def get_ids_of_sequences_that_are_inputs(self) -> list[int]: """ Get the ids of the sequences that are inputs to any source. """ return [ seq.id for seq in self.sequences if seq.id in [input.sequence for source in self.sources for input in source.input] ]
[docs] def to_dseqrecords(self) -> list["Dseqrecord"]: """ Convert the cloning strategy to a list of Dseqrecord objects. Walks the dependency graph from terminal sequences (those not used as inputs to any source) upward, reconstructing pydna Source objects and attaching them to the parsed Dseqrecord objects. """ from pydna.primer import Primer as _PrimerCls # Build primer lookup primer_by_id: dict[int, _PrimerCls] = {} for p in self.primers or []: primer_obj = _PrimerCls(p.sequence, name=p.name) primer_obj.id = str(p.id) primer_by_id[p.id] = primer_obj # Build sequence lookup by parsing TextFileSequence genbank content seq_by_id: dict[int, Dseqrecord] = {} for text_seq in self.sequences: if not isinstance(text_seq, _TextFileSequence): raise NotImplementedError( f"Only works with TextFileSequence objects, got {type(text_seq)}" ) seq_by_id[text_seq.id] = read_dseqrecord_from_text_file_sequence(text_seq) # Build source lookup (source.id == output sequence id) source_by_id: dict[int, _Source] = {s.id: s for s in self.sources} # Resolve sources recursively resolved: set[int] = set() def resolve(seq_id: int): if seq_id in resolved: return resolved.add(seq_id) source_model = source_by_id.get(seq_id) if source_model is None: raise ValueError(f"Missing source for sequence {seq_id}") # Recursively resolve input sequences first for inp in source_model.input or []: if inp.sequence in seq_by_id: resolve(inp.sequence) dseqr = seq_by_id[seq_id] if isinstance(source_model, _ManuallyTypedSource): dseqr.source = None return pydna_cls = _TARGET_MODEL_REGISTRY.get(type(source_model)) if pydna_cls is None: # pragma: no cover raise ValueError(f"Unknown source model type: {type(source_model)}") dseqr.source = pydna_cls.from_pydantic_model( source_model, seq_by_id, primer_by_id ) terminal_ids = self.get_ids_of_sequences_that_are_not_inputs() for sid in terminal_ids: resolve(sid) return [seq_by_id[sid] for sid in terminal_ids]
[docs] def normalize(self) -> "CloningStrategy": """ Normalize the cloning strategy by resolving the sources and sequences. """ newseqrs = [s.normalize_history() for s in self.to_dseqrecords()] return self.__class__.from_dseqrecords(newseqrs, self.description)
[docs] def validate(self) -> None: """ Validate the cloning strategy by resolving the sources and sequences. """ for seq in self.to_dseqrecords(): seq.validate_history()
def _build_target_model_registry() -> dict[type, type]: """Build a mapping from opencloning_linkml Source types to pydna Source types.""" registry = {} def _register(cls): target = cls.__dict__.get("TARGET_MODEL") if target is not None and not isinstance(target, property): registry[target] = cls for sub in cls.__subclasses__(): _register(sub) _register(Source) # SequenceCutSource uses @property for TARGET_MODEL, register manually registry[_SequenceCutSource] = SequenceCutSource registry[_RestrictionEnzymeDigestionSource] = SequenceCutSource return registry _TARGET_MODEL_REGISTRY: dict[type, type] = _build_target_model_registry()