API Documentation¶
Node module¶
The node
module contains the MetalNode
class, which is the foundation
for MetalPipe.
-
class
metalpipe.node.
AggregateValues
(values=False, tail_path=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Does that.
-
class
metalpipe.node.
BatchMessages
(batch_size=None, batch_list=None, counter=0, timeout=5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
CSVReader
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
CSVToDictionaryList
(**kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
ConstantEmitter
(thing=None, max_loops=5, delay=0.5, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Send a thing every n seconds
-
class
metalpipe.node.
CounterOfThings
(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
DynamicClassMediator
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
Filter
(test=None, test_keypath=None, value=True, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Applies tests to each message and filters out messages that don’t pass
- Built-in tests:
- key_exists value_is_true value_is_not_none
Example
- {‘test’: ‘key_exists’,
- ‘key’: mykey}
-
class
metalpipe.node.
FunctionOfMessage
(function_name, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
GetEnvironmentVariables
(mappings=None, environment_variables=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
This node reads environment variables and stores them in the message.
The required keyword argument for this node is
environment_variables
, which is a list of – you guessed it! – environment variables. By default, they will be read and stored in the outgoing message under keys with the same names as the environment variables. E.g.FOO_VAR
will be stored in the message{"FOO_BAR": whatever}
.Optionally, you can provide a dictionary to the
mappings
keyword argument, which maps environment variable names to new names. E.g. ifmappings = {"FOO_VAR": "bar_var"}
, then the value ofFOO_VAR
will be stored in the message{"bar_var": whatever}
.If the environment variable is not defined, then its value will be set to
None
.Parameters: - mappings (dict) – An optional dictionary mapping environment variable names to new names.
- environment_variables (list) – A list of environment variable names.
-
class
metalpipe.node.
InsertData
(overwrite=True, overwrite_if_null=True, value_dict=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
LocalDirectoryWatchdog
(directory='.', check_interval=3, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
LocalFileReader
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
MetalNode
(*args, batch=False, get_runtime_attrs=<function no_op>, get_runtime_attrs_args=None, get_runtime_attrs_kwargs=None, runtime_attrs_destinations=None, input_mapping=None, retain_input=True, throttle=0, keep_alive=True, max_errors=0, max_messages_received=None, name=None, input_message_keypath=None, key=None, messages_received_counter=0, prefer_existing_value=False, messages_sent_counter=0, post_process_function=None, post_process_keypath=None, summary='', fixturize=False, post_process_function_kwargs=None, output_key=None, break_test=None, **kwargs)[source]¶ Bases:
object
The foundational class of MetalPipe. This class is inherited by all nodes in a computation graph.
Order of operations: 1. Child class
__init__
function 2.MetalNode
__init__
function 3.preflight_function
(Specified in initialization params) 4.setup
5. startThese methods have the following intended uses:
__init__
Sets attribute values and calls theMetalNode
__init__
method.get_runtime_attrs
Sets any attribute values that are to be determined at runtime, e.g. by checking environment variables or reading values from a database. Theget_runtime_attrs
should return a dictionary of attributes -> values, or elseNone
.setup
Sets the state of theMetalNode
and/or creates any attributes that require information available only at runtime.
Parameters: - send_batch_markers – If
True
, then aBatchStart
marker will be sent when a new input is received, and aBatchEnd
will be sent after the input has been processed. The intention is that a number of items will be emitted for each input received. For example, we might emit a table row-by-row for each input. - get_runtime_attrs – A function that returns a dictionary-like object.
The keys and values will be saved to this
MetalNode
object’s attributes. The function is executed one time, upon starting the node. - get_runtime_attrs_args – A tuple of arguments to be passed to the
get_runtime_attrs
function upon starting the node. - get_runtime_attrs_kwargs – A dictionary of kwargs passed to the
get_runtime_attrs
function. - runtime_attrs_destinations – If set, this is a dictionary mapping
the keys returned from the
get_runtime_attrs
function to the names of the attributes to which the values will be saved. - throttle – For each input received, a delay of
throttle
seconds will be added. - keep_alive – If
True
, keep the node’s thread alive after everything has been processed. - name – The name of the node. Defaults to a randomly generated hash. Note that this hash is not consistent from one run to the next.
- input_mapping – When the node receives a dictionary-like object, this dictionary will cause the keys of the dictionary to be remapped to new keys.
- retain_input – If
True
, then combine the dictionary-like input with the output. If keys clash, the output value will be kept. - input_message_keypath – Read the value in this keypath as the content of the incoming message.
-
add_edge
(target, **kwargs)[source]¶ Create an edge connecting self to target.
This method instantiates the
MetalPipeQueue
object that connects the nodes. Connecting the nodes together consists in (1) adding the queue to the other’sinput_queue_list
oroutput_queue_list
and (2) setting the queue’ssource_node
andtarget_node
attributes.Parameters: target ( MetalNode
) – The node to whichself
will be connected.
-
all_connected
(seen=None)[source]¶ Returns all the nodes connected (directly or indirectly) to
self
. This allows us to loop over all the nodes in a pipeline even if we have a handle on only one. This is used byglobal_start
, for example.Parameters: seen (set) – A set of all the nodes that have been identified as connected to self
.Returns: - All the nodes connected to
self
. This - includes
self
.
Return type: (set of MetalNode
)- All the nodes connected to
-
broadcast
(broadcast_message)[source]¶ Puts the message into all the input queues for all connected nodes.
-
cleanup
()[source]¶ If there is any cleanup (closing files, shutting down database connections), necessary when the node is stopped, then the node’s class should provide a
cleanup
method. By default, the method is just a logging statement.
-
global_start
(prometheus=False, pipeline_name=None, max_time=None, fixturize=False)[source]¶ Starts every node connected to
self
. Mainly, it: 1. callsstart()
on each node #. sets some global variables #. optionally starts some experimental code for monitoring
-
input_queue_size
¶ Return the total number of items in all of the queues that are inputs to this node.
-
is_sink
¶ Tests whether the node is a sink or not, i.e. whether there are no outputs from the node.
Returns: True
if the node has no output nodes,False
otherwise.Return type: (bool)
-
is_source
¶ Tests whether the node is a source or not, i.e. whether there are no inputs to the node.
Returns: True
if the node has no inputs,False
otherwise.Return type: (bool)
-
logjam
¶ Returns the logjam score, which measures the degree to which the node is holding up progress in downstream nodes.
We’re defining a logjam as a node whose input queue is full, but whose output queue(s) is not. More specifically, we poll each node in the
monitor_thread
, and increment a counter if the node is a logjam at that time. This property returns the percentage of samples in which the node is a logjam. Our intention is that if this score exceeds a threshold, the user is alerted, or the load is rebalanced somehow (not yet implemented).Returns: Logjam score Return type: (float)
-
pipeline_finished
¶
-
setup
()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__
function is called. TheMetalNode
base class’s setup function is just a logging call.It should be unusual to have to make use of
setup
because in practice, initialization can be done in the__init__
function.
-
start
()[source]¶ Starts the node. This is called by
MetalNode.global_start()
.The node’s main loop is contained in this method. The main loop does the following:
- records the timestamp to the node’s
started_at
attribute. - calls
get_runtime_attrs
(TODO: check if we can deprecate this) - calls the
setup
method for the class (which is a no-op by default) - if the node is a source, then successively yield all the results of
the node’s
generator
method, then exit. - if the node is not a source, then loop over the input queues, getting
the next message. Note that when the message is pulled from the queue,
the
MetalPipeQueue
yields it as a dictionary. - gets either the content of the entire message if the node has no
key
attribute, or the value ofmessage[self.key]
. - remaps the message content if a
remapping
dictionary has been given in the node’s configuration - calls the node’s
process_item
method, yielding back the results. (Note that a single input message may cause the node to yield zero, one, or more than one output message.) - places the results into each of the node’s output queues.
- records the timestamp to the node’s
-
terminate_pipeline
(error=False)[source]¶ This method can be called on any node in a pipeline, and it will cause all of the nodes to terminate if they haven’t stopped already.
Parameters: error (bool) – Not yet implemented.
-
thread_monitor
(max_time=None)[source]¶ This function loops over all of the threads in the pipeline, checking that they are either
finished
orrunning
. If any have had an abnormal exit, terminate the entire pipeline.
-
time_running
¶ Return the number of wall-clock seconds elapsed since the node was started.
-
class
metalpipe.node.
NothingToSeeHere
[source]¶ Bases:
object
Vacuous class used as a no-op message type.
-
class
metalpipe.node.
PrinterOfThings
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
RandomSample
(sample=0.1)[source]¶ Bases:
metalpipe.node.MetalNode
Lets through only a random sample of incoming messages. Might be useful for testing, or when only approximate results are necessary.
-
class
metalpipe.node.
Remapper
(mapping=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
SequenceEmitter
(sequence, *args, max_sequences=1, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Emits
sequence
max_sequences
times, or forever ifmax_sequences
isNone
.
-
class
metalpipe.node.
Serializer
(values=False, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Takes an iterable thing as input, and successively yields its items.
-
class
metalpipe.node.
SimpleTransforms
(missing_keypath_action='ignore', starting_path=None, transform_mapping=None, target_value=None, keypath=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
StreamMySQLTable
(*args, host='localhost', user=None, table=None, password=None, database=None, port=3306, to_row_obj=False, send_batch_markers=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
setup
()[source]¶ For classes that require initialization at runtime, which can’t be done when the class’s
__init__
function is called. TheMetalNode
base class’s setup function is just a logging call.It should be unusual to have to make use of
setup
because in practice, initialization can be done in the__init__
function.
-
-
class
metalpipe.node.
StreamingJoin
(window=30, streams=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Joins two streams on a key, using exact match only. MVP.
-
class
metalpipe.node.
SubstituteRegex
(match_regex=None, substitute_string=None, *args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node.
TimeWindowAccumulator
(*args, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Every N seconds, put the latest M seconds data on the queue.
-
class
metalpipe.node.
bcolors
[source]¶ Bases:
object
This class holds the values for the various colors that are used in the tables that monitor the status of the nodes.
-
BOLD
= '\x1b[1m'¶
-
ENDC
= '\x1b[0m'¶
-
FAIL
= '\x1b[91m'¶
-
HEADER
= '\x1b[95m'¶
-
OKBLUE
= '\x1b[94m'¶
-
OKGREEN
= '\x1b[92m'¶
-
UNDERLINE
= '\x1b[4m'¶
-
WARNING
= '\x1b[93m'¶
-
Civis-specific node types¶
This is where any classes specific to the Civis API live.
-
class
metalpipe.node_classes.civis_nodes.
CivisSQLExecute
(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Execute a SQL statement and return the results.
-
class
metalpipe.node_classes.civis_nodes.
CivisToCSV
(*args, sql=None, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, dummy_run=False, query_dict=None, returned_columns=None, include_headers=True, delimiter=', ', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Execute a SQL statement and return the results via a CSV file.
-
class
metalpipe.node_classes.civis_nodes.
EnsureCivisRedshiftTableExists
(on_failure='exit', table=None, schema=None, database=None, columns=None, block=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.
FindValueInRedshiftColumn
(on_failure='exit', table=None, database=None, schema=None, column=None, choice='max', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
class
metalpipe.node_classes.civis_nodes.
SendToCivis
(*args, civis_api_key=None, civis_api_key_env_var='CIVIS_API_KEY', database=None, schema=None, existing_table_rows='append', include_columns=None, dummy_run=False, block=False, max_errors=0, table=None, via_staging_table=False, columns=None, staging_table=None, remap=None, recorded_tables={}, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
-
cleanup
()[source]¶ Check if we’re using staging tables. If so, copy the staging table into the production table. TODO: options for merge, upsert, append, drop
-
full_table_name
¶
-
Data structures module¶
Data types (e.g. Rows, Records) for ETL.
-
class
metalpipe.utils.data_structures.
BOOL
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.
DATETIME
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
()¶
-
-
class
metalpipe.utils.data_structures.
DataSourceTypeSystem
[source]¶ Bases:
object
Information about mapping one type system onto another contained in the children of this class.
-
class
metalpipe.utils.data_structures.
DataType
(value, original_type=None, name=None)[source]¶ Bases:
object
Each
DataType
gets apython_cast_function
, which is a function.-
intermediate_type
= None¶
-
python_cast_function
= None¶
-
to_intermediate_type
()[source]¶ Convert the
DataType
to anIntermediateDataType
using its class’sintermediate_type
attribute.
-
type_system
¶ Just for convenience to make the type system an attribute.
-
-
class
metalpipe.utils.data_structures.
FLOAT
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.float
-
-
class
metalpipe.utils.data_structures.
INTEGER
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.
IntermediateTypeSystem
[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystem
Never instantiate this by hand.
-
class
metalpipe.utils.data_structures.
MYSQL_BOOL
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.bool
-
-
class
metalpipe.utils.data_structures.
MYSQL_DATE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
()¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_ENUM
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER0
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 0¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER1
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 1¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER10
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 10¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER1024
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 1024¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER11
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 11¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER12
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 12¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER128
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 128¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER13
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 13¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER14
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 14¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER15
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 15¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER16
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 16¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER16384
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 16384¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER17
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 17¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER18
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 18¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER19
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 19¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER2
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 2¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER20
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 20¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER2048
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 2048¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER21
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 21¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER22
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 22¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER23
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 23¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER24
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 24¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER25
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 25¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER256
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 256¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER26
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 26¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER27
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 27¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER28
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 28¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER29
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 29¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER3
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 3¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER30
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 30¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER31
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 31¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER32
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 32¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER32768
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 32768¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER4
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 4¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER4096
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 4096¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER5
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 5¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER512
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 512¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER6
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 6¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER64
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 64¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER7
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 7¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER8
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 8¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER8192
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 8192¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER9
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_INTEGER_BASE
-
max_length
= 9¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_INTEGER_BASE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.int
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR0
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 0¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR1
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 1¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR10
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 10¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR1024
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 1024¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR11
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 11¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR12
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 12¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR128
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 128¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR13
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 13¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR14
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 14¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR15
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 15¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR16
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 16¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR16384
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 16384¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR17
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 17¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR18
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 18¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR19
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 19¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR2
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 2¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR20
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 20¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR2048
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 2048¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR21
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 21¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR22
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 22¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR23
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 23¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR24
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 24¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR25
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 25¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR256
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 256¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR26
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 26¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR27
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 27¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR28
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 28¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR29
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 29¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR3
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 3¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR30
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 30¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR31
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 31¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR32
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 32¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR32768
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 32768¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR4
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 4¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR4096
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 4096¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR5
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 5¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR512
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 512¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR6
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 6¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR64
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 64¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR7
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 7¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR8
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 8¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR8192
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 8192¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR9
(value, original_type=None, name=None)¶ Bases:
metalpipe.utils.data_structures.MYSQL_VARCHAR_BASE
-
max_length
= 9¶
-
-
class
metalpipe.utils.data_structures.
MYSQL_VARCHAR_BASE
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.MySQLTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
-
class
metalpipe.utils.data_structures.
MySQLTypeSystem
[source]¶ Bases:
metalpipe.utils.data_structures.DataSourceTypeSystem
Each
TypeSystem
gets atype_mapping
static method that takes a string and returns the class in the type system named by that string. For example,int(8)
in a MySQL schema should return theMYSQL_INTEGER8
class.
-
class
metalpipe.utils.data_structures.
Row
(*records, type_system=None)[source]¶ Bases:
object
A collection of
DataType
objects (typed values). They are dictionaries mapping the names of the values to theDataType
objects.
-
class
metalpipe.utils.data_structures.
STRING
(value, original_type=None, name=None)[source]¶ Bases:
metalpipe.utils.data_structures.DataType
,metalpipe.utils.data_structures.IntermediateTypeSystem
-
python_cast_function
¶ alias of
builtins.str
-
Network nodes module¶
Classes that deal with sending and receiving data across the interwebs.
-
class
metalpipe.node_classes.network_nodes.
HttpGetRequest
(endpoint_template=None, endpoint_dict=None, protocol='http', retries=5, json=True, **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Node class for making simple GET requests.
-
process_item
()[source]¶ The input to this function will be a dictionary-like object with parameters to be substituted into the endpoint string and a dictionary with keys and values to be passed in the GET request.
Three use-cases: 1. Endpoint and parameters set initially and never changed. 2. Endpoint and parameters set once at runtime 3. Endpoint and parameters set by upstream messages
-
-
class
metalpipe.node_classes.network_nodes.
HttpGetRequestPaginator
(endpoint_dict=None, json=True, pagination_get_request_key=None, endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_template_key=None, default_offset_value='', **kwargs)[source]¶ Bases:
metalpipe.node.MetalNode
Node class for HTTP API requests that require paging through sets of results.
This class handles making HTTP GET requests, determining whether there are additional results, and making additional calls if necessary. A typical case is to have an HTTP request something like this:
http://www.someapi.com/endpoint_name?resultpage=0
with a response like:
{"data": "something", "additional_pages": true, "next_page": 1}
The response contains some data, a flag
additional_pages
for determining whether there are additional results, and a parameter that gets passed to the next request for retrieving the right page of results (next_page
). So the next GET request would be:http://www.someapi.com/endpoint_name?resultpage=1
This process will repeat until
additional_pages
is false.In order to use this node class, you’ll need to provide arguments that tell the node where to look for the equivalent of
additional_pages
andnext_page
.endpoint_template
: The parameteried URL for the API.additional_data_key
: The keypath to the value in the API response that determines whether there are additional pages to request.pagination_key
: The keypath to the value in the API response that contains the value that would be passed to the API to retrieve the next set of values.pagination_get_request_key
: The key in theendpoint_template
that will contain the value of thepagination_key
.
For our simple example, the arguments would be
endpoint_template: http://www.someapi.com/endpoint_name?resultpage={result_page}
additional_data_key: ["additional_pages"]
pagination_key: ["next_page"]
pagination_get_request_key: result_page
In addition to those mandatory arguments, you can also optionally specify an
endpoint_dict
, which contains other values that will be substituted into theendpoint_template
. For example, these APIs often have an option that controls the number of results to provide in each response, like so:http://www.someapi.com/endpoint_name?results={num_results}?resultpage={result_page}
For cases like this, the value of
endpoint_dict
is a dictionary mapping keys from theendpoint_template
to their values. So if you wanted to have ten results per page, you would specify:endpoint_dict = {"num_results": 10}
There can be any number of other parameters specified in the
endpoint_dict
.If there are other keys in the
endpoint_template
that are not provided in theendpoint_dict
, then the node will try to find them in the current message that’s being processed. For example, it is common to have some kind of security token that might be given in an environment variable. If the value of that environment variable has been provided by some upstream node and placed in the keytoken
, then it would be substituted into the URL, provded that theendpoint_template
had a place for it, such as:http://www.someapi.com/endpoint_name?auth_token={token}?resultpage={result_page}
-
class
metalpipe.node_classes.network_nodes.
PaginatedHttpGetRequest
(endpoint_template=None, additional_data_key=None, pagination_key=None, pagination_get_request_key=None, protocol='http', retries=5, default_offset_value='', additional_data_test=<class 'bool'>, calling_node=None)[source]¶ Bases:
object
For handling requests in a semi-general way that require paging through lists of results and repeatedly making GET requests.
MetalPipeMessage module¶
The MetalPipeMesaage
encapsulates the content of each piece of data,
along with some useful metadata.
Trigger module¶
A simple class containing no data, which is intended merely as a trigger, signaling that the downstream node should do something.
Batch module¶
We’ll use markers to delimit batches of things, such as serialized files and that kind of thing.
MetalPipeQueue module¶
These are queues that form the directed edges between nodes.
-
class
metalpipe.node_queue.queue.
MetalPipeQueue
(max_queue_size, name=None)[source]¶ Bases:
object
-
empty
¶
-