Spark
Jump to navigation
Jump to search
Requisites
- java >= 7 (OpenJDK is ok)
- Spark bin package == 1.5.1 build for Hadoop 2.6 and later
- Hadoop 2.7.1 already installed in '/srv/hadoop'
- ssh server (ex openssh-server)
- Scala? (which version?)
Spark cluster install as standalone mode
Define hostnames on all hosts if it's not well defined (Master and Workers)
- change /etc/hostname
- run follow command to apply new hostname
hostname `cat /etc/hostname`
Define Spark nodes IP and names on all hosts (Master and workers)
- You may encounter several problems of communication. A sample problem is a hostname like
127.0.0.1 localhost 127.0.1.1 myhostname 10.1.1.2 myhostname
if you use myhostname in the configuration above, the master server appears to listening only on 127.0.1.1 !
so we will add specific spark hostnames
- adictional warning: do not use '_' char in hostnames!
- Add something like follow example at the end of the file /etc/hosts (if not already configured), where MASTER_HOSTNAME or NODE_HOSTNAME_* are the real names used in /etc/hostname :
# Spark needs the follow specific config 10.1.1.120 MASTER_HOSTNAME sparkmaster 10.1.1.121 NODE_HOSTNAME_1 sparknode1 10.1.1.122 NODE_HOSTNAME_2 sparknode2 ... 10.1.1.12N NODE_HOSTNAME_N sparknodeN
Commands to run on Master node
Creating Spark user
sudo su - groupadd --gid 11002 spark mkdir -p /srv/spark/ -m 755 useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark chown spark:spark /srv/spark exit
Creating common ssh authentication for spark user
sudo su - spark mkdir .ssh ssh-keygen -t rsa -P "" -f .ssh/id_rsa cat .ssh/id_rsa.pub >> .ssh/authorized_keys tar -czf ssh.tgz .ssh exit
- copy the file in all nodes
scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_1:/tmp # copy to node 1 scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_2:/tmp # copy to node 2 ... # ... scp /srv/spark/home/ssh.tgz NODE_HOSTNAME_N:/tmp # copy to node N sudo rm /srv/spark/home/ssh.tgz # remove the file
Commands executed at any Slave host
Creating spark user
sudo groupadd --gid 11002 spark sudo mkdir -p /srv/spark -m 755 sudo useradd --uid 11002 --gid spark --no-user-group --shell /bin/bash --create-home --home-dir /srv/spark/home spark
Setting ssh and environment configs
sudo chown -R spark:spark /tmp/ssh.tgz sudo mv /tmp/ssh.tgz /srv/spark/home sudo su - spark tar -xzf ssh.tgz rm -f ssh.tgz exit
Commands executed at Master host
Installing Spark binary package
- You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark cd /srv/spark wget -c http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz tar -xzf spark-1.5.1-bin-hadoop2.6.tgz ln -s spark-1.5.1-bin-hadoop2.6 spark-current cat >> ~/.profile << EOF # Set the JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre # Spark vars export SPARK_HOME=/srv/spark/spark-current export PATH=$SPARK_HOME/bin:$PATH EOF exit
Testing local mode
sudo su - spark cd /srv/spark/spark-current ./bin/spark-shell # run follow scala> sc.parallelize(1 to 1000000000).count() scala> exit # run the Pi example ./bin/run-example SparkPi exit
Copy binary package to all Workers nodes
sudo su - spark cd /srv/spark scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_1: # copy to node 1 scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_2: # copy to node 2 ... # ... scp spark-1.5.1-bin-hadoop2.6.tgz NODE_HOSTNAME_N: # copy to node N exit
Configuring spark
sudo su - spark cd /srv/spark/spark-current cp -a ./conf/spark-env.sh.template ./conf/spark-env.sh # configuring master echo "export SPARK_MASTER_IP=sparkmaster" >> ./conf/spark-env.sh # configure sereval remote workers machines echo sparknode1 > ./conf/slaves echo sparknode2 >> ./conf/slaves ... echo sparknodeN >> ./conf/slaves exit
- Create the Log4j file configuration on '/srv/spark/spark-current/conf/log4j.properties' with the follow content:
# Set everything to be logged to the console log4j.rootCategory=INFO, console, RFA log4j.appender.console.Threshold = WARN log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.spark-project.jetty=WARN log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR # from hadoop configs # use 'spark.log.dir=.' for non-cluster installations spark.log.dir=/srv/spark/spark-current/logs spark.log.file=spark.log spark.log.maxfilesize=10MB spark.log.maxbackupindex=100 log4j.appender.RFA.Threshold = INFO log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file} log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize} log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex} log4j.appender.RFA.layout=org.apache.log4j.PatternLayout log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
sudo su - spark scp -r ../spark-current/conf NODE_HOSTNAME_1:conf_spark # copy to node 1 scp -r ../spark-current/conf NODE_HOSTNAME_2:conf_spark # copy to node 2 ... # ... scp -r ../spark-current/conf NODE_HOSTNAME_N:conf_spark # copy to node N exit
Commands executed at any Worker host
Install Spark and apply configs
- You need to know the java install path, the script check_java.sh can help to find it.
sudo su - spark cd /srv/spark mv /srv/spark/home/spark-1.5.1-bin-hadoop2.6.tgz . tar -xzf spark-1.5.1-bin-hadoop2.6.tgz ln -s spark-1.5.1-bin-hadoop2.6 spark-current rm -rf spark-current/conf mv home/conf_spark spark-current/conf cat >> ~/.profile << EOF # Set the JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre # Spark vars export SPARK_HOME=/srv/spark/spark-current export PATH=$SPARK_HOME/bin:$PATH EOF exit
Configuring automatic spark daemons startup at boot
On Master host
- Create the script /etc/init.d/spark_master.sh at Master host
#!/bin/bash # # spark_master.sh # version 1.0 # from http://www.campisano.org/wiki/en/Script_chroot.sh ### BEGIN INIT INFO # Provides: spark_master.sh # Required-Start: $remote_fs $syslog $time # Required-Stop: $remote_fs $syslog $time # Should-Start: $network # Should-Stop: $network # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: Spark Master Node # Description: Spark Master Node ### END INIT INFO PATH=/sbin:/bin:/usr/sbin:/usr/bin; OWNER[1]="spark"; MSG[1]="Spark (Master) namenode daemon ..."; START_CMD[1]="\$SPARK_HOME/sbin/start-master.sh"; STOP_CMD[1]="\$SPARK_HOME/sbin/stop-master.sh"; function start() { N=1; while test -n "${OWNER[$N]}"; do echo -ne "Starting ${MSG[$N]} ..."; su "${OWNER[$N]}" -l -c "${START_CMD[$N]}"; echo "Done."; N=$(($N+1)); done; } function stop() { N=${#OWNER[@]}; while test -n "${OWNER[$N]}"; do echo -ne "Stopping ${MSG[$N]} ..."; su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break; echo "Done."; N=$(($N-1)); done; } function usage() { echo "Usage: $0 {start|stop}" exit 0 } case "$1" in start) start ;; stop) stop ;; *) usage esac; # End
- Configure as startup script: debian example
sudo chmod 755 /etc/init.d/spark_master.sh sudo update-rc.d spark_master.sh defaults sudo service spark_master.sh start
On all worker hosts
- Create the script /etc/init.d/spark_worker.sh at all worker hosts
#!/bin/bash # # spark_worker.sh # version 1.0 # from http://www.campisano.org/wiki/en/Script_chroot.sh ### BEGIN INIT INFO # Provides: spark_worker.sh # Required-Start: $remote_fs $syslog $time # Required-Stop: $remote_fs $syslog $time # Should-Start: $network # Should-Stop: $network # Default-Start: 2 3 4 5 # Default-Stop: 0 1 6 # Short-Description: Spark Worker Node # Description: Spark Worker Node ### END INIT INFO PATH=/sbin:/bin:/usr/sbin:/usr/bin; OWNER[1]="spark"; MSG[1]="Spark (Worker) datanode daemon ..."; START_CMD[1]="\$SPARK_HOME/sbin/start-slave.sh spark://sparkmaster:7077"; STOP_CMD[1]="\$SPARK_HOME/sbin/stop-slave.sh"; function start() { N=1; while test -n "${OWNER[$N]}"; do echo -ne "Starting ${MSG[$N]} ..."; su "${OWNER[$N]}" -l -c "${START_CMD[$N]}"; echo "Done."; N=$(($N+1)); done; } function stop() { N=${#OWNER[@]}; while test -n "${OWNER[$N]}"; do echo -ne "Stopping ${MSG[$N]} ..."; su "${OWNER[$N]}" -l -c "${STOP_CMD[$N]}" || break; echo "Done."; N=$(($N-1)); done; } function usage() { echo "Usage: $0 {start|stop}" exit 0 } case "$1" in start) start ;; stop) stop ;; *) usage esac; # End
- Configure as startup script
sudo chmod 755 /etc/init.d/spark_worker.sh sudo update-rc.d spark_worker.sh defaults sudo service spark_worker.sh start
Spark WEB interface
- WEB interface usually start at Master node, at port 8080: http://MASTER_HOST:8080
However, if there is another service that use 8080 port (ex Tomcat or Hadoop) it will start on next port, for example 8081
- If you have any problem to access this port (ex firewall lock), you can tunneling this port on local computer with the follow command:
ssh -C -N -L 8080:localhost:8080 -p MASTER_SSH_PORT root@MASTER_HOST
- now you can access the Spark interface opening this url http://localhost:8088
Run
First ready example
sudo su - spark MASTER=spark://sparkmaster:7077 ../spark-current/bin/spark-shell scala:> sc.parallelize(1 to 1000000000).count() scala:> exit exit
Create a client user to have access to the cluster
Creating a user
sudo groupadd --gid 10102 <USER_NAME> sudo useradd --uid 10102 --gid <USER_NAME> --no-user-group --shell /bin/bash --create-home --home-dir /home/<USER_NAME> <USER_NAME> sudo passwd <USER_NAME> sudo passwd -e <USER_NAME> sudo su - <USER_NAME> passwd mkdir -p ~/spark/log ~/spark/conf ~/spark/example exit
Configure Spark environment
- Add follow at the end of ~/.profile file
- You need to know the java install path, the script check_java.sh can help to find it.
# Set the JAVA_HOME export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/jre # or /usr/lib/jvm/java-7-oracle/jre # Hadoop vars export HADOOP_PREFIX=/srv/hadoop export HADOOP_HOME=$HADOOP_PREFIX export HADOOP_MAPRED_HOME=$HADOOP_PREFIX export HADOOP_COMMON_HOME=$HADOOP_PREFIX export HADOOP_HDFS_HOME=$HADOOP_PREFIX export HADOOP_CONF_DIR=${HADOOP_PREFIX}"/etc/hadoop" export HADOOP_YARN_HOME=$HADOOP_PREFIX export PATH=$PATH:$HADOOP_PREFIX/bin export PATH=$PATH:$HADOOP_PREFIX/sbin mkdir -p $HOME/hadoop/log export HADOOP_LOG_DIR=$HOME/hadoop/log export YARN_LOG_DIR=$HOME/hadoop/log # Spark vars export SPARK_HOME=/srv/spark/spark-current export PATH=$SPARK_HOME/bin:$PATH mkdir -p $HOME/spark/log mkdir -p $HOME/spark/conf export SPARK_CONF_DIR=$HOME/spark/conf
Configure the logger so Spark will be less verbose
- Create the follow file at ~/spark/conf/log4j.properties
# Set everything to be logged to the console log4j.rootCategory=INFO, console, RFA log4j.appender.console.Threshold = WARN log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n # Settings to quiet third party logs that are too verbose log4j.logger.org.spark-project.jetty=WARN log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR # from hadoop configs spark.log.dir=${user.home}/spark/log spark.log.file=spark.log spark.log.maxfilesize=10MB spark.log.maxbackupindex=100 log4j.appender.RFA.Threshold = INFO log4j.appender.RFA=org.apache.log4j.RollingFileAppender log4j.appender.RFA.File=${spark.log.dir}/${spark.log.file} log4j.appender.RFA.MaxFileSize=${spark.log.maxfilesize} log4j.appender.RFA.MaxBackupIndex=${spark.log.maxbackupindex} log4j.appender.RFA.layout=org.apache.log4j.PatternLayout log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
Create your first application
First Python application
Create the Python Program
- Create a source code file 'wordcount.py' with the follow content:
#!/usr/bin/env python # -*- coding: utf-8 -*- import sys from pyspark import SparkContext, SparkConf class WordCount: def __init__(self, _path): sc = SparkContext(conf=SparkConf().setAppName(self.__class__.__name__)) # Get all lines lines = sc.textFile(_path) # Filter empty ones lines_nonempty = lines.filter(self.nonEmptyLines) # Get all words words = lines_nonempty.flatMap(self.linesToWords) # Map each word with his occurrency (=1 for each word) pairs = words.map(self.mapWordsToPairs) # Group all words by key and sum their occurrencies counts = pairs.reduceByKey(self.reduceToCount) # Sort by number of ocurrency # Spark doesn't (yet) implement a sortByValue swappedPair = counts.map(self.swapPairs) swappedPair = swappedPair.sortByKey(False) sorted_counts = swappedPair.map(self.swapPairs) # Collect the ouput output = sorted_counts.collect() # Print the output for (word, count) in output: print("%s: %i" % (word.encode('ascii', 'ignore'), count)) sc.stop() def nonEmptyLines(self, _line): return (_line and len(_line) > 0) def linesToWords(self, _line): return _line.split(' ') def mapWordsToPairs(self, _word): return (_word, 1) def reduceToCount(self, _i1, _i2): return (_i1 + _i2) def swapPairs(self, _tuple): return tuple(reversed(_tuple)) if __name__ == "__main__": if len(sys.argv) != 2: sys.stderr.write( "Usage: WordCount <file>" + "\n" + " where file can be a local 'file:///LOCAL_PATH' path" + "\n" + " or a remote 'hdfs://REMOTE_PATH' path") sys.exit(1) WordCount(_path=sys.argv[1])
Run the package in a local copy of Spark
- Run locally
- NOTE: local[4] means run locally with 4 threads
spark-submit --master local[4] wordcount.py file:///etc/profile
Run the package in the Spark cluster
- Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data
- Run in the Spark cluster
spark-submit --master spark://sparkmaster:7077 wordcount.py hdfs://hadoopmaster:9000/my_data
- Cleanup data from HDFS
hadoop fs -rm /my_data
First Java Spark application using Eclipse
Eclipse installation
sudo apt-get install eclipse-jdt
Maven installation
sudo apt-get install maven
Create a new Spark project
mkdir spark_app cd spark_app mkdir -p src/main/java
- Create a new 'pom.xml' Maven file with the follow content:
<project> <groupId>org.campisano.wordcount</groupId> <artifactId>wordcount</artifactId> <modelVersion>4.0.0</modelVersion> <name>WordCount Project</name> <packaging>jar</packaging> <version>1.0</version> <build> <pluginManagement> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.0</version> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </pluginManagement> </build> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.5.1</version> </dependency> </dependencies> </project>
Create the Java Program
- Create a source code file 'src/main/java/WordCount.java' with the follow content:
- The follow code is a classic WordCount refactored from the official Spark JavaWordCount example
import java.util.Arrays; import java.util.List; import scala.Tuple2; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; // from // https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java public class WordCount { public WordCount(String _file_path) { SparkConf conf = new SparkConf().setAppName(this.getClass().getName()); JavaSparkContext sc = new JavaSparkContext(conf); // Get all lines JavaRDD<String> lines = sc.textFile(_file_path).cache(); // Filter empty ones JavaRDD<String> lines_nonempty = lines.filter(new NonEmptyLines()); // Get all words JavaRDD<String> words = lines_nonempty.flatMap(new LinesToWords()); // Map each word with his occurrency (=1 for each word) JavaPairRDD<String, Integer> pairs = words.mapToPair( new MapWordsToPairs()); // Group all words by key and sum their occurrencies JavaPairRDD<String, Integer> counts = pairs.reduceByKey( new ReduceToCount()); // Sort by number of ocurrency // Spark doesn't (yet) implement a sortByValue JavaPairRDD<Integer, String> swappedPair = counts.mapToPair( new SwapPairs()); swappedPair = swappedPair.sortByKey(false); JavaPairRDD<String, Integer> sorted_counts = swappedPair.mapToPair( new SwapPairs()); // Collect the ouput List<Tuple2<String, Integer>> output = counts.collect(); // Print the output for(Tuple2<String, Integer> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } sc.stop(); } public static void main(String[] args) { if(args.length != 1) { System.err.println("Usage: WordCount <file>" + "\n" + " where file can be a local 'file:///LOCAL_PATH' path" + "\n" + " or a remote 'hdfs://REMOTE_PATH' path"); System.exit(1); } new WordCount(args[0]); } } class NonEmptyLines implements Function<String, Boolean> { @Override public Boolean call(String _line) throws Exception { if(_line == null || _line.trim().length() < 1) { return false; } return true; } } class LinesToWords implements FlatMapFunction<String, String> { @Override public java.lang.Iterable<String> call(String _line) throws Exception { return Arrays.asList(_line.split(" ")); } } class MapWordsToPairs implements PairFunction< String, String, Integer> { @Override public scala.Tuple2<String, Integer> call(String _word) { return new Tuple2<String, Integer>(_word, 1); } } class ReduceToCount implements Function2< Integer, Integer, Integer> { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } } class SwapPairs<KEY, VAL> implements PairFunction < Tuple2<KEY, VAL>, VAL, KEY> { @Override public Tuple2<VAL, KEY> call(Tuple2<KEY, VAL> item) throws Exception { return item.swap(); } }
Use Eclipse
- Open Eclipse, click on 'File->Import->Existing Maven Projects'
- Specify the full path for the Maven project created above
Create a executable JAR package
mvn package
Run the package in a local copy of Spark
- Run locally
- NOTE: local[4] means run locally with 4 threads
spark-submit --class "WordCount" --master local[4] target/wordcount-1.0.jar file:///etc/profile
Run the package in the Spark cluster
- Put some data into HDFS (destination: /my_data)
hadoop fs -copyFromLocal /etc/profile /my_data
- Run in the Spark cluster
spark-submit --class "WordCount" --master spark://sparkmaster:7077 target/wordcount-1.0.jar hdfs://hadoopmaster:9000/my_data
- Cleanup data from HDFS
hadoop fs -rm /my_data