18
18
__all__ = ['SparkConf' ]
19
19
20
20
import sys
21
+ from typing import Dict , List , Optional , Tuple , cast , overload
21
22
23
+ from py4j .java_gateway import JVMView , JavaObject # type: ignore[import]
22
24
23
- class SparkConf (object ):
24
25
26
+ class SparkConf (object ):
25
27
"""
26
28
Configuration for a Spark application. Used to set various Spark
27
29
parameters as key-value pairs.
@@ -105,15 +107,19 @@ class SparkConf(object):
105
107
spark.home=/path
106
108
"""
107
109
108
- def __init__ (self , loadDefaults = True , _jvm = None , _jconf = None ):
110
+ _jconf : Optional [JavaObject ]
111
+ _conf : Optional [Dict [str , str ]]
112
+
113
+ def __init__ (self , loadDefaults : bool = True , _jvm : Optional [JVMView ] = None ,
114
+ _jconf : Optional [JavaObject ] = None ):
109
115
"""
110
116
Create a new Spark configuration.
111
117
"""
112
118
if _jconf :
113
119
self ._jconf = _jconf
114
120
else :
115
121
from pyspark .context import SparkContext
116
- _jvm = _jvm or SparkContext ._jvm
122
+ _jvm = _jvm or SparkContext ._jvm # type: ignore[attr-defined]
117
123
118
124
if _jvm is not None :
119
125
# JVM is created, so create self._jconf directly through JVM
@@ -124,48 +130,58 @@ def __init__(self, loadDefaults=True, _jvm=None, _jconf=None):
124
130
self ._jconf = None
125
131
self ._conf = {}
126
132
127
- def set (self , key , value ) :
133
+ def set (self , key : str , value : str ) -> "SparkConf" :
128
134
"""Set a configuration property."""
129
135
# Try to set self._jconf first if JVM is created, set self._conf if JVM is not created yet.
130
136
if self ._jconf is not None :
131
137
self ._jconf .set (key , str (value ))
132
138
else :
139
+ assert self ._conf is not None
133
140
self ._conf [key ] = str (value )
134
141
return self
135
142
136
- def setIfMissing (self , key , value ) :
143
+ def setIfMissing (self , key : str , value : str ) -> "SparkConf" :
137
144
"""Set a configuration property, if not already set."""
138
145
if self .get (key ) is None :
139
146
self .set (key , value )
140
147
return self
141
148
142
- def setMaster (self , value ) :
149
+ def setMaster (self , value : str ) -> "SparkConf" :
143
150
"""Set master URL to connect to."""
144
151
self .set ("spark.master" , value )
145
152
return self
146
153
147
- def setAppName (self , value ) :
154
+ def setAppName (self , value : str ) -> "SparkConf" :
148
155
"""Set application name."""
149
156
self .set ("spark.app.name" , value )
150
157
return self
151
158
152
- def setSparkHome (self , value ) :
159
+ def setSparkHome (self , value : str ) -> "SparkConf" :
153
160
"""Set path where Spark is installed on worker nodes."""
154
161
self .set ("spark.home" , value )
155
162
return self
156
163
157
- def setExecutorEnv (self , key = None , value = None , pairs = None ):
164
+ @overload
165
+ def setExecutorEnv (self , key : str , value : str ) -> "SparkConf" :
166
+ ...
167
+
168
+ @overload
169
+ def setExecutorEnv (self , * , pairs : List [Tuple [str , str ]]) -> "SparkConf" :
170
+ ...
171
+
172
+ def setExecutorEnv (self , key : Optional [str ] = None , value : Optional [str ] = None ,
173
+ pairs : Optional [List [Tuple [str , str ]]] = None ) -> "SparkConf" :
158
174
"""Set an environment variable to be passed to executors."""
159
175
if (key is not None and pairs is not None ) or (key is None and pairs is None ):
160
176
raise RuntimeError ("Either pass one key-value pair or a list of pairs" )
161
177
elif key is not None :
162
- self .set ("spark.executorEnv." + key , value )
178
+ self .set ("spark.executorEnv.{}" . format ( key ), cast ( str , value ) )
163
179
elif pairs is not None :
164
180
for (k , v ) in pairs :
165
- self .set ("spark.executorEnv." + k , v )
181
+ self .set ("spark.executorEnv.{}" . format ( k ) , v )
166
182
return self
167
183
168
- def setAll (self , pairs ) :
184
+ def setAll (self , pairs : List [ Tuple [ str , str ]]) -> "SparkConf" :
169
185
"""
170
186
Set multiple parameters, passed as a list of key-value pairs.
171
187
@@ -178,49 +194,52 @@ def setAll(self, pairs):
178
194
self .set (k , v )
179
195
return self
180
196
181
- def get (self , key , defaultValue = None ):
197
+ def get (self , key : str , defaultValue : Optional [ str ] = None ) -> Optional [ str ] :
182
198
"""Get the configured value for some key, or return a default otherwise."""
183
- if defaultValue is None : # Py4J doesn't call the right get() if we pass None
199
+ if defaultValue is None : # Py4J doesn't call the right get() if we pass None
184
200
if self ._jconf is not None :
185
201
if not self ._jconf .contains (key ):
186
202
return None
187
203
return self ._jconf .get (key )
188
204
else :
189
- if key not in self ._conf :
190
- return None
191
- return self ._conf [key ]
205
+ assert self ._conf is not None
206
+ return self ._conf .get (key , None )
192
207
else :
193
208
if self ._jconf is not None :
194
209
return self ._jconf .get (key , defaultValue )
195
210
else :
211
+ assert self ._conf is not None
196
212
return self ._conf .get (key , defaultValue )
197
213
198
- def getAll (self ):
214
+ def getAll (self ) -> List [ Tuple [ str , str ]] :
199
215
"""Get all values as a list of key-value pairs."""
200
216
if self ._jconf is not None :
201
- return [(elem ._1 (), elem ._2 ()) for elem in self ._jconf .getAll ()]
217
+ return [(elem ._1 (), elem ._2 ()) for elem in cast ( JavaObject , self ._jconf ) .getAll ()]
202
218
else :
203
- return self ._conf .items ()
219
+ assert self ._conf is not None
220
+ return list (self ._conf .items ())
204
221
205
- def contains (self , key ) :
222
+ def contains (self , key : str ) -> bool :
206
223
"""Does this configuration contain a given key?"""
207
224
if self ._jconf is not None :
208
225
return self ._jconf .contains (key )
209
226
else :
227
+ assert self ._conf is not None
210
228
return key in self ._conf
211
229
212
- def toDebugString (self ):
230
+ def toDebugString (self ) -> str :
213
231
"""
214
232
Returns a printable version of the configuration, as a list of
215
233
key=value pairs, one per line.
216
234
"""
217
235
if self ._jconf is not None :
218
236
return self ._jconf .toDebugString ()
219
237
else :
238
+ assert self ._conf is not None
220
239
return '\n ' .join ('%s=%s' % (k , v ) for k , v in self ._conf .items ())
221
240
222
241
223
- def _test ():
242
+ def _test () -> None :
224
243
import doctest
225
244
(failure_count , test_count ) = doctest .testmod (optionflags = doctest .ELLIPSIS )
226
245
if failure_count :
0 commit comments