Spark

Spark

2016-11-15. Category & Tags: Spark, Hadoop

Related:

The following content is tested in Ubuntu 16 (before 2019) & 18.04 (after 2018).
This for how to install Spark with standalone/yarn/mesos.
OBS: Assuming username: hpc.

STANDALONE (ONE-SINGLE-NODE) #

One-node standalone mode might be our fist time to try Spark, so we make the installation as easy as possible.
Tip:

  1. A pre-build binary Spark does NOT need users to install Scala.
  2. Hadoop, ZooKeeper etc. are for High Availability, it is not mandatory (for now). We don’t install them, though the pre-build Spark can work with Hadoop.

java installation #

via apt-get #

(traditonal jdk8 before 2019, optioanl jdk11 in 2019)

sudo apt-get update && \
sudo apt-get install -y openjdk-8-jdk && \
ll /usr/lib/jvm/ && \
ll /usr/bin/java && \
ll /etc/alternatives/java && \
sudo update-alternatives --config java # may require manual input
# someone mentioned also jre, in addition to jdk.

sudo rm /opt/jdk
sudo ln -s /usr/lib/jvm/java-8-openjdk-amd64 /opt/jdk
echo "JAVA_HOME='/opt/jdk'" | sudo tee -a /etc/environment && \
source /etc/environment && \
echo $JAVA_HOME

( Java Runtime Environment (jre) is used to run java applications. Java Development Kit (jdk) is used to develop java applications. )

manual (if no sudo permission) #

cd ~
wget http://anduin.linuxfromscratch.org/BLFS/OpenJDK/OpenJDK-1.8.0.141/OpenJDK-1.8.0.141-x86_64-bin.tar.xz && \
tar xvf OpenJDK-1.8.0.141-x86_64-bin.tar.xz && \
rm OpenJDK-1.8.0.141-x86_64-bin.tar.xz && \
mv OpenJDK-1.8.0.141-x86_64-bin jdk && \
cd jdk && \
export JAVA_HOME=`pwd` && \
echo $JAVA_HOME && \
echo "export JAVA_HOME=`pwd`" >> ~/.bashrc

ref-openjdk

spark installation #

pip way #

(tested 2019.02):
Note: pipenv is suggested in practice instead of pip.

pip install pyspark && \
pyspark --version # 2.4.0

Combing java with pyspark in pipenv (only for fresh Ubuntu 18):

sudo apt-get update && \
sudo apt-get install -y openjdk-8-jdk && \
sudo ln -s /usr/lib/jvm/java-8-openjdk-amd64 /opt/jdk && \
echo "JAVA_HOME='/opt/jdk'" | sudo tee -a /etc/environment && \
source /etc/environment && \
echo $JAVA_HOME && \
sudo apt install -y python3-pip python3-setuptools && \
python3 -m pip install --user pipenv && \
if [ ! -f  /usr/bin/python ]; then sudo ln -s /usr/bin/python3 /usr/bin/python; fi && \
wget https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz -O /home/ubuntu/dist-spark220-hadoop27.tgz && \
tar -xvzf /home/ubuntu/dist-spark220-hadoop27.tgz -C /home/ubuntu/ && \
mv /home/ubuntu/spark-2.2.0-bin-hadoop2.7 /home/ubuntu/dist-spark220-hadoop27 && \
rm /home/ubuntu/dist-spark220-hadoop27.tgz && \
mkdir /home/ubuntu/test-spark220-py367 && \
cd /home/ubuntu/test-spark220-py367 && \
pipenv --python 3.6.7 && \
pipenv shell

pipenv install pyspark==2.2.post0 && \
cd /home/ubuntu/dist-spark220-hadoop27/sbin
# ./start-master.sh -h <ip>
# ./start-slave.sh spark://<ip>:7077

original way: #

wget http://www.apache.org/dist/spark/spark-2.0.2/spark-2.0.2-bin-hadoop2.7.tgz && \
tar xvzf spark-2.0.2-bin-hadoop2.7.tgz && \
mv spark-2.0.2-bin-hadoop2.7 spark && \
export SPARK_HOME=/home/hpc/spark && \
export PATH=$SPARK_HOME/bin:$PATH && \
echo $PATH
export PYSPARK_PYTHON=python3

script way: #

OBS: root permission needed, use with caution !

wget https://raw.githubusercontent.com/Mister-Meeseeks/spark-installer/master/install-spark.sh && chmod a+x install-spark.sh && sudo ./install-spark.sh https://archive.apache.org/dist/spark/spark-2.2.0/spark-2.2.0-bin-hadoop2.7.tgz
# note: if .tgz url is not provided, will use the lastest version.

try it #

Commandline for scala: pyspark-shell. Inside terminal, try: sc.version.
Commandline for python: pyspark. Inside terminal, try: sc.version.

Web UI: localhost:4040.
In pyspark:

t = sc.textFile("/lib/firmware/README")
t.collect()

lines = sc.textFile("/lib/firmware/README")
lineLengths = lines.map(lambda s: len(s))
totalLength = lineLengths.reduce(lambda a, b: a + b)
print(totalLength)

======== need to move below example to usage =======

real world task #

download data: #

cd && rm -rf kdd* ; mkdir datasets; mkdir notebooks; cd notebooks/;
# !wget -q -O ../datasets/kddtrain.gz \
# http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data.gz
wget -q -O ../datasets/kddtrain.gz \
http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz && \
wget -q -O ../datasets/kddtest.gz \
http://kdd.ics.uci.edu/databases/kddcup99/corrected.gz && \
wget -q -O ../datasets/kddnames \
http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names && \
gunzip ../datasets/kdd*gz && \
head -3 ../datasets/kddtrain

For the following training, see Jupyter ….
Also: word-count.

ref #

  1. @luck
  2. markobigdata.com
  3. informit.com
  4. official doc

STANDALONE (MULTI-NODE) #

automatic install for each node (using root) #

(below code will install spark at /root; and java in /usr ln to /opt)

wget -q http://www.apache.org/dist/spark/spark-2.0.2/spark-2.0.2-bin-hadoop2.7.tgz && \
tar xzf spark-2.0.2-bin-hadoop2.7.tgz && \
mv spark-2.0.2-bin-hadoop2.7 spark && \
apt-get update && \
apt-get -qq install -y openjdk-8-jdk && \
ls -al /usr/lib/jvm/ && \
ls -al /usr/bin/java && \
ls -al /etc/alternatives/java && \
ln -s /usr/lib/jvm/java-8-openjdk-amd64 /opt/jdk && \
export JAVA_HOME=/opt/jdk && \
export SPARK_HOME=/root/spark && \
export SPARK_MASTER_HOST=sparkmaster.dmml.stream && \
export PYSPARK_PYTHON=python3 && \
export PYTHONHASHSEED=2017 && \
echo 'JAVA_HOME=/opt/jdk' >> /etc/environment && \
echo 'SPARK_HOME=/root/spark' >> /etc/environment && \
echo 'SPARK_MASTER_HOST=sparkmaster.dmml.stream' >> /etc/environment && \
echo 'PYSPARK_PYTHON=python3' >> /etc/environment && \
echo 'PYTHONHASHSEED=2017' >> /etc/environment && \
echo "PATH=$SPARK_HOME/bin:$PATH" >> /etc/environment && \
export PATH=$SPARK_HOME/bin:$PATH && \
source /etc/environment
#update-alternatives --config java

OBS: set $PATH at last. Double quotes: " " will interprate $VAR, but single quotes ' ' will NOT.
Tip for quiet:

  • Add -q for wget.
  • Add -q or -qq to apt-get.
  • Remove v (verbose) for tar.

run #

Now, let’s run Spark on multi-node: 1 msater + n slaves.
On master:

./spark/sbin/start-master.sh

On each slave:

./spark/sbin/start-slave.sh $SPARK_MASTER_HOST:7077

Check on master localhost:8080, it should show the all slaves (workers):

Tip: By default, SPARK_WORKER_CORES = all available cores, SPARK_WORKER_MEMORY = total memory - 1GB, because 1GB is allocated to SPARK_DAEMON_MEMORY. For more env variables, see official doc.

terminal #

To try: pyspark --version.
To use, launch a terminal (spark-shell or pyspark) in master or slave:

pyspark --master spark://$SPARK_MASTER_HOST:7077
# or
spark-shell --master spark://$SPARK_MASTER_HOST:7077


Test in spark-shell:

val file = sc.textFile("file:///lib/firmware/README")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.collect()

Test in pyspark (py2):
export PYSPARK_PYTHON=python2 (py3 is also ok, see PYTHONHASHSEED bug in below FAQ)

from operator import add
if __name__ == "__main__":
    lines = sc.textFile("file:///lib/firmware/README")
    counts = lines.flatMap(lambda x: x.split(' ')) \
                  .map(lambda x: (x, 1)) \
                  .reduceByKey(add)
    output = counts.collect()
    for (word, count) in output:
        print("%s: %i" % (word, count))

submit #

# params followed directly after the .py file
/root/spark/bin/spark-submit impute-1.2-spark-real_impute_NOT_param_exp-avrSpeed.py 2 1 0

+ JUPYTER #

To use jupyter while spark is running (spark-standalone), save one of the following scripts and run the script.

if (spark v2.x and py3) #

# ??? no need ???
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.3-src.zip:$PYTHONPATH
#!/bin/sh
SPARK_HOME="" HADOOP_HOME="" YARN_HOME="" SPARK_JAR="" HADOOP_COMMON_LIB_NATIVE_DIR="" HADOOP_HDFS_HOME="" HADOOP_COMMON_HOME="" HADOOP_OPTS="" YARN_CONF_DIR="" HADOOP_MAPRED_HOME="" PYSPARK_DRIVER_PYTHON="jupyter" PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root --no-browser --port 8888" /root/spark/bin/pyspark --master spark://$SPARK_MASTER_HOST:7077 --driver-memory 2g

Do NOT split above commands into multi lines.
Some options in PYSPARK_DRIVER_PYTHON_OPTS:

  • --ip=* default: localhost
  • --notebook-dir=/home/hpc/Dropbox/scalePy default: cureent folder

We can also modify ‘pyspark’ executable directly and add the “PYSPARK_DRIVER_PYTHON” & “PYSPARK_DRIVER_PYTHON_OPTS”.

if (spark v1.6 and py2) (deprecated) #

#!/bin/sh
SPARK_HOME="" HADOOP_HOME="" YARN_HOME="" SPARK_JAR="" HADOOP_COMMON_LIB_NATIVE_DIR="" HADOOP_HDFS_HOME="" HADOOP_COMMON_HOME="" HADOOP_OPTS="" YARN_CONF_DIR="" HADOOP_MAPRED_HOME="" IPYTHON=1 PYSPARK_DRIVER_PYTHON_OPTS="notebook --allow-root --no-browser --port 8888 --ip=* --notebook-dir=/home/hpc/notebooks" /root/spark/bin/pyspark --master spark://sparkmaster.dmml.nu:7077 --driver-memory 2g

The only difference is “PYSPARK_DRIVER_PYTHON=jupyter” or “IPYTHON=1”.

results #

We should see one application running on WebUI “localhost:8080”.

try #

The code is the same as one-single-node standalone, but make sure the file is accessable the same path for all nodes. To use simple NFS storage, see nfs. For complicated usage, please check Hadoop (HDFS) or Ceph etc.

Below topics are related with high availability (HA), or DevOps.

HA - SPARK STANDALONE + LOCAL-RECOVERY #

Single-Node Recovery with Local File System.
see official doc.
//TODO

HA - SPARK STANDALONE + ZooKeeper #

Standby Masters with ZooKeeper.
see blog.
see also official doc.
//TODO

HA - SPARK ON YARN #

Tip: Spark (version >= 2.0) is built with Scala 2.11 by default. //TODO

HA - SPARK ON MESOS #

//TODO

UI & MONITORING #

Getting started to understand the ui: http://<ip>:8080/:
[GitBook] Web UI
Getting Started with the Spark Web UI | MapR 2015
Yandex Video
ProTech detailed explanation
Spak UI meaning - common parts, 2017
cn: litaotao.github.io
Understanding Spark by Visualization, databricks 2015

Advanced:
Visualizations for spark streaming, databricks 2015
[Profiling, esp SparkSQL] Apache Spark Performance Troubleshooting at Scale, Challenges, Tools, and Methods with Luca Canali

To use the UI after the tasks finished, we need to run a “history server”, which requires Spark applications’ events log [doc, stackoverflow].
To log events:

mkdir -p /tmp/spark-events
<run_spark_cmd> --conf spark.eventLog.enabled ...

Or, modify <spark_dir>/conf/spark-defaults.conf to enable eventLog history by default.

Then, after the applications [ref]:

<spark_dir>/sbin/start-history-server.sh

PERFORMANCE & BENCHMARK #

resource allocation #

perf tuning #

How to Tune Your Apache Spark Jobs - by Cloudera 2015: (Part 1), (Part 2).

hi-bench #

(HiBench)
pro: multi types of tasks
con: java; WARN: config file does not accept in-line comment with #, comments should be using separated lines.
ref: [Intel types FC16 WISS10 thesis]

spark-bench #

pro: can use many different spark-submit param configs to compare performance under different situations (like my master thesis), but this can also be done using some simple script.
con: only contains a simple saprk-pi as a start point, and it is in java.

NATURE (DIVING INTO) #

Stages:
spark stages [mapr]

Tasks:
spark tasks

pre-terms #

Task: a unit of execution that runs on a single machine.[mapr]

Stage: a group of tasks, based on partitions of the input data, which will perform the same computation in parallel.

Job: has one or more stages (action).

Pipelining: collapsing of RDDs into a single stage, when RDD transformations can be computed without data movement.

DAG: Logical graph of RDD operations.

RDD: Parallel dataset with partitions.

Shuffle: if one algorithm is not fully parallel then we must, in general, move data. For example, to sort a distributed set of numbers, it’s not enough to locally sort partitions of the data. We need to introduce a global ordering and that requires comparisons of data records from all over the cluster, necessitating a shuffle [proTech].

terms #

A Spark application consists of a driver process and a set of executor processes [databricks].
A driver is a program which contains the main method [stackoverflow].
A Spark application runs as independent processes, coordinated by the SparkContext object in the driver program [mapr].
An application can run multiple jobs, in sequence or in parallel or a long-lived server [cloudera].

One job is usually considered to be triggered by one action (“action” API call), but some jobs are implicitly triggered — e.g., data schema inference requires Spark to physically inspect some data, hence it requires a job of its own.

A task is a set of operations that making data input all the way to an output or a shuffle. One task per data partition.

Jobs are decomposed into stages by separating where a shuffle is required [proTech].

PROBLEMS, FAQ, TIPS #

Problem: “Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED”, even with export PYTHONHASHSEED=123.
Reason: bug.
Solution: use py2, or echo 'PYTHONHASHSEED=2017' >> /etc/environment && reboot (for all nodes).

Problem: spark usages all cores.
Reason: by default.
Solution: set max cores per node

Problem: messy to start each node individually.
Reason: need cluster manager.
Solution: Ansible (recommended) or Puppet (not recommended)

Problem: Exception: Python in worker has different version X.x than that in driver X.x, PySpark cannot run with different minor versions
Reason: As it sais, check all nodes’ versions python -V or python3 -V. If the do have exactly the same version installed, it is due to the driver uses 3.x, but driver uses 2.x. Spark checks the major version number and the first minor version number to match the driver with its workers. When using spark-on-k8s, the mis-match can only happen when using client-mode (see also k8s.
Solution:

ansibleall 'cp ./spark/conf/spark-env.sh.template ./spark/conf/spark-env.sh'
ansibleall 'echo "export PYSPARK_PYTHON=/usr/bin/python3" >> ./spark/conf/spark-env.sh'
ansibleall 'echo "export PYSPARK_DRIVER_PYTHON=python3" >> ./spark/conf/spark-env.sh'
ansibleslaves './spark/sbin/stop-slave.sh'
ansiblemaster './spark/sbin/stop-master.sh'
ansiblemaster './spark/sbin/start-master.sh'
ansibleslaves './spark/sbin/start-slave.sh $SPARK_MASTER_HOST:7077'

while it is necessary to have the bash alias/functions such as:

ansibleslaves () { ansible -m shell -a "$@" slaves; }

where slaves is a group in /etc/ansible/hosts.

Problem: “python3: can’t open file ’notebook’: [Errno 2] No such file or directory”
Reason: trying to run jupyter + spark with “PYSPARK_PYTHON=/usr/bin/python3”
Solution:
Do NOT use the option PYSPARK_PYTHON=/usr/bin/python3 for the machine of jupyter. Can start spark with the option, but remove it before starting jupyter.

Problem: TypeError: ‘JavaPackage’ object is not callable.
Reason: …
Solution: submit (see above).

Problem: when running jupyter+spark, got err: “python3: can’t open file ’notebook’: [Errno 2] No such file or directory.”
Reason: ~/spark/conf/spark-env.sh contains export PYSPARK_DRIVER_PYTHON=python3 which is overwrites the PYSPARK_DRIVER_PYTHON=jupyter option given by command line.
Solution: edit spark-env.sh and comment the statement out.

REFs: