There are a few approaches for maximizing your throughput at OSC if you happen to have a multithreaded workflow. In this demonstration, I assume you have a script with Python using some form of multithreading based on a shared memory model, like OpenMP or using the pprocess package in Python. If you do not have a code, there is code for you to test the strategies below.
To present these job submission approaches, I provide an example using Python with LAPACK (driven by Intel MKL - this is automatically done for you if you load Python on OSC systems). This approach was tested with Ruby, but should easily adapt to Oakley and Owens clusters. Approach 1 is based on Bash, approach 2 is based on using the Moab/Torque scheduler, and approach 3 uses a combination of mpiexec (hydra) and parallel-command-processor but does not depend on the scheduler. All approaches use the same concept of Bash arrays, so you are not bound to simple sequences, you can have arbitrary lists or even read a file with all your fancy variations for parameter studies.
First, here is the Python example to solve A*x = b
- A random matrix is generated for square matrix A, and also for solution vector b
- Solve for x
The code is presented below in 100 lines of example_lapack.py
which you can toggle below.
example_lapack.py
""" global imports for example_lapack.py Copyright 2016 Ohio Supercomputer Center Attributions: This example was inspired by "Numerical Computing with Modern Fortran" (SIAM, 2013) by Hanson & Hopkins """ from __future__ import print_function import argparse import sys import time import logging import numpy as np try: import mkl except ImportError: raise ImportError('This version of Python does not have "mkl", load with ' + '"module load python/2.7.latest"') try: from scipy.linalg.lapack import dgetrf, dgetrs from scipy.linalg.blas import dnrm2, dgemv except ImportError: raise ImportError('This version of Python does not have access to a' + 'lower-level lapack/blas routine.') def main(): """ This example computes the following; 1. Random number generation to fill a matrix 'a' of dimension nxn and also for a matrix 'y' of dimension n 2. Pre-solve a*y = b so that we have 'b'. This uses dgemv. 3. Perform LU factorization (dgetrf) on dense matrix 'a' and store matrix and pivots in 'lu' and 'piv' 4. Solve for x given 'lu' and 'piv' arrays (dgetrs) 5. Compute L2 norm of the difference between solution and known vectors divided by L2 normed to the known y. This is to provide a single point measure of the relative error. Inputs: dimension of n Error checks: NONE currently """ # log and send it to stderr. logging.basicConfig(level=logging.INFO) parser = argparse.ArgumentParser() parser.add_argument("dimension", type=int, default=5, nargs='?', help="The dimension of square matrix A") parser.add_argument("threads", type=int, default=20, nargs='?', help="The number of threads") # grab the options here from the command line args = parser.parse_args() n = args.dimension mkl.set_num_threads(args.threads) # begin timing random number matrix generation time_1 = time.time() logging.debug('Dimension of square n by n matrix is:' + str(n) + '\n') a = np.random.rand(n, n) y = np.random.rand(n) logging.debug('a:' + np.array_str(a) + '\n') logging.debug('y:' + np.array_str(y) + '\n') # begin timing LAPACK time_2 = time.time() try: b = dgemv(1, a, y) except AttributeError: # catch when python scipy blas does not have dgemv print('This version of Python does not have access to lower-level dgemv' + 'routine.') sys.exit(1) logging.debug('b:' + np.array_str(b) + '\n') lu, piv, _ = dgetrf(a) # lu factorization x, _ = dgetrs(lu, piv, b) # solve for x logging.debug('x:' + np.array_str(x) + '\n') relerr = dnrm2(x-y) / dnrm2(y) # end timing LAPACK time_3 = time.time() print("Solved a matrix of size:", n, "using", mkl.get_max_threads(), "threads.") print('Relative Error:', relerr) print("--- Random Matrix Generation Time: %s seconds ---" % (time_2 - time_1)) print("--- Solution Time: %s seconds ---" % (time_3 - time_2)) # main script begin if __name__ == "__main__": main()
Now, we test it:
python example_lapack.py --help python example_lapack.py # solve using 5x5 matrix and max threads on node
If everything is good, you should see some output like:
Solved a matrix of size: 5 using 16 threads. Relative Error: 9.34065631647e-17 --- Random Matrix Generation Time: 0.0035982131958 seconds --- --- Solution Time: 0.00995397567749 seconds ---
Now, that this example is functioning, we can grow this into a case study with varying dimension - 1K, 2K, all the way up to 10K. Let's consider three approaches:
Approach 1: Using a Serial Loop
This one is easy, examine the script by toggling bash_loop.sh. But it's clear that the weakness is that we live on a single node, and do not task 'parallelize' further.
Submit the job with qsub bash_loop.sh
The Python script should reside in the same present working directory. You will see an output file of the form bash_loop.sh.o${PBS_JOBID} where $PBS_JOBID is a positive integer based on what the scheduler assigned.
bash_loop.sh
#!/bin/bash #PBS -l walltime=10:00 #PBS -l nodes=1:ppn=16 # Append stderr to stdout #PBS -j oe #PBS -q debug cd $PBS_O_WORKDIR module load python/2.7.latest array=$(seq 1000 1000 10000) for case in $array; do echo echo python example_lapack.py $case python example_lapack.py $case done
Approach 2: Using Job Task Arrays
This approach takes more effort, but the benefit is to use the scheduler to submit more jobs and leverage the array - the benefit is if you are okay with asynchronous results, whatever case it can fit it will submit Note: there are advanced directives that can hold jobs based on others completing, but we will withhold discussing that option. Approach 1 may be more suitable if you want clean serialized outputs.
Submit the job with qsub -t 0-9 bash_loop.sh. We start at 0 because array indices in Bash start at 0.
The Python script should reside in the same present working directory. You will see an output file of the form task_array.sh.o${PBS_JOBID}-${PBS_ARRAYID} where $PBS_JOBID is a positive integer based on what the scheduler assigned, and $PBS_ARRAYID is based on the task array values you set with the -t option.
I dislike how this output is formed in this example, so we can just write another script to provide comparable output as seen in Approach 1 as a post-processing step. If you want to post-process the task array outputs into one result task_array.sh.o${PBS_JOBID} then you can run bash task_array_combine_results.sh. Because this involves deleting some output files, you should test with d for dry-run to see what the operation would do.
task_array.sh
#!/bin/bash #PBS -l walltime=10:00 #PBS -l nodes=1:ppn=16 # Append stderr to stdout #PBS -j oe module load python/2.7.latest cd $PBS_O_WORKDIR #ALTERNATIVELY: array=(1 3 4 50000) array=($(seq 1000 1000 10000)) if [ -z ${PBS_ARRAYID+x} ] ; then # read http://stackoverflow.com/a/13864829/992834 echo PBS_ARRAYID is not set echo submit this script using "qsub -t 1-10 $0" else echo PBS_JOBID: $PBS_JOBID echo python example_lapack.py ${array[$PBS_ARRAYID]} python example_lapack.py ${array[$PBS_ARRAYID]} fi
task_array_combine_results.sh
#!/bin/bash jobscript="task_array.sh" echo "Name of job script" $jobscript jobids=$(ls ${jobscript}.o* -1 | grep -o -P '(?<=.o).*(?=-)' | sort | uniq) function enter_choice { read -p "Combine (y/n/d) d=dry-run? " choice case "$choice" in y|Y ) echo "yes";; n|N ) echo "no";; d|D ) echo "dry";; * ) echo "invalid";; esac } for jobid in $jobids; do echo echo "Files to combine:" # sort by the task array ID after the '-' files=$(ls ${jobscript}.o${jobid}-* | sort -t'-' -k2 -n) echo $files choice=$(enter_choice) combined=${jobscript}.o${jobid} if [[ $choice == "yes" ]] ; then echo cat $files ">" $combined echo rm $files # almost the same as 'cat' above but we put an extra row in between for f in $files; do (cat "${f}" | sed '/^Resources requested:$/q' | \ head -n -2; echo) >> $combined; done rm $files elif [[ $choice == "dry" ]] ; then echo cat $files ">" ${jobscript}.o$jobid echo rm $files fi done
Approach 3: Using parallel-command-processor and mpiexec
This approach may appear to be difficult, but just take another deep dive and see how you can leverage mpiexec using parallel command processor. The idea is that each core runs an independent task on a per line basis using either a Bash Here Document or in our case, using a pipe with Bash. In order to isolate a task to each node, one can check for the same remainder to ensure we are on a unique compute node. For example, on Ruby debug nodes, we have nodes with either 16 or 20 cores, and if we request two (that is the maximum nodes you can request on the debug queue), we are designing the process to solve 2 cases at a time on 2 nodes simultaneously. So in our example, we solve 5 batches in sets of 2 cases, where parallel-command-processor will take care of that logic. You can change the nodes to 10 to maximize parallelization, but be sure remove the line (#PBS -q debug) for requesting the debug queue, as your job will be rejected for having too many nodes requested in the debug queue.
Furthermore, on OSC systems, you optimize serial throughput by creating a a true parallel workload via mpiexec. You can read more about parallel command processor on OSC systems by typing man parallel-command-processor.
Submit the job with qsub pcp_bash_loop.sh.
The Python script should reside in the same present working directory. You will see an output file of the form pcp_bash_loop.sh.o${PBS_JOBID} where $PBS_JOBID is a positive integer based on what the scheduler assigned.
pcp_bash_loop.sh
#!/bin/bash #PBS -l walltime=10:00 #PBS -l nodes=2:ppn=16 #PBS -q debug # Append stderr to stdout #PBS -j oe cd $PBS_O_WORKDIR source /usr/local/lmod/lmod/init/sh module load python/2.7.latest procs=32 ppn=16 max_nodes=$((procs/ppn)) count=0 base=0 array=($(seq 1000 1000 10000)) while [ $base -lt ${#array[@]} ]; do for task in $(seq 1 $procs); do case=${array[$((task / ppn + base))]} command="python example_lapack.py $case" echo "if [ $((task%$ppn)) -eq 1 ] ; then $command; fi" done | mpiexec parallel-command-processor base=$((base+max_nodes)) done