Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
7252605
Access Spark configs from native code
andygrove Aug 22, 2025
d084cfa
code cleanup
andygrove Aug 22, 2025
4837935
revert
andygrove Aug 22, 2025
ad9c9b8
debug
andygrove Oct 3, 2025
f3bb412
use df release
andygrove Oct 3, 2025
13f14d3
cargo update
andygrove Oct 3, 2025
78f5b4f
[skip ci]
andygrove Oct 3, 2025
5a39d3b
merge other PR [skip-ci]
andygrove Oct 3, 2025
dc11515
save [skip-ci]
andygrove Oct 3, 2025
d2a1ab1
[skip ci]
andygrove Oct 3, 2025
31cdbc6
save [skip ci]
andygrove Oct 3, 2025
ffb1f71
Merge remote-tracking branch 'apache/main' into debug-mem
andygrove Oct 3, 2025
322b4c5
info logging
andygrove Oct 3, 2025
89e10ac
log task id [skip ci]
andygrove Oct 3, 2025
3b191fd
println
andygrove Oct 3, 2025
7c24836
revert lock file
andygrove Oct 3, 2025
405f5b7
prep for review
andygrove Oct 3, 2025
522238d
save
andygrove Oct 3, 2025
36565ca
Update spark/src/main/scala/org/apache/comet/CometExecIterator.scala
andygrove Oct 3, 2025
21189a6
info logging
andygrove Oct 3, 2025
dfa2c67
Merge branch 'debug-mem' of github.com:andygrove/datafusion-comet int…
andygrove Oct 3, 2025
d9817ce
fix
andygrove Oct 3, 2025
acba7bc
log error on try_grow fail
andygrove Oct 3, 2025
4051d29
log error on try_grow fail
andygrove Oct 3, 2025
df69875
revert
andygrove Oct 3, 2025
ad891a0
add Python script to convert log to csv
andygrove Oct 3, 2025
8756256
Python script to generate chart
andygrove Oct 3, 2025
7eb1bc1
scripts
andygrove Oct 3, 2025
21bd386
new script
andygrove Oct 3, 2025
ec823c2
show err
andygrove Oct 3, 2025
a66fa65
save
andygrove Oct 3, 2025
12db37f
Merge branch 'debug-mem' of github.com:andygrove/datafusion-comet int…
andygrove Oct 3, 2025
2fb336e
track errors
andygrove Oct 3, 2025
706f5e7
format
andygrove Oct 3, 2025
4faf881
ASF header
andygrove Oct 3, 2025
d91abda
add brief docs
andygrove Oct 3, 2025
f6128b5
docs
andygrove Oct 3, 2025
7d40ac2
fix
andygrove Oct 5, 2025
c495897
cargo fmt
andygrove Oct 6, 2025
06814b7
upmerge
andygrove Oct 6, 2025
e51751f
format
andygrove Oct 6, 2025
75e727f
upmerge
andygrove Oct 7, 2025
e844287
fix regression
andygrove Oct 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ object CometConf extends ShimCometConf {
private val TRACING_GUIDE = "For more information, refer to the Comet Tracing " +
"Guide (https://datafusion.apache.org/comet/user-guide/tracing.html)"

private val DEBUGGING_GUIDE = "For more information, refer to the Comet Debugging " +
"Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html"

/** List of all configs that is used for generating documentation */
val allConfs = new ListBuffer[ConfigEntry[_]]

Expand All @@ -63,9 +66,11 @@ object CometConf extends ShimCometConf {

def conf(key: String): ConfigBuilder = ConfigBuilder(key)

val COMET_EXEC_CONFIG_PREFIX = "spark.comet.exec";
val COMET_PREFIX = "spark.comet";

val COMET_EXEC_CONFIG_PREFIX: String = s"$COMET_PREFIX.exec";

val COMET_EXPR_CONFIG_PREFIX = "spark.comet.expression";
val COMET_EXPR_CONFIG_PREFIX: String = s"$COMET_PREFIX.expression";

val COMET_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.enabled")
.doc(
Expand Down Expand Up @@ -454,6 +459,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_DEBUG_MEMORY_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_PREFIX.debug.memory")
.doc(s"When enabled, log all native memory pool interactions. $DEBUGGING_GUIDE.")
.booleanConf
.createWithDefault(false)

val COMET_EXPLAIN_VERBOSE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.explain.verbose.enabled")
.doc(
Expand Down
2 changes: 1 addition & 1 deletion dev/benchmarks/comet-tpcds.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ $SPARK_HOME/sbin/stop-master.sh
$SPARK_HOME/sbin/stop-worker.sh

$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
RUST_BACKTRACE=1 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER

$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
Expand Down
2 changes: 1 addition & 1 deletion dev/benchmarks/comet-tpch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ $SPARK_HOME/sbin/stop-master.sh
$SPARK_HOME/sbin/stop-worker.sh

$SPARK_HOME/sbin/start-master.sh
$SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER
RUST_BACKTRACE=1 $SPARK_HOME/sbin/start-worker.sh $SPARK_MASTER

$SPARK_HOME/bin/spark-submit \
--master $SPARK_MASTER \
Expand Down
73 changes: 73 additions & 0 deletions dev/scripts/mem_debug_to_csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#!/usr/bin/python
##############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
##############################################################################

import argparse
import re
import sys

def main(file, task_filter):
# keep track of running total allocation per consumer
alloc = {}

# open file
with open(file) as f:
# iterate over lines in file
print("name,size,label")
for line in f:
# print(line, file=sys.stderr)

# example line: [Task 486] MemoryPool[HashJoinInput[6]].shrink(1000)
# parse consumer name
re_match = re.search('\[Task (.*)\] MemoryPool\[(.*)\]\.(try_grow|grow|shrink)\(([0-9]*)\)', line, re.IGNORECASE)
if re_match:
try:
task = int(re_match.group(1))
if task != task_filter:
continue

consumer = re_match.group(2)
method = re_match.group(3)
size = int(re_match.group(4))

if alloc.get(consumer) is None:
alloc[consumer] = size
else:
if method == "grow" or method == "try_grow":
if "Err" in line:
# do not update allocation if try_grow failed
# annotate this entry so it can be shown in the chart
print(consumer, ",", alloc[consumer], ",ERR")
else:
alloc[consumer] = alloc[consumer] + size
elif method == "shrink":
alloc[consumer] = alloc[consumer] - size

print(consumer, ",", alloc[consumer])

except Exception as e:
print("error parsing", line, e, file=sys.stderr)


if __name__ == "__main__":
ap = argparse.ArgumentParser(description="Generate CSV From memory debug output")
ap.add_argument("--task", default=None, help="Task ID.")
ap.add_argument("--file", default=None, help="Spark log containing memory debug output")
args = ap.parse_args()
main(args.file, int(args.task))
69 changes: 69 additions & 0 deletions dev/scripts/plot_memory_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/python
##############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
##############################################################################

import pandas as pd
import matplotlib.pyplot as plt
import sys

def plot_memory_usage(csv_file):
# Read the CSV file
df = pd.read_csv(csv_file)

# Create time index based on row order (each row is a sequential time point)
df['time'] = range(len(df))

# Pivot the data to have consumers as columns
pivot_df = df.pivot(index='time', columns='name', values='size')
pivot_df = pivot_df.fillna(method='ffill').fillna(0)

# Create stacked area chart
plt.figure(figsize=(8, 4))
plt.stackplot(pivot_df.index,
[pivot_df[col] for col in pivot_df.columns],
labels=pivot_df.columns,
alpha=0.8)

# Add annotations for ERR labels
if 'label' in df.columns:
err_points = df[df['label'].str.contains('ERR', na=False)]
for _, row in err_points.iterrows():
plt.axvline(x=row['time'], color='red', linestyle='--', alpha=0.7, linewidth=1.5)
plt.text(row['time'], plt.ylim()[1] * 0.95, 'ERR',
ha='center', va='top', color='red', fontweight='bold')

plt.xlabel('Time')
plt.ylabel('Memory Usage')
plt.title('Memory Usage Over Time by Consumer')
plt.legend(loc='upper left', bbox_to_anchor=(1.05, 1), borderaxespad=0, fontsize='small')
plt.grid(True, alpha=0.3)
plt.tight_layout()

# Save the plot
output_file = csv_file.replace('.csv', '_chart.png')
plt.savefig(output_file, dpi=300, bbox_inches='tight')
print(f"Chart saved to: {output_file}")
plt.show()

if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python plot_memory_usage.py <csv_file>")
sys.exit(1)

plot_memory_usage(sys.argv[1])
42 changes: 41 additions & 1 deletion docs/source/contributor-guide/debugging.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ To build Comet with this feature enabled:
make release COMET_FEATURES=backtrace
```

Start Comet with `RUST_BACKTRACE=1`
Set `RUST_BACKTRACE=1` for the Spark worker/executor process, or for `spark-submit` if running in local mode.

```console
RUST_BACKTRACE=1 $SPARK_HOME/spark-shell --jars spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar --conf spark.plugins=org.apache.spark.CometPlugin --conf spark.comet.enabled=true --conf spark.comet.exec.enabled=true
Expand Down Expand Up @@ -189,3 +189,43 @@ This produces output like the following:

Additionally, you can place a `log4rs.yaml` configuration file inside the Comet configuration directory specified by the `COMET_CONF_DIR` environment variable to enable more advanced logging configurations. This file uses the [log4rs YAML configuration format](https://docs.rs/log4rs/latest/log4rs/#configuration-via-a-yaml-file).
For example, see: [log4rs.yaml](https://github.com/apache/datafusion-comet/blob/main/conf/log4rs.yaml).

### Debugging Memory Reservations

Set `spark.comet.debug.memory=true` to log all calls that grow or shrink memory reservations.

Example log output:

```
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256232960) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256375168) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(256899456) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257296128) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].try_grow(257820416) returning Err
[Task 486] MemoryPool[ExternalSorterMerge[6]].shrink(10485760)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(150464)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(146688)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(137856)
[Task 486] MemoryPool[ExternalSorter[6]].shrink(141952)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorter[6]].shrink(524288)
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(0) returning Ok
[Task 486] MemoryPool[ExternalSorterMerge[6]].try_grow(68928) returning Ok
```

When backtraces are enabled (see earlier section) then backtraces will be included for failed allocations.

There are Python scripts in `dev/scripts` that can be used to produce charts for a particular Spark task.

First, extract the memory logging and write to CSV:

```shell
python3 dev/scripts/mem_debug_to_csv.py /path/to/executor/log > /tmp/mem.csv
```

Next, generate a chart from the CSV file for a specific Spark task:

```shell
python3 dev/scripts/plot_memory_usage.py /tmp/mem.csv --task 1234
```
1 change: 1 addition & 0 deletions docs/source/user-guide/latest/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Comet provides the following configuration settings.
| spark.comet.convert.json.enabled | When enabled, data from Spark (non-native) JSON v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.convert.parquet.enabled | When enabled, data from Spark (non-native) Parquet v1 and v2 scans will be converted to Arrow format. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. | false |
| spark.comet.debug.enabled | Whether to enable debug mode for Comet. When enabled, Comet will do additional checks for debugging purpose. For example, validating array when importing arrays from JVM at native side. Note that these checks may be expensive in performance and should only be enabled for debugging purpose. | false |
| spark.comet.debug.memory | When enabled, log all native memory pool interactions. For more information, refer to the Comet Debugging Guide (https://datafusion.apache.org/comet/contributor-guide/debugging.html. | false |
| spark.comet.dppFallback.enabled | Whether to fall back to Spark for queries that use DPP. | true |
| spark.comet.enabled | Whether to enable Comet extension for Spark. When this is turned on, Spark will use Comet to read Parquet data source. Note that to enable native vectorized execution, both this config and 'spark.comet.exec.enabled' need to be enabled. By default, this config is the value of the env var `ENABLE_COMET` if set, or true otherwise. | true |
| spark.comet.exceptionOnDatetimeRebase | Whether to throw exception when seeing dates/timestamps from the legacy hybrid (Julian + Gregorian) calendar. Since Spark 3, dates/timestamps were written according to the Proleptic Gregorian calendar. When this is true, Comet will throw exceptions when seeing these dates/timestamps that were written by Spark version before 3.0. If this is false, these dates/timestamps will be read as if they were written to the Proleptic Gregorian calendar and will not be rebased. | false |
Expand Down
48 changes: 30 additions & 18 deletions native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ use crate::execution::spark_plan::SparkPlan;

use crate::execution::tracing::{log_memory_usage, trace_begin, trace_end, with_trace};

use crate::execution::memory_pools::logging_pool::LoggingPool;
use crate::execution::spark_config::{
SparkConfig, COMET_DEBUG_ENABLED, COMET_DEBUG_MEMORY, COMET_EXPLAIN_NATIVE_ENABLED,
COMET_MAX_TEMP_DIRECTORY_SIZE, COMET_TRACING_ENABLED,
};
use crate::parquet::encryption_support::{CometEncryptionFactory, ENCRYPTION_FACTORY_ID};
use datafusion_comet_proto::spark_operator::operator::OpStruct;
use log::info;
Expand Down Expand Up @@ -168,14 +173,24 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
memory_limit: jlong,
memory_limit_per_task: jlong,
task_attempt_id: jlong,
debug_native: jboolean,
explain_native: jboolean,
tracing_enabled: jboolean,
Comment on lines -171 to -173
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rather than adding yet another flag to this API call, I am now using the already available spark config map in native code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. The config map should be the preferred method

max_temp_directory_size: jlong,
key_unwrapper_obj: JObject,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
with_trace("createPlan", tracing_enabled != JNI_FALSE, || {
// Deserialize Spark configs
let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) };
let bytes = env.convert_byte_array(array)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;
let spark_config: HashMap<String, String> = spark_configs.entries.into_iter().collect();

// Access Comet configs
let debug_native = spark_config.get_bool(COMET_DEBUG_ENABLED);
let explain_native = spark_config.get_bool(COMET_EXPLAIN_NATIVE_ENABLED);
let tracing_enabled = spark_config.get_bool(COMET_TRACING_ENABLED);
let max_temp_directory_size =
spark_config.get_u64(COMET_MAX_TEMP_DIRECTORY_SIZE, 100 * 1024 * 1024 * 1024);
let logging_memory_pool = spark_config.get_bool(COMET_DEBUG_MEMORY);

with_trace("createPlan", tracing_enabled, || {
// Init JVM classes
JVMClasses::init(&mut env);

Expand All @@ -186,15 +201,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let bytes = env.convert_byte_array(array)?;
let spark_plan = serde::deserialize_op(bytes.as_slice())?;

// Deserialize Spark configs
let array = unsafe { JPrimitiveArray::from_raw(serialized_spark_configs) };
let bytes = env.convert_byte_array(array)?;
let spark_configs = serde::deserialize_config(bytes.as_slice())?;

// Convert Spark configs to HashMap
let _spark_config_map: HashMap<String, String> =
spark_configs.entries.into_iter().collect();

let metrics = Arc::new(jni_new_global_ref!(env, metrics_node)?);

// Get the global references of input sources
Expand All @@ -221,6 +227,12 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
let memory_pool =
create_memory_pool(&memory_pool_config, task_memory_manager, task_attempt_id);

let memory_pool = if logging_memory_pool {
Arc::new(LoggingPool::new(task_attempt_id as u64, memory_pool))
} else {
memory_pool
};

// Get local directories for storing spill files
let local_dirs_array = JObjectArray::from_raw(local_dirs);
let num_local_dirs = env.get_array_length(&local_dirs_array)?;
Expand All @@ -238,7 +250,7 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
batch_size as usize,
memory_pool,
local_dirs,
max_temp_directory_size as u64,
max_temp_directory_size,
)?;

let plan_creation_time = start.elapsed();
Expand Down Expand Up @@ -274,10 +286,10 @@ pub unsafe extern "system" fn Java_org_apache_comet_Native_createPlan(
metrics_last_update_time: Instant::now(),
plan_creation_time,
session_ctx: Arc::new(session),
debug_native: debug_native == 1,
explain_native: explain_native == 1,
debug_native,
explain_native,
memory_pool_config,
tracing_enabled: tracing_enabled != JNI_FALSE,
tracing_enabled,
});

Ok(Box::into_raw(exec_context) as i64)
Expand Down
Loading
Loading