Skip to content

add insight aggregations tools #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open

add insight aggregations tools #41

wants to merge 1 commit into from

Conversation

cjber
Copy link
Contributor

@cjber cjber commented Jun 25, 2025

Summary by CodeRabbit

  • New Features
    • Introduced advanced aggregation capabilities for blockchain events and transactions, allowing users to specify aggregation functions, grouping fields, and filters for more flexible data analysis.
  • Bug Fixes
    • Improved input validation for aggregation functions to ensure only supported operations are accepted.

Copy link

coderabbitai bot commented Jun 25, 2025

Walkthrough

The changes introduce aggregation validation utilities and two new aggregation methods in the Insight service. The utilities define valid aggregation functions and provide validation logic. The new methods, aggregate_events and aggregate_transactions, enable advanced querying with aggregation, grouping, and filtering for events and transactions, incorporating the new validation logic.

Changes

File(s) Change Summary
python/thirdweb-ai/src/thirdweb_ai/common/utils.py Added constants for valid aggregation functions and functions for validating event and transaction aggregations.
python/thirdweb-ai/src/thirdweb_ai/services/insight.py Added aggregate_events and aggregate_transactions methods with aggregation, grouping, and filtering logic.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Insight
    participant Utils
    participant API

    User->>Insight: aggregate_events(...) or aggregate_transactions(...)
    Insight->>Utils: validate_*_aggregation(aggregate)
    Utils-->>Insight: Validated aggregation(s)
    Insight->>API: _get("events"/"transactions", params)
    API-->>Insight: Aggregated results
    Insight->>Insight: Clean up response (remove chain_id)
    Insight-->>User: Aggregated data
Loading
✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Explain this complex logic.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
python/thirdweb-ai/src/thirdweb_ai/common/utils.py (1)

110-118: Improve validation logic for edge cases.

The validation function handles basic alias parsing well, but could be more robust.

Consider these improvements:

 def validate_aggregation(agg: str, valid_aggregations: list[str]) -> str:
     """Validate an aggregation function string."""
+    if not agg or not agg.strip():
+        raise ValueError("Aggregation function cannot be empty")
+    
     # Handle aliases like "count() as event_count"
-    base_agg = agg.split(" as ")[0].strip()
+    base_agg = agg.split(" as ")[0].strip().lower()
 
-    if base_agg not in valid_aggregations:
+    # Case-insensitive comparison for better usability
+    valid_lower = [v.lower() for v in valid_aggregations]
+    if base_agg not in valid_lower:
         raise ValueError(f"Invalid aggregation function: {base_agg}. Valid options: {valid_aggregations}")
 
     return agg
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c6c2bb1 and a55ecfd.

📒 Files selected for processing (2)
  • python/thirdweb-ai/src/thirdweb_ai/common/utils.py (1 hunks)
  • python/thirdweb-ai/src/thirdweb_ai/services/insight.py (2 hunks)
🧰 Additional context used
🪛 Pylint (3.3.7)
python/thirdweb-ai/src/thirdweb_ai/services/insight.py

[refactor] 425-425: Too many arguments (9/5)

(R0913)


[refactor] 425-425: Too many positional arguments (9/5)

(R0917)


[refactor] 502-502: Too many arguments (10/5)

(R0913)


[refactor] 502-502: Too many positional arguments (10/5)

(R0917)


[refactor] 502-502: Too many local variables (16/15)

(R0914)


[refactor] 502-502: Too many branches (13/12)

(R0912)

🔇 Additional comments (4)
python/thirdweb-ai/src/thirdweb_ai/common/utils.py (3)

83-92: LGTM! Comprehensive event aggregation functions defined.

The list covers essential aggregation functions for blockchain events including counts, distinct counts, and min/max for key fields.


94-107: LGTM! Well-structured transaction aggregation functions.

The transaction aggregations appropriately include value-based calculations (sum, avg, min, max) which are meaningful for transaction data.


121-128: LGTM! Clean wrapper functions.

The wrapper functions provide a clean interface for specific validation types.

python/thirdweb-ai/src/thirdweb_ai/services/insight.py (1)

9-18: LGTM! Comprehensive imports for new aggregation functionality.

All necessary constants and validation functions are properly imported from the utils module.

Comment on lines +499 to +580
@tool(
description="Aggregate blockchain transactions with powerful grouping and aggregation options. Use this for transaction counts, volumes, sums, and other analytics grouped by address, block, or time period."
)
def aggregate_transactions(
self,
aggregate: Annotated[
list[str],
f"Aggregation functions to apply. Valid options: {', '.join(VALID_TRANSACTION_AGGREGATIONS)}. Can include aliases like 'count() as tx_count' or 'sum(value) as total_value'.",
],
group_by: Annotated[
Literal["from_address", "to_address", "block_number"] | None,
"Field to group transactions by for aggregation. Required when using aggregate functions.",
] = None,
chain_id: Annotated[
list[int] | int | None,
"Chain ID(s) to query (e.g., 1 for Ethereum, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries.",
] = None,
from_address: Annotated[
str | None,
"Filter transactions sent from this address (e.g., '0x1234...'). Useful for tracking outgoing transactions from a wallet.",
] = None,
to_address: Annotated[
str | None,
"Filter transactions sent to this address (e.g., '0x1234...'). Useful for tracking incoming transactions to a contract or wallet.",
] = None,
function_signature: Annotated[
str | None,
"Function signature to filter by (e.g., 'approve(address,uint256)').",
] = None,
value_above: Annotated[
int | None,
"Filter for transactions with value above this amount (in wei - base blockchain units).",
] = None,
limit: Annotated[
int | None,
"Number of aggregated results to return (default 20, max 100).",
] = 20,
page: Annotated[
int | None,
"Page number for paginated results, starting from 0. 20 results are returned per page.",
] = None,
) -> dict[str, Any]:
# Validate aggregation functions
validated_aggregate = [validate_transaction_aggregation(agg) for agg in aggregate]

params: dict[str, Any] = {
"sort_by": "block_number",
"sort_order": "desc",
"decode": True,
"aggregate": validated_aggregate,
}

if group_by:
params["group_by"] = group_by

chain_ids = chain_id if chain_id is not None else self.chain_ids
if chain_ids:
params["chain"] = chain_ids
if from_address:
params["filter_from_address"] = validate_address(from_address)
if to_address:
params["filter_to_address"] = validate_address(to_address)
if function_signature:
params["filter_function_signature"] = function_signature
if value_above:
params["value_gte"] = value_above
if limit:
params["limit"] = limit
if page:
params["page"] = page

out = self._get("transactions", params)

# Clean up response by removing chain_id from aggregations if present
if out.get("aggregations"):
for agg in out["aggregations"]:
if isinstance(agg, dict):
for value in agg.values():
if isinstance(value, dict) and "chain_id" in value:
value.pop("chain_id")

return out
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Extract shared logic and address complexity warnings.

This method duplicates response cleanup logic and has high complexity (flagged by static analysis for too many arguments, variables, and branches).

Apply these changes:

  1. Use the shared cleanup method:
         out = self._get("transactions", params)
 
-        # Clean up response by removing chain_id from aggregations if present
-        if out.get("aggregations"):
-            for agg in out["aggregations"]:
-                if isinstance(agg, dict):
-                    for value in agg.values():
-                        if isinstance(value, dict) and "chain_id" in value:
-                            value.pop("chain_id")
+        return self._cleanup_aggregation_response(out)
-
-        return out
  1. Consider splitting parameter building into a helper method to reduce local variables:
def _build_transaction_params(self, validated_aggregate, group_by, chain_ids, 
                            from_address, to_address, function_signature, 
                            value_above, limit, page):
    """Build query parameters for transaction aggregation."""
    # Move parameter building logic here

This addresses the static analysis warnings about complexity while improving maintainability.

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 502-502: Too many arguments (10/5)

(R0913)


[refactor] 502-502: Too many positional arguments (10/5)

(R0917)


[refactor] 502-502: Too many local variables (16/15)

(R0914)


[refactor] 502-502: Too many branches (13/12)

(R0912)

🤖 Prompt for AI Agents
In python/thirdweb-ai/src/thirdweb_ai/services/insight.py between lines 499 and
580, the aggregate_transactions method has duplicated response cleanup logic and
high complexity due to many arguments and branches. To fix this, extract the
parameter building logic into a new helper method named
_build_transaction_params that takes the validated_aggregate, group_by,
chain_ids, from_address, to_address, function_signature, value_above, limit, and
page as arguments and returns the params dictionary. Also, replace the manual
cleanup of the response's aggregations with a call to the existing shared
cleanup method to avoid duplication and reduce complexity.

Comment on lines +422 to +498
@tool(
description="Aggregate blockchain events with powerful grouping and aggregation options. Use this to get event counts, sums, or other aggregations grouped by fields like address, block, or time period."
)
def aggregate_events(
self,
aggregate: Annotated[
list[str],
f"Aggregation functions to apply. Valid options: {', '.join(VALID_EVENT_AGGREGATIONS)}. Can include aliases like 'count() as event_count'.",
],
group_by: Annotated[
Literal["address", "contract_address", "from_address", "block_number", "event_signature"] | None,
"Field to group events by for aggregation. Required when using aggregate functions.",
] = None,
chain_id: Annotated[
list[int] | int | None,
"Chain ID(s) to query (e.g., 1 for Ethereum Mainnet, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries (max 5).",
] = None,
contract_address: Annotated[
str | None,
"Contract address to filter events by (e.g., '0x1234...'). Only return events emitted by this contract.",
] = None,
transaction_hash: Annotated[
str | None,
"Specific transaction hash to filter events by (e.g., '0xabc123...'). Useful for examining events in a particular transaction.",
] = None,
event_signature: Annotated[
str | None,
"Event signature to filter by (human-readable, e.g., 'Transfer(address,address,uint256)').",
] = None,
limit: Annotated[
int | None,
"Number of aggregated results to return (default 20, max 100).",
] = 20,
page: Annotated[
int | None,
"Page number for paginated results, starting from 0. 20 results are returned per page.",
] = None,
) -> dict[str, Any]:
# Validate aggregation functions
validated_aggregate = [validate_event_aggregation(agg) for agg in aggregate]

params: dict[str, Any] = {
"sort_by": "block_number",
"sort_order": "desc",
"decode": True,
"aggregate": validated_aggregate,
}

if group_by:
params["group_by"] = "address" if group_by == "contract_address" else group_by

chain_ids = chain_id if chain_id is not None else self.chain_ids
if chain_ids:
params["chain"] = chain_ids
if contract_address:
params["filter_address"] = validate_address(contract_address)
if transaction_hash:
params["filter_transaction_hash"] = validate_transaction_hash(transaction_hash)
if event_signature:
params["filter_event_signature"] = event_signature
if limit:
params["limit"] = limit
if page:
params["page"] = page

out = self._get("events", params)

# Clean up response by removing chain_id from aggregations if present
if out.get("aggregations"):
for agg in out["aggregations"]:
if isinstance(agg, dict):
for value in agg.values():
if isinstance(value, dict) and "chain_id" in value:
value.pop("chain_id")

return out

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Refactor to reduce complexity and improve maintainability.

While the functionality is correct, this method has high complexity (many parameters, branches, and nested logic).

Consider these improvements:

  1. Extract response cleanup logic to a shared method:
+    def _cleanup_aggregation_response(self, out: dict[str, Any]) -> dict[str, Any]:
+        """Remove chain_id from aggregation results."""
+        if out.get("aggregations"):
+            for agg in out["aggregations"]:
+                if isinstance(agg, dict):
+                    for value in agg.values():
+                        if isinstance(value, dict) and "chain_id" in value:
+                            value.pop("chain_id")
+        return out
  1. Clarify the group_by mapping logic:
         if group_by:
-            params["group_by"] = "address" if group_by == "contract_address" else group_by
+            # Map contract_address to address for API compatibility
+            mapped_group_by = "address" if group_by == "contract_address" else group_by
+            params["group_by"] = mapped_group_by
  1. Consider using a parameter builder pattern for the complex parameter construction.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@tool(
description="Aggregate blockchain events with powerful grouping and aggregation options. Use this to get event counts, sums, or other aggregations grouped by fields like address, block, or time period."
)
def aggregate_events(
self,
aggregate: Annotated[
list[str],
f"Aggregation functions to apply. Valid options: {', '.join(VALID_EVENT_AGGREGATIONS)}. Can include aliases like 'count() as event_count'.",
],
group_by: Annotated[
Literal["address", "contract_address", "from_address", "block_number", "event_signature"] | None,
"Field to group events by for aggregation. Required when using aggregate functions.",
] = None,
chain_id: Annotated[
list[int] | int | None,
"Chain ID(s) to query (e.g., 1 for Ethereum Mainnet, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries (max 5).",
] = None,
contract_address: Annotated[
str | None,
"Contract address to filter events by (e.g., '0x1234...'). Only return events emitted by this contract.",
] = None,
transaction_hash: Annotated[
str | None,
"Specific transaction hash to filter events by (e.g., '0xabc123...'). Useful for examining events in a particular transaction.",
] = None,
event_signature: Annotated[
str | None,
"Event signature to filter by (human-readable, e.g., 'Transfer(address,address,uint256)').",
] = None,
limit: Annotated[
int | None,
"Number of aggregated results to return (default 20, max 100).",
] = 20,
page: Annotated[
int | None,
"Page number for paginated results, starting from 0. 20 results are returned per page.",
] = None,
) -> dict[str, Any]:
# Validate aggregation functions
validated_aggregate = [validate_event_aggregation(agg) for agg in aggregate]
params: dict[str, Any] = {
"sort_by": "block_number",
"sort_order": "desc",
"decode": True,
"aggregate": validated_aggregate,
}
if group_by:
params["group_by"] = "address" if group_by == "contract_address" else group_by
chain_ids = chain_id if chain_id is not None else self.chain_ids
if chain_ids:
params["chain"] = chain_ids
if contract_address:
params["filter_address"] = validate_address(contract_address)
if transaction_hash:
params["filter_transaction_hash"] = validate_transaction_hash(transaction_hash)
if event_signature:
params["filter_event_signature"] = event_signature
if limit:
params["limit"] = limit
if page:
params["page"] = page
out = self._get("events", params)
# Clean up response by removing chain_id from aggregations if present
if out.get("aggregations"):
for agg in out["aggregations"]:
if isinstance(agg, dict):
for value in agg.values():
if isinstance(value, dict) and "chain_id" in value:
value.pop("chain_id")
return out
@tool(
description="Aggregate blockchain events with powerful grouping and aggregation options. Use this to get event counts, sums, or other aggregations grouped by fields like address, block, or time period."
)
def aggregate_events(
self,
aggregate: Annotated[
list[str],
f"Aggregation functions to apply. Valid options: {', '.join(VALID_EVENT_AGGREGATIONS)}. Can include aliases like 'count() as event_count'.",
],
group_by: Annotated[
Literal["address", "contract_address", "from_address", "block_number", "event_signature"] | None,
"Field to group events by for aggregation. Required when using aggregate functions.",
] = None,
chain_id: Annotated[
list[int] | int | None,
"Chain ID(s) to query (e.g., 1 for Ethereum Mainnet, 137 for Polygon). Specify multiple IDs as a list for cross-chain queries (max 5).",
] = None,
contract_address: Annotated[
str | None,
"Contract address to filter events by (e.g., '0x1234...'). Only return events emitted by this contract.",
] = None,
transaction_hash: Annotated[
str | None,
"Specific transaction hash to filter events by (e.g., '0xabc123...'). Useful for examining events in a particular transaction.",
] = None,
event_signature: Annotated[
str | None,
"Event signature to filter by (human-readable, e.g., 'Transfer(address,address,uint256)').",
] = None,
limit: Annotated[
int | None,
"Number of aggregated results to return (default 20, max 100).",
] = 20,
page: Annotated[
int | None,
"Page number for paginated results, starting from 0. 20 results are returned per page.",
] = None,
) -> dict[str, Any]:
# Validate aggregation functions
validated_aggregate = [validate_event_aggregation(agg) for agg in aggregate]
params: dict[str, Any] = {
"sort_by": "block_number",
"sort_order": "desc",
"decode": True,
"aggregate": validated_aggregate,
}
if group_by:
+ # Map contract_address to address for API compatibility
+ mapped_group_by = "address" if group_by == "contract_address" else group_by
+ params["group_by"] = mapped_group_by
chain_ids = chain_id if chain_id is not None else self.chain_ids
if chain_ids:
params["chain"] = chain_ids
if contract_address:
params["filter_address"] = validate_address(contract_address)
if transaction_hash:
params["filter_transaction_hash"] = validate_transaction_hash(transaction_hash)
if event_signature:
params["filter_event_signature"] = event_signature
if limit:
params["limit"] = limit
if page:
params["page"] = page
out = self._get("events", params)
# Clean up response by removing chain_id from aggregations if present
if out.get("aggregations"):
for agg in out["aggregations"]:
if isinstance(agg, dict):
for value in agg.values():
if isinstance(value, dict) and "chain_id" in value:
value.pop("chain_id")
return out
+ def _cleanup_aggregation_response(self, out: dict[str, Any]) -> dict[str, Any]:
+ """Remove chain_id from aggregation results."""
+ if out.get("aggregations"):
+ for agg in out["aggregations"]:
+ if isinstance(agg, dict):
+ for value in agg.values():
+ if isinstance(value, dict) and "chain_id" in value:
+ value.pop("chain_id")
+ return out
🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 425-425: Too many arguments (9/5)

(R0913)


[refactor] 425-425: Too many positional arguments (9/5)

(R0917)

🤖 Prompt for AI Agents
In python/thirdweb-ai/src/thirdweb_ai/services/insight.py around lines 422 to
498, the aggregate_events method is complex due to many parameters and nested
logic. Refactor by extracting the response cleanup logic that removes "chain_id"
from aggregations into a separate helper method to improve readability and
reuse. Simplify the group_by assignment by clarifying or encapsulating the
mapping logic, for example by using a dictionary mapping instead of inline
conditional. Finally, implement a parameter builder pattern or helper function
to construct the params dictionary step-by-step, which will reduce branching and
improve maintainability.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant