Skip to content

Commit fa4c734

Browse files
committed
merge conflict
2 parents 9ce0093 + 7ded39c commit fa4c734

File tree

85 files changed

+2096
-870
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+2096
-870
lines changed

R/pkg/R/mllib_regression.R

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,23 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
5353
#' the result of a call to a family function. Refer R family at
5454
#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
5555
#' Currently these families are supported: \code{binomial}, \code{gaussian},
56-
#' \code{Gamma}, and \code{poisson}.
56+
#' \code{Gamma}, \code{poisson} and \code{tweedie}.
57+
#'
58+
#' Note that there are two ways to specify the tweedie family.
59+
#' \itemize{
60+
#' \item Set \code{family = "tweedie"} and specify the var.power and link.power;
61+
#' \item When package \code{statmod} is loaded, the tweedie family is specified using the
62+
#' family definition therein, i.e., \code{tweedie(var.power, link.power)}.
63+
#' }
5764
#' @param tol positive convergence tolerance of iterations.
5865
#' @param maxIter integer giving the maximal number of IRLS iterations.
5966
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
6067
#' weights as 1.0.
6168
#' @param regParam regularization parameter for L2 regularization.
69+
#' @param var.power the power in the variance function of the Tweedie distribution which provides
70+
#' the relationship between the variance and mean of the distribution. Only
71+
#' applicable to the Tweedie family.
72+
#' @param link.power the index in the power link function. Only applicable to the Tweedie family.
6273
#' @param ... additional arguments passed to the method.
6374
#' @aliases spark.glm,SparkDataFrame,formula-method
6475
#' @return \code{spark.glm} returns a fitted generalized linear model.
@@ -84,14 +95,30 @@ setClass("IsotonicRegressionModel", representation(jobj = "jobj"))
8495
#' # can also read back the saved model and print
8596
#' savedModel <- read.ml(path)
8697
#' summary(savedModel)
98+
#'
99+
#' # fit tweedie model
100+
#' model <- spark.glm(df, Freq ~ Sex + Age, family = "tweedie",
101+
#' var.power = 1.2, link.power = 0)
102+
#' summary(model)
103+
#'
104+
#' # use the tweedie family from statmod
105+
#' library(statmod)
106+
#' model <- spark.glm(df, Freq ~ Sex + Age, family = tweedie(1.2, 0))
107+
#' summary(model)
87108
#' }
88109
#' @note spark.glm since 2.0.0
89110
#' @seealso \link{glm}, \link{read.ml}
90111
setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
91112
function(data, formula, family = gaussian, tol = 1e-6, maxIter = 25, weightCol = NULL,
92-
regParam = 0.0) {
113+
regParam = 0.0, var.power = 0.0, link.power = 1.0 - var.power) {
114+
93115
if (is.character(family)) {
94-
family <- get(family, mode = "function", envir = parent.frame())
116+
# Handle when family = "tweedie"
117+
if (tolower(family) == "tweedie") {
118+
family <- list(family = "tweedie", link = NULL)
119+
} else {
120+
family <- get(family, mode = "function", envir = parent.frame())
121+
}
95122
}
96123
if (is.function(family)) {
97124
family <- family()
@@ -100,6 +127,12 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
100127
print(family)
101128
stop("'family' not recognized")
102129
}
130+
# Handle when family = statmod::tweedie()
131+
if (tolower(family$family) == "tweedie" && !is.null(family$variance)) {
132+
var.power <- log(family$variance(exp(1)))
133+
link.power <- log(family$linkfun(exp(1)))
134+
family <- list(family = "tweedie", link = NULL)
135+
}
103136

104137
formula <- paste(deparse(formula), collapse = "")
105138
if (!is.null(weightCol) && weightCol == "") {
@@ -111,7 +144,8 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
111144
# For known families, Gamma is upper-cased
112145
jobj <- callJStatic("org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper",
113146
"fit", formula, data@sdf, tolower(family$family), family$link,
114-
tol, as.integer(maxIter), weightCol, regParam)
147+
tol, as.integer(maxIter), weightCol, regParam,
148+
as.double(var.power), as.double(link.power))
115149
new("GeneralizedLinearRegressionModel", jobj = jobj)
116150
})
117151

@@ -126,11 +160,13 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
126160
#' the result of a call to a family function. Refer R family at
127161
#' \url{https://stat.ethz.ch/R-manual/R-devel/library/stats/html/family.html}.
128162
#' Currently these families are supported: \code{binomial}, \code{gaussian},
129-
#' \code{Gamma}, and \code{poisson}.
163+
#' \code{poisson}, \code{Gamma}, and \code{tweedie}.
130164
#' @param weightCol the weight column name. If this is not set or \code{NULL}, we treat all instance
131165
#' weights as 1.0.
132166
#' @param epsilon positive convergence tolerance of iterations.
133167
#' @param maxit integer giving the maximal number of IRLS iterations.
168+
#' @param var.power the index of the power variance function in the Tweedie family.
169+
#' @param link.power the index of the power link function in the Tweedie family.
134170
#' @return \code{glm} returns a fitted generalized linear model.
135171
#' @rdname glm
136172
#' @export
@@ -145,8 +181,10 @@ setMethod("spark.glm", signature(data = "SparkDataFrame", formula = "formula"),
145181
#' @note glm since 1.5.0
146182
#' @seealso \link{spark.glm}
147183
setMethod("glm", signature(formula = "formula", family = "ANY", data = "SparkDataFrame"),
148-
function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL) {
149-
spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol)
184+
function(formula, family = gaussian, data, epsilon = 1e-6, maxit = 25, weightCol = NULL,
185+
var.power = 0.0, link.power = 1.0 - var.power) {
186+
spark.glm(data, formula, family, tol = epsilon, maxIter = maxit, weightCol = weightCol,
187+
var.power = var.power, link.power = link.power)
150188
})
151189

152190
# Returns the summary of a model produced by glm() or spark.glm(), similarly to R's summary().
@@ -172,9 +210,10 @@ setMethod("summary", signature(object = "GeneralizedLinearRegressionModel"),
172210
deviance <- callJMethod(jobj, "rDeviance")
173211
df.null <- callJMethod(jobj, "rResidualDegreeOfFreedomNull")
174212
df.residual <- callJMethod(jobj, "rResidualDegreeOfFreedom")
175-
aic <- callJMethod(jobj, "rAic")
176213
iter <- callJMethod(jobj, "rNumIterations")
177214
family <- callJMethod(jobj, "rFamily")
215+
aic <- callJMethod(jobj, "rAic")
216+
if (family == "tweedie" && aic == 0) aic <- NA
178217
deviance.resid <- if (is.loaded) {
179218
NULL
180219
} else {

R/pkg/R/mllib_tree.R

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,14 @@ summary.treeEnsemble <- function(model) {
5252
numFeatures <- callJMethod(jobj, "numFeatures")
5353
features <- callJMethod(jobj, "features")
5454
featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString")
55+
maxDepth <- callJMethod(jobj, "maxDepth")
5556
numTrees <- callJMethod(jobj, "numTrees")
5657
treeWeights <- callJMethod(jobj, "treeWeights")
5758
list(formula = formula,
5859
numFeatures = numFeatures,
5960
features = features,
6061
featureImportances = featureImportances,
62+
maxDepth = maxDepth,
6163
numTrees = numTrees,
6264
treeWeights = treeWeights,
6365
jobj = jobj)
@@ -70,6 +72,7 @@ print.summary.treeEnsemble <- function(x) {
7072
cat("\nNumber of features: ", x$numFeatures)
7173
cat("\nFeatures: ", unlist(x$features))
7274
cat("\nFeature importances: ", x$featureImportances)
75+
cat("\nMax Depth: ", x$maxDepth)
7376
cat("\nNumber of trees: ", x$numTrees)
7477
cat("\nTree weights: ", unlist(x$treeWeights))
7578

@@ -197,8 +200,8 @@ setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"),
197200
#' @return \code{summary} returns summary information of the fitted model, which is a list.
198201
#' The list of components includes \code{formula} (formula),
199202
#' \code{numFeatures} (number of features), \code{features} (list of features),
200-
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
201-
#' and \code{treeWeights} (tree weights).
203+
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
204+
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
202205
#' @rdname spark.gbt
203206
#' @aliases summary,GBTRegressionModel-method
204207
#' @export
@@ -403,8 +406,8 @@ setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "fo
403406
#' @return \code{summary} returns summary information of the fitted model, which is a list.
404407
#' The list of components includes \code{formula} (formula),
405408
#' \code{numFeatures} (number of features), \code{features} (list of features),
406-
#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees),
407-
#' and \code{treeWeights} (tree weights).
409+
#' \code{featureImportances} (feature importances), \code{maxDepth} (max depth of trees),
410+
#' \code{numTrees} (number of trees), and \code{treeWeights} (tree weights).
408411
#' @rdname spark.randomForest
409412
#' @aliases summary,RandomForestRegressionModel-method
410413
#' @export

R/pkg/inst/tests/testthat/test_mllib_regression.R

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,24 @@ test_that("spark.glm and predict", {
7777
out <- capture.output(print(summary(model)))
7878
expect_true(any(grepl("Dispersion parameter for gamma family", out)))
7979

80+
# tweedie family
81+
model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species,
82+
family = "tweedie", var.power = 1.2, link.power = 0.0)
83+
prediction <- predict(model, training)
84+
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
85+
vals <- collect(select(prediction, "prediction"))
86+
87+
# manual calculation of the R predicted values to avoid dependence on statmod
88+
#' library(statmod)
89+
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
90+
#' family = tweedie(var.power = 1.2, link.power = 0.0))
91+
#' print(coef(rModel))
92+
93+
rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
94+
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
95+
data = iris) %*% rCoef))
96+
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
97+
8098
# Test stats::predict is working
8199
x <- rnorm(15)
82100
y <- x + rnorm(15)
@@ -233,7 +251,7 @@ test_that("glm and predict", {
233251
training <- suppressWarnings(createDataFrame(iris))
234252
# gaussian family
235253
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training)
236-
prediction <- predict(model, training)
254+
prediction <- predict(model, training)
237255
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
238256
vals <- collect(select(prediction, "prediction"))
239257
rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris)
@@ -249,6 +267,24 @@ test_that("glm and predict", {
249267
data = iris, family = poisson(link = identity)), iris))
250268
expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals)
251269

270+
# tweedie family
271+
model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training,
272+
family = "tweedie", var.power = 1.2, link.power = 0.0)
273+
prediction <- predict(model, training)
274+
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double")
275+
vals <- collect(select(prediction, "prediction"))
276+
277+
# manual calculation of the R predicted values to avoid dependence on statmod
278+
#' library(statmod)
279+
#' rModel <- glm(Sepal.Width ~ Sepal.Length + Species, data = iris,
280+
#' family = tweedie(var.power = 1.2, link.power = 0.0))
281+
#' print(coef(rModel))
282+
283+
rCoef <- c(0.6455409, 0.1169143, -0.3224752, -0.3282174)
284+
rVals <- exp(as.numeric(model.matrix(Sepal.Width ~ Sepal.Length + Species,
285+
data = iris) %*% rCoef))
286+
expect_true(all(abs(rVals - vals) < 1e-5), rVals - vals)
287+
252288
# Test stats::predict is working
253289
x <- rnorm(15)
254290
y <- x + rnorm(15)

R/pkg/inst/tests/testthat/test_mllib_tree.R

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ test_that("spark.gbt", {
3939
tolerance = 1e-4)
4040
stats <- summary(model)
4141
expect_equal(stats$numTrees, 20)
42+
expect_equal(stats$maxDepth, 5)
4243
expect_equal(stats$formula, "Employed ~ .")
4344
expect_equal(stats$numFeatures, 6)
4445
expect_equal(length(stats$treeWeights), 20)
@@ -53,6 +54,7 @@ test_that("spark.gbt", {
5354
expect_equal(stats$numFeatures, stats2$numFeatures)
5455
expect_equal(stats$features, stats2$features)
5556
expect_equal(stats$featureImportances, stats2$featureImportances)
57+
expect_equal(stats$maxDepth, stats2$maxDepth)
5658
expect_equal(stats$numTrees, stats2$numTrees)
5759
expect_equal(stats$treeWeights, stats2$treeWeights)
5860

@@ -66,6 +68,7 @@ test_that("spark.gbt", {
6668
stats <- summary(model)
6769
expect_equal(stats$numFeatures, 2)
6870
expect_equal(stats$numTrees, 20)
71+
expect_equal(stats$maxDepth, 5)
6972
expect_error(capture.output(stats), NA)
7073
expect_true(length(capture.output(stats)) > 6)
7174
predictions <- collect(predict(model, data))$prediction
@@ -93,6 +96,7 @@ test_that("spark.gbt", {
9396
expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction))
9497
expect_equal(s$numFeatures, 5)
9598
expect_equal(s$numTrees, 20)
99+
expect_equal(stats$maxDepth, 5)
96100

97101
# spark.gbt classification can work on libsvm data
98102
data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"),
@@ -116,6 +120,7 @@ test_that("spark.randomForest", {
116120

117121
stats <- summary(model)
118122
expect_equal(stats$numTrees, 1)
123+
expect_equal(stats$maxDepth, 5)
119124
expect_error(capture.output(stats), NA)
120125
expect_true(length(capture.output(stats)) > 6)
121126

@@ -129,6 +134,7 @@ test_that("spark.randomForest", {
129134
tolerance = 1e-4)
130135
stats <- summary(model)
131136
expect_equal(stats$numTrees, 20)
137+
expect_equal(stats$maxDepth, 5)
132138

133139
modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp")
134140
write.ml(model, modelPath)
@@ -141,6 +147,7 @@ test_that("spark.randomForest", {
141147
expect_equal(stats$features, stats2$features)
142148
expect_equal(stats$featureImportances, stats2$featureImportances)
143149
expect_equal(stats$numTrees, stats2$numTrees)
150+
expect_equal(stats$maxDepth, stats2$maxDepth)
144151
expect_equal(stats$treeWeights, stats2$treeWeights)
145152

146153
unlink(modelPath)
@@ -153,6 +160,7 @@ test_that("spark.randomForest", {
153160
stats <- summary(model)
154161
expect_equal(stats$numFeatures, 2)
155162
expect_equal(stats$numTrees, 20)
163+
expect_equal(stats$maxDepth, 5)
156164
expect_error(capture.output(stats), NA)
157165
expect_true(length(capture.output(stats)) > 6)
158166
# Test string prediction values
@@ -187,6 +195,8 @@ test_that("spark.randomForest", {
187195
stats <- summary(model)
188196
expect_equal(stats$numFeatures, 2)
189197
expect_equal(stats$numTrees, 20)
198+
expect_equal(stats$maxDepth, 5)
199+
190200
# Test numeric prediction values
191201
predictions <- collect(predict(model, data))$prediction
192202
expect_equal(length(grep("1.0", predictions)), 50)

R/pkg/vignettes/sparkr-vignettes.Rmd

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -672,14 +672,19 @@ gaussian | identity, log, inverse
672672
binomial | logit, probit, cloglog (complementary log-log)
673673
poisson | log, identity, sqrt
674674
gamma | inverse, identity, log
675+
tweedie | power link function
675676

676677
There are three ways to specify the `family` argument.
677678

678679
* Family name as a character string, e.g. `family = "gaussian"`.
679680

680681
* Family function, e.g. `family = binomial`.
681682

682-
* Result returned by a family function, e.g. `family = poisson(link = log)`
683+
* Result returned by a family function, e.g. `family = poisson(link = log)`.
684+
685+
* Note that there are two ways to specify the tweedie family:
686+
a) Set `family = "tweedie"` and specify the `var.power` and `link.power`
687+
b) When package `statmod` is loaded, the tweedie family is specified using the family definition therein, i.e., `tweedie()`.
683688

684689
For more information regarding the families and their link functions, see the Wikipedia page [Generalized Linear Model](https://en.wikipedia.org/wiki/Generalized_linear_model).
685690

@@ -695,6 +700,18 @@ gaussianFitted <- predict(gaussianGLM, carsDF)
695700
head(select(gaussianFitted, "model", "prediction", "mpg", "wt", "hp"))
696701
```
697702

703+
The following is the same fit using the tweedie family:
704+
```{r}
705+
tweedieGLM1 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie", var.power = 0.0)
706+
summary(tweedieGLM1)
707+
```
708+
We can try other distributions in the tweedie family, for example, a compound Poisson distribution with a log link:
709+
```{r}
710+
tweedieGLM2 <- spark.glm(carsDF, mpg ~ wt + hp, family = "tweedie",
711+
var.power = 1.2, link.power = 0.0)
712+
summary(tweedieGLM2)
713+
```
714+
698715
#### Isotonic Regression
699716

700717
`spark.isoreg` fits an [Isotonic Regression](https://en.wikipedia.org/wiki/Isotonic_regression) model against a `SparkDataFrame`. It solves a weighted univariate a regression problem under a complete order constraint. Specifically, given a set of real observed responses $y_1, \ldots, y_n$, corresponding real features $x_1, \ldots, x_n$, and optionally positive weights $w_1, \ldots, w_n$, we want to find a monotone (piecewise linear) function $f$ to minimize

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ private[deploy] class Worker(
6262
private val forwordMessageScheduler =
6363
ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
6464

65-
// A separated thread to clean up the workDir. Used to provide the implicit parameter of `Future`
66-
// methods.
65+
// A separated thread to clean up the workDir and the directories of finished applications.
66+
// Used to provide the implicit parameter of `Future` methods.
6767
private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
6868
ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
6969

@@ -578,10 +578,15 @@ private[deploy] class Worker(
578578
if (shouldCleanup) {
579579
finishedApps -= id
580580
appDirectories.remove(id).foreach { dirList =>
581-
logInfo(s"Cleaning up local directories for application $id")
582-
dirList.foreach { dir =>
583-
Utils.deleteRecursively(new File(dir))
584-
}
581+
concurrent.Future {
582+
logInfo(s"Cleaning up local directories for application $id")
583+
dirList.foreach { dir =>
584+
Utils.deleteRecursively(new File(dir))
585+
}
586+
}(cleanupThreadExecutor).onFailure {
587+
case e: Throwable =>
588+
logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
589+
}(cleanupThreadExecutor)
585590
}
586591
shuffleService.applicationRemoved(id)
587592
}

0 commit comments

Comments
 (0)