API Documentation¶
Node module¶
The node module contains the MetalNode class, which is the foundation
for MetalPipe.
-
class
metalpipe.node.AggregateValues(values=False, tail_path=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeDoes that.
-
class
metalpipe.node.BatchMessages(batch_size=None, batch_list=None, counter=0, timeout=5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.CSVReader(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.CSVToDictionaryList(**kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.ConstantEmitter(thing=None, max_loops=5, delay=0.5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeSend a thing every n seconds
-
class
metalpipe.node.CounterOfThings(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.DynamicClassMediator(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.Filter(test=None, test_keypath=None, value=True, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeApplies tests to each message and filters out messages that don’t pass
- Built-in tests:
- key_exists value_is_true value_is_not_none
Example
- {‘test’: ‘key_exists’,
- ‘key’: mykey}
-
class
metalpipe.node.FunctionOfMessage(function_name, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.GetEnvironmentVariables(mappings=None, environment_variables=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeThis node reads environment variables and stores them in the message.
The required keyword argument for this node is
environment_variables, which is a list of – you guessed it! – environment variables. By default, they will be read and stored in the outgoing message under keys with the same names as the environment variables. E.g.FOO_VARwill be stored in the message{"FOO_BAR": whatever}.Optionally, you can provide a dictionary to the
mappingskeyword argument, which maps environment variable names to new names. E.g. ifmappings = {"FOO_VAR": "bar_var"}, then the value ofFOO_VARwill be stored in the message{"bar_var": whatever}.If the environment variable is not defined, then its value will be set to
None.Parameters: - mappings (dict) – An optional dictionary mapping environment variable names to new names.
- environment_variables (list) – A list of environment variable names.
-
class
metalpipe.node.InsertData(overwrite=True, overwrite_if_null=True, value_dict=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.LocalDirectoryWatchdog(directory='.', check_interval=3, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.LocalFileReader(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.MetalNode(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
objectThe foundational class of MetalPipe. This class is inherited by all nodes in a computation graph.
Order of operations: 1. Child class
__init__function 2.MetalNode__init__function 3.preflight_function(Specified in initialization params) 4.setup5. startThese methods have the following intended uses:
__init__Sets attribute values and calls theMetalNode__init__method.get_runtime_attrsSets any attribute values that are to be determined at runtime, e.g. by checking environment variables or reading values from a database. Theget_runtime_attrsshould return a dictionary of attributes -> values, or elseNone.setupSets the state of theMetalNodeand/or creates any attributes that require information available only at runtime.
Parameters: - send_batch_markers – If
True, then aBatchStartmarker will be sent when a new input is received, and aBatchEndwill be sent after the input has been processed. The intention is that a number of items will be emitted for each input received. For example, we might emit a table row-by-row for each input. - get_runtime_attrs – A function that returns a dictionary-like object.
The keys and values will be saved to this
MetalNodeobject’s attributes. The function is executed one time, upon starting the node. - get_runtime_attrs_args – A tuple of arguments to be passed to the
get_runtime_attrsfunction upon starting the node. - get_runtime_attrs_kwargs – A dictionary of kwargs passed to the
get_runtime_attrsfunction. - runtime_attrs_destinations – If set, this is a dictionary mapping
the keys returned from the
get_runtime_attrsfunction to the names of the attributes to which the values will be saved. - throttle – For each input received, a delay of
throttleseconds will be added. - keep_alive – If
True, keep the node’s thread alive after everything has been processed. - name – The name of the node. Defaults to a randomly generated hash. Note that this hash is not consistent from one run to the next.
- input_mapping – When the node receives a dictionary-like object, this dictionary will cause the keys of the dictionary to be remapped to new keys.
- retain_input – If
True, then combine the dictionary-like input with the output. If keys clash, the output value will be kept. - input_message_keypath – Read the value in this keypath as the content of the incoming message.
-
add_edge(target, **kwargs)[source]¶ Create an edge connecting self to target.
This method instantiates the
MetalPipeQueueobject that connects the nodes. Connecting the nodes together consists in (1) adding the queue to the other’sinput_queue_listoroutput_queue_listand (2) setting the queue’ssource_nodeandtarget_nodeattributes.Parameters: target ( MetalNode) – The node to whichselfwill be connected.
-
all_connected(seen=None)[source]¶ Returns all the nodes connected (directly or indirectly) to
self. This allows us to loop over all the nodes in a pipeline even if we have a handle on only one. This is used byglobal_start, for example.Parameters: seen (set) – A set of all the nodes that have been identified as connected to self.Returns: - All the nodes connected to
self. This - includes
self.
Return type: (set of MetalNode)- All the nodes connected to
-
broadcast(broadcast_message)[source]¶ Puts the message into all the input queues for all connected nodes.
-
cleanup()[source]¶ If there is any cleanup (closing files, shutting down database connections), necessary when the node is stopped, then the node’s class should provide a
cleanupmethod. By default, the method is just a logging statement.
-
global_start(prometheus=False, pipeline_name=None, max_time=None, fixturize=False)[source]¶ Starts every node connected to
self. Mainly, it: 1. callsstart()on each node #. sets some global variables #. optionally starts some experimental code for monitoring
-
input_queue_size¶ Return the total number of items in all of the queues that are inputs to this node.
-
is_sink¶ Tests whether the node is a sink or not, i.e. whether there are no outputs from the node.
Returns: Trueif the node has no output nodes,Falseotherwise.Return type: (bool)
-
is_source¶ Tests whether the node is a source or not, i.e. whether there are no inputs to the node.
Returns: Trueif the node has no inputs,Falseotherwise.Return type: (bool)
-
logjam¶ Returns the logjam score, which measures the degree to which the node is holding up progress in downstream nodes.
We’re defining a logjam as a node whose input queue is full, but whose output queue(s) is not. More specifically, we poll each node in the
monitor_thread, and increment a counter if the node is a logjam at that time. This property returns the percentage of samples in which the node is a logjam. Our intention is that if this score exceeds a threshold, the user is alerted, or the load is rebalanced somehow (not yet implemented).Returns: Logjam score Return type: (float)
-
pipeline_finished¶
-
setup()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__function is called. TheMetalNodebase class’s setup function is just a logging call.It should be unusual to have to make use of
setupbecause in practice, initialization can be done in the__init__function.
-
start()[source]¶ Starts the node. This is called by
MetalNode.global_start().The node’s main loop is contained in this method. The main loop does the following:
- records the timestamp to the node’s
started_atattribute. - calls
get_runtime_attrs(TODO: check if we can deprecate this) - calls the
setupmethod for the class (which is a no-op by default) - if the node is a source, then successively yield all the results of
the node’s
generatormethod, then exit. - if the node is not a source, then loop over the input queues, getting
the next message. Note that when the message is pulled from the queue,
the
MetalPipeQueueyields it as a dictionary. - gets either the content of the entire message if the node has no
keyattribute, or the value ofmessage[self.key]. - remaps the message content if a
remappingdictionary has been given in the node’s configuration - calls the node’s
process_itemmethod, yielding back the results. (Note that a single input message may cause the node to yield zero, one, or more than one output message.) - places the results into each of the node’s output queues.
- records the timestamp to the node’s
-
terminate_pipeline(error=False)[source]¶ This method can be called on any node in a pipeline, and it will cause all of the nodes to terminate if they haven’t stopped already.
Parameters: error (bool) – Not yet implemented.
-
thread_monitor(max_time=None)[source]¶ This function loops over all of the threads in the pipeline, checking that they are either
finishedorrunning. If any have had an abnormal exit, terminate the entire pipeline.
-
time_running¶ Return the number of wall-clock seconds elapsed since the node was started.
-
class
metalpipe.node.NothingToSeeHere[source]¶ Bases:
objectVacuous class used as a no-op message type.
-
class
metalpipe.node.PrinterOfThings(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.RandomSample(sample=0.1)[source]¶ Bases:
metalpipe.node.MetalNodeLets through only a random sample of incoming messages. Might be useful for testing, or when only approximate results are necessary.
-
class
metalpipe.node.Remapper(mapping=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.SequenceEmitter(sequence, *args, max_sequences=1, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeEmits
sequencemax_sequencestimes, or forever ifmax_sequencesisNone.
-
class
metalpipe.node.Serializer(values=False, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeTakes an iterable thing as input, and successively yields its items.
-
class
metalpipe.node.SimpleTransforms(missing_keypath_action='ignore', starting_path=None, transform_mapping=None, target_value=None, keypath=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.StreamMySQLTable(*args, host='localhost', user=None, table=None, password=None, database=None, port=3306, to_row_obj=False, send_batch_markers=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode-
setup()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__function is called. TheMetalNodebase class’s setup function is just a logging call.It should be unusual to have to make use of
setupbecause in practice, initialization can be done in the__init__function.
-
-
class
metalpipe.node.StreamingJoin(window=30, streams=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeJoins two streams on a key, using exact match only. MVP.
-
class
metalpipe.node.SubstituteRegex(match_regex=None, substitute_string=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.TimeWindowAccumulator(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeEvery N seconds, put the latest M seconds data on the queue.
-
class
metalpipe.node.bcolors[source]¶ Bases:
objectThis class holds the values for the various colors that are used in the tables that monitor the status of the nodes.
-
BOLD= '\x1b[1m'¶
-
ENDC= '\x1b[0m'¶
-
FAIL= '\x1b[91m'¶
-
HEADER= '\x1b[95m'¶
-
OKBLUE= '\x1b[94m'¶
-
OKGREEN= '\x1b[92m'¶
-
UNDERLINE= '\x1b[4m'¶
-
WARNING= '\x1b[93m'¶
-
Civis-specific node types¶
This is where any classes specific to the Civis API live.
-
class
metalpipe.node_classes.civis_nodes.CivisSQLExecute(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeExecute a SQL statement and return the results.
-
class
metalpipe.node_classes.civis_nodes.CivisToCSV(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, include_headers=True, delimiter=', ', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeExecute a SQL statement and return the results via a CSV file.
-
class
metalpipe.node_classes.civis_nodes.EnsureCivisRedshiftTableExists(on_failure='exit', table=None, schema=None, database=None, columns=None, block=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.FindValueInRedshiftColumn(on_failure='exit', table=None, database=None, schema=None, column=None, choice='max', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.SendToCivis(*args, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, schema=None, existing_table_rows='append', include_columns=None, dummy_run=False, block=False, max_errors=0, table=None, via_staging_table=False, columns=None, staging_table=None, remap=None, recorded_tables={}, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode-
cleanup()[source]¶ Check if we’re using staging tables. If so, copy the staging table into the production table. TODO: options for merge, upsert, append, drop
-
full_table_name¶
-
Data structures module¶
Data types (e.g. Rows, Records) for ETL.
-
class
metalpipe.utils.data_structures.BOOL(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.IntermediateTypeSystem-
python_cast_function¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.DATETIME(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.IntermediateTypeSystem-
python_cast_function()¶
-
-
class
metalpipe.utils.data_structures.DataSourceTypeSystem[source]¶ Bases:
objectInformation about mapping one type system onto another contained in the children of this class.
-
class
metalpipe.utils.data_structures.DataType(value, original_type=None, name=None)[source]¶ Bases:
objectEach
DataTypegets apython_cast_function, which is a function.-
intermediate_type= None¶
-
python_cast_function= None¶
-
to_intermediate_type()[source]¶ Convert the
DataTypeto anIntermediateDataTypeusing its class’sintermediate_typeattribute.
-
type_system¶ Just for convenience to make the type system an attribute.
-
-
class
metalpipe.utils.data_structures.FLOAT(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.IntermediateTypeSystem-
python_cast_function¶ alias of
builtins.float
-
-
class
metalpipe.utils.data_structures.INTEGER(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.IntermediateTypeSystem-
python_cast_function¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.IntermediateTypeSystem[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystemNever instantiate this by hand.
-
class
metalpipe.utils.data_structures.MYSQL_BOOL(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.MySQLTypeSystem-
python_cast_function¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.MYSQL_DATE(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.MySQLTypeSystem-
python_cast_function()¶
-
-
class
metalpipe.utils.data_structures.MYSQL_ENUM(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.MySQLTypeSystem-
python_cast_function¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER0(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 0¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER1(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 1¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER10(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 10¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER1024(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 1024¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER11(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 11¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER12(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 12¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER128(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 128¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER13(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 13¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER14(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 14¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER15(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 15¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER16(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 16¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER16384(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 16384¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER17(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 17¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER18(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 18¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER19(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 19¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER2(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 2¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER20(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 20¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER2048(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 2048¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER21(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 21¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER22(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 22¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER23(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 23¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER24(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 24¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER25(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 25¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER256(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 256¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER26(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 26¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER27(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 27¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER28(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 28¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER29(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 29¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER3(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 3¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER30(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 30¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER31(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 31¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER32(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 32¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER32768(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 32768¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER4(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 4¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER4096(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 4096¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER5(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 5¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER512(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 512¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER6(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 6¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER64(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 64¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER7(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 7¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER8(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 8¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER8192(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 8192¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER9(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE-
max_length= 9¶
-
-
class
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.MySQLTypeSystem-
python_cast_function¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR0(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 0¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR1(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 1¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR10(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 10¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR1024(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 1024¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR11(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 11¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR12(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 12¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR128(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 128¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR13(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 13¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR14(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 14¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR15(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 15¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR16(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 16¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR16384(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 16384¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR17(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 17¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR18(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 18¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR19(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 19¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR2(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 2¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR20(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 20¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR2048(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 2048¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR21(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 21¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR22(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 22¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR23(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 23¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR24(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 24¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR25(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 25¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR256(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 256¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR26(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 26¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR27(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 27¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR28(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 28¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR29(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 29¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR3(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 3¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR30(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 30¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR31(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 31¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR32(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 32¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR32768(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 32768¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR4(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 4¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR4096(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 4096¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR5(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 5¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR512(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 512¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR6(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 6¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR64(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 64¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR7(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 7¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR8(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 8¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR8192(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 8192¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR9(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE-
max_length= 9¶
-
-
class
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.MySQLTypeSystem-
python_cast_function¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.MySQLTypeSystem[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystemEach
TypeSystemgets atype_mappingstatic method that takes a string and returns the class in the type system named by that string. For example,int(8)in a MySQL schema should return theMYSQL_INTEGER8class.
-
class
metalpipe.utils.data_structures.Row(*records, type_system=None)[source]¶ Bases:
objectA collection of
DataTypeobjects (typed values). They are dictionaries mapping the names of the values to theDataTypeobjects.
-
class
metalpipe.utils.data_structures.STRING(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType,metalpipe.utils.data_structures.IntermediateTypeSystem-
python_cast_function¶ alias of
builtins.str
-
Network nodes module¶
Classes that deal with sending and receiving data across the interwebs.
-
class
metalpipe.node_classes.network_nodes.HttpGetRequest(endpoint_template=None, endpoint_dict=None, protocol='http', retries=5, json=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeNode class for making simple GET requests.
-
process_item()[source]¶ The input to this function will be a dictionary-like object with parameters to be substituted into the endpoint string and a dictionary with keys and values to be passed in the GET request.
Three use-cases: 1. Endpoint and parameters set initially and never changed. 2. Endpoint and parameters set once at runtime 3. Endpoint and parameters set by upstream messages
-
-
class
metalpipe.node_classes.network_nodes.HttpGetRequestPaginator(endpoint_dict=None, json=True, pagination_get_request_key=None, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_template_key=None, default_offset_value='', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNodeNode class for HTTP API requests that require paging through sets of results.
This class handles making HTTP GET requests, determining whether there are additional results, and making additional calls if necessary. A typical case is to have an HTTP request something like this:
http://www.someapi.com/endpoint_name?resultpage=0
with a response like:
{"data": "something", "additional_pages": true, "next_page": 1}The response contains some data, a flag
additional_pagesfor determining whether there are additional results, and a parameter that gets passed to the next request for retrieving the right page of results (next_page). So the next GET request would be:http://www.someapi.com/endpoint_name?resultpage=1
This process will repeat until
additional_pagesis false.In order to use this node class, you’ll need to provide arguments that tell the node where to look for the equivalent of
additional_pagesandnext_page.endpoint_template: The parameteried URL for the API.additional_data_key: The keypath to the value in the API response that determines whether there are additional pages to request.pagination_key: The keypath to the value in the API response that contains the value that would be passed to the API to retrieve the next set of values.pagination_get_request_key: The key in theendpoint_templatethat will contain the value of thepagination_key.
For our simple example, the arguments would be
endpoint_template: http://www.someapi.com/endpoint_name?resultpage={result_page}additional_data_key: ["additional_pages"]pagination_key: ["next_page"]pagination_get_request_key: result_page
In addition to those mandatory arguments, you can also optionally specify an
endpoint_dict, which contains other values that will be substituted into theendpoint_template. For example, these APIs often have an option that controls the number of results to provide in each response, like so:http://www.someapi.com/endpoint_name?results={num_results}?resultpage={result_page}For cases like this, the value of
endpoint_dictis a dictionary mapping keys from theendpoint_templateto their values. So if you wanted to have ten results per page, you would specify:endpoint_dict = {"num_results": 10}There can be any number of other parameters specified in the
endpoint_dict.If there are other keys in the
endpoint_templatethat are not provided in theendpoint_dict, then the node will try to find them in the current message that’s being processed. For example, it is common to have some kind of security token that might be given in an environment variable. If the value of that environment variable has been provided by some upstream node and placed in the keytoken, then it would be substituted into the URL, provded that theendpoint_templatehad a place for it, such as:http://www.someapi.com/endpoint_name?auth_token={token}?resultpage={result_page}
-
class
metalpipe.node_classes.network_nodes.PaginatedHttpGetRequest(endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_get_request_key=None, protocol='http', retries=5, default_offset_value='', additional_data_test=<class 'bool'>, calling_node=None)[source]¶ Bases:
objectFor handling requests in a semi-general way that require paging through lists of results and repeatedly making GET requests.
MetalPipeMessage module¶
The MetalPipeMesaage encapsulates the content of each piece of data,
along with some useful metadata.
Trigger module¶
A simple class containing no data, which is intended merely as a trigger, signaling that the downstream node should do something.
Batch module¶
We’ll use markers to delimit batches of things, such as serialized files and that kind of thing.
MetalPipeQueue module¶
These are queues that form the directed edges between nodes.
-
class
metalpipe.node_queue.queue.MetalPipeQueue(max_queue_size, name=None)[source]¶ Bases:
object-
empty¶
-