-
I'm writing a minimum viable product for derivative sync functions in order to understand them better and share here for other people to look at. It works (as in, it fetches data and creates a properly formed child data pipe) but it still produces an error/warning that I'm not sure how to fix. Here is the code: # Minimum working example of derivative sync plugins
# More samples of register, fetch, and sync can be found in the mrsm docs
required = ['pandas', 'random', 'datetime']
# Minimum register function
def register(pipe, **kw):
# Indicates which column acts as the datetime column
return {
'columns': {
# Tells mrsm that the timestamp column is the datetime column
'datetime': 'timestamp',
},
}
# Get the base data that you will do operations on
# This function gets random data.
def fetch(pipe, **kw):
# Import statements go in function body
import pandas as pd
import random
import datetime
now = datetime.datetime.now()
# Initialize dataframe
df = pd.DataFrame()
# Generate random data
for i in range(3):
data = {
'timestamp': now + datetime.timedelta(seconds=i),
'random1': random.randint(1, 100),
'random2': random.randint(101, 200)
}
# This uses just one method of populating the dataframe (concatenating successive dictionaries)
# Another method is retrieving a whole DF from some API
# Good examples are in the mrsm docs
df_new = pd.DataFrame([data])
df = pd.concat([df, df_new], ignore_index=True)
# Return fetched data
return df
# The sync function is what creates derivative pipes.
def sync(pipe, **kw):
import meerschaum as mrsm
# Get data using fetch function above
pipe.sync(fetch(pipe, **kw), **kw)
# Create child pipe
child_pipe = mrsm.Pipe(
# Carry over the original pipe's connector and metric keys
pipe.connector_keys,
pipe.metric_key,
# Name the child pipe (required; child pipe will not be created if it does not have a unique name)
'derived_pipe_1',
# Add new derived columns
# Initially set to empty lists; they are populated in the remainder of this function
columns = pipe.columns.update({'deriv_random1': [], 'deriv_random2': []})
)
# Get the data from the pipe
# alternatively: use pipe.get_backtrack_data(X) to only get data from the past X minutes
# get_backtrack_data(X) is, of course, more efficient for large datasets.
fetched_data = pipe.get_data()
# copy fetched data to child data and add 2 derived columns
# df.assign copies data and adds new specified columns based on existing data
child_data = fetched_data.assign(
# These new column names need to match the ones added above
# The parameter handed to these functions is the original dataframe (fetched_data)
# Example derived column 1: delegating the data derivation to a function defined elsewhere
deriv_random1=derive1,
# Example derived column 2: lambda function
deriv_random2=lambda row: row.random2 + 0.5
# also works: deriv_random2=df['random2'] + 0.5
)
# Add the fetched and additional data to the child pipe
return child_pipe.sync(child_data, **kw)
# Example function for creating derivative data
# Must be a function with 1 argument that represents the original dataframe, called "row" here
def derive1(row):
return row.random1 * 2
# also works: return row['random1'] * 2 Edit: Also--any suggestions for improving conciseness/readability for other people to use as a resourse? |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 2 replies
-
Hey @p-zach, thanks for opening a question! I just played around with your plugin, and the issue stems from Because syncing the parent updates the child as well, you need to exit the function if the child is synced directly. To avoid the exception, add a quick check at the top of import meerschaum as mrsm
def register(pipe: mrsm.Pipe, **kw):
return {'columns': {'datetime': 'timestamp'}}
def fetch(pipe: mrsm.Pipe, **kw):
...
def get_child_data(parent_df):
...
def sync(pipe: mrsm.Pipe, **kw):
"""
Sync the parent and child in the same process.
"""
### Only continue if we're dealing with the parent pipe.
if pipe.location_key is not None:
return True, "Success"
parent_df = fetch(pipe, **kw)
pipe.sync(parent_df, **kw)
child_pipe = mrsm.Pipe(
pipe.connector_keys, pipe.metric_key, 'child',
columns=pipe.columns, ### Optional
)
child_data = get_child_data(parent_df)
return child_pipe.sync(child_data, **kw) Also, you only need to specify the columns for the child if the About the in-function imports ― in most cases, it's the convention to import at the top of the module, but because all of the plugins are imported each time >>> from meerschaum.utils.packages import lazy_import, attempt_import
>>> ### One at a time
>>> pd = lazy_import('pandas')
>>> np = lazy_import('numpy')
>>>
>>> ### One or more at once
>>> pd, np = attempt_import('pandas', 'numpy', venv='derivative_test', deactivate=False) Finally, for code styling, I recommend reading some of the PEP8 guidelines, and for readability, I recommend getting into the habit of splitting blocks of code into as many small functions as possible. Over time, you'll get a feeling for which level of abstraction you're working in, like how the entry point function ( I hope this cleared some things up! I'm cleaning up a lot of the documentation for the v0.6.0 release (soon to be v0.6.1) which should make things easier! For example, I'm exposing the Edit: The |
Beta Was this translation helpful? Give feedback.
Hey @p-zach, thanks for opening a question! I just played around with your plugin, and the issue stems from
sync pipes
selecting both the parent and child pipes, executing the plugin twice at the same time. The first sync works as expected because the child doesn't yet exist, but because the child has different columns from the parent (i.e. the warning about DataFrames' shapes before the error), syncing the child pipe directly fails on line 51.Because syncing the parent updates the child as well, you need to exit the function if the child is synced directly. To avoid the exception, add a quick check at the top of
sync(pipe)
to ensure thatpipe
is actually the parent: