Parallel chains with MPI
mpi.Rmd
Motivation
Markov chain Monte Carlo scales exponentially with the problem size. For models with ~50 parameters it already becomes hard to intialize a Markov chain with good values: - starting parameters - step size
The effect is a slow moving chain, and a sample with high auto-correlation.
To reduce auto-correlation, we suggest the parallel tempering technique, where several Markov chains run in parallel and each experiences a different likelihood: \(p(D|\theta)^\beta\) where \(\theta\) is the Markov chain variable (maps to the model’s kinetic parameters) and \(D\) is the available data. In this context \(0 \le \beta \le 1\) plays the role of an inverse temperature. A value of \(\beta = 1.0\) is equivalent to the original problem; the lower \(\beta\) gets, the more does the target distribution resemble the prior distribution.
The different chains are then allowed to exchange their positions if it is benefitial (according to the specific rules of parallel tempering). This exchange of information makes the chain with \(\beta = 1\) explore the sampling space much faster than the Markov chain algorithm by itself would allow.
Even though the implementation of Parallel Tempering is possible
without MPI, we now have several experiments to simulate on many
parallel chains, for AKAP79 this is 20 experiments and e.g. 32 chains.
At this point, it becomes benefitial to do the simulations on more than
one machine (mine has 16 cores). The high performance computing
cluster at KTH (Parallel Dator Center, Dardel cluster) has 128 cores per
node. This means that we can simulate 6 chains per node and still
process all experiments at the same time (using parLapply
or mclapply
), but also start 24 chains distributed accross
3 nodes.
The distribution requires message passing between nodes, for this prupose we use pbdMPI
MPI code is difficult to understand, write, test, and debug. So, the below instructions may not work for everyone.
AKAP79
MPI has several mechanisms to launch worker processes, one ow which
is the spawn
method (MPI_Comm_spawn
). We don’t
use this here.
Instead we launch an Rscript via mpirun
(and the
send
and recv
functions on
MPI_COMM_WORLD
, which corresponds to comm=0
in
pbdMPI).
The only difference between the workers is the rank
and
possibly the random number seed. The rank will determine the value of
\(\beta\) and thus the kind of sample
the worker obtains.
Locally (on your machine) you can test the MPI script using the
mpirun
command, if you have some mpi library installed:
# find number of cores on GNU/linux
N = $(( `grep -c processor /proc/cpuinfo` )) # or just type the right number
mpirun -H localhost:$N -N $N ./AKAP79.R 5000
For this to work, AKAP79.R
starts with the line:
#!/usr/bin/env Rscript
## ^^^^^^^^^^^^^^^^^^^
## The above has to be the first line.
## Nothing may be written before that.
[...]
R code follows below
So, Rscript will be found and used to process the rest.
If you want to call Rscript
explicitly, then:
On the cluster the -H
option is not necessary, because a
queue system like slurm will determine which machines to use.
The argument 5000
(the sample size) is passed to the R
program AKAP79.R
, it’s not an mpirun
option.
-N ...
means that we launch this many MPI workers per
node (node means the same as machine or host). We have one node
(localhost), the -H localhost:$SLOTS
option determines the
maximum number of jobs per node.
If you have 16 cores:
mpirun -H localhost:$16 -N 16 Rscript ./AKAP79.R 5000
In this example, we use the MPI chains for parallel chains on
different temperatures. But, this is not mandatory. It is possible to
run $N
chains on the same temperature, or split the work in
some other way.
MPI Script Template
This is an example that will sample a model using the Metropolis
algorithm, using pbdMPI
and the parallel tempering technique.
#!/usr/bin/env Rscript
library(rgsl)
library(SBtabVFGEN)
library(uqsa)
library(parallel)
library(pbdMPI)
start_time <- Sys.time()
comm <- 0
pbdMPI::init()
r <- pbdMPI::comm.rank(comm=comm)
cs <- pbdMPI::comm.size(comm=comm)
attr(comm,"rank") <- r
attr(comm,"size") <- cs
N <- 10000 # default sample size
h <- 1e-2 # step size
beta <- (1.0 - (r/cs))^2 # PT: inverse temperature
## ----label="SBtab content"----------------------------------------------------
modelFiles <- dir("..",pattern="[.]tsv$",full.names=TRUE)
sb <- SBtabVFGEN::sbtab_from_tsv(modelFiles)
modelName <- checkModel(comment(sb),sprintf("./%s_gvc.c",comment(sb)))
## ----ConservationLaws----------------------------------------------------------
if (file.exists("../ConservationLaws.RDS")){
ConLaw <- readRDS("../ConservationLaws.RData")
} else {
ConLaw <- NULL
}
experiments <- sbtab.data(sb,ConLaw)
## ----default------------------------------------------------------------------
n <- length(experiments[[1]]$input)
if (n>0){
parMCMC <- log10(head(model$par(),-n))
} else {
parMCMC <- log10(model$par())
}
## ----range--------------------------------------------------------------------
stdv <- head(sb$Parameter[["!Std"]],length(parMCMC))
if (is.null(stdv) || any(is.na(stdv))){ # some alternative default value
stdv <- parMCMC*0.05 + max(parMCMC)*0.05 + 1e-2
}
dprior <- dNormalPrior(mean=parMCMC,sd=stdv)
rprior <- rNormalPrior(mean=parMCMC,sd=stdv)
## ----simulate-----------------------------------------------------------------
sim <- simulator.c(experiments,modelName,log10ParMap)
y <- sim(parMCMC) ## little test
stopifnot(is.list(y) && length(y)==length(experiments))
stopifnot(all(c("state","func") %in% names(y[[1]])))
## ----likelihood---------------------------------------------------------------
logLH <- function(y,h,stdv,name){
n <- sum(!is.na(stdv))
llf_const <- sum(log(stdv),na.rm=TRUE) + 0.5*log(2*pi)*n
llf_sq <- 0.5*sum(((y - h)/stdv)^2,na.rm=TRUE)
return(-llf_const-llf_sq)
}
llf <- logLikelihoodFunc(experiments,simpleUserLLF=logLH)
## ----update-------------------------------------------------------------------
metropolis <- mcmcUpdate(simulate=sim,
experiments=experiments,
model=model,
logLikelihood=llf,
dprior=dprior)
## ----init---------------------------------------------------------------------
ptMetropolis <- mcmc_mpi(metropolis,comm=comm,swapDelay=0,swapFunc=pbdMPI_bcast_reduce_temperatures)
## ----sample-------------------------------------------------------------------
x <- mcmcInit(
beta,
parMCMC,
simulate=sim,
logLikelihood=llf,
dprior)
s <- ptMetropolis(x,N,h) # the main amount of work is done here
saveRDS(s,file=sprintf("sample-rank-%i.RDS",r))
# ---- when all are done, we load the sampled points from the files but only for the right temperature:
pbdMPI::barrier()
f <- dir(pattern=sprintf('sample-rank-.*RDS$'))
stopifnot(length(f) == cs) # MPI comm size
X <- uqsa::gatherSample(f,beta,size=N/2)
Copula <- fitCopula(X)
dprior <- dCopulaPrior(Copula)
rprior <- rCopulaPrior(Copula)
parMCMC <- as.numeric(tail(X,1))
attr(Copula,"beta") <- beta
attr(X,"beta") <- beta
saveRDS(Copula,file=sprintf("Copula-for-beta-%i-l10b-%i.RDS",r,round(100*log10(beta))))
saveRDS(X,file="parameter-sample-for-beta-%i-l10b-%i.RDS",r,round(100*log10(beta)))
time_ <- difftime(Sys.time(),start_time,units="min")
print(time_)
finalize()