Skip to content

chore: refactor filtering Flink SQL tutorial #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions aggregating-count/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions aggregating-minmax/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions aggregating-sum/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions anomaly-detection/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
6 changes: 3 additions & 3 deletions change-topic-partitions/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down Expand Up @@ -194,7 +194,7 @@ You can run the example backing this tutorial in one of two ways: locally with t
```

Observe the expected number of partitions for the `topic` and `topic2` topics when you navigate
to `Topics` in the lefthand navigation of the Confluent Cloud Console.
to `Topics` in the left-hand navigation of the Confluent Cloud Console.

### Clean up

Expand Down
4 changes: 2 additions & 2 deletions column-difference/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,9 +177,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions concatenation/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ confluent api-key create --resource <CLUSTER_ID>
```noformat
confluent connect cluster create --config-file /tmp/datagen-connector.json
```
* After a minute or so, validate in the Confluent Cloud Console that the connector is running. In the lefhand navigation, select `Environments`, click into the environment, then click the PrivateLink cluster. In the lefthand navigation, select `Connectors` and verify that the connector state is `Running` and generating messages:
* After a minute or so, validate in the Confluent Cloud Console that the connector is running. In the lefhand navigation, select `Environments`, click into the environment, then click the PrivateLink cluster. In the left-hand navigation, select `Connectors` and verify that the connector state is `Running` and generating messages:

![Datagen](https://raw.githubusercontent.com/confluentinc/tutorials/master/confluent-cloud-connector-aws-privatelink/kafka/img/cc-datagen.png)

Expand Down
4 changes: 2 additions & 2 deletions convert-timestamp-timezone/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions count-messages/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
4 changes: 2 additions & 2 deletions deduplication-windowed/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,9 @@ You can run the example backing this tutorial in one of two ways: locally with t

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then
select `ksqlDB` in the lefthand navigation.
select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
6 changes: 3 additions & 3 deletions deserialization-errors/ksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ confluent ksql cluster create ksqldb-tutorial \

### Run the commands

Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the lefthand navigation,
Login to the [Confluent Cloud Console](https://confluent.cloud/). Select `Environments` in the left-hand navigation,
and then click the `ksqldb-tutorial` environment tile. Click the `ksqldb-tutorial` Kafka cluster tile, and then select
`Topics` in the lefthand navigation. Create a topic called `sensors-raw` with 1 partition, and in the `Messages` tab,
`Topics` in the left-hand navigation. Create a topic called `sensors-raw` with 1 partition, and in the `Messages` tab,
produce the following two events, one at a time.

```noformat
Expand All @@ -82,7 +82,7 @@ produce the following two events, one at a time.
{"id": "1a076a64-4a84-40cb-a2e8-2190f3b37465", "timestamp": "2020-01-15 02:30:30", "enabled": "true"}
```

Next, select `ksqlDB` in the lefthand navigation.
Next, select `ksqlDB` in the left-hand navigation.

The cluster may take a few minutes to be provisioned. Once its status is `Up`, click the cluster name and scroll down to the editor.

Expand Down
131 changes: 41 additions & 90 deletions filtering/flinksql/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,65 +3,64 @@

# How to filter messages in a Kafka topic with Flink SQL

Consider a topic with events that represent book publications. In this tutorial, we'll use Flink SQL to find only the
publications written by a particular author.
### Overview

## Setup
In this tutorial, we'll use Flink SQL to filter messages in a Kafka topic.

Let's assume the following DDL for our base `publication_events` table:
### Prerequisites

* A [Confluent Cloud](https://confluent.cloud/signup) account
* A Flink compute pool created in Confluent Cloud. Follow [this](https://docs.confluent.io/cloud/current/flink/get-started/quick-start-cloud-console.html) quick start to create one.

### Tutorial steps

The following steps use Confluent Cloud. See the section at the bottom to run it locally with Docker.

In the [Confluent Cloud Console](https://confluent.cloud/), navigate to your environment and select `Flink` in the left-hand navigation. Then click the `Open SQL Workspace` button for the compute pool that you have created.

Select the default catalog (Confluent Cloud environment) and database (Kafka cluster) to use with the dropdowns at the top right.

Run following SQL statement to create a table named `publication_events` that represents book publications:

```sql
CREATE TABLE publication_events (
book_id INT,
author STRING,
title STRING
title STRING
);
```

## Filter events
Populate the table with test data:

Given the `publication_events` table definition above, we can filter to the publications by a particular author using a `WHERE` clause:
```sql
INSERT INTO publication_events VALUES
(0, 'C.S. Lewis', 'The Silver Chair'),
(1, 'George R. R. Martin', 'A Song of Ice and Fire'),
(2, 'C.S. Lewis', 'Perelandra'),
(3, 'George R. R. Martin', 'Fire & Blood'),
(4, 'J. R. R. Tolkien', 'The Hobbit'),
(5, 'J. R. R. Tolkien', 'The Lord of the Rings'),
(6, 'George R. R. Martin', 'A Dream of Spring'),
(7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'),
(8, 'George R. R. Martin', 'The Ice Dragon'),
(9, 'Mario Puzo', 'The Godfather');
```

Use the [`WHERE` clause](https://docs.confluent.io/cloud/current/flink/reference/queries/select.html#where-clause) to filter the rows down to the publications written by George R. R. Martin:

```sql
SELECT *
FROM publication_events
WHERE author = 'George R. R. Martin';
```

## Running the example
The query output should look like this:

You can run the example backing this tutorial in one of three ways: a Flink Table API-based JUnit test, locally with the Flink SQL Client
against Flink and Kafka running in Docker, or with Confluent Cloud.
![Query output](https://raw.githubusercontent.com/confluentinc/tutorials/master/filtering/flinksql/img/query-output.png)

<details>
<summary>Flink Table API-based test</summary>

### Prerequisites

* Java 17, e.g., follow the OpenJDK installation instructions [here](https://openjdk.org/install/) if you don't have Java.
* Docker running via [Docker Desktop](https://docs.docker.com/desktop/) or [Docker Engine](https://docs.docker.com/engine/install/)

### Run the test

Clone the `confluentinc/tutorials` GitHub repository (if you haven't already) and navigate to the `tutorials` directory:

```shell
git clone [email protected]:confluentinc/tutorials.git
cd tutorials
```

Run the following command to execute [FlinkSqlFilteringTest#testFilter](https://github.com/confluentinc/tutorials/blob/master/filtering/flinksql/src/test/java/io/confluent/developer/FlinkSqlFilteringTest.java):

```plaintext
./gradlew clean :filtering:flinksql:test
```

The test starts Kafka and Schema Registry with [Testcontainers](https://testcontainers.com/), runs the Flink SQL commands
above against a local Flink `StreamExecutionEnvironment`, and ensures that the filter results are what we expect.
</details>

<details>
<summary>Flink SQL Client CLI</summary>
<summary>Docker instructions</summary>

### Prerequisites

Expand Down Expand Up @@ -89,8 +88,7 @@ against Flink and Kafka running in Docker, or with Confluent Cloud.
docker exec -it flink-sql-client sql-client.sh
```

Finally, run following SQL statements to create the `publication_events` table backed by Kafka running in Docker, populate it with
test data, and run the filter query.
Run following SQL statement to create a table named `publication_events` that represents book publications:

```sql
CREATE TABLE publication_events (
Expand All @@ -110,6 +108,8 @@ against Flink and Kafka running in Docker, or with Confluent Cloud.
);
```

Populate the table with test data:

```sql
INSERT INTO publication_events VALUES
(0, 'C.S. Lewis', 'The Silver Chair'),
Expand All @@ -124,6 +124,8 @@ against Flink and Kafka running in Docker, or with Confluent Cloud.
(9, 'Mario Puzo', 'The Godfather');
```

Use the [`WHERE` clause](https://docs.confluent.io/cloud/current/flink/reference/queries/select.html#where-clause) to filter the rows down to the publications written by George R. R. Martin:

```sql
SELECT *
FROM publication_events
Expand All @@ -147,54 +149,3 @@ against Flink and Kafka running in Docker, or with Confluent Cloud.
```

</details>

<details>
<summary>Confluent Cloud</summary>

### Prerequisites

* A [Confluent Cloud](https://confluent.cloud/signup) account
* A Flink compute pool created in Confluent Cloud. Follow [this](https://docs.confluent.io/cloud/current/flink/get-started/quick-start-cloud-console.html) quick start to create one.

### Run the commands

In the Confluent Cloud Console, navigate to your environment and then click the `Open SQL Workspace` button for the compute
pool that you have created.

Select the default catalog (Confluent Cloud environment) and database (Kafka cluster) to use with the dropdowns at the top right.

Finally, run following SQL statements to create the `publication_events` table, populate it with test data, and run the filter query.

```sql
CREATE TABLE publication_events (
book_id INT,
author STRING,
title STRING
);
```

```sql
INSERT INTO publication_events VALUES
(0, 'C.S. Lewis', 'The Silver Chair'),
(1, 'George R. R. Martin', 'A Song of Ice and Fire'),
(2, 'C.S. Lewis', 'Perelandra'),
(3, 'George R. R. Martin', 'Fire & Blood'),
(4, 'J. R. R. Tolkien', 'The Hobbit'),
(5, 'J. R. R. Tolkien', 'The Lord of the Rings'),
(6, 'George R. R. Martin', 'A Dream of Spring'),
(7, 'J. R. R. Tolkien', 'The Fellowship of the Ring'),
(8, 'George R. R. Martin', 'The Ice Dragon'),
(9, 'Mario Puzo', 'The Godfather');
```

```sql
SELECT *
FROM publication_events
WHERE author = 'George R. R. Martin';
```

The query output should look like this:

![Query output](https://raw.githubusercontent.com/confluentinc/tutorials/master/filtering/flinksql/img/query-output.png)

<details>
37 changes: 0 additions & 37 deletions filtering/flinksql/build.gradle

This file was deleted.

Binary file modified filtering/flinksql/img/query-output.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
12 changes: 0 additions & 12 deletions filtering/flinksql/settings.gradle

This file was deleted.

Loading