Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
252 changes: 170 additions & 82 deletions bigtable/use-cases/fraudDetection/README.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package bigtable.fraud.beam;

import bigtable.fraud.beam.utils.AggregatedData;
import bigtable.fraud.beam.utils.CustomerDemographics;
import bigtable.fraud.beam.utils.CustomerProfile;
import bigtable.fraud.beam.utils.RowDetails;
import bigtable.fraud.beam.utils.TransactionDetails;
import bigtable.fraud.beam.utils.WriteCBTHelper;
Expand Down Expand Up @@ -127,12 +127,12 @@ public void processElement(
Preconditions.checkArgument(new String(row.getRow()).equals(
transactionDetails.getCustomerID()));

CustomerDemographics customerDemographics = new CustomerDemographics(
CustomerProfile customerProfile = new CustomerProfile(
row);

// Generate an AggregatedData object.
AggregatedData aggregatedData =
new AggregatedData(customerDemographics, transactionDetails, row);
new AggregatedData(customerProfile, transactionDetails, row);

c.output(aggregatedData);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package bigtable.fraud.beam;

import bigtable.fraud.beam.utils.CustomerDemographics;
import bigtable.fraud.beam.utils.CustomerProfile;
import bigtable.fraud.beam.utils.TransactionDetails;
import bigtable.fraud.beam.utils.RowDetails;
import bigtable.fraud.beam.utils.WriteCBTHelper;
Expand All @@ -29,7 +29,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TypeDescriptor;

// Load customer demographics and history into Cloud Bigtable.
// Load customer profiles and history into Cloud Bigtable.
public final class LoadDataset {

/**
Expand All @@ -46,7 +46,7 @@ public static void main(final String[] args) throws
LoadDatasetOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(LoadDatasetOptions.class);
options.setJobName("load-customer-demographics-" + options.getRandomUUID());
options.setJobName("load-customer-profiles-" + options.getRandomUUID());

CloudBigtableTableConfiguration config =
new CloudBigtableTableConfiguration.Builder()
Expand All @@ -55,21 +55,21 @@ public static void main(final String[] args) throws
.withTableId(options.getCBTTableId())
.build();

// Create a pipeline that reads the GCS demographics csv file
// Create a pipeline that reads the GCS customer profile csv file
// and write it into CBT.
Pipeline pDemographics = Pipeline.create(options);
pDemographics
Pipeline pProfiles = Pipeline.create(options);
pProfiles
.apply("ReadGCSFile",
TextIO.read().from(options.getDemographicsInputFile()))
TextIO.read().from(options.getCustomerProfileInputFile()))
.apply(
MapElements.into(TypeDescriptor.of(RowDetails.class))
.via(CustomerDemographics::new))
.via(CustomerProfile::new))
.apply("TransformParsingsToBigtable",
ParDo.of(WriteCBTHelper.MUTATION_TRANSFORM))
.apply(
"WriteToBigtable",
CloudBigtableIO.writeToTable(config));
PipelineResult pDemographicsRun = pDemographics.run();
PipelineResult pProfilesRun = pProfiles.run();

// Create a pipeline that reads the GCS history csv file and write
// it into CBT
Expand All @@ -89,7 +89,7 @@ public static void main(final String[] args) throws
CloudBigtableIO.writeToTable(config));
PipelineResult pHistoryRun = pHistory.run();

pDemographicsRun.waitUntilFinish();
pProfilesRun.waitUntilFinish();
pHistoryRun.waitUntilFinish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,15 @@ public interface LoadDatasetOptions extends DataflowPipelineOptions {
void setCBTTableId(String tableID);

/**
* @return customer demographics input file.
* @return customer profile input file.
*/
@Description("The Cloud Storage path to the demographics CSV file.")
String getDemographicsInputFile();
@Description("The Cloud Storage path to the profile CSV file.")
String getCustomerProfileInputFile();

/**
* @param location customer demographics file location.
* @param location customer profile file location.
*/
void setDemographicsInputFile(String location);
void setCustomerProfileInputFile(String location);

/**
* @return transactions history input file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ public final class AggregatedData {
*/
private TransactionDetails transactionDetails;
/**
* Stores the incoming transaction customer demographics.
* Stores the incoming transaction customer profile.
*/
private CustomerDemographics customerDemographics;
private CustomerProfile customerProfile;
/**
* Stores the time difference between this transaction and the last one in
* minutes.
Expand Down Expand Up @@ -58,14 +58,14 @@ public final class AggregatedData {
/**
* Construct an AggregatedData object.
*
* @param iCustomerDemographics the incoming customer demographic object.
* @param iCustomerProfile the incoming customer profile object.
* @param iTransactionDetails the incoming transaction details object.
* @param row a result row read from Cloud Bigtable.
*/
public AggregatedData(
final CustomerDemographics iCustomerDemographics,
final CustomerProfile iCustomerProfile,
final TransactionDetails iTransactionDetails, final Result row) {
this.customerDemographics = iCustomerDemographics;
this.customerProfile = iCustomerProfile;
this.transactionDetails = iTransactionDetails;

// Get last transaction.
Expand Down Expand Up @@ -162,8 +162,8 @@ public String getMLFeatures() {
mlFeatures.add(String.valueOf(avgAmountSpentLastWeek));
mlFeatures.add(String.valueOf(avgAmountSpentLastMonth));
mlFeatures.add(String.valueOf(numOfTransactionLastDay));
mlFeatures.add(String.valueOf(customerDemographics.getId()));
mlFeatures.add(customerDemographics.getCcNumber());
mlFeatures.add(String.valueOf(customerProfile.getId()));
mlFeatures.add(customerProfile.getCcNumber());
mlFeatures.add(String.valueOf(transactionDetails.getTransactionAmount()));
mlFeatures.add(String.valueOf(transactionDetails.getMerchantID()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.hadoop.hbase.client.Result;

@DefaultCoder(AvroCoder.class)
public final class CustomerDemographics extends RowDetails {
public final class CustomerProfile extends RowDetails {

/**
* The incoming request's customer id.
Expand Down Expand Up @@ -48,20 +48,20 @@ public final class CustomerDemographics extends RowDetails {
private String accountNumber;

/**
* Constructs CustomerDemographics object.
* Constructs CustomerProfile object.
*
* @param line a CustomerDemographics comma-seperated line
* @param line a CustomerProfile comma-seperated line
*/
public CustomerDemographics(final String line) {
public CustomerProfile(final String line) {
super(line);
}

/**
* Constructs CustomerDemographics object.
* Constructs CustomerProfile object.
*
* @param row a row result read from Cloud Bigtable.
*/
public CustomerDemographics(final Result row) {
public CustomerProfile(final Result row) {
super(row);
}

Expand All @@ -81,6 +81,6 @@ public String getCcNumber() {

@Override
public String getColFamily() {
return "demographics";
return "customer_profile";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

/**
* Classes that will hold customer demographics, transactions history, and
* Classes that will hold customer profiles, transactions history, and
* aggregated data to be sent to the machine learning model.
*/
package bigtable.fraud.beam.utils;
6 changes: 3 additions & 3 deletions bigtable/use-cases/fraudDetection/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ resource "google_bigtable_table" "tf-fd-table" {
name = "customer-information-${random_string.uuid.result}"
instance_name = google_bigtable_instance.tf-fd-instance.name
column_family {
family = "demographics"
family = "customer_profile"
}
column_family {
family = "history"
Expand Down Expand Up @@ -116,7 +116,7 @@ resource "google_storage_bucket_object" "legit_transactions" {
bucket = google_storage_bucket.tf-fd-bucket.name
}

# A CSV file that contains customers' demographics.
# A CSV file that contains customers' profiles.
resource "google_storage_bucket_object" "customers" {
name = "training_dataset/customers.csv"
source = "./datasets/training_data/customers.csv"
Expand Down Expand Up @@ -170,7 +170,7 @@ module "dataflow_pipeline" {
destroy_cmd_body = "${var.region} ${random_string.uuid.result}"
}

# Load both demographics and historical data into Cloud Bigtable so that
# Load both profiles and historical data into Cloud Bigtable so that
# the dataflow pipeline can aggregate data properly before querying
# the ML model.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash

# Load demographics and historical transactions data from GCS into CBT.
# Load customer profiles and historical transactions data from GCS into CBT.

PROJECT_ID=$1
REGION=$2
Expand All @@ -18,6 +18,6 @@ echo "GCS_BUCKET=$GCS_BUCKET"
mvn -f ../pom.xml compile exec:java -Dexec.mainClass=bigtable.fraud.beam.LoadDataset -Dexec.cleanupDaemonThreads=false \
"-Dexec.args= --runner=DataflowRunner --project=$PROJECT_ID --projectID=$PROJECT_ID --region=$REGION \
--gcpTempLocation=gs://$GCS_BUCKET/temp --CBTInstanceId=$CBT_INSTANCE --CBTTableId=$CBT_TABLE \
--demographicsInputFile=gs://$GCS_BUCKET/training_dataset/customers.csv \
--customerProfileInputFile=gs://$GCS_BUCKET/training_dataset/customers.csv \
--historyInputFile=gs://$GCS_BUCKET/training_dataset/transactions.csv \
--randomUUID=$RANDOM_UUID"