Source code for metalpipe.node_classes.network_nodes

"""
Network nodes module
====================

Classes that deal with sending and receiving data across the interwebs.
"""

import requests
import json
import time
import random
import logging
from requests.packages.urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter

from metalpipe.node import MetalNode
from metalpipe.utils.helpers import SafeMap

additional_data_test = bool


[docs]class PaginatedHttpGetRequest: """ For handling requests in a semi-general way that require paging through lists of results and repeatedly making GET requests. """ def __init__( self, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_get_request_key=None, protocol="http", retries=5, default_offset_value="", additional_data_test=bool, calling_node=None, ): """ :ivar endpoint_template: (str) Template for endpoint URL, suitable for calling ``endpoint_template.format(**kwargs)``. :ivar additional_data_key: Key in JSON payload whose value indicates whether there are additional pages to request. :ivar pagination_key: Key in JSON payload where the current page (or next page) is indicated. :ivar pagination_get_request_key: Variable in URL GET request where we pass the offset for the next page. :ivar default_offset_value: Offset value for the first request. Usually this will be an empty string. :ivar additional_data_test: Function that is passed the value of ``additional_data_key``. It should return ``True`` if there are additional pages to request, ``False`` otherwise. """ self.endpoint_template = endpoint_template self.additional_data_key = additional_data_key self.pagination_key = pagination_key self.pagination_get_request_key = pagination_get_request_key self.protocol = protocol self.retries = retries self.default_offset_value = default_offset_value self.additional_data_test = additional_data_test
[docs] def get_with_retry(self, url, error_on_none=True, **kwargs): ''' Simple method for making requests from flaky endpoints. ''' error_counter = 0 success = False hibernate = 1.0 while error_counter < self.retries and not success: try: output = requests.get(url) if output is None: self.log_info( "Request to {url} returned None".format(url=url) ) elif output.status_code >= 300: self.log_info( "Request to {url} returned {code} status code".format( url=url, code=str(output.status_code) ) ) else: success = True except: error_counter += 1 time.sleep(hibernate) hibernate *= 2 if success: return output else: raise Exception("Failure for URL: {url}".format(url=url))
[docs] def responses(self): """ Generator. Yields each response until empty. """ GET_ONLY = True offset_set = set() session = requests.Session() retries = Retry( total=self.retries, read=self.retries, connect=self.retries, backoff_factor=0.3, status_forcelist=[500, 502, 503, 504], ) session.mount( "{protocol}://".format(protocol=self.protocol), HTTPAdapter(max_retries=retries), ) get_request_parameters = { self.pagination_get_request_key: self.default_offset_value } endpoint_url = self.endpoint_template.format(**get_request_parameters) self.log_info("paginator url: " + endpoint_url) successful = False retry_counter = 0 sleep_time = 1.0 while not successful and retry_counter <= self.retries: try: out = requests.get(endpoint_url) successful = True except: self.log_info( "sleeping randomly... retry: {retry}".format( retry=str(retry_counter) ) ) retry_counter += 1 time.sleep(sleep_time + (random.random() * 2)) # Check if successful if not successful: self.log_info( "Unsuccessful request to {url}".format(url=endpoint_url) ) raise Exception("Unsuccessful GET request") # out = out.json() out = json.loads(out.text) offset = out.get(self.pagination_key, None) offset_set.add(offset) page_counter = 0 while self.additional_data_key in out and additional_data_test( out[self.additional_data_key] ): try: offset = out[self.pagination_key] offset_set.add(offset) except KeyError: logging.debug("No offset key. Assuming this is normal.") break get_request_parameters = {self.pagination_get_request_key: offset} endpoint_url = self.endpoint_template.format( **get_request_parameters ) logging.debug("paginator url: " + endpoint_url) try: # response = session.get(endpoint_url) if GET_ONLY: response = self.get_with_retry(endpoint_url) else: response = session.get(endpoint_url) out = response.json() except: logging.warning( "Error parsing. Assuming this is the " "end of the responses." ) break yield out
[docs]class HttpGetRequest(MetalNode): """ Node class for making simple GET requests. """ def __init__( self, endpoint_template=None, endpoint_dict=None, protocol="http", retries=5, json=True, **kwargs ): self.endpoint_template = endpoint_template self.endpoint_dict = endpoint_dict or {} self.protocol = protocol self.json = json self.retries = retries self.endpoint_dict.update(self.endpoint_dict) super(HttpGetRequest, self).__init__(**kwargs)
[docs] def process_item(self): """ The input to this function will be a dictionary-like object with parameters to be substituted into the endpoint string and a dictionary with keys and values to be passed in the GET request. Three use-cases: 1. Endpoint and parameters set initially and never changed. 2. Endpoint and parameters set once at runtime 3. Endpoint and parameters set by upstream messages """ # Hit the parameterized endpoint and yield back the results formatted_endpoint = self.endpoint_template.format_map( SafeMap(**(self.message or {})) ) try: formatted_endpoint = formatted_endpoint.format_map( SafeMap(**(self.endpoint_dict or {})) ) except Exception: logging.error("formatted endpoint: " + formatted_endpoint) raise Exception() self.log_info( "Http GET request: {endpoint}".format(endpoint=formatted_endpoint) ) session = requests.Session() retries = Retry( total=self.retries, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504], ) session.mount( "{protocol}://".format(protocol=self.protocol), HTTPAdapter(max_retries=retries), ) get_response = session.get(formatted_endpoint) try: output = get_response.json() except json.JSONDecodeError: output = get_response.text logging.debug( formatted_endpoint + " GET RESPONSE: " + str(output) + str(type(output)) ) yield output
[docs]class HttpGetRequestPaginator(MetalNode): """ Node class for HTTP API requests that require paging through sets of results. This class handles making HTTP GET requests, determining whether there are additional results, and making additional calls if necessary. A typical case is to have an HTTP request something like this: .. code-block:: none http://www.someapi.com/endpoint_name?resultpage=0 with a response like: .. code-block:: none {"data": "something", "additional_pages": true, "next_page": 1} The response contains some data, a flag ``additional_pages`` for determining whether there are additional results, and a parameter that gets passed to the next request for retrieving the right page of results (``next_page``). So the next GET request would be: .. code-block:: none http://www.someapi.com/endpoint_name?resultpage=1 This process will repeat until ``additional_pages`` is false. In order to use this node class, you'll need to provide arguments that tell the node where to look for the equivalent of ``additional_pages`` and ``next_page``. 1. ``endpoint_template``: The parameteried URL for the API. 2. ``additional_data_key``: The keypath to the value in the API response that determines whether there are additional pages to request. 3. ``pagination_key``: The keypath to the value in the API response that contains the value that would be passed to the API to retrieve the next set of values. 4. ``pagination_get_request_key``: The key in the ``endpoint_template`` that will contain the value of the ``pagination_key``. For our simple example, the arguments would be 1. ``endpoint_template: http://www.someapi.com/endpoint_name?resultpage={result_page}`` 2. ``additional_data_key: ["additional_pages"]`` 3. ``pagination_key: ["next_page"]`` 4. ``pagination_get_request_key: result_page`` In addition to those mandatory arguments, you can also optionally specify an ``endpoint_dict``, which contains other values that will be substituted into the ``endpoint_template``. For example, these APIs often have an option that controls the number of results to provide in each response, like so: .. code-block:: none http://www.someapi.com/endpoint_name?results={num_results}?resultpage={result_page} For cases like this, the value of ``endpoint_dict`` is a dictionary mapping keys from the ``endpoint_template`` to their values. So if you wanted to have ten results per page, you would specify: .. code-block:: none endpoint_dict = {"num_results": 10} There can be any number of other parameters specified in the ``endpoint_dict``. If there are other keys in the ``endpoint_template`` that are not provided in the ``endpoint_dict``, then the node will try to find them in the current message that's being processed. For example, it is common to have some kind of security token that might be given in an environment variable. If the value of that environment variable has been provided by some upstream node and placed in the key ``token``, then it would be substituted into the URL, provded that the ``endpoint_template`` had a place for it, such as: .. code-block:: none http://www.someapi.com/endpoint_name?auth_token={token}?resultpage={result_page} """ def __init__( self, endpoint_dict=None, json=True, pagination_get_request_key=None, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_template_key=None, default_offset_value="", **kwargs ): self.pagination_get_request_key = pagination_get_request_key self.additional_data_key = additional_data_key self.pagination_key = pagination_key self.endpoint_dict = endpoint_dict or {} self.endpoint_template = endpoint_template or "" self.default_offset_value = default_offset_value self.endpoint_template = self.endpoint_template.format_map( SafeMap(**self.endpoint_dict) ) super(HttpGetRequestPaginator, self).__init__(**kwargs)
[docs] def process_item(self): self.requestor = PaginatedHttpGetRequest( pagination_get_request_key=self.pagination_get_request_key, calling_node=self, endpoint_template=self.endpoint_template.format_map( SafeMap(**(self.message or {})) ), additional_data_key=self.additional_data_key, pagination_key=self.pagination_key, default_offset_value=self.default_offset_value, ) for i in self.requestor.responses(): logging.debug( "paginator GET request:" + str(self.name) + " " + str(self.requestor.endpoint_template) ) logging.debug( "::".join([self.name, "events", str(len(i.get("events", "")))]) ) yield i if self.finished: break