Skip to content

EMR compatibility issue with OnnxWrapper #14057

@anqini

Description

@anqini

Is there an existing issue for this?

  • I have searched the existing issues and did not find a match.

Who can help?

I can help to contribute to fix the issue in the short term and collaborate in a long term.

What are you working on?

Testing Spark-nlp functionality with Onnx and Tensorflow components in the ML pipeline on EMR with multiple task nodes. We hope to onboard a few models trained by ourselves to spark nlp pipeline by using Onnx and tensorflow integration.

Current Behavior

I ran into the "the SparkSession should only be created on driver" issue caused by OnnxWrapper when testing spark-nlp on EMR. I suspected there was an issue in the code. Basically, the spark worker node was trying to get configuration from hadoop to initialize OnnxWrapper session. but It they cannot get sparkSession since it is not the driver.

Caused by: java.lang.IllegalStateException: SparkSession should only be created and accessed on the driver.
	at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$assertOnDriver(SparkSession.scala:1158)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:927)
	at com.johnsnowlabs.nlp.util.io.ResourceHelper$.$anonfun$getActiveSparkSession$1(ResourceHelper.scala:56)
	at scala.Option.getOrElse(Option.scala:189)
	at com.johnsnowlabs.nlp.util.io.ResourceHelper$.getActiveSparkSession(ResourceHelper.scala:57)
	at com.johnsnowlabs.nlp.util.io.ResourceHelper$.spark$lzycompute(ResourceHelper.scala:104)
	at com.johnsnowlabs.nlp.util.io.ResourceHelper$.spark(ResourceHelper.scala:104)
	at com.johnsnowlabs.util.ConfigHelper$.sparkSession$lzycompute(ConfigHelper.scala:23)
	at com.johnsnowlabs.util.ConfigHelper$.sparkSession(ConfigHelper.scala:23)
	at com.johnsnowlabs.util.ConfigHelper$.getConfigValueOrElse(ConfigHelper.scala:79)
	at com.johnsnowlabs.util.ConfigLoader$.getConfigInfo(ConfigLoader.scala:70)
	at com.johnsnowlabs.util.ConfigLoader$.configData$lzycompute(ConfigLoader.scala:36)
	at com.johnsnowlabs.util.ConfigLoader$.configData(ConfigLoader.scala:60)
	at com.johnsnowlabs.util.ConfigLoader$.getConfigIntValue(ConfigLoader.scala:79)
	at com.johnsnowlabs.ml.onnx.OnnxWrapper$.getCPUSessionConfig(OnnxWrapper.scala:195)

Expected Behavior

should run without exception either in worker node or driver node.

Steps To Reproduce

Step 1 create EMR cluster

we recommend create the cluster with EC2 RSA key, if you don't have it, you may want to create it first.

aws emr create-cluster \
    --release-label emr-6.8.0 \
    --use-default-roles \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.large InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.large InstanceGroupType=TASK,InstanceCount=5,InstanceType=m4.large \
    --applications Name=Spark Name=Zeppelin \
    --ec2-attributes KeyName=<yourKeyName>

Step 2 Find DNS in AWS Console

find the Primary node public DNS of your cluster. It looks like hadoop@*.compute-1.amazonaws.com.

Step 3 SSH into the cluster

ssh into the emr cluster

ssh -i <key_path> <nodepublicDNS>

Step 4

install and run pyspark shell

pip install spark-nlp==5.1.4

pyspark --packages com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4

Step 5

run code

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence")

tokenizer = Tokenizer() \
    .setInputCols(["sentence"]) \
    .setOutputCol("token")

# Use the transformer embeddings
embeddings = BertEmbeddings.pretrained(name='bert_base_cased', lang='en') \
    .setInputCols(['document', 'token']) \
    .setOutputCol('embeddings')

# This pretrained model requires those specific transformer embeddings
ner_model = NerDLModel.pretrained("ner_dl_bert", "en") \
    .setInputCols(["document", "token", "embeddings"]) \
    .setOutputCol("ner")

pipeline = Pipeline().setStages([
    documentAssembler,
    sentence,
    tokenizer,
    embeddings,
    ner_model
])

data = spark.createDataFrame([["U.N. official Ekeus heads for Baghdad."]]).toDF("text")
result = pipeline.fit(data).transform(data)

result.select("ner.result").show(truncate=False)

exception will be thrown when executing the lass line of code.

Spark NLP version and Apache Spark

'5.1.4'
'3.3.0-amzn-0'

Type of Spark Application

No response

Java Version

No response

Java Home Directory

No response

Setup and installation

No response

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

Metadata

Metadata

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions