Skip to content

Commit 821a270

Browse files
committed
fix: lint
1 parent 1e0d29e commit 821a270

File tree

7 files changed

+379
-285
lines changed

7 files changed

+379
-285
lines changed

deployment/migrations/versions/0033_1c06d0ade60c_calculate_costs_statically.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
"""
88

99
from decimal import Decimal
10-
from typing import Dict
1110
from alembic import op
1211
import sqlalchemy as sa
1312
import logging
@@ -18,7 +17,7 @@
1817
from aleph.db.accessors.messages import get_message_by_item_hash
1918
from aleph.services.cost import _is_confidential_vm, get_detailed_costs, CostComputableContent
2019
from aleph.services.pricing_utils import build_default_pricing_model
21-
from aleph.types.cost import ProductPriceType, ProductPricing
20+
from aleph.types.cost import ProductPriceType
2221
from aleph.types.db_session import DbSession
2322

2423
logger = logging.getLogger("alembic")

src/aleph/services/pricing_utils.py

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import datetime as dt
66
from typing import Dict, List, Union
77

8-
from aleph.db.accessors.aggregates import get_aggregate_elements, merge_aggregate_elements
8+
from aleph.db.accessors.aggregates import (
9+
get_aggregate_elements,
10+
merge_aggregate_elements,
11+
)
912
from aleph.db.models import AggregateElementDb
1013
from aleph.toolkit.constants import (
1114
DEFAULT_PRICE_AGGREGATE,
@@ -16,22 +19,24 @@
1619
from aleph.types.db_session import DbSession
1720

1821

19-
def build_pricing_model_from_aggregate(aggregate_content: Dict[Union[ProductPriceType, str], dict]) -> Dict[ProductPriceType, ProductPricing]:
22+
def build_pricing_model_from_aggregate(
23+
aggregate_content: Dict[Union[ProductPriceType, str], dict]
24+
) -> Dict[ProductPriceType, ProductPricing]:
2025
"""
2126
Build a complete pricing model from an aggregate content dictionary.
22-
23-
This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
24-
content into a dictionary of ProductPricing objects that can be used by the cost
27+
28+
This function converts the DEFAULT_PRICE_AGGREGATE format or any pricing aggregate
29+
content into a dictionary of ProductPricing objects that can be used by the cost
2530
calculation functions.
26-
31+
2732
Args:
2833
aggregate_content: Dictionary containing pricing information with ProductPriceType as keys
29-
34+
3035
Returns:
3136
Dictionary mapping ProductPriceType to ProductPricing objects
3237
"""
3338
pricing_model: Dict[ProductPriceType, ProductPricing] = {}
34-
39+
3540
for price_type, pricing_data in aggregate_content.items():
3641
try:
3742
price_type = ProductPriceType(price_type)
@@ -41,16 +46,17 @@ def build_pricing_model_from_aggregate(aggregate_content: Dict[Union[ProductPric
4146
except (KeyError, ValueError) as e:
4247
# Log the error but continue processing other price types
4348
import logging
49+
4450
logger = logging.getLogger(__name__)
4551
logger.warning(f"Failed to parse pricing for {price_type}: {e}")
46-
52+
4753
return pricing_model
4854

4955

5056
def build_default_pricing_model() -> Dict[ProductPriceType, ProductPricing]:
5157
"""
5258
Build the default pricing model from DEFAULT_PRICE_AGGREGATE constant.
53-
59+
5460
Returns:
5561
Dictionary mapping ProductPriceType to ProductPricing objects
5662
"""
@@ -60,55 +66,57 @@ def build_default_pricing_model() -> Dict[ProductPriceType, ProductPricing]:
6066
def get_pricing_aggregate_history(session: DbSession) -> List[AggregateElementDb]:
6167
"""
6268
Get all pricing aggregate updates in chronological order.
63-
69+
6470
Args:
6571
session: Database session
66-
72+
6773
Returns:
6874
List of AggregateElementDb objects ordered by creation_datetime
6975
"""
7076
aggregate_elements = get_aggregate_elements(
71-
session=session,
72-
owner=PRICE_AGGREGATE_OWNER,
73-
key=PRICE_AGGREGATE_KEY
77+
session=session, owner=PRICE_AGGREGATE_OWNER, key=PRICE_AGGREGATE_KEY
7478
)
7579
return list(aggregate_elements)
7680

7781

78-
def get_pricing_timeline(session: DbSession) -> List[tuple[dt.datetime, Dict[ProductPriceType, ProductPricing]]]:
82+
def get_pricing_timeline(
83+
session: DbSession,
84+
) -> List[tuple[dt.datetime, Dict[ProductPriceType, ProductPricing]]]:
7985
"""
8086
Get the complete pricing timeline with timestamps and pricing models.
81-
87+
8288
This function returns a chronologically ordered list of pricing changes,
83-
useful for processing messages in chronological order and applying the
89+
useful for processing messages in chronological order and applying the
8490
correct pricing at each point in time.
85-
91+
8692
This properly merges aggregate elements up to each point in time to create
8793
the cumulative pricing state, similar to how _update_aggregate works.
88-
94+
8995
Args:
9096
session: Database session
91-
97+
9298
Returns:
9399
List of tuples containing (timestamp, pricing_model)
94100
"""
95101
pricing_elements = get_pricing_aggregate_history(session)
96-
102+
97103
timeline = []
98-
104+
99105
# Add default pricing as the initial state
100-
timeline.append((dt.datetime.min.replace(tzinfo=dt.timezone.utc), build_default_pricing_model()))
101-
106+
timeline.append(
107+
(dt.datetime.min.replace(tzinfo=dt.timezone.utc), build_default_pricing_model())
108+
)
109+
102110
# Build cumulative pricing models by merging elements up to each timestamp
103111
elements_so_far = []
104112
for element in pricing_elements:
105113
elements_so_far.append(element)
106-
114+
107115
# Merge all elements up to this point to get the cumulative state
108116
merged_content = merge_aggregate_elements(elements_so_far)
109-
117+
110118
# Build pricing model from the merged content
111119
pricing_model = build_pricing_model_from_aggregate(merged_content)
112120
timeline.append((element.creation_datetime, pricing_model))
113-
114-
return timeline
121+
122+
return timeline

src/aleph/toolkit/constants.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1+
from typing import Dict, Union
2+
3+
from aleph.types.cost import ProductPriceType
4+
15
KiB = 1024
26
MiB = 1024 * 1024
37
GiB = 1024 * 1024 * 1024
48

59
MINUTE = 60
610
HOUR = 60 * MINUTE
711

8-
from aleph.types.cost import ProductPriceType
9-
1012
PRICE_AGGREGATE_OWNER = "0xFba561a84A537fCaa567bb7A2257e7142701ae2A"
1113
PRICE_AGGREGATE_KEY = "pricing"
1214
PRICE_PRECISION = 18
13-
DEFAULT_PRICE_AGGREGATE = {
15+
DEFAULT_PRICE_AGGREGATE: Dict[Union[ProductPriceType, str], dict] = {
1416
ProductPriceType.PROGRAM: {
1517
"price": {
1618
"storage": {"payg": "0.000000977", "holding": "0.05"},
@@ -50,7 +52,9 @@
5052
"memory_mib": 2048,
5153
},
5254
},
53-
ProductPriceType.WEB3_HOSTING: {"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}},
55+
ProductPriceType.WEB3_HOSTING: {
56+
"price": {"fixed": 50, "storage": {"holding": "0.333333333"}}
57+
},
5458
ProductPriceType.PROGRAM_PERSISTENT: {
5559
"price": {
5660
"storage": {"payg": "0.000000977", "holding": "0.05"},

src/aleph/web/controllers/prices.py

Lines changed: 51 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ async def message_price_estimate(request: web.Request):
174174

175175
async def recalculate_message_costs(request: web.Request):
176176
"""Force recalculation of message costs in chronological order with historical pricing.
177-
177+
178178
This endpoint will:
179179
1. Get all messages that need cost recalculation (if item_hash provided, just that message)
180180
2. Get the pricing timeline to track price changes over time
@@ -183,15 +183,15 @@ async def recalculate_message_costs(request: web.Request):
183183
5. Delete existing cost entries and recalculate with historical pricing
184184
6. Store the new cost calculations
185185
"""
186-
186+
187187
session_factory = get_session_factory_from_request(request)
188-
188+
189189
# Check if a specific message hash was provided
190190
item_hash_param = request.match_info.get("item_hash")
191-
191+
192192
with session_factory() as session:
193193
messages_to_recalculate: List[MessageDb] = []
194-
194+
195195
if item_hash_param:
196196
# Recalculate costs for a specific message
197197
try:
@@ -203,78 +203,97 @@ async def recalculate_message_costs(request: web.Request):
203203
# Recalculate costs for all executable messages, ordered by time (oldest first)
204204
select_stmt = (
205205
select(MessageDb)
206-
.where(MessageDb.type.in_([MessageType.instance, MessageType.program, MessageType.store]))
206+
.where(
207+
MessageDb.type.in_(
208+
[MessageType.instance, MessageType.program, MessageType.store]
209+
)
210+
)
207211
.order_by(MessageDb.time.asc())
208212
)
209213
result = session.execute(select_stmt)
210214
messages_to_recalculate = result.scalars().all()
211-
215+
212216
if not messages_to_recalculate:
213217
return web.json_response(
214-
{"message": "No messages found for cost recalculation", "recalculated_count": 0}
218+
{
219+
"message": "No messages found for cost recalculation",
220+
"recalculated_count": 0,
221+
}
215222
)
216-
223+
217224
# Get the pricing timeline to track price changes over time
218225
pricing_timeline = get_pricing_timeline(session)
219226
LOGGER.info(f"Found {len(pricing_timeline)} pricing changes in timeline")
220-
227+
221228
recalculated_count = 0
222229
errors = []
223230
current_pricing_model = None
224231
current_pricing_index = 0
225-
232+
233+
settings = _get_settings(session)
234+
226235
for message in messages_to_recalculate:
227236
try:
228237
# Find the applicable pricing model for this message's timestamp
229-
while (current_pricing_index < len(pricing_timeline) - 1 and
230-
pricing_timeline[current_pricing_index + 1][0] <= message.time):
238+
while (
239+
current_pricing_index < len(pricing_timeline) - 1
240+
and pricing_timeline[current_pricing_index + 1][0] <= message.time
241+
):
231242
current_pricing_index += 1
232-
243+
233244
current_pricing_model = pricing_timeline[current_pricing_index][1]
234245
pricing_timestamp = pricing_timeline[current_pricing_index][0]
235-
236-
LOGGER.debug(f"Message {message.item_hash} at {message.time} using pricing from {pricing_timestamp}")
237-
246+
247+
LOGGER.debug(
248+
f"Message {message.item_hash} at {message.time} using pricing from {pricing_timestamp}"
249+
)
250+
238251
# Delete existing cost entries for this message
239252
delete_costs_for_message(session, message.item_hash)
240-
253+
241254
# Get the message content and determine product type
242255
content: ExecutableContent = message.parsed_content
243-
product_type = _get_product_price_type(content, None, current_pricing_model)
244-
256+
product_type = _get_product_price_type(
257+
content, settings, current_pricing_model
258+
)
259+
245260
# Get the pricing for this specific product type
246261
if product_type not in current_pricing_model:
247-
LOGGER.warning(f"Product type {product_type} not found in pricing model for message {message.item_hash}")
262+
LOGGER.warning(
263+
f"Product type {product_type} not found in pricing model for message {message.item_hash}"
264+
)
248265
continue
249-
266+
250267
pricing = current_pricing_model[product_type]
251-
268+
252269
# Calculate new costs using the historical pricing model
253-
new_costs = get_detailed_costs(session, content, message.item_hash, pricing)
254-
270+
new_costs = get_detailed_costs(
271+
session, content, message.item_hash, pricing
272+
)
273+
255274
if new_costs:
256275
# Store the new cost calculations
257276
upsert_stmt = make_costs_upsert_query(new_costs)
258277
session.execute(upsert_stmt)
259-
278+
260279
recalculated_count += 1
261-
280+
262281
except Exception as e:
263282
error_msg = f"Failed to recalculate costs for message {message.item_hash}: {str(e)}"
264283
LOGGER.error(error_msg)
265284
errors.append({"item_hash": message.item_hash, "error": str(e)})
266-
285+
267286
# Commit all changes
268287
session.commit()
269-
288+
270289
response_data = {
271290
"message": "Cost recalculation completed with historical pricing",
272291
"recalculated_count": recalculated_count,
273292
"total_messages": len(messages_to_recalculate),
274-
"pricing_changes_found": len(pricing_timeline)
293+
"pricing_changes_found": len(pricing_timeline),
275294
}
276-
295+
277296
if errors:
278297
response_data["errors"] = errors
279-
298+
280299
return web.json_response(response_data)

src/aleph/web/controllers/routes.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,9 @@ def register_routes(app: web.Application):
6868
app.router.add_get("/api/v0/price/{item_hash}", prices.message_price)
6969
app.router.add_post("/api/v0/price/estimate", prices.message_price_estimate)
7070
app.router.add_post("/api/v0/price/recalculate", prices.recalculate_message_costs)
71-
app.router.add_post("/api/v0/price/{item_hash}/recalculate", prices.recalculate_message_costs)
71+
app.router.add_post(
72+
"/api/v0/price/{item_hash}/recalculate", prices.recalculate_message_costs
73+
)
7274

7375
app.router.add_get("/api/v0/addresses/stats.json", accounts.addresses_stats_view)
7476
app.router.add_get(

0 commit comments

Comments
 (0)