1
+ """
2
+ Utility functions for pricing model creation and management.
3
+ """
4
+
5
+ import datetime as dt
6
+ from typing import Dict , List , Union
7
+
8
+ from aleph .db .accessors .aggregates import get_aggregate_elements , merge_aggregate_elements
9
+ from aleph .db .models import AggregateElementDb
10
+ from aleph .toolkit .constants import (
11
+ DEFAULT_PRICE_AGGREGATE ,
12
+ PRICE_AGGREGATE_KEY ,
13
+ PRICE_AGGREGATE_OWNER ,
14
+ )
15
+ from aleph .types .cost import ProductPriceType , ProductPricing
16
+ from aleph .types .db_session import DbSession
17
+
18
+
19
+ def build_pricing_model_from_aggregate (aggregate_content : Dict [Union [ProductPriceType , str ], dict ]) -> Dict [ProductPriceType , ProductPricing ]:
20
+ """
21
+ Build a complete pricing model from an aggregate content dictionary.
22
+
23
+ This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
24
+ content into a dictionary of ProductPricing objects that can be used by the cost
25
+ calculation functions.
26
+
27
+ Args:
28
+ aggregate_content: Dictionary containing pricing information with ProductPriceType as keys
29
+
30
+ Returns:
31
+ Dictionary mapping ProductPriceType to ProductPricing objects
32
+ """
33
+ pricing_model : Dict [ProductPriceType , ProductPricing ] = {}
34
+
35
+ for price_type , pricing_data in aggregate_content .items ():
36
+ try :
37
+ price_type = ProductPriceType (price_type )
38
+ pricing_model [price_type ] = ProductPricing .from_aggregate (
39
+ price_type , aggregate_content
40
+ )
41
+ except (KeyError , ValueError ) as e :
42
+ # Log the error but continue processing other price types
43
+ import logging
44
+ logger = logging .getLogger (__name__ )
45
+ logger .warning (f"Failed to parse pricing for { price_type } : { e } " )
46
+
47
+ return pricing_model
48
+
49
+
50
+ def build_default_pricing_model () -> Dict [ProductPriceType , ProductPricing ]:
51
+ """
52
+ Build the default pricing model from DEFAULT_PRICE_AGGREGATE constant.
53
+
54
+ Returns:
55
+ Dictionary mapping ProductPriceType to ProductPricing objects
56
+ """
57
+ return build_pricing_model_from_aggregate (DEFAULT_PRICE_AGGREGATE )
58
+
59
+
60
+ def get_pricing_aggregate_history (session : DbSession ) -> List [AggregateElementDb ]:
61
+ """
62
+ Get all pricing aggregate updates in chronological order.
63
+
64
+ Args:
65
+ session: Database session
66
+
67
+ Returns:
68
+ List of AggregateElementDb objects ordered by creation_datetime
69
+ """
70
+ aggregate_elements = get_aggregate_elements (
71
+ session = session ,
72
+ owner = PRICE_AGGREGATE_OWNER ,
73
+ key = PRICE_AGGREGATE_KEY
74
+ )
75
+ return list (aggregate_elements )
76
+
77
+
78
+ def get_pricing_timeline (session : DbSession ) -> List [tuple [dt .datetime , Dict [ProductPriceType , ProductPricing ]]]:
79
+ """
80
+ Get the complete pricing timeline with timestamps and pricing models.
81
+
82
+ This function returns a chronologically ordered list of pricing changes,
83
+ useful for processing messages in chronological order and applying the
84
+ correct pricing at each point in time.
85
+
86
+ This properly merges aggregate elements up to each point in time to create
87
+ the cumulative pricing state, similar to how _update_aggregate works.
88
+
89
+ Args:
90
+ session: Database session
91
+
92
+ Returns:
93
+ List of tuples containing (timestamp, pricing_model)
94
+ """
95
+ pricing_elements = get_pricing_aggregate_history (session )
96
+
97
+ timeline = []
98
+
99
+ # Add default pricing as the initial state
100
+ timeline .append ((dt .datetime .min .replace (tzinfo = dt .timezone .utc ), build_default_pricing_model ()))
101
+
102
+ # Build cumulative pricing models by merging elements up to each timestamp
103
+ elements_so_far = []
104
+ for element in pricing_elements :
105
+ elements_so_far .append (element )
106
+
107
+ # Merge all elements up to this point to get the cumulative state
108
+ merged_content = merge_aggregate_elements (elements_so_far )
109
+
110
+ # Build pricing model from the merged content
111
+ pricing_model = build_pricing_model_from_aggregate (merged_content )
112
+ timeline .append ((element .creation_datetime , pricing_model ))
113
+
114
+ return timeline
0 commit comments