Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions Dockerfile.spark
Original file line number Diff line number Diff line change
@@ -1,37 +1,51 @@
FROM gettyimages/spark:2.1.0-hadoop-2.7
FROM debian:stretch

RUN apt-get update && apt-get install -yq --no-install-recommends --force-yes \
curl \
git \
openjdk-7-jdk \
openjdk-8-jdk \
maven \
python2.7 \
python3.4 \
python2.7 python2.7-setuptools \
python3 python3-setuptools \
r-base \
r-base-core && \
rm -rf /var/lib/apt/lists/*

RUN easy_install3 pip py4j
RUN pip install --upgrade setuptools

ENV LIVY_BUILD_VERSION livy-server-0.3.0
ENV LIVY_APP_PATH /apps/$LIVY_BUILD_VERSION
ENV LIVY_BUILD_PATH /apps/build/livy
ENV PYSPARK_PYTHON python2.7
ENV PYSPARK3_PYTHON python3.4
ENV PYTHONHASHSEED 0
ENV PYTHONIOENCODING UTF-8
ENV PIP_DISABLE_PIP_VERSION_CHECK 1

ENV SPARK_BUILD_VERSION 2.3.3
ENV SPARK_HOME /apps/spark-$SPARK_BUILD_VERSION
ENV SPARK_BUILD_PATH /apps/build/spark
RUN mkdir -p /apps/build && \
cd /apps/build && \
git clone https://github.com/cloudera/livy.git && \
cd /apps/build && \
git clone https://github.com/apache/spark.git spark && \
cd $SPARK_BUILD_PATH && \
git checkout v$SPARK_BUILD_VERSION && \
dev/make-distribution.sh --name spark-$SPARK_BUILD_VERSION -Phive -Phive-thriftserver -Pyarn && \
cp -r /apps/build/spark/dist $SPARK_HOME && \
rm -rf $SPARK_BUILD_PATH

ENV LIVY_BUILD_VERSION 0.6.0-incubating
ENV LIVY_APP_PATH /apps/apache-livy-$LIVY_BUILD_VERSION-bin
ENV LIVY_BUILD_PATH /apps/build/livy
RUN cd /apps/build && \
git clone https://github.com/apache/incubator-livy.git livy && \
cd $LIVY_BUILD_PATH && \
git checkout v0.3.0 && \
mvn -DskipTests -Dspark.version=$SPARK_VERSION clean package && \
git checkout v$LIVY_BUILD_VERSION-rc2 && \
mvn -DskipTests -Dspark.version=$SPARK_BUILD_VERSION clean package && \
ls -al $LIVY_BUILD_PATH && ls -al $LIVY_BUILD_PATH/assembly && ls -al $LIVY_BUILD_PATH/assembly/target && \
unzip $LIVY_BUILD_PATH/assembly/target/$LIVY_BUILD_VERSION.zip -d /apps && \
unzip $LIVY_BUILD_PATH/assembly/target/apache-livy-${LIVY_BUILD_VERSION}-bin.zip -d /apps && \
rm -rf $LIVY_BUILD_PATH && \
mkdir -p $LIVY_APP_PATH/upload && \
mkdir -p $LIVY_APP_PATH/logs


EXPOSE 8998

CMD ["/apps/livy-server-0.3.0/bin/livy-server"]
CMD $LIVY_APP_PATH/bin/livy-server

Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def run(self):
properties = conf.get_session_properties(language)

try:
self.spark_controller.add_session(alias, endpoint, skip, properties)
self.spark_controller.add_session(alias, endpoint, skip, language, properties)
except ValueError as e:
self.ipython_display.send_error("""Could not add session with
name:
Expand Down
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/kernels/kernelmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ def _do_not_call_start_session(self, line, cell="", local_ns=None):
self.session_started = True

try:
self.spark_controller.add_session(self.session_name, self.endpoint, skip, properties)
self.spark_controller.add_session(self.session_name, self.endpoint, skip, self.language, properties)
except Exception as e:
self.fatal_error = True
self.fatal_error_message = conf.fatal_error_suggestion().format(e)
Expand Down
20 changes: 15 additions & 5 deletions sparkmagic/sparkmagic/livyclientlib/livyreliablehttpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
class LivyReliableHttpClient(object):
"""A Livy-specific Http client which wraps the normal ReliableHttpClient. Propagates
HttpClientExceptions up."""

def __init__(self, http_client, endpoint):
self.endpoint = endpoint
self._http_client = http_client
self.languages = {}

@staticmethod
def from_endpoint(endpoint):
headers = {"Content-Type": "application/json" }
headers = {"Content-Type": "application/json"}
headers.update(conf.custom_headers())
retry_policy = LivyReliableHttpClient._get_retry_policy()
return LivyReliableHttpClient(ReliableHttpClient(endpoint, headers, retry_policy), endpoint)
Expand All @@ -32,12 +34,19 @@ def get_statement(self, session_id, statement_id):
def get_sessions(self):
return self._http_client.get("/sessions", [200]).json()

def post_session(self, properties):
return self._http_client.post("/sessions", [201], properties).json()
def post_session(self, lang, properties):
result = self._http_client.post("/sessions", [201], properties).json()
if u'id' in result:
session_id = result[u'id']
self.languages[session_id] = lang
return result

def get_session(self, session_id):
return self._http_client.get(self._session_url(session_id), [200]).json()

def get_language(self, session_id):
return self.languages[session_id]

def delete_session(self, session_id):
self._http_client.delete(self._session_url(session_id), [200, 404])

Expand All @@ -62,10 +71,11 @@ def _statement_url(session_id, statement_id):
@staticmethod
def _get_retry_policy():
policy = conf.retry_policy()

if policy == LINEAR_RETRY:
return LinearRetryPolicy(seconds_to_sleep=5, max_retries=5)
elif policy == CONFIGURABLE_RETRY:
return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(), max_retries=conf.configurable_retry_policy_max_retries())
return ConfigurableRetryPolicy(retry_seconds_to_sleep_list=conf.retry_seconds_to_sleep_list(),
max_retries=conf.configurable_retry_policy_max_retries())
else:
raise BadUserConfigurationException(u"Retry policy '{}' not supported".format(policy))
7 changes: 5 additions & 2 deletions sparkmagic/sparkmagic/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,14 @@ def stop(self):


class LivySession(ObjectWithGuid):
def __init__(self, http_client, properties, ipython_display,
def __init__(self, http_client, lang, properties, ipython_display,
session_id=-1, spark_events=None,
heartbeat_timeout=0, heartbeat_thread=None):
super(LivySession, self).__init__()
assert constants.LIVY_KIND_PARAM in list(properties.keys())
kind = properties[constants.LIVY_KIND_PARAM]
# lang = properties[constants.LIVY_LANG_PARAM]
lang = lang

should_heartbeat = False
if heartbeat_timeout > 0:
Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, http_client, properties, ipython_display,
self._printed_resource_warning = False

self.kind = kind
self.lang = lang
self.id = session_id
self.session_info = u""

Expand All @@ -116,7 +119,7 @@ def start(self):
self._printed_resource_warning = False

try:
r = self._http_client.post_session(self.properties)
r = self._http_client.post_session(self.lang, self.properties)
self.id = r[u"id"]
self.status = str(r[u"state"])

Expand Down
20 changes: 12 additions & 8 deletions sparkmagic/sparkmagic/livyclientlib/sparkcontroller.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ def run_sqlquery(self, sqlquery, client_name=None):
def get_all_sessions_endpoint(self, endpoint):
http_client = self._http_client(endpoint)
sessions = http_client.get_sessions()[u"sessions"]
session_list = [self._livy_session(http_client, {constants.LIVY_KIND_PARAM: s[constants.LIVY_KIND_PARAM]},
session_list = [self._livy_session(http_client,
http_client.get_language(s[u"id"]),
{constants.LIVY_KIND_PARAM: s[constants.LIVY_KIND_PARAM]},
self.ipython_display, s[u"id"])
for s in sessions]
for s in session_list:
Expand All @@ -65,23 +67,25 @@ def delete_session_by_name(self, name):

def delete_session_by_id(self, endpoint, session_id):
name = self.session_manager.get_session_name_by_id_endpoint(session_id, endpoint)

if name in self.session_manager.get_sessions_list():
self.delete_session_by_name(name)
else:
http_client = self._http_client(endpoint)
response = http_client.get_session(session_id)
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, {constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]},
self.ipython_display, session_id)
lang = http_client.get_language(session_id)
session = self._livy_session(http_client, lang,
{constants.LIVY_KIND_PARAM: response[constants.LIVY_KIND_PARAM]},
self.ipython_display, session_id)
session.delete()

def add_session(self, name, endpoint, skip_if_exists, properties):
def add_session(self, name, endpoint, skip_if_exists, lang, properties):
if skip_if_exists and (name in self.session_manager.get_sessions_list()):
self.logger.debug(u"Skipping {} because it already exists in list of sessions.".format(name))
return
http_client = self._http_client(endpoint)
session = self._livy_session(http_client, properties, self.ipython_display)
session = self._livy_session(http_client, lang, properties, self.ipython_display)
self.session_manager.add_session(name, session)
session.start()

Expand All @@ -105,9 +109,9 @@ def get_managed_clients(self):
return self.session_manager.sessions

@staticmethod
def _livy_session(http_client, properties, ipython_display,
def _livy_session(http_client, lang, properties, ipython_display,
session_id=-1):
return LivySession(http_client, properties, ipython_display,
return LivySession(http_client, lang, properties, ipython_display,
session_id, heartbeat_timeout=conf.livy_server_heartbeat_timeout_seconds())

@staticmethod
Expand Down
36 changes: 17 additions & 19 deletions sparkmagic/sparkmagic/livyclientlib/sparkstorecommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

import ast


class SparkStoreCommand(Command):
def __init__(self, output_var, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None, coerce=None):
def __init__(self, output_var, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None,
coerce=None):
super(SparkStoreCommand, self).__init__("", spark_events)

if samplemethod is None:
Expand All @@ -34,10 +36,9 @@ def __init__(self, output_var, samplemethod=None, maxrows=None, samplefraction=N
self._spark_events = spark_events
self._coerce = coerce


def execute(self, session):
try:
command = self.to_command(session.kind, self.output_var)
command = self.to_command(session.lang, session.kind, self.output_var)
(success, records_text) = command.execute(session)
if not success:
raise BadUserDataException(records_text)
Expand All @@ -47,20 +48,21 @@ def execute(self, session):
else:
return result


def to_command(self, kind, spark_context_variable_name):
def to_command(self, lang, kind, spark_context_variable_name):
if kind == constants.SESSION_KIND_PYSPARK:
return self._pyspark_command(spark_context_variable_name)
elif kind == constants.SESSION_KIND_PYSPARK3:
return self._pyspark_command(spark_context_variable_name, False)
if lang == constants.LANG_PYTHON:
return self._pyspark_command(spark_context_variable_name)
elif lang == constants.LANG_PYTHON3:
return self._pyspark_command(spark_context_variable_name, False)
else:
raise BadUserDataException(u"Kind '{}' with lang '{}' is not supported.".format(kind, lang))
elif kind == constants.SESSION_KIND_SPARK:
return self._scala_command(spark_context_variable_name)
elif kind == constants.SESSION_KIND_SPARKR:
return self._r_command(spark_context_variable_name)
else:
raise BadUserDataException(u"Kind '{}' is not supported.".format(kind))


def _pyspark_command(self, spark_context_variable_name, encode_result=True):
command = u'{}.toJSON()'.format(spark_context_variable_name)
if self.samplemethod == u'sample':
Expand All @@ -80,7 +82,6 @@ def _pyspark_command(self, spark_context_variable_name, encode_result=True):
print_command)
return Command(command)


def _scala_command(self, spark_context_variable_name):
command = u'{}.toJSON'.format(spark_context_variable_name)
if self.samplemethod == u'sample':
Expand All @@ -91,7 +92,6 @@ def _scala_command(self, spark_context_variable_name):
command = u'{}.collect'.format(command)
return Command(u'{}.foreach(println)'.format(command))


def _r_command(self, spark_context_variable_name):
command = spark_context_variable_name
if self.samplemethod == u'sample':
Expand All @@ -107,16 +107,14 @@ def _r_command(self, spark_context_variable_name):
constants.LONG_RANDOM_VARIABLE_NAME)
return Command(command)



# Used only for unit testing
def __eq__(self, other):
return self.code == other.code and \
self.samplemethod == other.samplemethod and \
self.maxrows == other.maxrows and \
self.samplefraction == other.samplefraction and \
self.output_var == other.output_var and \
self._coerce == other._coerce
self.samplemethod == other.samplemethod and \
self.maxrows == other.maxrows and \
self.samplefraction == other.samplefraction and \
self.output_var == other.output_var and \
self._coerce == other._coerce

def __ne__(self, other):
return not (self == other)
return not (self == other)
30 changes: 18 additions & 12 deletions sparkmagic/sparkmagic/livyclientlib/sqlquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
class SQLQuery(ObjectWithGuid):
def __init__(self, query, samplemethod=None, maxrows=None, samplefraction=None, spark_events=None, coerce=None):
super(SQLQuery, self).__init__()

if samplemethod is None:
samplemethod = conf.default_samplemethod()
if maxrows is None:
Expand All @@ -35,11 +35,17 @@ def __init__(self, query, samplemethod=None, maxrows=None, samplefraction=None,
self._spark_events = spark_events
self._coerce = coerce

def to_command(self, kind, sql_context_variable_name):
def to_command(self, lang, kind, sql_context_variable_name):
if kind == constants.SESSION_KIND_PYSPARK:
return self._pyspark_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_PYSPARK3:
return self._pyspark_command(sql_context_variable_name, False)
if lang == constants.LANG_PYTHON:
return self._pyspark_command(sql_context_variable_name)
elif lang == constants.LANG_PYTHON3:
return self._pyspark_command(sql_context_variable_name, False)
else:
raise BadUserDataException(u"Kind '{}' with lang '{}' is not supported.".format(kind, lang))
# unreachable since livy 0.4+ the session is alwats pyspark regardless the version of python
# elif kind == constants.SESSION_KIND_PYSPARK3:
# return self._pyspark_command(sql_context_variable_name, False)
elif kind == constants.SESSION_KIND_SPARK:
return self._scala_command(sql_context_variable_name)
elif kind == constants.SESSION_KIND_SPARKR:
Expand All @@ -52,7 +58,7 @@ def execute(self, session):
self.samplemethod, self.maxrows, self.samplefraction)
command_guid = ''
try:
command = self.to_command(session.kind, session.sql_context_variable_name)
command = self.to_command(session.lang, session.kind, session.sql_context_variable_name)
command_guid = command.guid
(success, records_text) = command.execute(session)
if not success:
Expand All @@ -68,7 +74,6 @@ def execute(self, session):
command_guid, True, "", "")
return result


def _pyspark_command(self, sql_context_variable_name, encode_result=True):
command = u'{}.sql(u"""{} """).toJSON()'.format(sql_context_variable_name, self.query)
if self.samplemethod == u'sample':
Expand Down Expand Up @@ -110,16 +115,17 @@ def _r_command(self, sql_context_variable_name):
else:
command = u'collect({})'.format(command)
command = u'jsonlite:::toJSON({})'.format(command)
command = u'for ({} in ({})) {{cat({})}}'.format(constants.LONG_RANDOM_VARIABLE_NAME, command, constants.LONG_RANDOM_VARIABLE_NAME)
command = u'for ({} in ({})) {{cat({})}}'.format(constants.LONG_RANDOM_VARIABLE_NAME, command,
constants.LONG_RANDOM_VARIABLE_NAME)
return Command(command)

# Used only for unit testing
def __eq__(self, other):
return self.query == other.query and \
self.samplemethod == other.samplemethod and \
self.maxrows == other.maxrows and \
self.samplefraction == other.samplefraction and \
self._coerce == other._coerce
self.samplemethod == other.samplemethod and \
self.maxrows == other.maxrows and \
self.samplefraction == other.samplefraction and \
self._coerce == other._coerce

def __ne__(self, other):
return not (self == other)
2 changes: 1 addition & 1 deletion sparkmagic/sparkmagic/magics/remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def spark(self, line, cell="", local_ns=None):

properties = conf.get_session_properties(language)

self.spark_controller.add_session(name, endpoint, skip, properties)
self.spark_controller.add_session(name, endpoint, skip, language, properties)
# delete
elif subcommand == "delete":
if args.session is not None:
Expand Down
Loading