Source code for fractal.loaders.thegraph.base_graph_loader

from typing import Dict

import requests

from fractal.loaders.base_loader import Loader, LoaderType


[docs] class GraphLoaderException(Exception): pass
[docs] class BaseGraphLoader(Loader): """ Base class for The Graph loaders. Each graph loader should inherit from this class. It should have the following attributes: - subgraph_id: The ID of the subgraph - root_url: The root URL of the graph - api_key: The API key to access Methods: _make_request: Make a request to the graph with a given query """ def __init__(self, root_url: str, api_key: str, subgraph_id: str, loader_type: LoaderType): super().__init__(loader_type=loader_type) self._url: str = f'{root_url}/{api_key}/subgraphs/id/{subgraph_id}'
[docs] def _make_request(self, query: str, *args, **kwargs) -> Dict: """ Make a request to The Graph Args: query (str): GraphQL query Returns: dict: Response data """ response = requests.post(self._url, json={'query': query}, timeout=60) if response.status_code != 200: raise GraphLoaderException(f'Status code: {response.status_code}') data = response.json() if 'errors' in data: raise GraphLoaderException(data['errors']) return data['data']
[docs] class ArbitrumGraphLoader(BaseGraphLoader): """ Graph Loader with arbitrum gateway """ ROOT_URL = "https://gateway-arbitrum.network.thegraph.com/api" def __init__(self, api_key: str, subgraph_id: str, loader_type: LoaderType): super().__init__(root_url=self.ROOT_URL, api_key=api_key, subgraph_id=subgraph_id, loader_type=loader_type)