modernpython icon indicating copy to clipboard operation
modernpython copied to clipboard

SHA-256 issues

Open Zella0828 opened this issue 1 year ago • 0 comments

from Bio import SeqIO from Bio import Entrez import re import io import hashlib import urllib.request import urllib.error import time from itertools import chain

自定义异常类,用于区分服务器错误

class ServerException(Exception): pass

class DownloadError(Exception): """基类,用于所有下载相关的错误。""" pass

class ProteinDownloadError(DownloadError): """特定于蛋白质序列下载的错误。""" pass

class CDSDownloadError(DownloadError): """特定于CDS序列下载的错误。""" pass

class GenomicDownloadError(DownloadError): """特定于基因组序列下载的错误。""" pass

重试请求函数

def retry_request(request_func, backoff, timeout, retries=3): for _ in range(retries): try: handle = request_func return handle except urllib.error.HTTPError as e: if e.code == 500: time.sleep(backoff) else: raise ServerException(f"HTTP Error: {e}") raise ServerException("Max retries exceeded for HTTP request")

添加延迟函数

def add_delay(seconds): time.sleep(seconds)

定义拉丁学名

organisms = { "Panax ginseng": "txid4054", " Hevea brasiliensis": "txid3981", "Robinia pseudoacacia": "txid35938", "Pueraria lobata": "txid3893", "Salvinia natans": "txid42333", "Albizia julibrissin": "txid3813", "Cercis chinensis": "txid161750", }

定义输出文件名

protein_output_filename = "proteins.fasta" cds_output_filename = "cds.fasta" genomic_output_filename = "genomic.fasta" Entrez.email = "[email protected]"

去重集合

seen_sequences = set()

搜索并保存protein序列到FASTA文件

def fetch_and_save_protein_sequences(organism, protein_output_filename): # 获取esearch的HTTP响应对象 esearch_func = retry_request(lambda: Entrez.esearch(db="protein", term=organism, usehistory="y"), backoff=2, timeout=10) esearch_response = esearch_func() # 调用函数获取HTTP响应对象

# 解析HTTP响应内容
esearch_record = Entrez.read(esearch_response)  # 使用Entrez.read来解析XML文件

# 关闭HTTP响应对象
esearch_response.close()

add_delay(1)

# 直接获取IdList
seq_ids = esearch_record["IdList"]  # 获取"IdList"键对应的值

# 打开输出文件
with open(protein_output_filename, "w") as protein_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="protein", id=seq_id, rettype="fasta", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()  # 调用函数获取HTTP响应对象

            # 读取efetch响应内容并写入文件
            protein_output_file.write(efetch_response.read())

            # 关闭efetch响应对象
            efetch_response.close()
        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
            raise ProteinDownloadError(f"Failed to download protein sequence with ID {seq_id}")
        except Exception as e:
            print(f"An unexpected error occurred for sequence ID {seq_id}: {e}")
            raise DownloadError(f"An unexpected error occurred for sequence ID {seq_id}")

print(f"All protein sequences for {organism} have been downloaded and saved to {protein_output_filename}.")

搜索并保存CDS序列到FASTA文件

def fetch_and_save_cds_sequences(organism, cds_output_filename): # 使用esearch搜索指定物种的mRNA序列 esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[mRNA]", usehistory="y"), backoff=2, timeout=10) esearch_response = esearch_func() esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果 esearch_response.close() add_delay(1)

# 直接获取IdList
seq_ids = esearch_result["IdList"]

# 打开输出文件以写入CDS序列
with open(cds_output_filename, "w") as cds_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()
            gb_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank")  # 读取GenBank记录

            # 遍历特征并提取CDS序列
            for feature in gb_record.features:
                if feature.type == "CDS":
                    cds_seq = feature.location.extract(gb_record.seq)
                    clean_sequence = re.sub(r"[^A-Za-z*]", "", str(cds_seq))
                    sequence_hash = hashlib.md5(clean_sequence.encode('utf-8')).hexdigest()

                    # 检查序列是否已存在
                    if sequence_hash not in seen_sequences:
                        # 如果序列是唯一的,将其写入文件并添加到去重集合
                        cds_output_file.write(
                            f">CDS_{feature.location.start}..{feature.location.end} {gb_record.id}\n")
                        cds_output_file.write(str(cds_seq) + "\n")
                        seen_sequences.add(sequence_hash)
                    else:
                        print(f"Skipping duplicate sequence with ID {seq_id}.")

            efetch_response.close()

        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
        except Exception as e:
            print(f"An error occurred for sequence ID {seq_id}: {e}")

print(f"All CDS sequences for {organism} have been downloaded and saved to {cds_output_filename}.")

定义一个函数用于获取并保存基因组序列

def fetch_and_save_genomic_sequences(organism, genomic_output_filename): # 使用esearch搜索指定物种的基因组序列 esearch_func = retry_request(lambda: Entrez.esearch(db="nucleotide", term=f"{organism}[Genomic]", usehistory="y"), backoff=2, timeout=10) esearch_response = esearch_func() esearch_result = Entrez.read(esearch_response) # 读取esearch响应结果 esearch_response.close() add_delay(1)

# 直接获取IdList
seq_ids = esearch_result["IdList"]

# 打开输出文件以写入基因组序列
with open(genomic_output_filename, "w") as genomic_output_file:
    for seq_id in seq_ids:
        try:
            # 获取efetch的HTTP响应对象
            efetch_func = retry_request(
                lambda: Entrez.efetch(db="nucleotide", id=seq_id, rettype="gb", retmode="text"), backoff=2,
                timeout=10)
            efetch_response = efetch_func()
            genomic_record = SeqIO.read(io.BytesIO(efetch_response.read()), "genbank")  # 读取GenBank记录

            # 写入基因组序列
            genomic_output_file.write(str(genomic_record.seq) + "\n")
            efetch_response.close()
        except urllib.error.URLError as e:
            print(f"URL error for sequence ID {seq_id}: {e.reason}")
        except Exception as e:
            print(f"An error occurred for sequence ID {seq_id}: {e}")

print(f"All genomic sequences for {organism} have been downloaded and saved to {genomic_output_filename}.")

调用函数获取并保存蛋白质、CDS和基因组序列

for organism, taxid in organisms.items(): fetch_and_save_protein_sequences(organism, protein_output_filename) fetch_and_save_cds_sequences(organism, cds_output_filename) fetch_and_save_genomic_sequences(organism, genomic_output_filename)

调用函数获取并保存蛋白质、CDS和基因组序列

for organism, taxid in organisms.items(): fetch_and_save_protein_sequences(organism, protein_output_filename) fetch_and_save_cds_sequences(organism, cds_output_filename) fetch_and_save_genomic_sequences(organism, genomic_output_filename)

定义筛选匹配序列的函数

def find_matching_sequences(input_filename, btw, numc, seen_sequences): """ 从给定的FASTA文件中筛选出符合特定条件的序列,并进行去重。 """ with open(input_filename, "r") as input_file: for record in SeqIO.parse(input_file, "fasta"): sequence = str(record.seq) # 序列转换为字符串 c_positions = [m.start() for m in re.finditer('C', sequence)] # 找到所有'C'的位置 if len(c_positions) >= numc: valid_pairs = [] for i in range(len(c_positions) - 1): if c_positions[i + 1] - c_positions[i] <= btw: valid_pairs.append((c_positions[i], c_positions[i + 1]))

            consecutive_valid_pairs = 0
            for i in range(len(valid_pairs) - 1):
                if valid_pairs[i][1] == valid_pairs[i + 1][0] - 1:  # 检查是否连续
                    consecutive_valid_pairs += 1
                    if consecutive_valid_pairs >= numc - 1:  # 检查连续对的数量
                        # 使用SHA-256哈希算法
                        sequence_hash = hashlib.sha256(sequence.encode('utf-8')).hexdigest()
                        if sequence_hash not in seen_sequences:
                            yield record  # 生成满足条件的序列
                            seen_sequences.add(sequence_hash)  # 添加到已见集合
                        break  # 找到后退出循环
                else:
                    consecutive_valid_pairs = 0  # 重置计数器

将匹配序列写入文件的函数

def write_matching_sequences_to_file(matching_sequences_generator, output_file): """ 将筛选出的匹配序列写入到指定的FASTA文件中。 参数: matching_sequences_generator (迭代器): 包含匹配序列的生成器。 output_file (str): 输出文件的路径。 """ with open(output_file, "w") as output_handle: SeqIO.write(matching_sequences_generator, output_handle, "fasta")

设置筛选参数的范围

btw_range = range(20, 100) # btw的取值范围 numc_values = [3, 4, 5] # numc的取值

清空去重集合

seen_sequences.clear()

定义最终结果文件名

final_result_filename = "8.5_result.fasta"

初始化一个空的生成器列表

all_matching_sequences = []

对于每个btw和numc的组合,找到匹配的序列

for btw in btw_range: for numc in numc_values: # 找到匹配的序列并添加到生成器列表中 matching_sequences_proteins = find_matching_sequences(protein_output_filename, btw, numc, seen_sequences) matching_sequences_cds = find_matching_sequences(cds_output_filename, btw, numc, seen_sequences) matching_sequences_genomic = find_matching_sequences(genomic_output_filename, btw, numc, seen_sequences)

    # 将所有匹配的序列合并到一个生成器中
    all_matching_sequences.extend([matching_sequences_proteins, matching_sequences_cds, matching_sequences_genomic])

调用函数,传递生成器列表而不是单个生成器

write_matching_sequences_to_file(chain(*all_matching_sequences), final_result_filename)

打印完成消息

print(f"All matching sequences have been saved in {final_result_filename}")

Zella0828 avatar Apr 19 '24 05:04 Zella0828