Source code for bioino.fasta

"""Input and output functions and classes for FASTA files."""

from typing import Iterable, Optional, Union

from dataclasses import dataclass, field
from io import TextIOWrapper
from itertools import chain
import textwrap

from carabiner.cast import cast
from pandas import DataFrame

from .tables import _sanitize_columns

[docs] @dataclass class FastaSequence: """Object which gives a fasta-formatted sequence when printed. Attributes ---------- name : str Name of the sequence. description : str Sequence description string. sequence : str The actual sequence. Methods ------- __str__ Show the FASTA-formatted sequence. Examples -------- >>> s = FastaSequence("example", "This is a description", "ATCG") >>> print(s) >example This is a description ATCG """ name: str description: str sequence: str
[docs] def __str__(self) -> str: """Show the FASTA-formatted sequence.""" seq = '\n'.join(textwrap.wrap(self.sequence, width=80)) return f">{self.name} {self.description}\n{seq}"
[docs] def write(self, file: Optional[TextIOWrapper] = None) -> None: return print(self, file=file)
[docs] @dataclass class FastaCollection: """Collection of FASTA sequences for reading and writing. Attributes ---------- sequences : str, optional Iterable of `FastaSequence` Methods ------- from_file Instantiate by reading a FASTA file. write Write sequences to FASTA file. Examples -------- >>> seq1 = FastaSequence("example", "This is a description", "ATCG") >>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA") >>> fasta_stream = FastaCollection([seq1, seq2]) >>> print(fasta_stream) >example This is a description ATCG >example2 This is another sequence GGGAAAA """ sequences: Iterable[FastaSequence] = field(default_factory=list) def __str__(self): """Show the FASTA-formatted sequence.""" return '\n'.join(str(seq) for seq in self.sequences) @staticmethod def _from_file(file: Union[str, TextIOWrapper]) -> Iterable[FastaSequence]: if isinstance(file, str): file = cast(file, to=TextIOWrapper) seq_name, seq_desc, seq = None, None, '' for line in file: line = line.rstrip() if line.startswith('>'): if seq_name is not None: yield FastaSequence(seq_name, seq_desc, seq) seq_name, seq_desc, seq = None, None, '' seq_header = line.lstrip('>').lstrip().split() seq_name, seq_desc = seq_header[0], ' '.join(seq_header[1:]) elif len(line) > 0: seq += line if len(seq) > 0: yield FastaSequence(seq_name, seq_desc, seq)
[docs] @classmethod def from_file( cls, file: Union[str, TextIOWrapper] ): """Read sequences from a FASTA file. Takes a file handle or filename and creates a new `FastaCollection` of a `FastaSequence` for each sequence. Parameters ---------- file: TextIO or str String or file handle such as on generated by `open(f, mode='r')`. Returns ------- FastaCollection Examples -------- >>> from io import StringIO >>> seq1 = FastaSequence("example", "This is a description", "ATCG") >>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA") >>> fasta_stream = FastaCollection([seq1, seq2]) >>> fasta_file = StringIO() >>> fasta_stream.write(fasta_file) >>> fasta_file.seek(0) # rewind file 0 >>> fasta_stream2 = FastaCollection.from_file(fasta_file) >>> print(fasta_stream2) >example This is a description ATCG >example2 This is another sequence GGGAAAA """ return cls(seq for seq in cls._from_file(file=file))
@staticmethod def _from_pandas( data: DataFrame, sequence: str, names: Union[str, Iterable[str]], descriptions: Optional[Union[str, Iterable[str]]] = None, name_sep: str = '_', desc_sep: str = ';' ) -> Iterable[FastaSequence]: descriptions = descriptions or [] names = cast(names, to=list) descriptions = cast(descriptions, to=list) columns = list(_sanitize_columns(chain(names, descriptions, cast(sequence, to=list)))) cols_in_data = [column in data for column in columns] cols_not_in_data = [column for column in columns if column not in data] if not all(cols_in_data): raise KeyError('Some requested columns not in the table: ' '"{}"'.format('", "'.join(cols_not_in_data))) data = data[columns] for row in data.itertuples(): clean_names = (str(getattr(row, name)).replace(' ', '-') for name in names) name = name_sep.join(clean_names) clean_desc = ('{}={}'.format(desc, str(getattr(row, desc)).replace(' ', '_')) for desc in descriptions) decription = desc_sep.join(clean_desc) seq = getattr(row, sequence) yield FastaSequence(name, decription, seq)
[docs] @classmethod def from_pandas( cls, data: DataFrame, sequence: str, names: Union[str, Iterable[str]], descriptions: Optional[Union[str, Iterable[str]]] = None, name_sep: str = '_', desc_sep: str = ';' ): """Create a `FastaCollection from a Pandas DataFrame. The FASTA sequence is taken from the `sequence` column, and the names is taken from the `names` columns, concatenated separated by `name_sep`. If provided, description columns values are added to the description field as 'key=value' pairs, separated by `desc_sep`. Parameters ---------- data : pd.DataFrame Input data. Must contain columns named as `sequence`, `names`, and (optionally) `descriptions`. sequence : str Name of column containing sequences. names : list Names of columns to use as sequence names in FASTA. descriptions : list, optional Names of columns to add as metadata to the description in FASTA. name_sep : str, optional Separator between name values. Default: '_'. desc_sep : str, optional Separator between description values. Default: ';'. Yields ------ FastaSequence Object representing a single FASTA sequence. Examples -------- >>> import pandas as pd >>> df = pd.DataFrame(dict(seq=['atcg', 'aaaa'], ... title=['seq1', 'seq2'], ... info=['SeqA', 'SeqB'], ... score=[1, 2])) >>> df # doctest: +NORMALIZE_WHITESPACE seq title info score 0 atcg seq1 SeqA 1 1 aaaa seq2 SeqB 2 >>> FastaCollection.from_pandas(df, sequence='seq', ... names=['title'], ... descriptions=['info', 'score']).write() # doctest: +NORMALIZE_WHITESPACE >seq1 info=SeqA;score=1 atcg >seq2 info=SeqB;score=2 aaaa >>> FastaCollection.from_pandas(df, sequence='seq', ... names=['title', 'info'], ... descriptions=['score']).write() # doctest: +NORMALIZE_WHITESPACE >seq1_SeqA score=1 atcg >seq2_SeqB score=2 aaaa """ sequences = cls._from_pandas( data=data, sequence=sequence, names=names, descriptions=descriptions, name_sep=name_sep, desc_sep=desc_sep, ) return cls(seq for seq in sequences)
[docs] def write( self, file: Optional[TextIOWrapper] = None ): """Stream sequences to a FASTA file. Takes an iterable of FastaSequence and writes them to the given file. Parameters ---------- stream : Sequence Iterable of FastaSequence objects. file : TextIO File handle such as on generated by `open(f, mode='w')`. Returns ------- None Examples -------- >>> seq1 = FastaSequence("example", "This is a description", "ATCG") >>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA") >>> fasta_stream = FastaCollection([seq1, seq2]) >>> fasta_stream.write() >example This is a description ATCG >example2 This is another sequence GGGAAAA """ for fasta_seq in self.sequences: fasta_seq.write(file=file) return None