Source code for cfoldseeker.communication

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import logging
import requests
import time
from pathlib import Path
from tqdm.contrib.concurrent import thread_map
from tqdm.contrib.logging import logging_redirect_tqdm


LOG = logging.getLogger(__name__)


[docs] def submit_foldseek_query(query_path: Path, dbs: list, taxfilters: list, max_attempts: int = 3) -> dict: """ Submits a structure file to the FoldSeek API for processing. Sends a protein structure query to the FoldSeek webserver with specified databases and taxonomic filters. Returns the submission ticket on success or raises an error on failure with maximum attempt logic. Args: query_path (Path): Path object pointing to the structure file to submit. dbs (list): List of database names to search against. taxfilters (list): List of taxonomic filters to apply to the search. max_attempts (int): Maximum number of submission attempts Returns: A dictionary containing the submission ticket and metadata from the FoldSeek API response. Raises: RuntimeError: If submission fails too many times. """ FOLDSEEK_SUBMISSION_URL = "https://search.foldseek.com/api/ticket" trials = 0 with open(query_path, "rb") as f: # Compose ticket files = {"q": f} data = [("mode", "3diaa")] for db in dbs: data.append(("database[]", db)) for taxfilt in taxfilters: data.append(('taxfilter', taxfilt)) # Submit ticket LOG.debug(f"Posting request on FoldSeek webserver {FOLDSEEK_SUBMISSION_URL}") LOG.debug(f"using the following parameters: {data}") while trials < max_attempts: try: response = requests.post(FOLDSEEK_SUBMISSION_URL, files=files, data=data) match response.status_code: case 200: LOG.debug(f'Query {query_path} successfully submitted!') return response.json() case 429: LOG.warning('Too many requests error returned. Waiting 5 seconds before retrying.') trials += 1 time.sleep(5) case _: msg = f"Error submitting query {query_path}! Status code: {response.status_code}" LOG.warning(msg) except requests.exceptions.ConnectionError: LOG.warning('Error submitting request. Connection was aborted.') trials += 1 time.sleep(5) msg = 'Too many attempts submitting. Aborting.' LOG.critical(msg) raise RuntimeError(msg)
[docs] def check_query_status(job_id: str) -> str: """ Retrieves the current status of a FoldSeek job. Queries the FoldSeek API to check the processing status of a previously submitted job using its unique job ID. Args: job_id: The unique identifier for the FoldSeek job. Returns: A string indicating the job status (e.g., "COMPLETE", "RUNNING", etc.). """ FOLDSEEK_SUBMISSION_URL = "https://search.foldseek.com/api/ticket" url = f"{FOLDSEEK_SUBMISSION_URL}/{job_id}" LOG.debug(f'Checking out status of job {job_id}') results = requests.get(url).json() status = results['status'] return status
[docs] def retrieve_foldseek_results(job_id: str) -> dict: """ Waits for a FoldSeek job to complete and retrieves its results. Polls the job status at regular intervals until completion, then downloads and returns the parsed results from the FoldSeek API. Args: job_id: The unique identifier for the FoldSeek job. Returns: A dictionary containing the parsed results from the completed FoldSeek job. """ FOLDSEEK_RESULTS_URL = "https://search.foldseek.com/api/result" while True: status = check_query_status(job_id) if status == "COMPLETE": LOG.debug(f"Job {job_id} has completed! Downloading results...") entry = 0 url = f"{FOLDSEEK_RESULTS_URL}/{job_id}/{entry}" results = requests.get(url, timeout = 300).json() break else: LOG.debug(f'Job status: {status}') LOG.debug(f"Job {job_id} has not completed yet. Waiting another 10 seconds...") time.sleep(10) return results
[docs] def pull_from_ena(entry: str, max_retries: int = 3) -> None | str: """ Retrieves a GenPept record from the ENA Browser API. Attempts to fetch a GenPept sequence record from the European Nucleotide Archive (ENA) with retry logic for rate-limited responses. Args: entry: The accession number or identifier of the GenPept record to retrieve. max_retries: Maximum number of retry attempts for rate-limited requests (error code 429). Defaults to 3. Waiting time between trials is 5 seconds. Returns: A string containing the GenPept record in text format, or None if the retrieval fails after max retries or an unexpected error occurs. """ ENA_BROWSER_URL = "https://www.ebi.ac.uk/ena/browser/api/embl" trials = 0 url = f"{ENA_BROWSER_URL}/{entry}" LOG.debug(f'Going to pull GenPept record from {url}') while trials < max_retries: try: response = requests.get(url) match response.status_code: case 200: return response.text case 429: LOG.warning('Too many requests error returned. Waiting 5 seconds before retrying.') trials += 1 time.sleep(5) case 404: LOG.warning(f'Entry {entry} not found. Aborting.') return None case _: LOG.warning(f'Error pulling GenPept entry {entry}. Code returned: {response.status_code}. Retrying.') except requests.exceptions.ConnectionError: LOG.warning(f'Error pulling GenPept entry {entry}. Connection was aborted.') trials += 1 time.sleep(5) LOG.warning('Too many attempts to download {entry}. Aborting.') return None
[docs] def pull_from_unisave(entry: str, max_retries: int = 3) -> None | str: """ Retrieves a UniSave record from the UniProt REST API. Fetches a protein sequence record from UniSave (UniProt archive) with retry logic for rate-limited responses. Args: entry: The UniProt accession number of the record to retrieve. max_retries: Maximum number of retry attempts for rate-limited requests (429). Defaults to 3. Waiting time between trials is 5 seconds. Returns: A string containing the UniSave record in text format, or None if the retrieval fails after max retries or an unexpected error occurs. """ UNISAVE_REST_URL = "https://rest.uniprot.org/unisave" trials = 0 url = f"{UNISAVE_REST_URL}/{entry}?format=txt" LOG.debug(f'Going to pull UniSave entry from {url}') while trials < max_retries: try: response = requests.get(url) match response.status_code: case 200: return response.text case 429: LOG.warning('Too many requests error returned. Waiting 5 seconds before retrying.') trials += 1 time.sleep(5) case 404: LOG.warning(f'Entry {entry} not found. Aborting.') return None case _: LOG.warning(f'Error pulling UniSave record {entry}. Code returned: {response.status_code}. Retrying.') except requests.exceptions.ConnectionError: LOG.warning(f'Error pulling UniSave record {entry}. Connection was aborted.') trials += 1 time.sleep(5) LOG.warning(f'Too many attempts to download record {entry}. Aborting.') return None
[docs] def pull_dict_from_unisave(entries: list, max_workers: int = 1, no_progress: bool = False) -> dict: """ Retrieves multiple UniSave records and returns them as a dictionary. Fetches a list of UniSave entries in parallel and returns them mapped to their original accession numbers. Failed retrievals are filtered out. Args: entries: List of UniProt accession numbers to retrieve. max_workers: Number of worker threads for parallel retrieval. Defaults to 1. no_progress: If True, suppresses the progress bar during retrieval. Defaults to False. Returns: A dictionary mapping each successfully retrieved accession number to its corresponding UniSave record as a string. Failed retrievals are excluded from the dictionary. """ LOG.info(f'Going to pull {len(entries)} UniSave records') with logging_redirect_tqdm(loggers = [LOG]): unisave_entries = thread_map(pull_from_unisave, entries, max_workers = max_workers, leave = False, disable = no_progress) succeeded_unisave_entries = {k:v for k,v in zip(entries, unisave_entries) if v != None} return succeeded_unisave_entries