Spark
Jump to navigation
Jump to search
Requisites
- java >= 7 (OpenJDK is ok)
- Spark bin package == 1.5.1 build for Hadoop 2.6 and later
- Hadoop 2.7.1 already installed in '/srv/hadoop'
- ssh server (ex openssh-server)
- Scala? (which version?)
Spark cluster install as standalone mode
Define hostnames on all hosts if it's not well defined (Master and Workers)
- change /etc/hostname
- run follow command to apply new hostname
hostname `cat /etc/hostname`
Define Spark nodes IP and names on all hosts (Master and workers)
- You may encounter several problems of communication. A sample problem is a hostname like
127.0.0.1 localhost 127.0.1.1 myhostname 10.1.1.2 myhostname
if you use myhostname in the configuration above, the master server appears to listening only on 127.0.1.1 !
so we will add specific spark hostnames
- adictional warning: do not use '_' char in hostnames!
- Add something like follow example at the end of the file /etc/hosts (if not already configured), where MASTER_HOSTNAME or NODE_HOSTNAME_* are the real names used in /etc/hostname :
# Spark needs the follow specific config 10.1.1.120 MASTER_HOSTNAME sparkmaster 10.1.1.121 NODE_HOSTNAME_1 sparknode1 10.1.1.122 NODE_HOSTNAME_2 sparknode2 ... 10.1.1.12N NODE_HOSTNAME_N sparknodeN
Commands to run on Master node
Creating Spark user
sudo su - groupadd --gid 11002 spark mkdir -p /srv/spark/ -m 755 useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark chown spark:spark /srv/spark exit
Creating common ssh authentication for spark user
sudo su - spark mkdir .ssh ssh-keygen -t rsa -P "" -f .ssh/id_rsa cat .ssh/id_rsa.pub >> .ssh/authorized_keys tar -czf ssh.tgz .ssh exit
- copy the file in all nodes
scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_1:/tmp # copy to node 1 scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_2:/tmp # copy to node 2 ... # ... scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_N:/tmp # copy to node N sudo rm /srv/spark/home/ssh.tgz # remove the file
Commands executed at any Slave host
Creating spark user
sudo groupadd --gid 11002 spark sudo mkdir -p /srv/spark -m 755 sudo useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark
Setting ssh and environment configs
sudo chown -R spark:spark /tmp/ssh.tgz sudo mv /tmp/ssh.tgz /srv/spark/home sudo su - spark tar -xzf ssh.tgz rm -f ssh.tgz exit
Commands executed at Master host
Installing Spark binary package
- You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark cd /srv/spark wget -c http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz tar -xzf spark-1.5.1-bin-hadoop2.6.tgz ln -s spark-1.5.1-bin-hadoop2.6 spark-current cat >> ~/.profile << EOF # Set the JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre # Spark vars export SPARK_HOME=/srv/spark/spark-current export PATH=$SPARK_HOME/bin:$PATH EOF exit
Testing local mode
sudo su - spark cd /srv/spark/spark-current ./bin/spark-shell # run follow scala> sc.parallelize(1 to 1000000000).count() scala> exit # run the Pi example ./bin/run-example SparkPi exit
Copy binary package to all Workers nodes
sudo su - spark cd /srv/spark scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_1: # copy to node 1 scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_2: # copy to node 2 ... # ... scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_N: # copy to node N exit
Configuring spark
sudo su - spark cd /srv/spark/spark-current cp -a ./conf/spark-env.sh.template ./conf/spark-env.sh # configuring master echo "export SPARK_MASTER_IP=sparkmaster" >> ./conf/spark-env.sh # configure sereval remote workers machines echo sparknode1 > ./conf/slaves echo sparknode2 >> ./conf/slaves ... echo sparknodeN >> ./conf/slaves exit
- Create the Log4j file configuration on '/srv/spark/spark-current/conf/log4j.properties' with the follow content:
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, RFA
log4j.appender.console.Threshold = WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# from hadoop configs
# use 'spark.log.dir=.' for non-cluster installations
spark.log.dir=/srv/spark/spark-current/logs
spark.log.file=spark.log
spark.log.maxfilesize=10MB
spark.log.maxbackupindex=100
log4j.appender.RFA.Threshold = INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
sudo su - spark scp -r ../spark-current/conf NODE_HOSTNAME_1:conf_spark # copy to node 1 scp -r ../spark-current/conf NODE_HOSTNAME_2:conf_spark # copy to node 2 ... # ... scp -r ../spark-current/conf NODE_HOSTNAME_N:conf_spark # copy to node N exit
Commands executed at any Worker host
Install Spark and apply configs
- You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark cd /srv/spark mv /srv/spark/home/spark-1.5.1-bin-hadoop2.6.tgz . tar -xzf spark-1.5.1-bin-hadoop2.6.tgz ln -s spark-1.5.1-bin-hadoop2.6 spark-current rm -rf spark-current/conf mv home/conf_spark spark-current/conf cat >> ~/.profile << EOF # Set the JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre # Spark vars export SPARK_HOME=/srv/spark/spark-current export PATH=$SPARK_HOME/bin:$PATH EOF exit
Configuring automatic spark daemons startup at boot
On Master host
- Create the script /etc/init.d/spark_master.sh at Master host
#!/bin/bash
#
# spark_master.sh
# version 1.0
# from http://www.campisano.org/wiki/en/Script_chroot.sh
### BEGIN INIT INFO
# Provides: spark_master.sh
# Required-Start: $remote_fs $syslog $time
# Required-Stop: $remote_fs $syslog $time
# Should-Start: $network
# Should-Stop: $network
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Spark Master Node
# Description: Spark Master Node
### END INIT INFO
PATH=/sbin:/bin:/usr/sbin:/usr/bin;
OWNER[1]="spark";
MSG[1]="Spark (Master) namenode daemon ...";
START_CMD[1]="\$SPARK_HOME/sbin/start-master.sh";
STOP_CMD[1]="\$SPARK_HOME/sbin/stop-master.sh";
function start()
{
N=1;
while test -n "${OWNER[$N]}"; do
echo -ne "Starting ${MSG[$N]} ...";
su "${OWNER[$N]}" -l -c "${START_CMD[$N]}";
echo "Done.";
N=$(($N+1));
done;
}
function stop()
{
N=${#OWNER[@]};
while test -n "${OWNER[$N]}"; do
echo -ne "Stopping ${MSG[$N]} ...";
su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break;
echo "Done.";
N=$(($N-1));
done;
}
function usage()
{
echo "Usage: $0 {start|stop}"
exit 0
}
case "$1" in
start)
start
;;
stop)
stop
;;
*)
usage
esac;
# End
- Configure as startup script: debian example
sudo chmod 755 /etc/init.d/spark_master.sh sudo update-rc.d spark_master.sh defaults sudo service spark_master.sh start
On all worker hosts
- Create the script /etc/init.d/spark_worker.sh at all worker hosts
#!/bin/bash
#
# spark_worker.sh
# version 1.0
# from http://www.campisano.org/wiki/en/Script_chroot.sh
### BEGIN INIT INFO
# Provides: spark_worker.sh
# Required-Start: $remote_fs $syslog $time
# Required-Stop: $remote_fs $syslog $time
# Should-Start: $network
# Should-Stop: $network
# Default-Start: 2 3 4 5
# Default-Stop: 0 1 6
# Short-Description: Spark Worker Node
# Description: Spark Worker Node
### END INIT INFO
PATH=/sbin:/bin:/usr/sbin:/usr/bin;
OWNER[1]="spark";
MSG[1]="Spark (Worker) datanode daemon ...";
START_CMD[1]="\$SPARK_HOME/sbin/start-slave.sh spark://sparkmaster:7077";
STOP_CMD[1]="\$SPARK_HOME/sbin/stop-slave.sh";
function start()
{
N=1;
while test -n "${OWNER[$N]}"; do
echo -ne "Starting ${MSG[$N]} ...";
su "${OWNER[$N]}" -l -c "${START_CMD[$N]}";
echo "Done.";
N=$(($N+1));
done;
}
function stop()
{
N=${#OWNER[@]};
while test -n "${OWNER[$N]}"; do
echo -ne "Stopping ${MSG[$N]} ...";
su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break;
echo "Done.";
N=$(($N-1));
done;
}
function usage()
{
echo "Usage: $0 {start|stop}"
exit 0
}
case "$1" in
start)
start
;;
stop)
stop
;;
*)
usage
esac;
# End
- Configure as startup script
sudo chmod 755 /etc/init.d/spark_worker.sh sudo update-rc.d spark_worker.sh defaults sudo service spark_worker.sh start
Spark WEB interface
- WEB interface usually start at Master node, at port 8080: http://MASTER_HOST:8080
However, if there is another service that use 8080 port (ex Tomcat or Hadoop) it will start on next port, for example 8081
- If you have any problem to access this port (ex firewall lock), you can tunneling this port on local computer with the follow command:
ssh -C -N -L 8080:localhost:8080 -p MASTER_SSH_PORT root@MASTER_HOST
- now you can access the Spark interface opening this url http://localhost:8088
Run
First ready example
sudo su - spark MASTER=spark://sparkmaster:7077 ../spark-current/bin/spark-shell scala:> sc.parallelize(1 to 1000000000).count() scala:> exit exit
Create a client user to have access to the cluster
Creating a user
sudo groupadd --gid 10102 <USER_NAME> sudo useradd --uid 10102 --gid <USER_NAME> --no-user-group --shell /bin/bash --create-home --home-dir /home/<USER_NAME> <USER_NAME> sudo passwd <USER_NAME> sudo passwd -e <USER_NAME> sudo su - <USER_NAME> passwd mkdir -p ~/spark/log ~/spark/conf ~/spark/example exit
Configure Spark environment
- Add follow at the end of ~/.profile file
- You need to know the java install path, the script check_java.sh can help to find it.
# Set the JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre
# Hadoop vars
export HADOOP_PREFIX=/srv/hadoop
export HADOOP_HOME=$HADOOP_PREFIX
export HADOOP_MAPRED_HOME=$HADOOP_PREFIX
export HADOOP_COMMON_HOME=$HADOOP_PREFIX
export HADOOP_HDFS_HOME=$HADOOP_PREFIX
export HADOOP_CONF_DIR=${HADOOP_PREFIX}"/etc/hadoop"
export HADOOP_YARN_HOME=$HADOOP_PREFIX
export PATH=$PATH:$HADOOP_PREFIX/bin
export PATH=$PATH:$HADOOP_PREFIX/sbin
mkdir -p $HOME/hadoop/log
export HADOOP_LOG_DIR=$HOME/hadoop/log
export YARN_LOG_DIR=$HOME/hadoop/log
# Spark vars
export SPARK_HOME=/srv/spark/spark-current
export PATH=$SPARK_HOME/bin:$PATH
mkdir -p $HOME/spark/log
mkdir -p $HOME/spark/conf
export SPARK_CONF_DIR=$HOME/spark/conf
Configure the logger so Spark will be less verbose
- Create the follow file at ~/spark/conf/log4j.properties
# Set everything to be logged to the console
log4j.rootCategory=INFO, console, RFA
log4j.appender.console.Threshold = WARN
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
# from hadoop configs
spark.log.dir=${user.home}/spark/log
spark.log.file=spark.log
spark.log.maxfilesize=10MB
spark.log.maxbackupindex=100
log4j.appender.RFA.Threshold = INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file}
log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex}
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
Create your first application
First Python application
Create the Python Program
- Create a source code file 'wordcount.py' with the follow content:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from pyspark import SparkContext, SparkConf
class WordCount:
def __init__(self, _path):
sc = SparkContext(conf=SparkConf().setAppName(self.__class__.__name__))
# Get all lines
lines = sc.textFile(_path)
# Filter empty ones
lines_nonempty = lines.filter(self.nonEmptyLines)
# Get all words
words = lines_nonempty.flatMap(self.linesToWords)
# Map each word with his occurrency (=1 for each word)
pairs = words.map(self.mapWordsToPairs)
# Group all words by key and sum their occurrencies
counts = pairs.reduceByKey(self.reduceToCount)
# Sort by number of ocurrency
# Spark doesn't (yet) implement a sortByValue
swappedPair = counts.map(self.swapPairs)
swappedPair = swappedPair.sortByKey(False)
sorted_counts = swappedPair.map(self.swapPairs)
# Collect the ouput
output = sorted_counts.collect()
# Print the output
for (word, count) in output:
print("%s: %i" % (word.encode('ascii', 'ignore'), count))
sc.stop()
def nonEmptyLines(self, _line):
return (_line and len(_line) > 0)
def linesToWords(self, _line):
return _line.split(' ')
def mapWordsToPairs(self, _word):
return (_word, 1)
def reduceToCount(self, _i1, _i2):
return (_i1 + _i2)
def swapPairs(self, _tuple):
return tuple(reversed(_tuple))
if __name__ == "__main__":
if len(sys.argv) != 2:
sys.stderr.write(
"Usage: WordCount <file>" + "\n" +
" where file can be a local 'file:///LOCAL_PATH' path" + "\n" +
" or a remote 'hdfs://REMOTE_PATH' path")
sys.exit(1)
WordCount(_path=sys.argv[1])
Run the package in a local copy of Spark
- Run locally
- NOTE: local[4] means run locally with 4 threads
spark-submit --master local[4] wordcount.py file:///etc/profile
Run the package in the Spark cluster
- Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data
- Run in the Spark cluster
spark-submit --master spark://sparkmaster:7077 wordcount.py hdfs://hadoopmaster:9000/my_data
- Cleanup data from HDFS
hadoop fs -rm /my_data
First Java Spark application using Eclipse
Eclipse installation
sudo apt-get install eclipse-jdt
Maven installation
sudo apt-get install maven
Create a new Spark project
mkdir spark_app cd spark_app mkdir -p src/main/java
- Create a new 'pom.xml' Maven file with the follow content:
<project>
<groupId>org.campisano.wordcount</groupId>
<artifactId>wordcount</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>WordCount Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
</dependencies>
</project>
Create the Java Program
- Create a source code file 'src/main/java/WordCount.java' with the follow content:
- The follow code is a classic WordCount refactored from the official Spark JavaWordCount example
import java.util.Arrays;
import java.util.List;
import scala.Tuple2;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
// from
// https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
public class WordCount {
public WordCount(String _file_path) {
SparkConf conf = new SparkConf().setAppName(this.getClass().getName());
JavaSparkContext sc = new JavaSparkContext(conf);
// Get all lines
JavaRDD<String> lines = sc.textFile(_file_path).cache();
// Filter empty ones
JavaRDD<String> lines_nonempty = lines.filter(new NonEmptyLines());
// Get all words
JavaRDD<String> words = lines_nonempty.flatMap(new LinesToWords());
// Map each word with his occurrency (=1 for each word)
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new MapWordsToPairs());
// Group all words by key and sum their occurrencies
JavaPairRDD<String, Integer> counts = pairs.reduceByKey(
new ReduceToCount());
// Sort by number of ocurrency
// Spark doesn't (yet) implement a sortByValue
JavaPairRDD<Integer, String> swappedPair = counts.mapToPair(
new SwapPairs());
swappedPair = swappedPair.sortByKey(false);
JavaPairRDD<String, Integer> sorted_counts = swappedPair.mapToPair(
new SwapPairs());
// Collect the ouput
List<Tuple2<String, Integer>> output = counts.collect();
// Print the output
for(Tuple2<String, Integer> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());
}
sc.stop();
}
public static void main(String[] args) {
if(args.length != 1) {
System.err.println("Usage: WordCount <file>" + "\n" +
" where file can be a local 'file:///LOCAL_PATH' path" + "\n" +
" or a remote 'hdfs://REMOTE_PATH' path");
System.exit(1);
}
new WordCount(args[0]);
}
}
class NonEmptyLines implements Function<String, Boolean> {
@Override
public Boolean call(String _line) throws Exception {
if(_line == null || _line.trim().length() < 1) {
return false;
}
return true;
}
}
class LinesToWords implements FlatMapFunction<String, String> {
@Override
public java.lang.Iterable<String> call(String _line) throws Exception {
return Arrays.asList(_line.split(" "));
}
}
class MapWordsToPairs implements PairFunction<
String, String, Integer> {
@Override
public scala.Tuple2<String, Integer> call(String _word) {
return new Tuple2<String, Integer>(_word, 1);
}
}
class ReduceToCount implements Function2<
Integer, Integer, Integer> {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
}
class SwapPairs<KEY, VAL> implements PairFunction <
Tuple2<KEY, VAL>, VAL, KEY> {
@Override
public Tuple2<VAL, KEY> call(Tuple2<KEY, VAL> item)
throws Exception {
return item.swap();
}
}
Use Eclipse
- Open Eclipse, click on 'File->Import->Existing Maven Projects'
- Specify the full path for the Maven project created above
Create a executable JAR package
mvn package
Run the package in a local copy of Spark
- Run locally
- NOTE: local[4] means run locally with 4 threads
spark-submit --class "WordCount" --master local[4] target/wordcount-1.0.jar file:///etc/profile
Run the package in the Spark cluster
- Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data
- Run in the Spark cluster
spark-submit --class "WordCount" --master spark://sparkmaster:7077 target/wordcount-1.0.jar hdfs://hadoopmaster:9000/my_data
- Cleanup data from HDFS
hadoop fs -rm /my_data