diff --git a/.github/workflows/regression.yml b/.github/workflows/regression.yml index 53c65c30..26871507 100644 --- a/.github/workflows/regression.yml +++ b/.github/workflows/regression.yml @@ -55,6 +55,10 @@ jobs: shell: bash run: python3 _utils/prep_solutions.py --task=join && source path.env && TEST_RUN=true ./run.sh + - name: Run mini Rollfun benchmark + shell: bash + run: python3 _utils/prep_solutions.py --task=rollfun && source path.env && TEST_RUN=true ./run.sh + - name: Validate benchmark results shell: bash run: ./_utils/validate_no_errors.sh diff --git a/README.md b/README.md index 47abcde9..c21b52ef 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,7 @@ Contribution and feedback are very welcome! - [x] groupby - [x] join - [x] groupby2014 + - [x] rollfun # Solutions diff --git a/_benchplot/benchplot-dict.R b/_benchplot/benchplot-dict.R index 14cb6964..e0f46faa 100644 --- a/_benchplot/benchplot-dict.R +++ b/_benchplot/benchplot-dict.R @@ -249,7 +249,7 @@ groupby.query.exceptions = {list( "arrow" = list("Expression row_number() <= 2L not supported in Arrow; pulling data into R" = "max v1 - min v2 by id3", "Expression cor(v1, v2, ... is not supported in arrow; pulling data into R" = "regression v1 v2 by id2 id4"), "duckdb" = list(), "duckdb-latest" = list(), - "datafusion" = list(), + "datafusion" = list() )} groupby.data.exceptions = {list( # exceptions as of run 1575727624 "data.table" = {list( @@ -468,7 +468,7 @@ join.data.exceptions = {list( "J1_1e9_NA_5_0","J1_1e9_NA_0_1") # q1 r1 )}, "polars" = {list( - "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1"), + "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1") )}, "arrow" = {list( "out of memory" = c("J1_1e9_NA_0_0","J1_1e9_NA_5_0","J1_1e9_NA_0_1", "J1_1e8_NA_0_0", "J1_1e8_NA_5_0", "J1_1e8_NA_0_1" )#, @@ -529,3 +529,97 @@ groupby2014.data.exceptions = {list( )} )} groupby2014.exceptions = task.exceptions(groupby2014.query.exceptions, groupby2014.data.exceptions) + +# rollfun ---- + +rollfun_q_title_fun = function(x) { + stopifnot(c("question","iquestion","out_rows","out_cols","in_rows") %in% names(x), + uniqueN(x, by="iquestion")==nrow(x)) + x[, sprintf("Query %s: \"%s\"", + iquestion, as.character(question)), + by = "iquestion"]$V1 +} +rollfun.syntax.dict = {list( + "data.table" = {c( + "mean" = "frollmean(x$v1, w)", + "window small" = "frollmean(x$v1, wsmall)", + "window big" = "frollmean(x$v1, wbig)", + "min" = "frollmin(x$v1, w)", + "median" = "frollmedian(x$v1, w)", + "multiroll" = "frollmean(list(x$v1, x$v2), c(w-50L, w+50L))", + "weighted" = "", + "uneven dense" = "frollmean(x$v1, frolladapt(x$id2, w), adaptive=TRUE)", + "uneven sparse" = "frollmean(x$v1, frolladapt(x$id3, w), adaptive=TRUE)", + "regression" = "" + )}, + "dplyr" = {c( + "mean" = "slide_mean(x$v1, before=w-1L, complete=TRUE)", + "window small" = "slide_mean(x$v1, before=wsmall-1L, complete=TRUE)", + "window big" = "slide_mean(x$v1, before=wbig-1L, complete=TRUE)", + "min" = "slide_min(x$v1, before=w-1L, complete=TRUE)", + "median" = "", + "multiroll" = "list(slide_mean(x$v1, before=w-51L, complete=TRUE), slide_mean(x$v1, before=w+49L, complete=TRUE), slide_mean(x$v2, before=w-51L, complete=TRUE), slide_mean(x$v2, before=w+49L, complete=TRUE))", + "weighted" = "", + "uneven dense" = "slide_index_mean(x$v1, i=x$id2, before=w-1L, complete=TRUE)", + "uneven sparse" = "slide_index_mean(x$v1, i=x$id3, before=w-1L, complete=TRUE)", + "regression" = "" + )}, + "pandas" = {c( + "mean" = "x['v1'].rolling(w).mean()", + "window small" = "x['v1'].rolling(wsmall).mean()", + "window big" = "x['v1'].rolling(wbig).mean()", + "min" = "x['v1'].rolling(w).min()", + "median" = "x['v1'].rolling(w).median()", + "multiroll" = "pd.concat([x[['v1','v2']].rolling(w-50).mean().reset_index(drop=True), x[['v1','v2']].rolling(w+50).mean().reset_index(drop=True)], axis=1)", + "weighted" = "", + "uneven dense" = "{y}.rolling('{w}s').mean()", + "uneven sparse" = "{y}.rolling('{w}s').mean()", + "regression" = "" + )}, + "spark" = {c( + "mean" = "select avg(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x", + "window small" = "select avg(v1) over (order by id1 rows between {wsmall-1} preceding and current row) as v1 from x", + "window big" = "select avg(v1) over (order by id1 rows between {wbig-1} preceding and current row) as v1 from x", + "min" = "select min(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x", + "median" = "select median(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x", + "multiroll" = "select avg(v1) over small as v1_small, avg(v1) over big as v1_big, avg(v2) over small as v2_small, avg(v2) over big as v2_big from x window small as (order by id1 rows between {w-51} preceding and current row), big as (order by id1 rows between {w+49} preceding and current row)", + "weighted" = "", + "uneven dense" = "select avg(v1) over (order by id2 range between {w-1} preceding and current row) as v1 from x", + "uneven sparse" = "select avg(v1) over (order by id3 range between {w-1} preceding and current row) as v1 from x", + "regression" = "" + )}, + "duckdb-latest" = {c( + "mean" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "window small" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {wsmall-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "window big" = "SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN {wbig-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "min" = "SELECT min(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "median" = "SELECT median(v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "multiroll" = "SELECT avg(v1) OVER small AS v1_small, avg(v1) OVER big AS v1_big, avg(v2) OVER small AS v2_small, avg(v2) OVER big AS v2_big FROM x WINDOW small AS (ORDER BY id1 ROWS BETWEEN w-51 PRECEDING AND CURRENT ROW), big AS (ORDER BY id1 ROWS BETWEEN w+49 PRECEDING AND CURRENT ROW)", + "weighted" = "", + "uneven dense" = "SELECT avg(v1) OVER (ORDER BY id2 RANGE BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "uneven sparse" = "SELECT avg(v1) OVER (ORDER BY id3 RANGE BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS v1 FROM x", + "regression" = "SELECT regr_r2(v2, v1) OVER (ORDER BY id1 ROWS BETWEEN {w-1} PRECEDING AND CURRENT ROW) AS r2 FROM x" + )} +)} +rollfun.query.exceptions = {list( + "data.table" = list("not yet implemented" = "weighted", "not yet implemented" = "regression"), + "dplyr" = list("not yet implemented" = "median", "not yet implemented" = "weighted", "not yet implemented" = "regression"), + "pandas" = list("not yet implemented" = "weighted", "not yet implemented" = "regression"), + "spark" = list("not yet implemented" = "median", "not yet implemented" = "weighted", "not yet implemented" = "regression"), + "duckdb-latest" = list("not yet implemented" = "weighted") +)} +rollfun.data.exceptions = {list( + "data.table" = {list( + )}, + "dplyr" = {list( + )}, + "pandas" = {list( + )}, + "spark" = {list( + "timeout" = c("R1_1e7_NA_0_1", "R1_1e8_NA_0_1") + )}, + "duckdb-latest" = {list( + "timeout" = c("R1_1e8_NA_0_1") + )} +)} +rollfun.exceptions = task.exceptions(rollfun.query.exceptions, rollfun.data.exceptions) diff --git a/_benchplot/benchplot.R b/_benchplot/benchplot.R index 040f1d6e..2f7725c1 100644 --- a/_benchplot/benchplot.R +++ b/_benchplot/benchplot.R @@ -365,7 +365,8 @@ benchplot = function( } margins(nsolutions, pending=pending) x[na_time_sec==FALSE, "max_time" := max(c(time1, time2)), by=c("solution","question")] - lim_x = tail(xlab_labels(max(c(0, x$max_time), na.rm=TRUE)), n=1L) + trunc5 = function(x) trunc(x*1e5)/1e5 + lim_x = tail(xlab_labels(trunc5(max(c(0, x$max_time), na.rm=TRUE))), n=1L) if (lim_x == 0) stop("internal error: lim x is c(0,0), this should be already escaped at the beginning with 'sum(x$na_time_sec)==nrow(x)'") # get bars Y coordinates, positions only, plot later in bar1 all_y_bars = barplot(rep(NA_real_, length(pad)), horiz=TRUE, xlim=c(0, lim_x), axes=FALSE, xpd=FALSE) diff --git a/_control/data.csv b/_control/data.csv index d68b271f..b63c952e 100644 --- a/_control/data.csv +++ b/_control/data.csv @@ -20,4 +20,7 @@ join,J1_1e7_NA_0_1,1e7,NA,0,1,1 join,J1_1e8_NA_0_0,1e8,NA,0,0,1 join,J1_1e8_NA_5_0,1e8,NA,5,0,1 join,J1_1e8_NA_0_1,1e8,NA,0,1,1 -join,J1_1e9_NA_0_0,1e9,NA,0,0,1 \ No newline at end of file +join,J1_1e9_NA_0_0,1e9,NA,0,0,1 +rollfun,R1_1e6_NA_0_1,1e6,NA,0,1,1 +rollfun,R1_1e7_NA_0_1,1e7,NA,0,1,1 +rollfun,R1_1e8_NA_0_1,1e8,NA,0,1,1 diff --git a/_control/questions.csv b/_control/questions.csv index f732c3bb..92f7f8ac 100644 --- a/_control/questions.csv +++ b/_control/questions.csv @@ -19,3 +19,13 @@ groupby2014,sum v1 by id1:id2,basic groupby2014,sum v1 mean v3 by id3,basic groupby2014,mean v1:v3 by id4,basic groupby2014,sum v1:v3 by id6,basic +rollfun,mean,basic +rollfun,window small,basic +rollfun,window big,basic +rollfun,min,basic +rollfun,median,basic +rollfun,multiroll,advanced +rollfun,weighted,advanced +rollfun,uneven dense,advanced +rollfun,uneven sparse,advanced +rollfun,regression,advanced diff --git a/_control/solutions.csv b/_control/solutions.csv index ac996de0..45872785 100644 --- a/_control/solutions.csv +++ b/_control/solutions.csv @@ -2,16 +2,20 @@ solution,task data.table,groupby data.table,join data.table,groupby2014 +data.table,rollfun dplyr,groupby dplyr,join dplyr,groupby2014 +dplyr,rollfun pandas,groupby pandas,join pandas,groupby2014 +pandas,rollfun pydatatable,groupby pydatatable,join spark,groupby spark,join +spark,rollfun dask,groupby dask,join juliadf,groupby @@ -28,5 +32,6 @@ duckdb,groupby duckdb,join duckdb-latest,groupby duckdb-latest,join +duckdb-latest,rollfun datafusion,groupby datafusion,join diff --git a/_control/timeout.csv b/_control/timeout.csv index b66414d7..b43b08f2 100644 --- a/_control/timeout.csv +++ b/_control/timeout.csv @@ -8,3 +8,6 @@ join,1e9,360 groupby2014,1e7,60 groupby2014,1e8,120 groupby2014,1e9,180 +rollfun,1e6,60 +rollfun,1e7,120 +rollfun,1e8,180 diff --git a/_data/rollfun-datagen.R b/_data/rollfun-datagen.R new file mode 100644 index 00000000..51c404e8 --- /dev/null +++ b/_data/rollfun-datagen.R @@ -0,0 +1,35 @@ +# Rscript _data/rollfun-datagen.R 1e6 0 0 1 +# Rscript _data/rollfun-datagen.R 1e7 0 0 1 +# Rscript _data/rollfun-datagen.R 1e8 0 0 1 + +args = commandArgs(TRUE) + +pretty_sci = function(x) { + tmp<-strsplit(as.character(x), "+", fixed=TRUE)[[1L]] + if(length(tmp)==1L) { + paste0(substr(tmp, 1L, 1L), "e", nchar(tmp)-1L) + } else if(length(tmp)==2L){ + paste0(tmp[1L], as.character(as.integer(tmp[2L]))) + } +} + +library(data.table) +N=as.integer(args[1L]); K=as.integer(args[2L]); nas=as.integer(args[3L]); sort=as.integer(args[4L]) +stopifnot(nas==0L, sort==1L) ## timeseries data always sorted +set.seed(108) +cat(sprintf("Producing data of %s rows, %s NAs ratio, %s sort flag\n", pretty_sci(N), nas, sort)) +DT = list() +DT[["id1"]] = seq.int(N) ## index, do we need it as POSIXct/IDate? +## uneven idx +DT[["id2"]] = sort(sample(N*1.1, N)) ## index dense +DT[["id3"]] = sort(sample(N*2, N)) ## index sparse +DT[["v1"]] = cumprod(rnorm(N, 1, 0.005)) ## more risky asset +DT[["v2"]] = cumprod(rnorm(N, 1, 0.001)) ## less risky asset +DT[["weights"]] = rnorm(n=N, m=1, sd=0.1) + +setDT(DT) +file = sprintf("R1_%s_NA_%s_%s.csv", pretty_sci(N), nas, sort) +cat(sprintf("Writing data to %s\n", file)) +fwrite(DT, file) +cat(sprintf("Data written to %s, quitting\n", file)) +if (!interactive()) quit("no", status=0) diff --git a/_launcher/solution.R b/_launcher/solution.R index c419a2c5..d8faa0a5 100755 --- a/_launcher/solution.R +++ b/_launcher/solution.R @@ -133,6 +133,8 @@ data.desc = function(task, nrow, k, na, sort) { prefix = "J1" } else if (task=="groupby2014") { prefix = "G0" + } else if (task=="rollfun") { + prefix = "R1" } else { stop("undefined task in solution.R data.desc function") } diff --git a/_report/history.Rmd b/_report/history.Rmd index 62e5074d..e3ac5182 100644 --- a/_report/history.Rmd +++ b/_report/history.Rmd @@ -108,6 +108,26 @@ plot(d, "data.table", 1e8, "join") plot(d, "data.table", 1e9, "join") ``` +#### rollfun {.tabset .tabset-fade .tabset-pills} + +##### 0.05 GB + +```{r datatable.rollfun.1e6} +plot(d, "data.table", 1e6, "rollfun") +``` + +##### 0.5 GB {.active} + +```{r datatable.rollfun.1e7} +plot(d, "data.table", 1e7, "rollfun") +``` + +##### 5 GB + +```{r datatable.rollfun.1e8} +plot(d, "data.table", 1e8, "rollfun") +``` + #### groupby2014 {.tabset .tabset-fade .tabset-pills} ##### 0.5 GB @@ -212,6 +232,26 @@ plot(d, "pandas", 1e8, "join") plot(d, "pandas", 1e9, "join") ``` +#### rollfun {.tabset .tabset-fade .tabset-pills} + +##### 0.05 GB + +```{r pandas.rollfun.1e6} +plot(d, "pandas", 1e6, "rollfun") +``` + +##### 0.5 GB {.active} + +```{r pandas.rollfun.1e7} +plot(d, "pandas", 1e7, "rollfun") +``` + +##### 5 GB + +```{r pandas.rollfun.1e8} +plot(d, "pandas", 1e8, "rollfun") +``` + #### groupby2014 {.tabset .tabset-fade .tabset-pills} ##### 0.5 GB @@ -274,6 +314,27 @@ plot(d, "dplyr", 1e8, "join") plot(d, "dplyr", 1e9, "join") ``` + +#### rollfun {.tabset .tabset-fade .tabset-pills} + +##### 0.05 GB + +```{r dplyr.rollfun.1e6} +plot(d, "dplyr", 1e6, "rollfun") +``` + +##### 0.5 GB {.active} + +```{r dplyr.rollfun.1e7} +plot(d, "dplyr", 1e7, "rollfun") +``` + +##### 5 GB + +```{r dplyr.rollfun.1e8} +plot(d, "dplyr", 1e8, "rollfun") +``` + #### groupby2014 {.tabset .tabset-fade .tabset-pills} ##### 0.5 GB @@ -378,6 +439,27 @@ plot(d, "spark", 1e8, "join") plot(d, "spark", 1e9, "join") ``` + +#### rollfun {.tabset .tabset-fade .tabset-pills} + +##### 0.05 GB + +```{r spark.rollfun.1e6} +plot(d, "spark", 1e6, "rollfun") +``` + +##### 0.5 GB {.active} + +```{r spark.rollfun.1e7} +plot(d, "spark", 1e7, "rollfun") +``` + +##### 5 GB + +```{r spark.rollfun.1e8} +plot(d, "spark", 1e8, "rollfun") +``` + ### juliadf {.tabset .tabset-fade .tabset-pills} #### groupby {.tabset .tabset-fade .tabset-pills} @@ -715,18 +797,24 @@ plot(d, "duckdb-latest", 1e8, "join") plot(d, "duckdb-latest", 1e9, "join") ``` -## Details +#### rollfun {.tabset .tabset-fade .tabset-pills} -### Environment +##### 0.05 GB -Nodename: `r .nodename` +```{r duckdb-latest.rollfun.1e6} +plot(d, "duckdb-latest", 1e6, "rollfun") +``` ------- +##### 0.5 GB {.active} -Report was generated on: `r format(Sys.time(), usetz=TRUE)`. +```{r duckdb-latest.rollfun.1e7} +plot(d, "duckdb-latest", 1e7, "rollfun") +``` -```{r status_set_success} -cat("history\n", file=get_report_status_file(), append=TRUE) +##### 5 GB + +```{r duckdb-latest.rollfun.1e8} +plot(d, "duckdb-latest", 1e8, "rollfun") ``` ### datafusion {.tabset .tabset-fade .tabset-pills} @@ -769,4 +857,18 @@ plot(d, "datafusion", 1e8, "join") ```{r datafusion.join.1e9} plot(d, "datafusion", 1e9, "join") -``` \ No newline at end of file +``` + +## Details + +### Environment + +Nodename: `r .nodename` + +------ + +Report was generated on: `r format(Sys.time(), usetz=TRUE)`. + +```{r status_set_success} +cat("history\n", file=get_report_status_file(), append=TRUE) +``` diff --git a/_report/index.Rmd b/_report/index.Rmd index e6526548..6553aad1 100644 --- a/_report/index.Rmd +++ b/_report/index.Rmd @@ -52,6 +52,7 @@ if (nrow(lld_unfinished)) { dt_groupby = lld[task=="groupby"][substr(data,1,2)=="G1"] dt_join = lld[task=="join"] dt_groupby2014 = lld[task=="groupby2014"] +dt_rollfun = lld[task=="rollfun"] ``` ```{r helpers} @@ -97,6 +98,11 @@ data_name = get_data_levels()[["join"]] loop_benchplot(dt_join, report_name="join", syntax.dict=join.syntax.dict, exceptions=join.exceptions, solution.dict=solution.dict, data_namev=data_name, q_groupv=c("basic"), title.txt.fun = header_title_fun, question.txt.fun = join_q_title_fun, cutoff = "spark", pending = "Modin") ``` +```{r report_rollfun, message=FALSE} +data_name = get_data_levels()[["rollfun"]] +loop_benchplot(dt_rollfun, report_name="rollfun", syntax.dict=rollfun.syntax.dict, exceptions=rollfun.exceptions, solution.dict=solution.dict, data_namev=data_name, q_groupv=c("basic","advanced"), title.txt.fun = header_title_fun, question.txt.fun = rollfun_q_title_fun, cutoff = "dplyr", pending = character()) +``` + ```{r report_groupby2014, message=FALSE} data_name = get_data_levels()[["groupby2014"]] loop_benchplot(dt_groupby2014, report_name="groupby2014", syntax.dict=groupby2014.syntax.dict, exceptions=groupby2014.exceptions, solution.dict=solution.dict, data_namev=data_name, q_groupv="basic", title.txt.fun = header_title_fun, question.txt.fun = groupby_q_title_fun, cutoff = "spark", pending = character()) @@ -175,6 +181,38 @@ loop_benchplot(dt_groupby2014, report_name="groupby2014", syntax.dict=groupby201 ![](./join/J1_1e9_NA_0_0_advanced.png) --> +### rollfun {.tabset .tabset-fade .tabset-pills} + +#### 0.05 GB + +##### **basic questions** + +![](./rollfun/R1_1e6_NA_0_1_basic.png) + +##### **advanced questions** + +![](./rollfun/R1_1e6_NA_0_1_advanced.png) + +#### 0.5 GB {.active} + +##### **basic questions** + +![](./rollfun/R1_1e7_NA_0_1_basic.png) + +##### **advanced questions** + +![](./rollfun/R1_1e7_NA_0_1_advanced.png) + +#### 5 GB + +##### **basic questions** + +![](./rollfun/R1_1e8_NA_0_1_basic.png) + +##### **advanced questions** + +![](./rollfun/R1_1e8_NA_0_1_advanced.png) + ### groupby2014 {.tabset .tabset-fade .tabset-pills} #### 0.5 GB @@ -240,6 +278,27 @@ rpivotTable::rpivotTable( ) ``` +### rollfun + +Timings are presented for a single dataset, no NAs (missing values). + +### rollfun timings + +```{r pivot_rollfunn} +sdcols = c("solution","question_group","question","data","in_rows","k","na","sorted","time_sec_1","time_sec_2","version","git","chk_time_sec_1","na_time_sec","out_rows","out_cols") +data = dt_rollfun[, .SD, .SDcols=sdcols] +rpivotTable::rpivotTable( + data, + rows = c("in_rows","k","sorted","question"), + cols = "solution", + aggregatorName = "Average", + vals = "time_sec_1", + height = "100%", + sorters = make_sorters(data), + unusedAttrsVertical = TRUE +) +``` + ### groupby2014 This task reflects precisely grouping benchmark made by Matt Dowle in 2014 [here](https://github.com/Rdatatable/data.table/wiki/Benchmarks-%3A-Grouping). Differences are well summarized in the following [post on Data Science stackexchange](https://datascience.stackexchange.com/a/40532/10588). diff --git a/_report/report.R b/_report/report.R index 0770a8f5..27a95528 100644 --- a/_report/report.R +++ b/_report/report.R @@ -21,7 +21,11 @@ get_data_levels = function() { in_rows = c("1e7","1e8","1e9") k_na_sort = "1e2_0_0" groupby2014 = paste("G0", paste(rep(in_rows, each=length(k_na_sort)), k_na_sort, sep="_"), sep="_") - list(groupby=groupby, join=join, groupby2014=groupby2014) + ## rollfun + in_rows = c("1e6","1e7","1e8") + k_na_sort = "NA_0_1" + rollfun = paste("R1", paste(rep(in_rows, each=length(k_na_sort)), k_na_sort, sep="_"), sep="_") + list(groupby=groupby, join=join, groupby2014=groupby2014, rollfun=rollfun) } get_excluded_batch = function() { c( @@ -40,7 +44,7 @@ load_time = function(path=getwd()) { time.csv = Sys.getenv("CSV_TIME_FILE","time.csv") fread(file.path(path,time.csv))[ !is.na(batch) & - in_rows %in% c(1e7, 1e8, 1e9) & + in_rows %in% c(1e6, 1e7, 1e8, 1e9) & solution %in% get_report_solutions() & !batch %in% get_excluded_batch() & !(task=="groupby" & substr(data, 1L, 2L)=="G2") & @@ -186,7 +190,7 @@ ftdata = function(x, task) { ans[as.logical(as.integer(x))] = "pre-sorted data" ans } - if (all(task %in% c("groupby","join","groupby2014"))) { + if (all(task %in% c("groupby","join","groupby2014","rollfun"))) { y = strsplit(as.character(x), "_", fixed = TRUE) y = lapply(y, function(yy) {yy[yy=="NA"] = NA_character_; yy}) in_rows=ft(sapply(y, `[`, 2L)) @@ -243,7 +247,7 @@ transform = function(ld) { # all ---- time_logs = function(path=getwd()) { - ct = clean_time(load_time(path=getwd())) + ct = clean_time(load_time(path=path)) d = model_time(ct) l = model_logs(clean_logs(load_logs(path=path))) q = model_questions(clean_questions(load_questions(path=path))) diff --git a/_utils/answers-validation.R b/_utils/answers-validation.R index 0d48d127..fec89597 100644 --- a/_utils/answers-validation.R +++ b/_utils/answers-validation.R @@ -1,4 +1,4 @@ -source("report.R") +source("_report/report.R") d = time_logs() # this script meant to detect some inconsistencies within a solution results and between solutions results diff --git a/_utils/generate-data-small.sh b/_utils/generate-data-small.sh index c67e60d5..d9c2eee4 100755 --- a/_utils/generate-data-small.sh +++ b/_utils/generate-data-small.sh @@ -1,9 +1,10 @@ -# Data generation data for groupby 0.5GB +# Data generation data for groupby and join 0.5GB, rollfun 50MB mkdir data cd data/ Rscript ../_data/groupby-datagen.R 1e7 1e2 0 0 -Rscript ../_data/join-datagen.R 1e7 0 0 0 +Rscript ../_data/join-datagen.R 1e7 NA 0 0 +Rscript ../_data/rollfun-datagen.R 1e6 NA 0 1 cd .. # don't publish, we dont even have the keys @@ -15,3 +16,4 @@ mv _control/data.csv _control/data.csv.original echo "task,data,nrow,k,na,sort,active" > _control/data.csv echo "groupby,G1_1e7_1e2_0_0,1e7,1e2,0,0,1" >> _control/data.csv echo "join,J1_1e7_NA_0_0,1e7,NA,0,0,1" >> _control/data.csv +echo "rollfun,R1_1e6_NA_0_1,1e6,NA,0,1,1" >> _control/data.csv diff --git a/_utils/prep_solutions.py b/_utils/prep_solutions.py index 868c3c23..40438e12 100755 --- a/_utils/prep_solutions.py +++ b/_utils/prep_solutions.py @@ -9,7 +9,7 @@ def print_usage(): - print("Usage: python3 _utils/prep_solutions.py --task=[groupby|join]") + print("Usage: python3 _utils/prep_solutions.py --task=[groupby|join|rollfun]") exit(1) def parse_args(): @@ -19,7 +19,7 @@ def parse_args(): task = arg.replace("--task=", "") else: print_usage() - if task == None or (task != "groupby" and task != "join"): + if task == None or (task != "groupby" and task != "join" and task != "rollfun"): print_usage() return task @@ -44,4 +44,4 @@ def get_solutions(task): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/_utils/rollfun-ans-validation.txt b/_utils/rollfun-ans-validation.txt new file mode 100644 index 00000000..d9f4b27f --- /dev/null +++ b/_utils/rollfun-ans-validation.txt @@ -0,0 +1,60 @@ +# data.table dplyr +ans[(w-2):(w+2)] +tail(ans, 3) + +# duckdb +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID BETWEEN 1000-2-1 AND 1000+2-1") +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4") + +# pandas +ans.head(w+2).tail(5) +ans.tail(3) + +# spark +ans.head(w+2)[-5:] +ans.tail(3) + +# q6 +data.table::as.data.table(ans)[(w-52):(w-48)] +data.table::as.data.table(ans)[(w+48):(w+52)] +tail(data.table::as.data.table(ans), 3) + +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID BETWEEN 1000-50-2-1 AND 1000-50+2-1") +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID BETWEEN 1000+50-2-1 AND 1000+50+2-1") +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4") + +ans.head(w-50+2).tail(5) +ans.head(w+50+2).tail(5) +ans.tail(3) + +ans.head(w-50+2)[-5:] +ans.head(w+50+2)[-5:] +ans.tail(3) + +# q8 +ans[900:904] +tail(ans, 3) + +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID BETWEEN 900-1 AND 904-1") +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4") + +ans.head(904).tail(5) +ans.tail(3) + +ans.head(904)[-5:] +ans.tail(3) + +# q9 +ans[493:497] +tail(ans, 3) + +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID BETWEEN 493-1 AND 497-1") +dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4") + +ans.head(497).tail(5) +ans.tail(3) + +# spark +ans.head(497)[-5:] +ans.tail(3) + diff --git a/datatable/rollfun-datatable.R b/datatable/rollfun-datatable.R new file mode 100755 index 00000000..937072e5 --- /dev/null +++ b/datatable/rollfun-datatable.R @@ -0,0 +1,216 @@ +#!/usr/bin/env Rscript + +if (!dir.exists("./datatable/r-datatable-rollmedian/data.table")) { + cat("# data.table adapt branch library does not exist, installing\n") + #stopifnot(requireNamespace("remotes", quietly=TRUE)) + dir.create("./datatable/r-datatable-rollmedian", showWarnings=FALSE) + #remotes::install_github("Rdatatable/data.table@rollmedian", force=TRUE, lib="./datatable/r-datatable-rollmedian") + ## https://github.com/duckdblabs/db-benchmark/actions/runs/5585092483/job/15159770868 + ## install_github fails from GH Actions with HTTP error 401, therefore use standard R repo instead + install.packages("data.table", repos="https://jangorecki.github.io/data.table-rollmedian", lib="./datatable/r-datatable-rollmedian") +} + +cat("# rollfun-datatable.R\n") + +source("./_helpers/helpers.R") + +suppressPackageStartupMessages(library("data.table", lib.loc="./datatable/r-datatable-rollmedian")) +setDTthreads(0L) +## till rollmedian branch is not yet merged to master we need extra trickery so DT version/git between logs.csv and time.csv matches +if (FALSE) { + # use this when rollmedian branch will be merged to master + ver = packageVersion("data.table") + git = data.table:::.git(quiet=TRUE) +} else { + f = Sys.getenv("CSV_LOGS_FILE", "logs.csv") + if (file.exists(f)) { + l = fread(f)[.N] + if (nrow(l)==1L && identical(l$solution, "data.table") && identical(l$action, "start")) { + ver = l$version + git = l$git + } else { + # possibly run interactively + ver = NA_character_ + git = "" + } + rm(f, l) + } else { + ver = NA_character_ + git = "" + } +} +task = "rollfun" +solution = "data.table" +cache = TRUE +on_disk = FALSE + +data_name = Sys.getenv("SRC_DATANAME") +src_grp = file.path("data", paste(data_name, "csv", sep=".")) +cat(sprintf("loading dataset %s\n", data_name)) + +x = fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.strings="") +print(nrow(x)) + +# window size +w = nrow(x)/1e3 +wsmall = nrow(x)/1e4 +wbig = nrow(x)/1e2 + +task_init = proc.time()[["elapsed"]] +cat("rolling...\n") + +fun = "frollmean" + +question = "mean" # q1 +t = system.time(print(length(ans<-frollmean(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "window small" # q2 +t = system.time(print(length(ans<-frollmean(x$v1, wsmall))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(x$v1, wsmall))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "window big" # q3 +t = system.time(print(length(ans<-frollmean(x$v1, wbig))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(x$v1, wbig))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +fun = "frollmin" + +question = "min" # q4 +t = system.time(print(length(ans<-frollmin(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmin(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +fun = "frollmedian" + +question = "median" # q5 +t = system.time(print(length(ans<-frollmedian(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun="frollmedian", time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmedian(x$v1, w))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun="frollmedian", time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +fun = "frollmean" + +question = "multiroll" # q6 +t = system.time(print(length(ans<-frollmean(list(x$v1, x$v2), c(w-50L, w+50L)))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-lapply(ans, sum, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans[[1L]]), out_cols=length(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(list(x$v1, x$v2), c(w-50L, w+50L)))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-lapply(ans, sum, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans[[1L]]), out_cols=length(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(lapply(ans, head, 3)) +print(lapply(ans, tail, 3)) +rm(ans) + +#question = "weighted" # q7 ## not yet implemeneted +#t = system.time(print(length(ans<-frollmean(x$v1, w, w=x$weights))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#rm(ans) +##t = system.time(print(length(ans<-frollmean(x$v1, w, w=x$weights))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(head(ans, 3)) +#print(tail(ans, 3)) +#rm(ans) + +fun = "frollmean" + +question = "uneven dense" # q8 +t = system.time(print(length(ans<-frollmean(x$v1, frolladapt(x$id2, w), adaptive=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(x$v1, frolladapt(x$id2, w), adaptive=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "uneven sparse" # q9 +t = system.time(print(length(ans<-frollmean(x$v1, frolladapt(x$id3, w), adaptive=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-frollmean(x$v1, frolladapt(x$id3, w), adaptive=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +#fun = "frollreg" + +#question = "regression" # q10 ## not yet implemeneted +#t = system.time(print(length(ans<-frollreg(list(x$v1, x$v2), w))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#rm(ans) +#t = system.time(print(length(ans<-frollreg(list(x$v1, x$v2), w))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(head(ans, 3)) +#print(tail(ans, 3)) +#rm(ans) + +cat(sprintf("rolling finished, took %.0fs\n", proc.time()[["elapsed"]]-task_init)) + +if( !interactive() ) q("no", status=0) diff --git a/dplyr/rollfun-dplyr.R b/dplyr/rollfun-dplyr.R new file mode 100755 index 00000000..8a7caf32 --- /dev/null +++ b/dplyr/rollfun-dplyr.R @@ -0,0 +1,191 @@ +#!/usr/bin/env Rscript + +cat("# rollfun-dplyr (slider).R\n") + +source("./_helpers/helpers.R") + +stopifnot(requireNamespace(c("bit64","data.table"), quietly=TRUE)) # used in chk to sum numeric columns and data loading +.libPaths("./dplyr/r-dplyr") # tidyverse/dplyr#4641 +suppressPackageStartupMessages(library("dplyr", lib.loc="./dplyr/r-dplyr", warn.conflicts=FALSE)) +suppressPackageStartupMessages(library("slider", lib.loc="./dplyr/r-dplyr", warn.conflicts=FALSE)) +ver = packageVersion("dplyr") +git = "" # uses stable version now #124 +task = "rollfun" +solution = "dplyr" ## could use tidyverse instead if we rename dplyr to tidyverse: duckdblabs/db-benchmark/pull/9#issuecomment-1610065100 +cache = TRUE +on_disk = FALSE + +data_name = Sys.getenv("SRC_DATANAME") +src_grp = file.path("data", paste(data_name, "csv", sep=".")) +cat(sprintf("loading dataset %s\n", data_name)) + +x = as_tibble(data.table::fread(src_grp, showProgress=FALSE, stringsAsFactors=TRUE, na.strings="", data.table=FALSE)) +print(nrow(x)) + +# window size +w = nrow(x)/1e3L +wsmall = nrow(x)/1e4L +wbig = nrow(x)/1e2L + +task_init = proc.time()[["elapsed"]] +cat("rolling...\n") + +question = "mean" # q1 +fun = "slide_mean" +t = system.time(print(length(ans<-slide_mean(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_mean(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "window small" # q2 +fun = "slide_mean" +t = system.time(print(length(ans<-slide_mean(x$v1, before=wsmall-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_mean(x$v1, before=wsmall-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "window big" # q3 +fun = "slide_mean" +t = system.time(print(length(ans<-slide_mean(x$v1, before=wbig-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_mean(x$v1, before=wbig-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "min" # q4 +fun = "slide_min" +t = system.time(print(length(ans<-slide_min(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_min(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +#question = "median" # q5 ## not yet implemeneted +#fun = "slide_median" +#t = system.time(print(length(ans<-slide_median(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun="frollmedian", time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#rm(ans) +#t = system.time(print(length(ans<-slide_median(x$v1, before=w-1L, complete=TRUE))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun="frollmedian", time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(head(ans, 3)) +#print(tail(ans, 3)) +#rm(ans) + +question = "multiroll" # q6 +fun = "slide_mean" +t = system.time(print(length(ans<-list( + slide_mean(x$v1, before=w-51L, complete=TRUE), slide_mean(x$v1, before=w+49L, complete=TRUE), + slide_mean(x$v2, before=w-51L, complete=TRUE), slide_mean(x$v2, before=w+49L, complete=TRUE) +))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-lapply(ans, sum, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans[[1L]]), out_cols=length(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-list( + slide_mean(x$v1, before=w-51L, complete=TRUE), slide_mean(x$v1, before=w+49L, complete=TRUE), + slide_mean(x$v2, before=w-51L, complete=TRUE), slide_mean(x$v2, before=w+49L, complete=TRUE) +))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-lapply(ans, sum, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans[[1L]]), out_cols=length(ans), solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(lapply(ans, head, 3)) +print(lapply(ans, tail, 3)) +rm(ans) + +#question = "weighted" # q7 ## not yet implemeneted +#fun = "slide_mean" +#t = system.time(print(length(ans<-slide_mean(x$v1, before=w-1L, complete=TRUE, w=x$weights))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#rm(ans) +#t = system.time(print(length(ans<-slide_mean(x$v1, before=w-1L, complete=TRUE, w=x$weights))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(head(ans, 3)) +#print(tail(ans, 3)) +#rm(ans) + +question = "uneven dense" # q8 +fun = "slide_index_mean" +t = system.time(print(length(ans<-slide_index_mean(x$v1, i=x$id2, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_index_mean(x$v1, i=x$id2, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +question = "uneven sparse" # q9 +fun = "slide_index_mean" +t = system.time(print(length(ans<-slide_index_mean(x$v1, i=x$id3, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +rm(ans) +t = system.time(print(length(ans<-slide_index_mean(x$v1, i=x$id3, before=w-1L, complete=TRUE))))[["elapsed"]] +m = memory_usage() +chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(head(ans, 3)) +print(tail(ans, 3)) +rm(ans) + +#question = "regression" # q10 ## Killed, UDF simply does not scale, needs to be specialized fun +#fun = "slide" +#t = system.time(print(length(ans<-slide(select(x, v1, v2), ~lm(v2 ~ v1, data=.x), .before=w-1L, .complete=TRUE))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#rm(ans) +#t = system.time(print(length(ans<-slide(select(x, v1, v2), ~lm(v2 ~ v1, data=.x), .before=w-1L, .complete=TRUE))))[["elapsed"]] +#m = memory_usage() +#chkt = system.time(chk<-sum(ans, na.rm=TRUE))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=nrow(x), question=question, out_rows=length(ans), out_cols=1L, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(head(ans, 3)) +#print(tail(ans, 3)) +#rm(ans) + +cat(sprintf("rolling finished, took %.0fs\n", proc.time()[["elapsed"]]-task_init)) + +if( !interactive() ) q("no", status=0) diff --git a/dplyr/setup-dplyr.sh b/dplyr/setup-dplyr.sh index 076fafb7..474a7581 100755 --- a/dplyr/setup-dplyr.sh +++ b/dplyr/setup-dplyr.sh @@ -4,3 +4,15 @@ set -e # install stable dplyr mkdir -p ./dplyr/r-dplyr Rscript -e 'install.packages("dplyr", lib="./dplyr/r-dplyr")' +Rscript -e 'install.packages("slider", lib="./dplyr/r-dplyr")' ## rollfun +Rscript -e 'install.packages("data.table", lib="./dplyr/r-dplyr")' ## data load +Rscript -e 'install.packages("rlang", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("R6", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("glue", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("lifecycle", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("magrittr", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("cli", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("fansi", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("utf8", lib="./dplyr/r-dplyr")' ## seems also be needed +Rscript -e 'install.packages("pkgconfig", lib="./dplyr/r-dplyr")' ## seems also be needed + diff --git a/duckdb-latest/rollfun-duckdb-latest.R b/duckdb-latest/rollfun-duckdb-latest.R new file mode 100755 index 00000000..fcaa80c6 --- /dev/null +++ b/duckdb-latest/rollfun-duckdb-latest.R @@ -0,0 +1,301 @@ +#!/usr/bin/env Rscript + +cat("# rollfun-duckdb-latest.R\n") + +source("./_helpers/helpers.R") + +suppressPackageStartupMessages({ + library("DBI", lib.loc="./duckdb-latest/r-duckdb-latest", warn.conflicts=FALSE) + library("duckdb", lib.loc="./duckdb-latest/r-duckdb-latest", warn.conflicts=FALSE) +}) +ver = packageVersion("duckdb") +#git = "" # set up later on after connecting to db +task = "rollfun" +solution = "duckdb-latest" +fun = "over" +cache = TRUE + +data_name = Sys.getenv("SRC_DATANAME") +src_grp = file.path("data", paste(data_name, "csv", sep=".")) +cat(sprintf("loading dataset %s\n", data_name)) + +on_disk = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][2L])>=1e9 +uses_NAs = as.numeric(strsplit(data_name, "_", fixed=TRUE)[[1L]][4L])>0 +if (on_disk) { + print("using disk memory-mapped data storage") + con = dbConnect(duckdb::duckdb(), dbdir=tempfile()) +} else { + print("using in-memory data storage") + con = dbConnect(duckdb::duckdb()) +} + +ncores = parallel::detectCores() +invisible(dbExecute(con, sprintf("PRAGMA THREADS=%d", ncores))) +invisible(dbExecute(con, "SET experimental_parallel_csv=true;")) +git = dbGetQuery(con, "SELECT source_id FROM pragma_version()")[[1L]] + +# first create and ingest the table. +invisible(dbExecute(con, "CREATE TABLE y(id1 INT, id2 INT, id3 INT, v1 FLOAT, v2 FLOAT, weights FLOAT)")) +invisible(dbExecute(con, sprintf("COPY y FROM '%s' (AUTO_DETECT TRUE)", src_grp))) + +# no enums in table, also as of now no NULLs in data, so uses_NAs handling not needed +invisible(dbExecute(con, "ALTER TABLE y RENAME TO x")) + +print(in_nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM x")$cnt) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) + +# window size +w = in_nr/1e3 +wsmall = in_nr/1e4 +wbig = in_nr/1e2 + +task_init = proc.time()[["elapsed"]] +cat("rolling...\n") + +question = "mean" # q1 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) ## due to https://github.com/duckdb/duckdb/discussions/8340 note that rowid is 0-based thus w-1 +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "window small" # q2 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", wsmall-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", wsmall-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", wsmall-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "window big" # q3 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", wbig-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", wbig-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", wbig-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "min" # q4 +sql = sprintf("CREATE TABLE ans AS SELECT MIN(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "median" # q5 +sql = sprintf("CREATE TABLE ans AS SELECT MEDIAN(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "multiroll" # q6 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER small AS v1_small, avg(v1) OVER big AS v1_big, avg(v2) OVER small AS v2_small, avg(v2) OVER big AS v2_big FROM x WINDOW small AS (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW), big AS (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW)", w-51L, w+49L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1_small = NULL, v2_small = NULL WHERE ROWID < %d", w-51L))) +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1_big = NULL, v2_big = NULL WHERE ROWID < %d", w+49L))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1_small) AS v1_small, sum(v1_big) AS v1_big, sum(v2_small) AS v2_small, sum(v2_big) AS v2_big FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1_small = NULL, v2_small = NULL WHERE ROWID < %d", w-51L))) +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1_big = NULL, v2_big = NULL WHERE ROWID < %d", w+49L))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1_small) AS v1_small, sum(v1_big) AS v1_big, sum(v2_small) AS v2_small, sum(v2_big) AS v2_big FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +#question = "weighted" # q7 ## not yet implemented +#sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +#t = system.time({ +# dbExecute(con, sql) +# print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +#})[["elapsed"]] +#m = memory_usage() +#invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +#chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +#write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +#t = system.time({ +# dbExecute(con, sql) +# print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +#})[["elapsed"]] +#m = memory_usage() +#invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID < %d", w-1))) +#chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +#write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +#print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +#invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +#rm(sql) + +question = "uneven dense" # q8 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id2 RANGE BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID <= (SELECT max(ROWID) FROM x WHERE id2 < %d)", w))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID <= (SELECT max(ROWID) FROM x WHERE id2 < %d)", w))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "uneven sparse" # q9 +sql = sprintf("CREATE TABLE ans AS SELECT avg(v1) OVER (ORDER BY id3 RANGE BETWEEN %d PRECEDING AND CURRENT ROW) AS v1 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID <= (SELECT max(ROWID) FROM x WHERE id3 < %d)", w))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET v1 = NULL WHERE ROWID <= (SELECT max(ROWID) FROM x WHERE id3 < %d)", w))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(v1) AS v1 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +question = "regression" # q10 +sql = sprintf("CREATE TABLE ans AS SELECT regr_r2(v2, v1) OVER (ORDER BY id1 ROWS BETWEEN %d PRECEDING AND CURRENT ROW) AS r2 FROM x", w-1L) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET r2 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(r2) AS r2 FROM ans"))[["elapsed"]] +write.log(run=1L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +t = system.time({ + dbExecute(con, sql) + print(c(nr<-dbGetQuery(con, "SELECT count(*) AS cnt FROM ans")$cnt, nc<-ncol(dbGetQuery(con, "SELECT * FROM ans LIMIT 0")))) +})[["elapsed"]] +m = memory_usage() +invisible(dbSendQuery(con, sprintf("UPDATE ans SET r2 = NULL WHERE ROWID < %d", w-1))) +chkt = system.time(chk<-dbGetQuery(con, "SELECT sum(r2) AS r2 FROM ans"))[["elapsed"]] +write.log(run=2L, task=task, data=data_name, in_rows=in_nr, question=question, out_rows=nr, out_cols=nc, solution=solution, version=ver, git=git, fun=fun, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(dbGetQuery(con, "SELECT * FROM ans LIMIT 3")) ## head +print(dbGetQuery(con, "SELECT * FROM ans WHERE ROWID > (SELECT count(*) FROM ans) - 4")) ## tail +invisible(dbExecute(con, "DROP TABLE IF EXISTS ans")) +rm(sql) + +invisible(dbDisconnect(con, shutdown=TRUE)) + +cat(sprintf("rolling finished, took %.0fs\n", proc.time()[["elapsed"]]-task_init)) + +if( !interactive() ) q("no", status=0) diff --git a/pandas/rollfun-pandas.py b/pandas/rollfun-pandas.py new file mode 100755 index 00000000..2ffa23d0 --- /dev/null +++ b/pandas/rollfun-pandas.py @@ -0,0 +1,328 @@ +#!/usr/bin/env python3 + +print("# rollfun-pandas.py", flush=True) + +import os +import gc +import sys +import timeit +import pandas as pd +import numpy as np ## q8 q9 + +exec(open("./_helpers/helpers.py").read()) + +ver = pd.__version__ +git = pd.__git_version__ +task = "rollfun" +solution = "pandas" +fun = ".rolling" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ['SRC_DATANAME'] +src_grp = os.path.join("data", data_name+".csv") +print("loading dataset %s" % data_name, flush=True) + +na_flag = int(data_name.split("_")[3]) +if na_flag > 0: + print("skip due to na_flag>0: #171", flush=True, file=sys.stderr) + exit(0) # not yet implemented #171 + +x = pd.read_csv(src_grp) + +print(len(x.index), flush=True) + +# window size +w = int(len(x.index)/1e3) +wsmall = int(len(x.index)/1e4) +wbig = int(len(x.index)/1e2) + +task_init = timeit.default_timer() +print("rolling...", flush=True) + +question = "mean" # q1 +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "window small" # q2 +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(wsmall).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(wsmall).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "window big" # q3 +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(wbig).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(wbig).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "min" # q4 +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).min() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).min() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "median" # q5 +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).median() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = x['v1'].rolling(w).median() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +question = "multiroll" # q6 +gc.collect() +t_start = timeit.default_timer() +ans = pd.concat([x[['v1','v2']].rolling(w-50).mean().reset_index(drop=True), x[['v1','v2']].rolling(w+50).mean().reset_index(drop=True)], axis=1).set_axis(['v1_small', 'v2_small', 'v1_big', 'v2_big'], axis=1) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans = ans[['v1_small','v1_big','v2_small','v2_big']] +t_start = timeit.default_timer() +chk = [ans['v1_small'].sum(), ans['v1_big'].sum(), ans['v2_small'].sum(), ans['v2_big'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = pd.concat([x[['v1','v2']].rolling(w-50).mean().reset_index(drop=True), x[['v1','v2']].rolling(w+50).mean().reset_index(drop=True)], axis=1).set_axis(['v1_small', 'v2_small', 'v1_big', 'v2_big'], axis=1) +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans = ans[['v1_small','v1_big','v2_small','v2_big']] +t_start = timeit.default_timer() +chk = [ans['v1_small'].sum(), ans['v1_big'].sum(), ans['v2_small'].sum(), ans['v2_big'].sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans + +#question = "weighted" # q7 ## not yet implemented, tried with apply but seems to need to pass whole df to lamba which will be very slow +#gc.collect() +#ans = x['v1'].rolling(w).mean() +#t_start = timeit.default_timer() +#print(ans.shape, flush=True) +#t = timeit.default_timer() - t_start +#m = memory_usage() +#t_start = timeit.default_timer() +#chk = [ans.sum()] +#chkt = timeit.default_timer() - t_start +#write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#del ans +#gc.collect() +#t_start = timeit.default_timer() +#ans = x['v1'].rolling(w).mean() +#t_start = timeit.default_timer() +#print(ans.shape, flush=True) +#t = timeit.default_timer() - t_start +#m = memory_usage() +#t_start = timeit.default_timer() +#chk = [ans.sum()] +#chkt = timeit.default_timer() - t_start +#write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(ans.head(3), flush=True) +#print(ans.tail(3), flush=True) +#del ans + +question = "uneven dense" # q8 +## we do not include below pre/post-processing in query timing because pandas has rich feature support related to unevenly spaced time series +## it just doesn't seem to support the most basic and most generic integer based index. and we do want to stick most generic approach for portability +y = x.set_axis(pd.to_timedelta(x['id2'], unit='s'))[['v1']] +ws = f'{w}s' +gc.collect() +t_start = timeit.default_timer() +ans = y.rolling(ws).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans.reset_index(drop=True, inplace=True) +ans.iloc[:x[x.id2 >= w].index[0]] = np.nan +ans = ans['v1'] +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = y.rolling(ws).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans.reset_index(drop=True, inplace=True) +ans.iloc[:x[x.id2 >= w].index[0]] = np.nan +ans = ans['v1'] +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans, y + +question = "uneven sparse" # q9 +## we do not include below pre/post-processing in query timing because pandas has rich feature support related to unevenly spaced time series +## it just doesn't seem to support the most basic and most generic integer based index. and we do want to stick most generic approach for portability +y = x.set_axis(pd.to_timedelta(x['id3'], unit='s'))[['v1']] +ws = f'{w}s' +gc.collect() +t_start = timeit.default_timer() +ans = y.rolling(ws).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans.reset_index(drop=True, inplace=True) +ans.iloc[:x[x.id3 >= w].index[0]] = np.nan +ans = ans['v1'] +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +del ans +gc.collect() +t_start = timeit.default_timer() +ans = y.rolling(ws).mean() +print(ans.shape, flush=True) +t = timeit.default_timer() - t_start +m = memory_usage() +ans.reset_index(drop=True, inplace=True) +ans.iloc[:x[x.id3 >= w].index[0]] = np.nan +ans = ans['v1'] +t_start = timeit.default_timer() +chk = [ans.sum()] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=1, solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +del ans, y + +#question = "regression" # q10 ## not implemeneted +#gc.collect() +#t_start = timeit.default_timer() +#ans = x['v1'].rolling(w).mean() +#print(ans.shape, flush=True) +#t = timeit.default_timer() - t_start +#m = memory_usage() +#t_start = timeit.default_timer() +#chk = [ans.sum()] +#chkt = timeit.default_timer() - t_start +#write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#del ans +#gc.collect() +#t_start = timeit.default_timer() +#ans = x['v1'].rolling(w).mean() +#print(ans.shape, flush=True) +#t = timeit.default_timer() - t_start +#m = memory_usage() +#t_start = timeit.default_timer() +#chk = [ans.sum()] +#chkt = timeit.default_timer() - t_start +#write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +#print(ans.head(3), flush=True) +#print(ans.tail(3), flush=True) +#del ans + +print("rolling finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0) diff --git a/spark/rollfun-spark.py b/spark/rollfun-spark.py new file mode 100755 index 00000000..b6208591 --- /dev/null +++ b/spark/rollfun-spark.py @@ -0,0 +1,361 @@ +#!/usr/bin/env python3 + +print("# rollfun-spark.py", flush=True) + +import os +import gc +import timeit +import pyspark +from pyspark.sql import SparkSession + +exec(open("./_helpers/helpers.py").read()) + +ver = pyspark.__version__ +git = "" # won't fix: https://issues.apache.org/jira/browse/SPARK-16864 +task = "rollfun" +solution = "spark" +fun = "over" +cache = "TRUE" +on_disk = "FALSE" + +data_name = os.environ['SRC_DATANAME'] +src_grp = os.path.join("data", data_name+".csv") +print("loading dataset %s" % data_name, flush=True) + +mem_usage = "100g" +if "TEST_RUN" in os.environ: + mem_usage = "2g" + +from pyspark.conf import SparkConf +spark = SparkSession.builder \ + .master("local[*]") \ + .appName("rollfun-spark") \ + .config("spark.executor.memory", mem_usage) \ + .config("spark.driver.memory", mem_usage) \ + .config("spark.python.worker.memory", mem_usage) \ + .config("spark.driver.maxResultSize", mem_usage) \ + .config("spark.network.timeout", "2400") \ + .config("spark.executor.heartbeatInterval", "1200") \ + .config("spark.ui.showConsoleProgress", "false") \ + .getOrCreate() +#print(spark.sparkContext._conf.getAll(), flush=True) + +x = spark.read.csv(src_grp, header=True, inferSchema='true').persist(pyspark.StorageLevel.MEMORY_ONLY) + +print(x.count(), flush=True) + +x.createOrReplaceTempView("x") + +# window size +w = int(x.count()/1e3) +wsmall = int(x.count()/1e4) +wbig = int(x.count()/1e2) + +task_init = timeit.default_timer() +print("rolling...", flush=True) + +question = "mean" # q1 +sql0 = f'select id1, avg(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x' +sql = f'select case when id1<{w} then null else v1 end as v1 from ans' ## handling partial window? https://stackoverflow.com/q/76799677/2490497 +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +question = "window small" # q2 +sql0 = f'select id1, avg(v1) over (order by id1 rows between {wsmall-1} preceding and current row) as v1 from x' +sql = f'select case when id1<{wsmall} then null else v1 end as v1 from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +question = "window big" # q3 +sql0 = f'select id1, avg(v1) over (order by id1 rows between {wbig-1} preceding and current row) as v1 from x' +sql = f'select case when id1<{wbig} then null else v1 end as v1 from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +question = "min" # q4 +sql0 = f'select id1, min(v1) over (order by id1 rows between {w-1} preceding and current row) as v1 from x' +sql = f'select case when id1<{w} then null else v1 end as v1 from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +#question = "median" # q5 ## https://stackoverflow.com/q/76760672/2490497 + +question = "multiroll" # q6 +sql0 = f'select id1, avg(v1) over small as v1_small, avg(v1) over big as v1_big, avg(v2) over small as v2_small, avg(v2) over big as v2_big from x window small as (order by id1 rows between {w-51} preceding and current row), big as (order by id1 rows between {w+49} preceding and current row)' +sql = f'select case when id1<{w-50} then null else v1_small end as v1_small, case when id1<{w+50} then null else v1_big end as v1_big, case when id1<{w-50} then null else v2_small end as v2_small, case when id1<{w+50} then null else v2_big end as v2_big from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = spark.sql("select sum(v1_small) as v1_small, sum(v1_big) as v1_big, sum(v2_small) as v2_small, sum(v2_big) as v2_big from ans").collect()[0].asDict().values() +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = spark.sql("select sum(v1_small) as v1_small, sum(v1_big) as v1_big, sum(v2_small) as v2_small, sum(v2_big) as v2_big from ans").collect()[0].asDict().values() +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +#question = "weighted" # q7 ## not yet implemented + +question = "uneven dense" # q8 +sql0 = f'select id2, avg(v1) over (order by id2 range between {w-1} preceding and current row) as v1 from x' +sql = f'select case when id2<{w} then null else v1 end as v1 from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +question = "uneven sparse" # q9 +sql0 = f'select id3, avg(v1) over (order by id3 range between {w-1} preceding and current row) as v1 from x' +sql = f'select case when id3<{w} then null else v1 end as v1 from ans' +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo +gc.collect() +t_start = timeit.default_timer() +ans0 = spark.sql(sql0).persist(pyspark.StorageLevel.MEMORY_ONLY) +print((ans0.count(), len(ans0.columns)), flush=True) # shape +t = timeit.default_timer() - t_start +m = memory_usage() +ans0.createOrReplaceTempView("ans") +ans = spark.sql(sql).persist(pyspark.StorageLevel.MEMORY_ONLY) +ansdo = [ans.count(), len(ans.columns)] +ans.createOrReplaceTempView("ans") +t_start = timeit.default_timer() +chk = [spark.sql("select sum(v1) as v1 from ans").collect()[0].asDict()['v1']] +chkt = timeit.default_timer() - t_start +write_log(task=task, data=data_name, in_rows=x.count(), question=question, out_rows=ans.count(), out_cols=len(ans.columns), solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt, on_disk=on_disk) +print(ans.head(3), flush=True) +print(ans.tail(3), flush=True) +ans0.unpersist() +ans.unpersist() +spark.catalog.uncacheTable("ans") +del ans0, ans, ansdo, sql0, sql + +#question = "regression" # q10 + +spark.stop() + +print("rolling finished, took %0.fs" % (timeit.default_timer()-task_init), flush=True) + +exit(0)