Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ jobs:
pyspark-sql, pyspark-mllib, pyspark-resource
- >-
pyspark-core, pyspark-streaming, pyspark-ml
- >-
pyspark-pandas
Comment on lines +164 to +165
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can rebalance the pyspark tests after we finish porting unit tests.

env:
MODULES_TO_TEST: ${{ matrix.modules }}
HADOOP_PROFILE: hadoop3.2
Expand Down
6 changes: 3 additions & 3 deletions dev/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,17 +123,17 @@ def determine_modules_to_test(changed_modules, deduplicated=True):
>>> [x.name for x in determine_modules_to_test([modules.sql])]
... # doctest: +NORMALIZE_WHITESPACE
['sql', 'avro', 'hive', 'mllib', 'sql-kafka-0-10', 'examples', 'hive-thriftserver',
'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-ml']
'pyspark-sql', 'repl', 'sparkr', 'pyspark-mllib', 'pyspark-pandas', 'pyspark-ml']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sparkr, modules.sql], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'examples', 'hive', 'hive-thriftserver', 'mllib', 'pyspark-ml',
'pyspark-mllib', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
'pyspark-mllib', 'pyspark-pandas', 'pyspark-sql', 'repl', 'sparkr', 'sql', 'sql-kafka-0-10']
>>> sorted([x.name for x in determine_modules_to_test(
... [modules.sql, modules.core], deduplicated=False)])
... # doctest: +NORMALIZE_WHITESPACE
['avro', 'catalyst', 'core', 'examples', 'graphx', 'hive', 'hive-thriftserver',
'mllib', 'mllib-local', 'pyspark-core', 'pyspark-ml', 'pyspark-mllib',
'mllib', 'mllib-local', 'pyspark-core', 'pyspark-ml', 'pyspark-mllib', 'pyspark-pandas',
'pyspark-resource', 'pyspark-sql', 'pyspark-streaming', 'repl', 'root',
'sparkr', 'sql', 'sql-kafka-0-10', 'streaming', 'streaming-kafka-0-10',
'streaming-kinesis-asl']
Expand Down
44 changes: 44 additions & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,50 @@ def __hash__(self):
]
)

pyspark_pandas = Module(
name="pyspark-pandas",
dependencies=[pyspark_core, pyspark_sql],
source_file_regexes=[
"python/pyspark/pandas/"
],
python_test_goals=[
# doctests
"pyspark.pandas.accessors",
"pyspark.pandas.base",
"pyspark.pandas.categorical",
"pyspark.pandas.config",
"pyspark.pandas.datetimes",
"pyspark.pandas.exceptions",
"pyspark.pandas.extensions",
"pyspark.pandas.frame",
"pyspark.pandas.generic",
"pyspark.pandas.groupby",
"pyspark.pandas.indexing",
"pyspark.pandas.internal",
"pyspark.pandas.ml",
"pyspark.pandas.mlflow",
"pyspark.pandas.namespace",
"pyspark.pandas.numpy_compat",
"pyspark.pandas.series",
"pyspark.pandas.sql_processor",
"pyspark.pandas.strings",
"pyspark.pandas.utils",
"pyspark.pandas.window",
"pyspark.pandas.indexes.base",
"pyspark.pandas.indexes.category",
"pyspark.pandas.indexes.datetimes",
"pyspark.pandas.indexes.multi",
"pyspark.pandas.indexes.numeric",
"pyspark.pandas.spark.accessors",
"pyspark.pandas.spark.utils",
"pyspark.pandas.typedef.typehints",
],
excluded_python_implementations=[
"PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and
# they aren't available there
]
)

sparkr = Module(
name="sparkr",
dependencies=[hive, mllib],
Expand Down
17 changes: 14 additions & 3 deletions python/pyspark/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,24 @@
import os
import sys
from distutils.version import LooseVersion
import warnings

from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version

try:
require_minimum_pandas_version()
require_minimum_pyarrow_version()
except ImportError as e:
if os.environ.get("SPARK_TESTING"):
warnings.warn(str(e))
sys.exit()
else:
raise

from pyspark.pandas.version import __version__ # noqa: F401


def assert_python_version():
import warnings

major = 3
minor = 5
deprecated_version = (major, minor)
Expand Down Expand Up @@ -206,4 +217,4 @@ def _auto_patch_pandas():
# Import after the usage logger is attached.
from pyspark.pandas.config import get_option, options, option_context, reset_option, set_option
from pyspark.pandas.namespace import * # F405
from pyspark.pandas.sql import sql
from pyspark.pandas.sql_processor import sql
30 changes: 30 additions & 0 deletions python/pyspark/pandas/accessors.py
Original file line number Diff line number Diff line change
Expand Up @@ -928,3 +928,33 @@ def apply_func(pdf):
),
dtype=dtype,
)


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.accessors

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.accessors.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.accessors tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.accessors,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
28 changes: 28 additions & 0 deletions python/pyspark/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1993,3 +1993,31 @@ def factorize(
uniques = pd.Index(uniques_list)

return codes, uniques


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.base

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.base.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.base tests").getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.base,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
30 changes: 30 additions & 0 deletions python/pyspark/pandas/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,33 @@ def set_categories(
self, new_categories, ordered: bool = None, rename: bool = False, inplace: bool = False
):
raise NotImplementedError()


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.categorical

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.categorical.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.categorical tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.categorical,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
28 changes: 28 additions & 0 deletions python/pyspark/pandas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,31 @@ def __dir__(self):


options = DictWrapper(_options_dict)


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.config

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.config.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.config tests").getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.config,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
30 changes: 30 additions & 0 deletions python/pyspark/pandas/datetimes.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,3 +848,33 @@ def pandas_day_name(s) -> "pp.Series[str]":
return s.dt.day_name(locale=locale)

return self._data.koalas.transform_batch(pandas_day_name)


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.datetimes

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.datetimes.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.datetimes tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.datetimes,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
30 changes: 30 additions & 0 deletions python/pyspark/pandas/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,33 @@ def __init__(
class_name, property_name, reason
)
super().__init__(msg)


def _test():
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.exceptions

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.exceptions.__dict__.copy()
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.exceptions tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.exceptions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
32 changes: 32 additions & 0 deletions python/pyspark/pandas/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,3 +340,35 @@ def bar(self):
from pyspark.pandas import Index

return _register_accessor(name, Index)


def _test():
import os
import doctest
import sys
import numpy
from pyspark.sql import SparkSession
import pyspark.pandas.extensions

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.extensions.__dict__.copy()
globs["np"] = numpy
globs["pp"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.extensions tests")
.getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.extensions,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)


if __name__ == "__main__":
_test()
Loading