API Documentation

Node module

The node module contains the MetalNode class, which is the foundation for MetalPipe.

class metalpipe.node.AggregateValues(values=False, tail_path=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Does that.

process_item()[source]

Default no-op for nodes.

class metalpipe.node.BatchMessages(batch_size=None, batch_list=None, counter=0, timeout=5, **kwargs)[source]

Bases: metalpipe.node.MetalNode

cleanup()[source]

If there is any cleanup (closing files, shutting down database connections), necessary when the node is stopped, then the node’s class should provide a cleanup method. By default, the method is just a logging statement.

process_item()[source]

Default no-op for nodes.

class metalpipe.node.CSVReader(*args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.CSVToDictionaryList(**kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.ConstantEmitter(thing=None, max_loops=5, delay=0.5, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Send a thing every n seconds

generator()[source]
class metalpipe.node.CounterOfThings(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

bar__init__(*args, start=0, end=None, **kwargs)[source]
generator()[source]

Just start counting integers

class metalpipe.node.DynamicClassMediator(*args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

get_sink()[source]
get_source()[source]
hi()[source]
sink_list()[source]
source_list()[source]
class metalpipe.node.Filter(test=None, test_keypath=None, value=True, *args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Applies tests to each message and filters out messages that don’t pass

Built-in tests:
key_exists value_is_true value_is_not_none

Example

{‘test’: ‘key_exists’,
‘key’: mykey}
process_item()[source]

Default no-op for nodes.

class metalpipe.node.FunctionOfMessage(function_name, *args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.GetEnvironmentVariables(mappings=None, environment_variables=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

This node reads environment variables and stores them in the message.

The required keyword argument for this node is environment_variables, which is a list of – you guessed it! – environment variables. By default, they will be read and stored in the outgoing message under keys with the same names as the environment variables. E.g. FOO_VAR will be stored in the message {"FOO_BAR": whatever}.

Optionally, you can provide a dictionary to the mappings keyword argument, which maps environment variable names to new names. E.g. if mappings = {"FOO_VAR": "bar_var"}, then the value of FOO_VAR will be stored in the message {"bar_var": whatever}.

If the environment variable is not defined, then its value will be set to None.

Parameters:
  • mappings (dict) – An optional dictionary mapping environment variable names to new names.
  • environment_variables (list) – A list of environment variable names.
generator()[source]
process_item()[source]

Default no-op for nodes.

class metalpipe.node.InsertData(overwrite=True, overwrite_if_null=True, value_dict=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.LocalDirectoryWatchdog(directory='.', check_interval=3, **kwargs)[source]

Bases: metalpipe.node.MetalNode

generator()[source]
class metalpipe.node.LocalFileReader(*args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.MetalNode(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]

Bases: object

The foundational class of MetalPipe. This class is inherited by all nodes in a computation graph.

Order of operations: 1. Child class __init__ function 2. MetalNode __init__ function 3. preflight_function (Specified in initialization params) 4. setup 5. start

These methods have the following intended uses:

  1. __init__ Sets attribute values and calls the MetalNode __init__ method.
  2. get_runtime_attrs Sets any attribute values that are to be determined at runtime, e.g. by checking environment variables or reading values from a database. The get_runtime_attrs should return a dictionary of attributes -> values, or else None.
  3. setup Sets the state of the MetalNode and/or creates any attributes that require information available only at runtime.
Parameters:
  • send_batch_markers – If True, then a BatchStart marker will be sent when a new input is received, and a BatchEnd will be sent after the input has been processed. The intention is that a number of items will be emitted for each input received. For example, we might emit a table row-by-row for each input.
  • get_runtime_attrs – A function that returns a dictionary-like object. The keys and values will be saved to this MetalNode object’s attributes. The function is executed one time, upon starting the node.
  • get_runtime_attrs_args – A tuple of arguments to be passed to the get_runtime_attrs function upon starting the node.
  • get_runtime_attrs_kwargs – A dictionary of kwargs passed to the get_runtime_attrs function.
  • runtime_attrs_destinations – If set, this is a dictionary mapping the keys returned from the get_runtime_attrs function to the names of the attributes to which the values will be saved.
  • throttle – For each input received, a delay of throttle seconds will be added.
  • keep_alive – If True, keep the node’s thread alive after everything has been processed.
  • name – The name of the node. Defaults to a randomly generated hash. Note that this hash is not consistent from one run to the next.
  • input_mapping – When the node receives a dictionary-like object, this dictionary will cause the keys of the dictionary to be remapped to new keys.
  • retain_input – If True, then combine the dictionary-like input with the output. If keys clash, the output value will be kept.
  • input_message_keypath – Read the value in this keypath as the content of the incoming message.
add_edge(target, **kwargs)[source]

Create an edge connecting self to target.

This method instantiates the MetalPipeQueue object that connects the nodes. Connecting the nodes together consists in (1) adding the queue to the other’s input_queue_list or output_queue_list and (2) setting the queue’s source_node and target_node attributes.

Parameters:target (MetalNode) – The node to which self will be connected.
all_connected(seen=None)[source]

Returns all the nodes connected (directly or indirectly) to self. This allows us to loop over all the nodes in a pipeline even if we have a handle on only one. This is used by global_start, for example.

Parameters:seen (set) – A set of all the nodes that have been identified as connected to self.
Returns:
All the nodes connected to self. This
includes self.
Return type:(set of MetalNode)
broadcast(broadcast_message)[source]

Puts the message into all the input queues for all connected nodes.

cleanup()[source]

If there is any cleanup (closing files, shutting down database connections), necessary when the node is stopped, then the node’s class should provide a cleanup method. By default, the method is just a logging statement.

draw_pipeline()[source]

Draw the pipeline structure using graphviz.

global_start(prometheus=False, pipeline_name=None, max_time=None, fixturize=False)[source]

Starts every node connected to self. Mainly, it: 1. calls start() on each node #. sets some global variables #. optionally starts some experimental code for monitoring

input_queue_size

Return the total number of items in all of the queues that are inputs to this node.

is_sink

Tests whether the node is a sink or not, i.e. whether there are no outputs from the node.

Returns:True if the node has no output nodes, False otherwise.
Return type:(bool)
is_source

Tests whether the node is a source or not, i.e. whether there are no inputs to the node.

Returns:True if the node has no inputs, False otherwise.
Return type:(bool)
kill_pipeline()[source]
log_info(message='')[source]
logjam

Returns the logjam score, which measures the degree to which the node is holding up progress in downstream nodes.

We’re defining a logjam as a node whose input queue is full, but whose output queue(s) is not. More specifically, we poll each node in the monitor_thread, and increment a counter if the node is a logjam at that time. This property returns the percentage of samples in which the node is a logjam. Our intention is that if this score exceeds a threshold, the user is alerted, or the load is rebalanced somehow (not yet implemented).

Returns:Logjam score
Return type:(float)
pipeline_finished
process_item(*args, **kwargs)[source]

Default no-op for nodes.

setup()[source]

For classes that require initialization at runtime, which can’t be done when the class’s __init__ function is called. The MetalNode base class’s setup function is just a logging call.

It should be unusual to have to make use of setup because in practice, initialization can be done in the __init__ function.

start()[source]

Starts the node. This is called by MetalNode.global_start().

The node’s main loop is contained in this method. The main loop does the following:

  1. records the timestamp to the node’s started_at attribute.
  2. calls get_runtime_attrs (TODO: check if we can deprecate this)
  3. calls the setup method for the class (which is a no-op by default)
  4. if the node is a source, then successively yield all the results of the node’s generator method, then exit.
  5. if the node is not a source, then loop over the input queues, getting the next message. Note that when the message is pulled from the queue, the MetalPipeQueue yields it as a dictionary.
  6. gets either the content of the entire message if the node has no key attribute, or the value of message[self.key].
  7. remaps the message content if a remapping dictionary has been given in the node’s configuration
  8. calls the node’s process_item method, yielding back the results. (Note that a single input message may cause the node to yield zero, one, or more than one output message.)
  9. places the results into each of the node’s output queues.
stream()[source]

Called in each MetalNode thread.

terminate_pipeline(error=False)[source]

This method can be called on any node in a pipeline, and it will cause all of the nodes to terminate if they haven’t stopped already.

Parameters:error (bool) – Not yet implemented.
thread_monitor(max_time=None)[source]

This function loops over all of the threads in the pipeline, checking that they are either finished or running. If any have had an abnormal exit, terminate the entire pipeline.

time_running

Return the number of wall-clock seconds elapsed since the node was started.

wait_for_pipeline_finish()[source]
class metalpipe.node.NothingToSeeHere[source]

Bases: object

Vacuous class used as a no-op message type.

class metalpipe.node.PrinterOfThings(*args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.RandomSample(sample=0.1)[source]

Bases: metalpipe.node.MetalNode

Lets through only a random sample of incoming messages. Might be useful for testing, or when only approximate results are necessary.

process_item()[source]

Default no-op for nodes.

class metalpipe.node.Remapper(mapping=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.SequenceEmitter(sequence, *args, max_sequences=1, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Emits sequence max_sequences times, or forever if max_sequences is None.

generator()[source]

Emit the sequence max_sequences times.

process_item()[source]

Emit the sequence max_sequences times.

class metalpipe.node.Serializer(values=False, *args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Takes an iterable thing as input, and successively yields its items.

process_item()[source]

Default no-op for nodes.

class metalpipe.node.SimpleTransforms(missing_keypath_action='ignore', starting_path=None, transform_mapping=None, target_value=None, keypath=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.StreamMySQLTable(*args, host='localhost', user=None, table=None, password=None, database=None, port=3306, to_row_obj=False, send_batch_markers=True, **kwargs)[source]

Bases: metalpipe.node.MetalNode

generator()[source]
get_schema()[source]
setup()[source]

For classes that require initialization at runtime, which can’t be done when the class’s __init__ function is called. The MetalNode base class’s setup function is just a logging call.

It should be unusual to have to make use of setup because in practice, initialization can be done in the __init__ function.

class metalpipe.node.StreamingJoin(window=30, streams=None, *args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Joins two streams on a key, using exact match only. MVP.

process_item()[source]
class metalpipe.node.SubstituteRegex(match_regex=None, substitute_string=None, *args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

process_item()[source]

Default no-op for nodes.

class metalpipe.node.TimeWindowAccumulator(*args, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Every N seconds, put the latest M seconds data on the queue.

class metalpipe.node.bcolors[source]

Bases: object

This class holds the values for the various colors that are used in the tables that monitor the status of the nodes.

BOLD = '\x1b[1m'
ENDC = '\x1b[0m'
FAIL = '\x1b[91m'
HEADER = '\x1b[95m'
OKBLUE = '\x1b[94m'
OKGREEN = '\x1b[92m'
UNDERLINE = '\x1b[4m'
WARNING = '\x1b[93m'
metalpipe.node.class_factory(raw_config)[source]
metalpipe.node.get_node_dict(node_config)[source]
metalpipe.node.kwarg_remapper(f, **kwarg_mapping)[source]
metalpipe.node.no_op(*args, **kwargs)[source]

No-op function to serve as default get_runtime_attrs.

metalpipe.node.template_class(class_name, parent_class, kwargs_remapping, frozen_arguments_mapping)[source]

Civis-specific node types

This is where any classes specific to the Civis API live.

class metalpipe.node_classes.civis_nodes.CivisSQLExecute(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Execute a SQL statement and return the results.

process_item()[source]

Execute a SQL statement and return the result.

class metalpipe.node_classes.civis_nodes.CivisToCSV(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, include_headers=True, delimiter=', ', **kwargs)[source]

Bases: metalpipe.node.MetalNode

Execute a SQL statement and return the results via a CSV file.

process_item()[source]

Execute a SQL statement and return the result.

class metalpipe.node_classes.civis_nodes.EnsureCivisRedshiftTableExists(on_failure='exit', table=None, schema=None, database=None, columns=None, block=True, **kwargs)[source]

Bases: metalpipe.node.MetalNode

generator()[source]
process_item()[source]

Default no-op for nodes.

class metalpipe.node_classes.civis_nodes.FindValueInRedshiftColumn(on_failure='exit', table=None, database=None, schema=None, column=None, choice='max', **kwargs)[source]

Bases: metalpipe.node.MetalNode

generator()[source]
process_item()[source]

Default no-op for nodes.

class metalpipe.node_classes.civis_nodes.SendToCivis(*args, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, schema=None, existing_table_rows='append', include_columns=None, dummy_run=False, block=False, max_errors=0, table=None, via_staging_table=False, columns=None, staging_table=None, remap=None, recorded_tables={}, **kwargs)[source]

Bases: metalpipe.node.MetalNode

cleanup()[source]

Check if we’re using staging tables. If so, copy the staging table into the production table. TODO: options for merge, upsert, append, drop

full_table_name
monitor_futures()[source]
process_item()[source]

Accept a bunch of dictionaries mapping column names to values.

setup()[source]

Check if we’re using staging tables and create the table if necessary.

Data structures module

Data types (e.g. Rows, Records) for ETL.

class metalpipe.utils.data_structures.BOOL(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.IntermediateTypeSystem

python_cast_function

alias of builtins.bool

class metalpipe.utils.data_structures.DATETIME(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.IntermediateTypeSystem

python_cast_function()
class metalpipe.utils.data_structures.DataSourceTypeSystem[source]

Bases: object

Information about mapping one type system onto another contained in the children of this class.

static convert(obj)[source]

Override this method if something more complicated is necessary.

static type_mapping(*args, **kwargs)[source]
class metalpipe.utils.data_structures.DataType(value, original_type=None, name=None)[source]

Bases: object

Each DataType gets a python_cast_function, which is a function.

intermediate_type = None
python_cast_function = None
to_intermediate_type()[source]

Convert the DataType to an IntermediateDataType using its class’s intermediate_type attribute.

to_python()[source]
type_system

Just for convenience to make the type system an attribute.

class metalpipe.utils.data_structures.FLOAT(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.IntermediateTypeSystem

python_cast_function

alias of builtins.float

class metalpipe.utils.data_structures.INTEGER(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.IntermediateTypeSystem

python_cast_function

alias of builtins.int

exception metalpipe.utils.data_structures.IncompatibleTypesException[source]

Bases: Exception

class metalpipe.utils.data_structures.IntermediateTypeSystem[source]

Bases: metalpipe.utils.data_structures.DataSourceTypeSystem

Never instantiate this by hand.

class metalpipe.utils.data_structures.MYSQL_BOOL(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.MySQLTypeSystem

intermediate_type

alias of BOOL

python_cast_function

alias of builtins.bool

class metalpipe.utils.data_structures.MYSQL_DATE(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.MySQLTypeSystem

intermediate_type

alias of DATETIME

python_cast_function()
class metalpipe.utils.data_structures.MYSQL_ENUM(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.MySQLTypeSystem

intermediate_type

alias of STRING

python_cast_function

alias of builtins.str

class metalpipe.utils.data_structures.MYSQL_INTEGER[source]

Bases: type

class metalpipe.utils.data_structures.MYSQL_INTEGER0(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 0
class metalpipe.utils.data_structures.MYSQL_INTEGER1(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 1
class metalpipe.utils.data_structures.MYSQL_INTEGER10(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 10
class metalpipe.utils.data_structures.MYSQL_INTEGER1024(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 1024
class metalpipe.utils.data_structures.MYSQL_INTEGER11(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 11
class metalpipe.utils.data_structures.MYSQL_INTEGER12(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 12
class metalpipe.utils.data_structures.MYSQL_INTEGER128(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 128
class metalpipe.utils.data_structures.MYSQL_INTEGER13(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 13
class metalpipe.utils.data_structures.MYSQL_INTEGER14(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 14
class metalpipe.utils.data_structures.MYSQL_INTEGER15(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 15
class metalpipe.utils.data_structures.MYSQL_INTEGER16(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 16
class metalpipe.utils.data_structures.MYSQL_INTEGER16384(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 16384
class metalpipe.utils.data_structures.MYSQL_INTEGER17(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 17
class metalpipe.utils.data_structures.MYSQL_INTEGER18(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 18
class metalpipe.utils.data_structures.MYSQL_INTEGER19(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 19
class metalpipe.utils.data_structures.MYSQL_INTEGER2(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 2
class metalpipe.utils.data_structures.MYSQL_INTEGER20(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 20
class metalpipe.utils.data_structures.MYSQL_INTEGER2048(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 2048
class metalpipe.utils.data_structures.MYSQL_INTEGER21(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 21
class metalpipe.utils.data_structures.MYSQL_INTEGER22(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 22
class metalpipe.utils.data_structures.MYSQL_INTEGER23(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 23
class metalpipe.utils.data_structures.MYSQL_INTEGER24(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 24
class metalpipe.utils.data_structures.MYSQL_INTEGER25(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 25
class metalpipe.utils.data_structures.MYSQL_INTEGER256(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 256
class metalpipe.utils.data_structures.MYSQL_INTEGER26(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 26
class metalpipe.utils.data_structures.MYSQL_INTEGER27(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 27
class metalpipe.utils.data_structures.MYSQL_INTEGER28(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 28
class metalpipe.utils.data_structures.MYSQL_INTEGER29(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 29
class metalpipe.utils.data_structures.MYSQL_INTEGER3(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 3
class metalpipe.utils.data_structures.MYSQL_INTEGER30(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 30
class metalpipe.utils.data_structures.MYSQL_INTEGER31(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 31
class metalpipe.utils.data_structures.MYSQL_INTEGER32(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 32
class metalpipe.utils.data_structures.MYSQL_INTEGER32768(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 32768
class metalpipe.utils.data_structures.MYSQL_INTEGER4(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 4
class metalpipe.utils.data_structures.MYSQL_INTEGER4096(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 4096
class metalpipe.utils.data_structures.MYSQL_INTEGER5(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 5
class metalpipe.utils.data_structures.MYSQL_INTEGER512(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 512
class metalpipe.utils.data_structures.MYSQL_INTEGER6(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 6
class metalpipe.utils.data_structures.MYSQL_INTEGER64(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 64
class metalpipe.utils.data_structures.MYSQL_INTEGER7(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 7
class metalpipe.utils.data_structures.MYSQL_INTEGER8(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 8
class metalpipe.utils.data_structures.MYSQL_INTEGER8192(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 8192
class metalpipe.utils.data_structures.MYSQL_INTEGER9(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_INTEGER_BASE

max_length = 9
class metalpipe.utils.data_structures.MYSQL_INTEGER_BASE(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.MySQLTypeSystem

intermediate_type

alias of INTEGER

python_cast_function

alias of builtins.int

class metalpipe.utils.data_structures.MYSQL_VARCHAR[source]

Bases: type

class metalpipe.utils.data_structures.MYSQL_VARCHAR0(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 0
class metalpipe.utils.data_structures.MYSQL_VARCHAR1(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 1
class metalpipe.utils.data_structures.MYSQL_VARCHAR10(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 10
class metalpipe.utils.data_structures.MYSQL_VARCHAR1024(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 1024
class metalpipe.utils.data_structures.MYSQL_VARCHAR11(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 11
class metalpipe.utils.data_structures.MYSQL_VARCHAR12(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 12
class metalpipe.utils.data_structures.MYSQL_VARCHAR128(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 128
class metalpipe.utils.data_structures.MYSQL_VARCHAR13(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 13
class metalpipe.utils.data_structures.MYSQL_VARCHAR14(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 14
class metalpipe.utils.data_structures.MYSQL_VARCHAR15(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 15
class metalpipe.utils.data_structures.MYSQL_VARCHAR16(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 16
class metalpipe.utils.data_structures.MYSQL_VARCHAR16384(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 16384
class metalpipe.utils.data_structures.MYSQL_VARCHAR17(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 17
class metalpipe.utils.data_structures.MYSQL_VARCHAR18(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 18
class metalpipe.utils.data_structures.MYSQL_VARCHAR19(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 19
class metalpipe.utils.data_structures.MYSQL_VARCHAR2(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 2
class metalpipe.utils.data_structures.MYSQL_VARCHAR20(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 20
class metalpipe.utils.data_structures.MYSQL_VARCHAR2048(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 2048
class metalpipe.utils.data_structures.MYSQL_VARCHAR21(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 21
class metalpipe.utils.data_structures.MYSQL_VARCHAR22(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 22
class metalpipe.utils.data_structures.MYSQL_VARCHAR23(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 23
class metalpipe.utils.data_structures.MYSQL_VARCHAR24(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 24
class metalpipe.utils.data_structures.MYSQL_VARCHAR25(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 25
class metalpipe.utils.data_structures.MYSQL_VARCHAR256(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 256
class metalpipe.utils.data_structures.MYSQL_VARCHAR26(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 26
class metalpipe.utils.data_structures.MYSQL_VARCHAR27(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 27
class metalpipe.utils.data_structures.MYSQL_VARCHAR28(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 28
class metalpipe.utils.data_structures.MYSQL_VARCHAR29(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 29
class metalpipe.utils.data_structures.MYSQL_VARCHAR3(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 3
class metalpipe.utils.data_structures.MYSQL_VARCHAR30(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 30
class metalpipe.utils.data_structures.MYSQL_VARCHAR31(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 31
class metalpipe.utils.data_structures.MYSQL_VARCHAR32(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 32
class metalpipe.utils.data_structures.MYSQL_VARCHAR32768(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 32768
class metalpipe.utils.data_structures.MYSQL_VARCHAR4(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 4
class metalpipe.utils.data_structures.MYSQL_VARCHAR4096(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 4096
class metalpipe.utils.data_structures.MYSQL_VARCHAR5(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 5
class metalpipe.utils.data_structures.MYSQL_VARCHAR512(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 512
class metalpipe.utils.data_structures.MYSQL_VARCHAR6(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 6
class metalpipe.utils.data_structures.MYSQL_VARCHAR64(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 64
class metalpipe.utils.data_structures.MYSQL_VARCHAR7(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 7
class metalpipe.utils.data_structures.MYSQL_VARCHAR8(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 8
class metalpipe.utils.data_structures.MYSQL_VARCHAR8192(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 8192
class metalpipe.utils.data_structures.MYSQL_VARCHAR9(value, original_type=None, name=None)

Bases: metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE

max_length = 9
class metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.MySQLTypeSystem

intermediate_type

alias of STRING

python_cast_function

alias of builtins.str

class metalpipe.utils.data_structures.MySQLTypeSystem[source]

Bases: metalpipe.utils.data_structures.DataSourceTypeSystem

Each TypeSystem gets a type_mapping static method that takes a string and returns the class in the type system named by that string. For example, int(8) in a MySQL schema should return the MYSQL_INTEGER8 class.

static type_mapping(string)[source]

Parses the schema strings from MySQL and returns the appropriate class.

class metalpipe.utils.data_structures.PrimitiveTypeSystem[source]

Bases: metalpipe.utils.data_structures.DataSourceTypeSystem

class metalpipe.utils.data_structures.PythonTypeSystem[source]

Bases: metalpipe.utils.data_structures.DataSourceTypeSystem

class metalpipe.utils.data_structures.Row(*records, type_system=None)[source]

Bases: object

A collection of DataType objects (typed values). They are dictionaries mapping the names of the values to the DataType objects.

concat(other, fail_on_duplicate=True)[source]
static from_dict(row_dictionary, **kwargs)[source]

Creates a Row object form a dictionary mapping names to values.

is_empty()[source]
keys()[source]

For implementing the mapping protocol.

class metalpipe.utils.data_structures.STRING(value, original_type=None, name=None)[source]

Bases: metalpipe.utils.data_structures.DataType, metalpipe.utils.data_structures.IntermediateTypeSystem

python_cast_function

alias of builtins.str

metalpipe.utils.data_structures.all_bases(obj)[source]

Return all the class to which obj belongs.

metalpipe.utils.data_structures.convert_to_type_system(obj, cls)[source]
metalpipe.utils.data_structures.get_type_system(obj)[source]
metalpipe.utils.data_structures.make_types()[source]
metalpipe.utils.data_structures.mysql_type(string)[source]

Parses the schema strings from MySQL and returns the appropriate class.

metalpipe.utils.data_structures.primitive_to_intermediate_type(thing, name=None)[source]

Network nodes module

Classes that deal with sending and receiving data across the interwebs.

class metalpipe.node_classes.network_nodes.HttpGetRequest(endpoint_template=None, endpoint_dict=None, protocol='http', retries=5, json=True, **kwargs)[source]

Bases: metalpipe.node.MetalNode

Node class for making simple GET requests.

process_item()[source]

The input to this function will be a dictionary-like object with parameters to be substituted into the endpoint string and a dictionary with keys and values to be passed in the GET request.

Three use-cases: 1. Endpoint and parameters set initially and never changed. 2. Endpoint and parameters set once at runtime 3. Endpoint and parameters set by upstream messages

class metalpipe.node_classes.network_nodes.HttpGetRequestPaginator(endpoint_dict=None, json=True, pagination_get_request_key=None, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_template_key=None, default_offset_value='', **kwargs)[source]

Bases: metalpipe.node.MetalNode

Node class for HTTP API requests that require paging through sets of results.

This class handles making HTTP GET requests, determining whether there are additional results, and making additional calls if necessary. A typical case is to have an HTTP request something like this:

http://www.someapi.com/endpoint_name?resultpage=0

with a response like:

{"data": "something", "additional_pages": true, "next_page": 1}

The response contains some data, a flag additional_pages for determining whether there are additional results, and a parameter that gets passed to the next request for retrieving the right page of results (next_page). So the next GET request would be:

http://www.someapi.com/endpoint_name?resultpage=1

This process will repeat until additional_pages is false.

In order to use this node class, you’ll need to provide arguments that tell the node where to look for the equivalent of additional_pages and next_page.

  1. endpoint_template: The parameteried URL for the API.
  2. additional_data_key: The keypath to the value in the API response that determines whether there are additional pages to request.
  3. pagination_key: The keypath to the value in the API response that contains the value that would be passed to the API to retrieve the next set of values.
  4. pagination_get_request_key: The key in the endpoint_template that will contain the value of the pagination_key.

For our simple example, the arguments would be

  1. endpoint_template: http://www.someapi.com/endpoint_name?resultpage={result_page}
  2. additional_data_key: ["additional_pages"]
  3. pagination_key: ["next_page"]
  4. pagination_get_request_key: result_page

In addition to those mandatory arguments, you can also optionally specify an endpoint_dict, which contains other values that will be substituted into the endpoint_template. For example, these APIs often have an option that controls the number of results to provide in each response, like so:

http://www.someapi.com/endpoint_name?results={num_results}?resultpage={result_page}

For cases like this, the value of endpoint_dict is a dictionary mapping keys from the endpoint_template to their values. So if you wanted to have ten results per page, you would specify:

endpoint_dict = {"num_results": 10}

There can be any number of other parameters specified in the endpoint_dict.

If there are other keys in the endpoint_template that are not provided in the endpoint_dict, then the node will try to find them in the current message that’s being processed. For example, it is common to have some kind of security token that might be given in an environment variable. If the value of that environment variable has been provided by some upstream node and placed in the key token, then it would be substituted into the URL, provded that the endpoint_template had a place for it, such as:

http://www.someapi.com/endpoint_name?auth_token={token}?resultpage={result_page}
process_item()[source]

Default no-op for nodes.

class metalpipe.node_classes.network_nodes.PaginatedHttpGetRequest(endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_get_request_key=None, protocol='http', retries=5, default_offset_value='', additional_data_test=<class 'bool'>, calling_node=None)[source]

Bases: object

For handling requests in a semi-general way that require paging through lists of results and repeatedly making GET requests.

get_with_retry(url, error_on_none=True, **kwargs)[source]

Simple method for making requests from flaky endpoints.

responses()[source]

Generator. Yields each response until empty.

MetalPipeMessage module

The MetalPipeMesaage encapsulates the content of each piece of data, along with some useful metadata.

class metalpipe.message.message.MetalPipeMessage(message_content)[source]

Bases: object

A class that contains the message payloads that are queued for each MetalPipeProcessor. It holds the messages and lots of metadata used for logging, monitoring, etc.

Trigger module

A simple class containing no data, which is intended merely as a trigger, signaling that the downstream node should do something.

class metalpipe.message.trigger.Trigger(previous_trigger_time=None, trigger_name=None)[source]

Bases: object

metalpipe.message.trigger.hello_world()[source]

Batch module

We’ll use markers to delimit batches of things, such as serialized files and that kind of thing.

class metalpipe.message.batch.BatchEnd(*args, **kwargs)[source]

Bases: object

class metalpipe.message.batch.BatchStart(*args, **kwargs)[source]

Bases: object

class metalpipe.message.canary.Canary[source]

Bases: object

MetalPipeQueue module

These are queues that form the directed edges between nodes.

class metalpipe.node_queue.queue.MetalPipeQueue(max_queue_size, name=None)[source]

Bases: object

approximately_full(error=0.95)[source]
empty
get()[source]
put(message, *args, previous_message=None, **kwargs)[source]

Places a message on the output queues. If the message is None, then the queue is skipped.

Messages are MetalPipeMessage objects; the payload of the message is message.message_content.

size()[source]