Spark

From campisano.org
Jump to navigation Jump to search

Requisites

  • java >= 7 (OpenJDK is ok)
  • Spark bin package == 1.5.1 build for Hadoop 2.6 and later
  • Hadoop 2.7.1 already installed in '/srv/hadoop'
  • ssh server (ex openssh-server)
  • Scala? (which version?)

Spark cluster install as standalone mode

Define hostnames on all hosts if it's not well defined (Master and Workers)

  • change /etc/hostname
  • run follow command to apply new hostname
hostname `cat /etc/hostname`

Define Spark nodes IP and names on all hosts (Master and workers)

  • You may encounter several problems of communication. A sample problem is a hostname like
127.0.0.1 localhost
127.0.1.1 myhostname

10.1.1.2 myhostname


if you use myhostname in the configuration above, the master server appears to listening only on 127.0.1.1 !

so we will add specific spark hostnames

  • adictional warning: do not use '_' char in hostnames!


  • Add something like follow example at the end of the file /etc/hosts (if not already configured), where MASTER_HOSTNAME or NODE_HOSTNAME_* are the real names used in /etc/hostname :
# Spark needs the follow specific config
10.1.1.120      MASTER_HOSTNAME     sparkmaster
10.1.1.121      NODE_HOSTNAME_1     sparknode1
10.1.1.122      NODE_HOSTNAME_2     sparknode2
...
10.1.1.12N      NODE_HOSTNAME_N     sparknodeN

Commands to run on Master node

Creating Spark user

sudo su -
groupadd --gid 11002 spark
mkdir -p /srv/spark/ -m 755
useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark
chown spark:spark /srv/spark
exit

Creating common ssh authentication for spark user

sudo su - spark
mkdir .ssh
ssh-keygen -t rsa -P "" -f .ssh/id_rsa
cat .ssh/id_rsa.pub >> .ssh/authorized_keys
tar -czf ssh.tgz .ssh
exit
  • copy the file in all nodes
scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_1:/tmp                                # copy to node 1
scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_2:/tmp                                # copy to node 2
...                                                                             # ...
scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_N:/tmp                                # copy to node N
sudo rm /srv/spark/home/ssh.tgz                                                 # remove the file

Commands executed at any Slave host

Creating spark user

sudo groupadd --gid 11002 spark
sudo mkdir -p /srv/spark -m 755
sudo useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark

Setting ssh and environment configs

sudo chown -R spark:spark /tmp/ssh.tgz
sudo mv /tmp/ssh.tgz /srv/spark/home
sudo su - spark
tar -xzf ssh.tgz
rm -f ssh.tgz
exit

Commands executed at Master host

Installing Spark binary package

  • You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark
cd /srv/spark
wget -c http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz
tar -xzf spark-1.5.1-bin-hadoop2.6.tgz
ln -s spark-1.5.1-bin-hadoop2.6 spark-current
cat >> ~/.profile << EOF

# Set the JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre  # or /usr/lib/jvm/java-7-oracle/jre

# Spark vars
export SPARK_HOME=/srv/spark/spark-current
export PATH=$SPARK_HOME/bin:$PATH
EOF
exit

Testing local mode

sudo su - spark
cd /srv/spark/spark-current
./bin/spark-shell
# run follow
scala> sc.parallelize(1 to 1000000000).count()
scala> exit
# run the Pi example
./bin/run-example SparkPi
exit

Copy binary package to all Workers nodes

sudo su - spark
cd /srv/spark
scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_1:                              # copy to node 1
scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_2:                              # copy to node 2
...                                                                             # ...
scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_N:                              # copy to node N
exit

Configuring spark

sudo su - spark
cd /srv/spark/spark-current
cp -a ./conf/spark-env.sh.template ./conf/spark-env.sh
# configuring master
echo "export SPARK_MASTER_IP=sparkmaster" >> ./conf/spark-env.sh
# configure sereval remote workers machines
echo sparknode1 > ./conf/slaves
echo sparknode2 >> ./conf/slaves
...
echo sparknodeN >> ./conf/slaves
exit
  • Create the Log4j file configuration on '/srv/spark/spark-current/conf/log4j.properties' with the follow content:
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, RFA
log4j.appender.console.Threshold = WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# from hadoop configs
# use 'spark.log.dir=.' for non-cluster installations
spark.log.dir=/srv/spark/spark-current/logs
spark.log.file=spark.log
spark.log.maxfilesize=10MB
spark.log.maxbackupindex=100
log4j.appender.RFA.Threshold = INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

Share configs with all Slave host

sudo su - spark
scp -r ../spark-current/conf NODE_HOSTNAME_1:conf_spark                         # copy to node 1
scp -r ../spark-current/conf NODE_HOSTNAME_2:conf_spark                         # copy to node 2
...                                                                             # ...
scp -r ../spark-current/conf NODE_HOSTNAME_N:conf_spark                         # copy to node N
exit

Commands executed at any Worker host

Install Spark and apply configs

  • You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark
cd /srv/spark
mv /srv/spark/home/spark-1.5.1-bin-hadoop2.6.tgz .
tar -xzf spark-1.5.1-bin-hadoop2.6.tgz
ln -s spark-1.5.1-bin-hadoop2.6 spark-current
rm -rf spark-current/conf
mv home/conf_spark spark-current/conf
cat >> ~/.profile << EOF

# Set the JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre  # or /usr/lib/jvm/java-7-oracle/jre

# Spark vars
export SPARK_HOME=/srv/spark/spark-current
export PATH=$SPARK_HOME/bin:$PATH
EOF
exit

Configuring automatic spark daemons startup at boot

On Master host

  • Create the script /etc/init.d/spark_master.sh at Master host
#!/bin/bash
#
# spark_master.sh
# version 1.0
# from http://www.campisano.org/wiki/en/Script_chroot.sh



### BEGIN INIT INFO
# Provides:          spark_master.sh
# Required-Start:    $remote_fs $syslog $time
# Required-Stop:     $remote_fs $syslog $time
# Should-Start:      $network
# Should-Stop:       $network
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Spark Master Node
# Description:       Spark Master Node
### END INIT INFO



PATH=/sbin:/bin:/usr/sbin:/usr/bin;

OWNER[1]="spark";
MSG[1]="Spark (Master) namenode daemon ...";
START_CMD[1]="\$SPARK_HOME/sbin/start-master.sh";
STOP_CMD[1]="\$SPARK_HOME/sbin/stop-master.sh";



function start()
{
    N=1;
    while test -n "${OWNER[$N]}"; do
        echo -ne "Starting ${MSG[$N]} ...";
        su "${OWNER[$N]}" -l -c "${START_CMD[$N]}";
        echo "Done.";
        N=$(($N+1));
    done;
}

function stop()
{
    N=${#OWNER[@]};
    while test -n "${OWNER[$N]}"; do
        echo -ne "Stopping ${MSG[$N]} ...";
        su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break;
        echo "Done.";
        N=$(($N-1));
    done;
}

function usage()
{
    echo "Usage: $0 {start|stop}"
    exit 0
}



case "$1" in
    start)
        start
        ;;

    stop)
        stop
        ;;

    *)
        usage
esac;



# End

  • Configure as startup script: debian example
sudo chmod 755 /etc/init.d/spark_master.sh
sudo update-rc.d spark_master.sh defaults
sudo service spark_master.sh start

On all worker hosts

  • Create the script /etc/init.d/spark_worker.sh at all worker hosts
#!/bin/bash
#
# spark_worker.sh
# version 1.0
# from http://www.campisano.org/wiki/en/Script_chroot.sh



### BEGIN INIT INFO
# Provides:          spark_worker.sh
# Required-Start:    $remote_fs $syslog $time
# Required-Stop:     $remote_fs $syslog $time
# Should-Start:      $network
# Should-Stop:       $network
# Default-Start:     2 3 4 5
# Default-Stop:      0 1 6
# Short-Description: Spark Worker Node
# Description:       Spark Worker Node
### END INIT INFO



PATH=/sbin:/bin:/usr/sbin:/usr/bin;

OWNER[1]="spark";
MSG[1]="Spark (Worker) datanode daemon ...";
START_CMD[1]="\$SPARK_HOME/sbin/start-slave.sh spark://sparkmaster:7077";
STOP_CMD[1]="\$SPARK_HOME/sbin/stop-slave.sh";



function start()
{
    N=1;
    while test -n "${OWNER[$N]}"; do
        echo -ne "Starting ${MSG[$N]} ...";
        su "${OWNER[$N]}" -l -c "${START_CMD[$N]}";
        echo "Done.";
        N=$(($N+1));
    done;
}

function stop()
{
    N=${#OWNER[@]};
    while test -n "${OWNER[$N]}"; do
        echo -ne "Stopping ${MSG[$N]} ...";
        su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break;
        echo "Done.";
        N=$(($N-1));
    done;
}

function usage()
{
    echo "Usage: $0 {start|stop}"
    exit 0
}



case "$1" in
    start)
        start
        ;;

    stop)
        stop
        ;;

    *)
        usage
esac;



# End

  • Configure as startup script
sudo chmod 755 /etc/init.d/spark_worker.sh
sudo update-rc.d spark_worker.sh defaults
sudo service spark_worker.sh start

Spark WEB interface

However, if there is another service that use 8080 port (ex Tomcat or Hadoop) it will start on next port, for example 8081

  • If you have any problem to access this port (ex firewall lock), you can tunneling this port on local computer with the follow command:
ssh -C -N -L 8080:localhost:8080 -p MASTER_SSH_PORT root@MASTER_HOST

Run

First ready example

sudo su - spark
MASTER=spark://sparkmaster:7077 ../spark-current/bin/spark-shell
scala:> sc.parallelize(1 to 1000000000).count()
scala:> exit
exit

Create a client user to have access to the cluster

Creating a user

sudo groupadd --gid 10102 <USER_NAME>
sudo useradd --uid 10102 --gid <USER_NAME> --no-user-group --shell /bin/bash --create-home --home-dir /home/<USER_NAME> <USER_NAME>
sudo passwd <USER_NAME>
sudo passwd -e <USER_NAME>

sudo su - <USER_NAME>
passwd
mkdir -p ~/spark/log ~/spark/conf ~/spark/example
exit

Configure Spark environment

  • Add follow at the end of ~/.profile file

- You need to know the java install path, the script check_java.sh can help to find it.

# Set the JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre  # or /usr/lib/jvm/java-7-oracle/jre

# Hadoop vars
export HADOOP_PREFIX=/srv/hadoop
export HADOOP_HOME=$HADOOP_PREFIX
export HADOOP_MAPRED_HOME=$HADOOP_PREFIX
export HADOOP_COMMON_HOME=$HADOOP_PREFIX
export HADOOP_HDFS_HOME=$HADOOP_PREFIX
export HADOOP_CONF_DIR=${HADOOP_PREFIX}"/etc/hadoop"
export HADOOP_YARN_HOME=$HADOOP_PREFIX
export PATH=$PATH:$HADOOP_PREFIX/bin
export PATH=$PATH:$HADOOP_PREFIX/sbin
mkdir -p $HOME/hadoop/log
export HADOOP_LOG_DIR=$HOME/hadoop/log
export YARN_LOG_DIR=$HOME/hadoop/log

# Spark vars
export SPARK_HOME=/srv/spark/spark-current
export PATH=$SPARK_HOME/bin:$PATH
mkdir -p $HOME/spark/log
mkdir -p $HOME/spark/conf
export SPARK_CONF_DIR=$HOME/spark/conf

Configure the logger so Spark will be less verbose

  • Create the follow file at ~/spark/conf/log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, RFA
log4j.appender.console.Threshold = WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# from hadoop configs
spark.log.dir=${user.home}/spark/log
spark.log.file=spark.log
spark.log.maxfilesize=10MB
spark.log.maxbackupindex=100
log4j.appender.RFA.Threshold = INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

Create your first application

First Python application

Create the Python Program

  • Create a source code file 'wordcount.py' with the follow content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
from pyspark import SparkContext, SparkConf


class WordCount:

    def __init__(self, _path):

        sc = SparkContext(conf=SparkConf().setAppName(self.__class__.__name__))

        # Get all lines
        lines = sc.textFile(_path)

        # Filter empty ones
        lines_nonempty = lines.filter(self.nonEmptyLines)

        # Get all words
        words = lines_nonempty.flatMap(self.linesToWords)

        # Map each word with his occurrency (=1 for each word)
        pairs = words.map(self.mapWordsToPairs)

        # Group all words by key and sum their occurrencies
        counts = pairs.reduceByKey(self.reduceToCount)

        # Sort by number of ocurrency
        # Spark doesn't (yet) implement a sortByValue
        swappedPair = counts.map(self.swapPairs)
        swappedPair = swappedPair.sortByKey(False)
        sorted_counts = swappedPair.map(self.swapPairs)

        # Collect the ouput
        output = sorted_counts.collect()

        # Print the output
        for (word, count) in output:
            print("%s: %i" % (word.encode('ascii', 'ignore'), count))

        sc.stop()

    def nonEmptyLines(self, _line):
        return (_line and len(_line) > 0)

    def linesToWords(self, _line):
        return _line.split(' ')

    def mapWordsToPairs(self, _word):
        return (_word, 1)

    def reduceToCount(self, _i1, _i2):
        return (_i1 + _i2)

    def swapPairs(self, _tuple):
        return tuple(reversed(_tuple))


if __name__ == "__main__":

    if len(sys.argv) != 2:
        sys.stderr.write(
            "Usage: WordCount <file>" + "\n" +
            " where file can be a local 'file:///LOCAL_PATH' path" + "\n" +
            " or a remote 'hdfs://REMOTE_PATH' path")
        sys.exit(1)

    WordCount(_path=sys.argv[1])

Run the package in a local copy of Spark

  • Run locally

- NOTE: local[4] means run locally with 4 threads

spark-submit --master local[4] wordcount.py file:///etc/profile

Run the package in the Spark cluster

  • Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data


  • Run in the Spark cluster
spark-submit --master spark://sparkmaster:7077 wordcount.py hdfs://hadoopmaster:9000/my_data


  • Cleanup data from HDFS
hadoop fs -rm /my_data

First Java Spark application using Eclipse

Eclipse installation

sudo apt-get install eclipse-jdt

Maven installation

sudo apt-get install maven

Create a new Spark project

mkdir spark_app
cd spark_app
mkdir -p src/main/java
  • Create a new 'pom.xml' Maven file with the follow content:
<project>
    <groupId>org.campisano.wordcount</groupId>
    <artifactId>wordcount</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>WordCount Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.6</source>
                        <target>1.6</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.1</version>
        </dependency>
    </dependencies>

</project>

Create the Java Program

  • Create a source code file 'src/main/java/WordCount.java' with the follow content:

- The follow code is a classic WordCount refactored from the official Spark JavaWordCount example

import java.util.Arrays;
import java.util.List;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;

// from
// https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java

public class WordCount {

    public WordCount(String _file_path) {

        SparkConf conf = new SparkConf().setAppName(this.getClass().getName());
        JavaSparkContext sc = new JavaSparkContext(conf);

        // Get all lines
        JavaRDD<String> lines = sc.textFile(_file_path).cache();

        // Filter empty ones
        JavaRDD<String> lines_nonempty = lines.filter(new NonEmptyLines());

        // Get all words
        JavaRDD<String> words = lines_nonempty.flatMap(new LinesToWords());

        // Map each word with his occurrency (=1 for each word)
        JavaPairRDD<String, Integer> pairs = words.mapToPair(
            new MapWordsToPairs());

        // Group all words by key and sum their occurrencies
        JavaPairRDD<String, Integer> counts = pairs.reduceByKey(
            new ReduceToCount());

        // Sort by number of ocurrency
        // Spark doesn't (yet) implement a sortByValue
        JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(
            new SwapPairs());
        swappedPair = swappedPair.sortByKey(false);
        JavaPairRDD<String, Integer> sorted_counts = swappedPair.mapToPair(
            new SwapPairs());

        // Collect the ouput
        List<Tuple2<String, Integer>> output = counts.collect();

        // Print the output
        for(Tuple2<String, Integer> tuple : output) {
            System.out.println(tuple._1() + ": " + tuple._2());
        }

        sc.stop();
    }

    public static void main(String[] args) {
        if(args.length != 1) {
              System.err.println("Usage: WordCount <file>" + "\n" +
                " where file can be a local 'file:///LOCAL_PATH' path" + "\n" +
                " or a remote 'hdfs://REMOTE_PATH' path");
              System.exit(1);
        }

        new WordCount(args[0]);
    }
}

class NonEmptyLines implements Function<String, Boolean> {
    @Override
    public Boolean call(String _line) throws Exception {
        if(_line == null || _line.trim().length() < 1) {
            return false;
        }

        return true;
    }
}

class LinesToWords implements FlatMapFunction<String, String> {
    @Override
    public java.lang.Iterable<String> call(String _line) throws Exception {
        return Arrays.asList(_line.split(" "));
    }
}

class MapWordsToPairs implements PairFunction<
    String, String, Integer> {
    @Override
    public scala.Tuple2<String, Integer> call(String _word) {
        return new Tuple2<String, Integer>(_word, 1);
    }
}

class ReduceToCount implements Function2<
    Integer, Integer, Integer> {
    @Override
    public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
    }
}

class SwapPairs<KEY, VAL> implements PairFunction <
    Tuple2<KEY, VAL>, VAL, KEY> {
    @Override
    public Tuple2<VAL, KEY> call(Tuple2<KEY, VAL> item)
        throws Exception {
        return item.swap();
    }
}

Use Eclipse

  • Open Eclipse, click on 'File->Import->Existing Maven Projects'
  • Specify the full path for the Maven project created above

Create a executable JAR package

mvn package

Run the package in a local copy of Spark

  • Run locally

- NOTE: local[4] means run locally with 4 threads

spark-submit --class "WordCount" --master local[4] target/wordcount-1.0.jar file:///etc/profile

Run the package in the Spark cluster

  • Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data


  • Run in the Spark cluster
spark-submit --class "WordCount" --master spark://sparkmaster:7077 target/wordcount-1.0.jar hdfs://hadoopmaster:9000/my_data


  • Cleanup data from HDFS
hadoop fs -rm /my_data

References