From 2a37c34b0f6399142f8bc093439e983313884eeb Mon Sep 17 00:00:00 2001 From: Jiacheng Guo Date: Fri, 7 Mar 2014 23:24:05 +0800 Subject: [PATCH 1/2] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set. --- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0eb2f78b730f..0bc5b2028f0f 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -278,6 +278,10 @@ private[spark] object Utils extends Logging { uc = new URL(url).openConnection() } + val timeout = conf.getInt("spark.files.fetchTimeout",60) * 1000 + uc.setConnectTimeout(timeout) + uc.setReadTimeout(timeout) + uc.connect() val in = uc.getInputStream(); val out = new FileOutputStream(tempFile) Utils.copyStream(in, out, true) From abfe698c4041ae18f42b034aec714f8620520812 Mon Sep 17 00:00:00 2001 From: Jiacheng Guo Date: Sat, 8 Mar 2014 00:18:38 +0800 Subject: [PATCH 2/2] add space according request --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 0bc5b2028f0f..d02a8f92b8a6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -278,7 +278,7 @@ private[spark] object Utils extends Logging { uc = new URL(url).openConnection() } - val timeout = conf.getInt("spark.files.fetchTimeout",60) * 1000 + val timeout = conf.getInt("spark.files.fetchTimeout", 60) * 1000 uc.setConnectTimeout(timeout) uc.setReadTimeout(timeout) uc.connect()