Skip to content

[usage] Use attribution ID to reduce DB queries for usage report #10938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions components/usage/pkg/controller/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@

package controller

import "github.com/gitpod-io/gitpod/usage/pkg/stripe"
import (
"context"
"github.com/gitpod-io/gitpod/usage/pkg/stripe"
"time"
)

type BillingController interface {
Reconcile(report []TeamUsage)
Reconcile(ctx context.Context, now time.Time, report UsageReport)
}

type NoOpBillingController struct{}

func (b *NoOpBillingController) Reconcile(report []TeamUsage) {}
func (b *NoOpBillingController) Reconcile(_ context.Context, _ time.Time, _ UsageReport) {}

type StripeBillingController struct {
sc *stripe.Client
Expand All @@ -22,12 +26,7 @@ func NewStripeBillingController(sc *stripe.Client) *StripeBillingController {
return &StripeBillingController{sc: sc}
}

func (b *StripeBillingController) Reconcile(report []TeamUsage) {
// Convert the usage report to sum all entries for the same team.
var summedReport = make(map[string]int64)
for _, usageEntry := range report {
summedReport[usageEntry.TeamID] += usageEntry.WorkspaceSeconds
}

b.sc.UpdateUsage(summedReport)
func (b *StripeBillingController) Reconcile(ctx context.Context, now time.Time, report UsageReport) {
runtimeReport := report.RuntimeSummaryForTeams(now)
b.sc.UpdateUsage(runtimeReport)
}
171 changes: 32 additions & 139 deletions components/usage/pkg/controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ type UsageReconcileStatus struct {

WorkspaceInstances int
InvalidWorkspaceInstances int

Workspaces int

Teams int

Report []TeamUsage
}

func (u *UsageReconciler) Reconcile() error {
Expand All @@ -60,7 +54,7 @@ func (u *UsageReconciler) Reconcile() error {
startOfCurrentMonth := time.Date(now.Year(), now.Month(), 1, 0, 0, 0, 0, time.UTC)
startOfNextMonth := startOfCurrentMonth.AddDate(0, 1, 0)

status, err := u.ReconcileTimeRange(ctx, startOfCurrentMonth, startOfNextMonth)
status, report, err := u.ReconcileTimeRange(ctx, startOfCurrentMonth, startOfNextMonth)
if err != nil {
return err
}
Expand All @@ -75,7 +69,7 @@ func (u *UsageReconciler) Reconcile() error {
defer f.Close()

enc := json.NewEncoder(f)
err = enc.Encode(status.Report)
err = enc.Encode(report)
if err != nil {
return fmt.Errorf("failed to marshal report to JSON: %w", err)
}
Expand All @@ -89,7 +83,7 @@ func (u *UsageReconciler) Reconcile() error {
return nil
}

func (u *UsageReconciler) ReconcileTimeRange(ctx context.Context, from, to time.Time) (*UsageReconcileStatus, error) {
func (u *UsageReconciler) ReconcileTimeRange(ctx context.Context, from, to time.Time) (*UsageReconcileStatus, UsageReport, error) {
now := u.nowFunc().UTC()
log.Infof("Gathering usage data from %s to %s", from, to)
status := &UsageReconcileStatus{
Expand All @@ -98,7 +92,7 @@ func (u *UsageReconciler) ReconcileTimeRange(ctx context.Context, from, to time.
}
instances, invalidInstances, err := u.loadWorkspaceInstances(ctx, from, to)
if err != nil {
return nil, fmt.Errorf("failed to load workspace instances: %w", err)
return nil, nil, fmt.Errorf("failed to load workspace instances: %w", err)
}
status.WorkspaceInstances = len(instances)
status.InvalidWorkspaceInstances = len(invalidInstances)
Expand All @@ -108,129 +102,38 @@ func (u *UsageReconciler) ReconcileTimeRange(ctx context.Context, from, to time.
}
log.WithField("workspace_instances", instances).Debug("Successfully loaded workspace instances.")

workspaces, err := u.loadWorkspaces(ctx, instances)
if err != nil {
return nil, fmt.Errorf("failed to load workspaces for workspace instances in time range: %w", err)
}
status.Workspaces = len(workspaces)

// match workspaces to teams
teams, err := u.loadTeamsForWorkspaces(ctx, workspaces)
if err != nil {
return nil, fmt.Errorf("failed to load teams for workspaces: %w", err)
}
status.Teams = len(teams)

report, err := generateUsageReport(teams, now)
if err != nil {
return nil, fmt.Errorf("failed to generate usage report: %w", err)
}
status.Report = report

u.billingController.Reconcile(status.Report)

return status, nil
}

func generateUsageReport(teams []teamWithWorkspaces, maxStopTime time.Time) ([]TeamUsage, error) {
var report []TeamUsage
for _, team := range teams {
var teamTotalRuntime int64
for _, workspace := range team.Workspaces {
for _, instance := range workspace.Instances {
teamTotalRuntime += instance.WorkspaceRuntimeSeconds(maxStopTime)
}
}
instancesByAttributionID := groupInstancesByAttributionID(instances)

report = append(report, TeamUsage{
TeamID: team.TeamID.String(),
WorkspaceSeconds: teamTotalRuntime,
})
}
return report, nil
}
u.billingController.Reconcile(ctx, now, instancesByAttributionID)

type teamWithWorkspaces struct {
TeamID uuid.UUID
Workspaces []workspaceWithInstances
return status, instancesByAttributionID, nil
}

func (u *UsageReconciler) loadTeamsForWorkspaces(ctx context.Context, workspaces []workspaceWithInstances) ([]teamWithWorkspaces, error) {
// find owner IDs of these workspaces
var ownerIDs []uuid.UUID
for _, workspace := range workspaces {
ownerIDs = append(ownerIDs, workspace.Workspace.OwnerID)
}
type UsageReport map[db.AttributionID][]db.WorkspaceInstance

// Retrieve memberships. This gives a link between an Owner and a Team they belong to.
memberships, err := db.ListTeamMembershipsForUserIDs(ctx, u.conn, ownerIDs)
if err != nil {
return nil, fmt.Errorf("failed to list team memberships: %w", err)
}
func (u UsageReport) RuntimeSummaryForTeams(maxStopTime time.Time) map[string]int64 {
attributedUsage := map[string]int64{}

membershipsByUserID := map[uuid.UUID]db.TeamMembership{}
for _, membership := range memberships {
// User can belong to multiple teams. For now, we're choosing the membership at random.
membershipsByUserID[membership.UserID] = membership
}
for attribution, instances := range u {
entity, id := attribution.Values()
if entity != db.AttributionEntity_Team {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: A comment that we handle this in the future would be golden 👍

continue
}

// Convert workspaces into a lookup so that we can index into them by Owner ID, needed for joining Teams with Workspaces
workspacesByOwnerID := map[uuid.UUID][]workspaceWithInstances{}
for _, workspace := range workspaces {
workspacesByOwnerID[workspace.Workspace.OwnerID] = append(workspacesByOwnerID[workspace.Workspace.OwnerID], workspace)
}
var runtime uint64
for _, instance := range instances {
runtime += instance.WorkspaceRuntimeSeconds(maxStopTime)
}

// Finally, join the datasets
// Because we iterate over memberships, and not workspaces, we're in effect ignoring Workspaces which are not in a team.
// This is intended as we focus on Team usage for now.
var teamsWithWorkspaces []teamWithWorkspaces
for userID, membership := range membershipsByUserID {
teamsWithWorkspaces = append(teamsWithWorkspaces, teamWithWorkspaces{
TeamID: membership.TeamID,
Workspaces: workspacesByOwnerID[userID],
})
attributedUsage[id] = int64(runtime)
Copy link
Member

@geropl geropl Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels "dangerous"/off given that we do use uint64 for usage in other places in the code base. Why not make attributedUsage accumulate in uint64 and stick to that datatype everywhere? 🤔

Copy link
Member Author

@easyCZ easyCZ Jun 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to update to unit64 everywhere (but Stripe only accepts int64, not uint64). But the reason for using it here is to limit changes to the existing Usage Report reconcile logic with stripe in this PR. That happens here https://github.com/gitpod-io/gitpod/pull/10938/files#diff-b2499b7086d5733f081dcce586e0ff0e77206dd5d8fd6ede638e3fc8f284c798L25-L32 and it currently has int64 as the interface.
I'll update these in a follow-up PR (minus the Stripe API call which will have to be int64) if that's acceptable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fine with this for now.

}

return teamsWithWorkspaces, nil
}

type workspaceWithInstances struct {
Workspace db.Workspace
Instances []db.WorkspaceInstance
return attributedUsage
}

func (u *UsageReconciler) loadWorkspaces(ctx context.Context, instances []db.WorkspaceInstance) ([]workspaceWithInstances, error) {
var workspaceIDs []string
for _, instance := range instances {
workspaceIDs = append(workspaceIDs, instance.WorkspaceID)
}

workspaces, err := db.ListWorkspacesByID(ctx, u.conn, toSet(workspaceIDs))
if err != nil {
return nil, fmt.Errorf("failed to find workspaces for provided workspace instances: %w", err)
}

workspacesByID := map[string]db.Workspace{}
for _, workspace := range workspaces {
workspacesByID[workspace.ID] = workspace
}

// We need to also add the instances to corresponding records, a single workspace can have multiple instances
instancesByWorkspaceID := map[string][]db.WorkspaceInstance{}
for _, instance := range instances {
instancesByWorkspaceID[instance.WorkspaceID] = append(instancesByWorkspaceID[instance.WorkspaceID], instance)
}

// Flatten results into a list
var workspacesWithInstances []workspaceWithInstances
for workspaceID, workspace := range workspacesByID {
workspacesWithInstances = append(workspacesWithInstances, workspaceWithInstances{
Workspace: workspace,
Instances: instancesByWorkspaceID[workspaceID],
})
}

return workspacesWithInstances, nil
type invalidWorkspaceInstance struct {
reason string
workspaceInstanceID uuid.UUID
}

func (u *UsageReconciler) loadWorkspaceInstances(ctx context.Context, from, to time.Time) ([]db.WorkspaceInstance, []invalidWorkspaceInstance, error) {
Expand All @@ -246,11 +149,6 @@ func (u *UsageReconciler) loadWorkspaceInstances(ctx context.Context, from, to t
return trimmed, invalid, nil
}

type invalidWorkspaceInstance struct {
reason string
workspaceInstanceID uuid.UUID
}

func validateInstances(instances []db.WorkspaceInstance) (valid []db.WorkspaceInstance, invalid []invalidWorkspaceInstance) {
for _, i := range instances {
// i is a pointer to the current element, we need to assign it to ensure we're copying the value, not the current pointer.
Expand Down Expand Up @@ -302,20 +200,15 @@ func trimStartStopTime(instances []db.WorkspaceInstance, maximumStart, minimumSt
return updated
}

func toSet(items []string) []string {
m := map[string]struct{}{}
for _, i := range items {
m[i] = struct{}{}
}
func groupInstancesByAttributionID(instances []db.WorkspaceInstance) map[db.AttributionID][]db.WorkspaceInstance {
result := map[db.AttributionID][]db.WorkspaceInstance{}
for _, instance := range instances {
if _, ok := result[instance.UsageAttributionID]; !ok {
result[instance.UsageAttributionID] = []db.WorkspaceInstance{}
}

var result []string
for s := range m {
result = append(result, s)
result[instance.UsageAttributionID] = append(result[instance.UsageAttributionID], instance)
}
return result
}

type TeamUsage struct {
TeamID string `json:"team_id"`
WorkspaceSeconds int64 `json:"workspace_seconds"`
return result
}
Loading