"""Input and output functions and classes for FASTA files."""
from typing import Iterable, Optional, Union
from dataclasses import dataclass, field
from io import TextIOWrapper
from itertools import chain
import textwrap
from carabiner.cast import cast
from pandas import DataFrame
from .tables import _sanitize_columns
[docs]
@dataclass
class FastaSequence:
"""Object which gives a fasta-formatted sequence when printed.
Attributes
----------
name : str
Name of the sequence.
description : str
Sequence description string.
sequence : str
The actual sequence.
Methods
-------
__str__
Show the FASTA-formatted sequence.
Examples
--------
>>> s = FastaSequence("example", "This is a description", "ATCG")
>>> print(s)
>example This is a description
ATCG
"""
name: str
description: str
sequence: str
[docs]
def __str__(self) -> str:
"""Show the FASTA-formatted sequence."""
seq = '\n'.join(textwrap.wrap(self.sequence, width=80))
return f">{self.name} {self.description}\n{seq}"
[docs]
def write(self,
file: Optional[TextIOWrapper] = None) -> None:
return print(self, file=file)
[docs]
@dataclass
class FastaCollection:
"""Collection of FASTA sequences for reading and writing.
Attributes
----------
sequences : str, optional
Iterable of `FastaSequence`
Methods
-------
from_file
Instantiate by reading a FASTA file.
write
Write sequences to FASTA file.
Examples
--------
>>> seq1 = FastaSequence("example", "This is a description", "ATCG")
>>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA")
>>> fasta_stream = FastaCollection([seq1, seq2])
>>> print(fasta_stream)
>example This is a description
ATCG
>example2 This is another sequence
GGGAAAA
"""
sequences: Iterable[FastaSequence] = field(default_factory=list)
def __str__(self):
"""Show the FASTA-formatted sequence."""
return '\n'.join(str(seq) for seq in self.sequences)
@staticmethod
def _from_file(file: Union[str, TextIOWrapper]) -> Iterable[FastaSequence]:
if isinstance(file, str):
file = cast(file, to=TextIOWrapper)
seq_name, seq_desc, seq = None, None, ''
for line in file:
line = line.rstrip()
if line.startswith('>'):
if seq_name is not None:
yield FastaSequence(seq_name, seq_desc, seq)
seq_name, seq_desc, seq = None, None, ''
seq_header = line.lstrip('>').lstrip().split()
seq_name, seq_desc = seq_header[0], ' '.join(seq_header[1:])
elif len(line) > 0:
seq += line
if len(seq) > 0:
yield FastaSequence(seq_name, seq_desc, seq)
[docs]
@classmethod
def from_file(
cls,
file: Union[str, TextIOWrapper]
):
"""Read sequences from a FASTA file.
Takes a file handle or filename and creates a new `FastaCollection` of
a `FastaSequence` for each sequence.
Parameters
----------
file: TextIO or str
String or file handle such as on generated by `open(f, mode='r')`.
Returns
-------
FastaCollection
Examples
--------
>>> from io import StringIO
>>> seq1 = FastaSequence("example", "This is a description", "ATCG")
>>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA")
>>> fasta_stream = FastaCollection([seq1, seq2])
>>> fasta_file = StringIO()
>>> fasta_stream.write(fasta_file)
>>> fasta_file.seek(0) # rewind file
0
>>> fasta_stream2 = FastaCollection.from_file(fasta_file)
>>> print(fasta_stream2)
>example This is a description
ATCG
>example2 This is another sequence
GGGAAAA
"""
return cls(seq for seq in cls._from_file(file=file))
@staticmethod
def _from_pandas(
data: DataFrame,
sequence: str,
names: Union[str, Iterable[str]],
descriptions: Optional[Union[str, Iterable[str]]] = None,
name_sep: str = '_',
desc_sep: str = ';'
) -> Iterable[FastaSequence]:
descriptions = descriptions or []
names = cast(names, to=list)
descriptions = cast(descriptions, to=list)
columns = list(_sanitize_columns(chain(names, descriptions, cast(sequence, to=list))))
cols_in_data = [column in data for column in columns]
cols_not_in_data = [column for column in columns if column not in data]
if not all(cols_in_data):
raise KeyError('Some requested columns not in the table: '
'"{}"'.format('", "'.join(cols_not_in_data)))
data = data[columns]
for row in data.itertuples():
clean_names = (str(getattr(row, name)).replace(' ', '-') for name in names)
name = name_sep.join(clean_names)
clean_desc = ('{}={}'.format(desc, str(getattr(row, desc)).replace(' ', '_'))
for desc in descriptions)
decription = desc_sep.join(clean_desc)
seq = getattr(row, sequence)
yield FastaSequence(name, decription, seq)
[docs]
@classmethod
def from_pandas(
cls,
data: DataFrame,
sequence: str,
names: Union[str, Iterable[str]],
descriptions: Optional[Union[str, Iterable[str]]] = None,
name_sep: str = '_',
desc_sep: str = ';'
):
"""Create a `FastaCollection from a Pandas DataFrame.
The FASTA sequence is taken from the `sequence` column, and the names is taken from the
`names` columns, concatenated separated by `name_sep`. If provided, description columns values
are added to the description field as 'key=value' pairs, separated by `desc_sep`.
Parameters
----------
data : pd.DataFrame
Input data. Must contain columns named as `sequence`, `names`, and (optionally) `descriptions`.
sequence : str
Name of column containing sequences.
names : list
Names of columns to use as sequence names in FASTA.
descriptions : list, optional
Names of columns to add as metadata to the description in FASTA.
name_sep : str, optional
Separator between name values. Default: '_'.
desc_sep : str, optional
Separator between description values. Default: ';'.
Yields
------
FastaSequence
Object representing a single FASTA sequence.
Examples
--------
>>> import pandas as pd
>>> df = pd.DataFrame(dict(seq=['atcg', 'aaaa'],
... title=['seq1', 'seq2'],
... info=['SeqA', 'SeqB'],
... score=[1, 2]))
>>> df # doctest: +NORMALIZE_WHITESPACE
seq title info score
0 atcg seq1 SeqA 1
1 aaaa seq2 SeqB 2
>>> FastaCollection.from_pandas(df, sequence='seq',
... names=['title'],
... descriptions=['info', 'score']).write() # doctest: +NORMALIZE_WHITESPACE
>seq1 info=SeqA;score=1
atcg
>seq2 info=SeqB;score=2
aaaa
>>> FastaCollection.from_pandas(df, sequence='seq',
... names=['title', 'info'],
... descriptions=['score']).write() # doctest: +NORMALIZE_WHITESPACE
>seq1_SeqA score=1
atcg
>seq2_SeqB score=2
aaaa
"""
sequences = cls._from_pandas(
data=data,
sequence=sequence,
names=names,
descriptions=descriptions,
name_sep=name_sep,
desc_sep=desc_sep,
)
return cls(seq for seq in sequences)
[docs]
def write(
self,
file: Optional[TextIOWrapper] = None
):
"""Stream sequences to a FASTA file.
Takes an iterable of FastaSequence and writes them to the given file.
Parameters
----------
stream : Sequence
Iterable of FastaSequence objects.
file : TextIO
File handle such as on generated by `open(f, mode='w')`.
Returns
-------
None
Examples
--------
>>> seq1 = FastaSequence("example", "This is a description", "ATCG")
>>> seq2 = FastaSequence("example2", "This is another sequence", "GGGAAAA")
>>> fasta_stream = FastaCollection([seq1, seq2])
>>> fasta_stream.write()
>example This is a description
ATCG
>example2 This is another sequence
GGGAAAA
"""
for fasta_seq in self.sequences:
fasta_seq.write(file=file)
return None