Skip to content

Commit a55ecfd

Browse files
committed
add insight aggregations tools
1 parent c6c2bb1 commit a55ecfd

File tree

2 files changed

+219
-1
lines changed

2 files changed

+219
-1
lines changed

python/thirdweb-ai/src/thirdweb_ai/common/utils.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,52 @@ def filter_response_keys(items: list[dict[str, Any]], keys_to_keep: list[str] |
7777
for key in keys_to_remove:
7878
item.pop(key, None)
7979
return items
80+
81+
82+
# Aggregation function validation
83+
VALID_EVENT_AGGREGATIONS = [
84+
"count()",
85+
"countDistinct(address)",
86+
"countDistinct(contract_address)",
87+
"countDistinct(transaction_hash)",
88+
"min(block_number)",
89+
"max(block_number)",
90+
"min(block_timestamp)",
91+
"max(block_timestamp)",
92+
]
93+
94+
VALID_TRANSACTION_AGGREGATIONS = [
95+
"count()",
96+
"countDistinct(from_address)",
97+
"countDistinct(to_address)",
98+
"countDistinct(contract_address)",
99+
"sum(value)",
100+
"avg(value)",
101+
"min(value)",
102+
"max(value)",
103+
"min(block_number)",
104+
"max(block_number)",
105+
"min(block_timestamp)",
106+
"max(block_timestamp)",
107+
]
108+
109+
110+
def validate_aggregation(agg: str, valid_aggregations: list[str]) -> str:
111+
"""Validate an aggregation function string."""
112+
# Handle aliases like "count() as event_count"
113+
base_agg = agg.split(" as ")[0].strip()
114+
115+
if base_agg not in valid_aggregations:
116+
raise ValueError(f"Invalid aggregation function: {base_agg}. Valid options: {valid_aggregations}")
117+
118+
return agg
119+
120+
121+
def validate_event_aggregation(agg: str) -> str:
122+
"""Validate an event aggregation function."""
123+
return validate_aggregation(agg, VALID_EVENT_AGGREGATIONS)
124+
125+
126+
def validate_transaction_aggregation(agg: str) -> str:
127+
"""Validate a transaction aggregation function."""
128+
return validate_aggregation(agg, VALID_TRANSACTION_AGGREGATIONS)

python/thirdweb-ai/src/thirdweb_ai/services/insight.py

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
validate_signature,
77
validate_transaction_hash,
88
)
9-
from thirdweb_ai.common.utils import EVENT_KEYS_TO_KEEP, TRANSACTION_KEYS_TO_KEEP, clean_resolve, filter_response_keys
9+
from thirdweb_ai.common.utils import (
10+
EVENT_KEYS_TO_KEEP,
11+
TRANSACTION_KEYS_TO_KEEP,
12+
VALID_EVENT_AGGREGATIONS,
13+
VALID_TRANSACTION_AGGREGATIONS,
14+
clean_resolve,
15+
filter_response_keys,
16+
validate_event_aggregation,
17+
validate_transaction_aggregation,
18+
)
1019
from thirdweb_ai.services.service import Service
1120
from thirdweb_ai.tools.tool import tool
1221

@@ -409,3 +418,163 @@ def decode_signature(
409418
signature = validate_signature(signature)
410419
out = self._get(f"resolve/{signature}", params)
411420
return clean_resolve(out)
421+
422+
@tool(
423+
description="Aggregate blockchain events with powerful grouping and aggregation options. Use this to get event counts, sums, or other aggregations grouped by fields like address, block, or time period."
424+
)
425+
def aggregate_events(
426+
self,
427+
aggregate: Annotated[
428+
list[str],
429+
f"Aggregation functions to apply. Valid options: {', '.join(VALID_EVENT_AGGREGATIONS)}. Can include aliases like 'count() as event_count'.",
430+
],
431+
group_by: Annotated[
432+
Literal["address", "contract_address", "from_address", "block_number", "event_signature"] | None,
433+
"Field to group events by for aggregation. Required when using aggregate functions.",
434+
] = None,
435+
chain_id: Annotated[
436+
list[int] | int | None,
437+
"Chain ID(s) to query (e.g., 1 for Ethereum Mainnet, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries (max 5).",
438+
] = None,
439+
contract_address: Annotated[
440+
str | None,
441+
"Contract address to filter events by (e.g., '0x1234...'). Only return events emitted by this contract.",
442+
] = None,
443+
transaction_hash: Annotated[
444+
str | None,
445+
"Specific transaction hash to filter events by (e.g., '0xabc123...'). Useful for examining events in a particular transaction.",
446+
] = None,
447+
event_signature: Annotated[
448+
str | None,
449+
"Event signature to filter by (human-readable, e.g., 'Transfer(address,address,uint256)').",
450+
] = None,
451+
limit: Annotated[
452+
int | None,
453+
"Number of aggregated results to return (default 20, max 100).",
454+
] = 20,
455+
page: Annotated[
456+
int | None,
457+
"Page number for paginated results, starting from 0. 20 results are returned per page.",
458+
] = None,
459+
) -> dict[str, Any]:
460+
# Validate aggregation functions
461+
validated_aggregate = [validate_event_aggregation(agg) for agg in aggregate]
462+
463+
params: dict[str, Any] = {
464+
"sort_by": "block_number",
465+
"sort_order": "desc",
466+
"decode": True,
467+
"aggregate": validated_aggregate,
468+
}
469+
470+
if group_by:
471+
params["group_by"] = "address" if group_by == "contract_address" else group_by
472+
473+
chain_ids = chain_id if chain_id is not None else self.chain_ids
474+
if chain_ids:
475+
params["chain"] = chain_ids
476+
if contract_address:
477+
params["filter_address"] = validate_address(contract_address)
478+
if transaction_hash:
479+
params["filter_transaction_hash"] = validate_transaction_hash(transaction_hash)
480+
if event_signature:
481+
params["filter_event_signature"] = event_signature
482+
if limit:
483+
params["limit"] = limit
484+
if page:
485+
params["page"] = page
486+
487+
out = self._get("events", params)
488+
489+
# Clean up response by removing chain_id from aggregations if present
490+
if out.get("aggregations"):
491+
for agg in out["aggregations"]:
492+
if isinstance(agg, dict):
493+
for value in agg.values():
494+
if isinstance(value, dict) and "chain_id" in value:
495+
value.pop("chain_id")
496+
497+
return out
498+
499+
@tool(
500+
description="Aggregate blockchain transactions with powerful grouping and aggregation options. Use this for transaction counts, volumes, sums, and other analytics grouped by address, block, or time period."
501+
)
502+
def aggregate_transactions(
503+
self,
504+
aggregate: Annotated[
505+
list[str],
506+
f"Aggregation functions to apply. Valid options: {', '.join(VALID_TRANSACTION_AGGREGATIONS)}. Can include aliases like 'count() as tx_count' or 'sum(value) as total_value'.",
507+
],
508+
group_by: Annotated[
509+
Literal["from_address", "to_address", "block_number"] | None,
510+
"Field to group transactions by for aggregation. Required when using aggregate functions.",
511+
] = None,
512+
chain_id: Annotated[
513+
list[int] | int | None,
514+
"Chain ID(s) to query (e.g., 1 for Ethereum, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries.",
515+
] = None,
516+
from_address: Annotated[
517+
str | None,
518+
"Filter transactions sent from this address (e.g., '0x1234...'). Useful for tracking outgoing transactions from a wallet.",
519+
] = None,
520+
to_address: Annotated[
521+
str | None,
522+
"Filter transactions sent to this address (e.g., '0x1234...'). Useful for tracking incoming transactions to a contract or wallet.",
523+
] = None,
524+
function_signature: Annotated[
525+
str | None,
526+
"Function signature to filter by (e.g., 'approve(address,uint256)').",
527+
] = None,
528+
value_above: Annotated[
529+
int | None,
530+
"Filter for transactions with value above this amount (in wei - base blockchain units).",
531+
] = None,
532+
limit: Annotated[
533+
int | None,
534+
"Number of aggregated results to return (default 20, max 100).",
535+
] = 20,
536+
page: Annotated[
537+
int | None,
538+
"Page number for paginated results, starting from 0. 20 results are returned per page.",
539+
] = None,
540+
) -> dict[str, Any]:
541+
# Validate aggregation functions
542+
validated_aggregate = [validate_transaction_aggregation(agg) for agg in aggregate]
543+
544+
params: dict[str, Any] = {
545+
"sort_by": "block_number",
546+
"sort_order": "desc",
547+
"decode": True,
548+
"aggregate": validated_aggregate,
549+
}
550+
551+
if group_by:
552+
params["group_by"] = group_by
553+
554+
chain_ids = chain_id if chain_id is not None else self.chain_ids
555+
if chain_ids:
556+
params["chain"] = chain_ids
557+
if from_address:
558+
params["filter_from_address"] = validate_address(from_address)
559+
if to_address:
560+
params["filter_to_address"] = validate_address(to_address)
561+
if function_signature:
562+
params["filter_function_signature"] = function_signature
563+
if value_above:
564+
params["value_gte"] = value_above
565+
if limit:
566+
params["limit"] = limit
567+
if page:
568+
params["page"] = page
569+
570+
out = self._get("transactions", params)
571+
572+
# Clean up response by removing chain_id from aggregations if present
573+
if out.get("aggregations"):
574+
for agg in out["aggregations"]:
575+
if isinstance(agg, dict):
576+
for value in agg.values():
577+
if isinstance(value, dict) and "chain_id" in value:
578+
value.pop("chain_id")
579+
580+
return out

0 commit comments

Comments
 (0)