Parallel chains with MPI
mpi.Rmd
This article explains how to combine SMMALA with the technique of parallel tempering used in the article: Sample AKAP79.
AKAP79 is a fairly big model. This makes it worthwhile to use SMMALA and also parallel tempering which both speed up convergence to the target distribution. Here we explain which model properties have influenced our decisions and which modifications we made to the model to make the job easier.
UQSA includes a special ODE solver, which calculates a crude estimate of the Fisher information \(G\) and the gradient of the log-likelihood \(\nabla_\theta l(\theta;D)\). Both are required in SMMALA. We cannot calculate the exact \(G\) an dgradient. This lowers the algorithms performance compared to the ideal case. But the algorithm remains exact.
This special solver is called uqsa::simfi()
(same
interface as the solvers in rgsl
). In the R script shown at
the end of this article we have included functions that extract the
Fisher information and gradient from the simulation result. These
simulation results are always attached to the current position of the
Markov chain as attributes.
The Fisher information is also linearly transformed by the Jacobian of the parameter mapping (the map between the Markov chain variable \(\theta\) and the model’s kinetic parameters \(\rho\)). We use the mapping: \(\rho = 10^\theta\).
In addition, it is easier to work with matrices in C, rather than
data-frames, so we transform all data-sets into equivalent matrices, and
replace all NA
values with values that have the same effect
as na.rm=TRUE
. missing data values can be replaced by any
number, missing standard errors are replaced with infinities (a missing
value has infinite error). The infinite values are disregarded when the
normalising factors a re computed (i.e.:
1/sqrt(2*pi*sigma^2)
).
If the liklihood function is not derived from a Gaussian, then the user has to provide a different set of functions to approximate gradient and Fisher information, or use a different solver.
This particular model has a very dense data-set, which we will down-sample to fewer points. We do this to reduce memory consumption and speed up the process (and to show how one can do such a thing). Not much information is lost that way.
The standard errors of the (real) data-sets are very uneven. To make the job easier for the sampler, we replace the real standard errors with even 5% errors (bigger than the real ones). If we were to try for a very big sample (millions of sampled points), we coudl use the real errors instead. For a small sample, even errors work better.
Motivation
Markov chain Monte Carlo scales exponentially with the problem size. For models with ~50 parameters it already becomes hard to intialize a Markov chain with good values:
- starting parameters
- step size
The effect is a slow moving chain, and a sample with high auto-correlation.
To reduce auto-correlation, we suggest the parallel tempering technique, where several Markov chains run in parallel and each experiences a different likelihood: \(p(D|\theta)^\beta\) where \(\theta\) is the Markov chain variable (maps to the model’s kinetic parameters) and \(D\) is the available data. In this context \(0 \le \beta \le 1\) plays the role of an inverse temperature. A value of \(\beta = 1.0\) is equivalent to the original problem; the lower \(\beta\) gets, the more does the target distribution resemble the prior distribution.
The different chains are then allowed to exchange their positions if it is benefitial (according to the specific rules of parallel tempering). This exchange of information makes the chain with \(\beta = 1\) explore the sampling space much faster than the Markov chain algorithm by itself would allow.
Even though the implementation of Parallel Tempering is possible
without MPI, we now have several experiments to simulate on many
parallel chains, for AKAP79 this is 20 experiments and e.g. 32 chains.
At this point, it becomes benefitial to do the simulations on more than
one machine (mine has 16 cores, which isn’t much for a problem of this
size). The high performance computing cluster at KTH (Parallel
Dator Center, Dardel cluster) has 128 cores per node. This means that we
can simulate 6 chains per node and still process all experiments at the
same time (using parLapply
or mclapply
), but
also start 24 chains distributed accross 3 nodes.
The distribution requires message passing between nodes, for this prupose we use pbdMPI
MPI code is difficult to understand, write, test, and debug. So, the below instructions may not work for everyone.
AKAP79
MPI has several mechanisms to launch worker processes, one of which
is the spawn
method (MPI_Comm_spawn
). We don’t
use this here.
Instead we launch an Rscript via mpirun
(and the
send
and recv
functions on
MPI_COMM_WORLD
, which corresponds to comm=0
in
pbdMPI).
The only difference between the workers is the rank
and
possibly the random number seed. The rank will determine the value of
\(\beta\) and thus the kind of sample
the worker obtains.
Locally (on your machine) you can test the MPI script using the
mpirun
command, if you have some mpi library installed:
# find number of cores on GNU/linux
N = $(( `grep -c processor /proc/cpuinfo` )) # or just type the right number
mpirun -H localhost:$N -N $N ./pt-smmala-akap79.R 5000
For this to work, AKAP79.R
starts with the line:
#!/usr/bin/env Rscript
## ^^^^^^^^^^^^^^^^^^^
## The above has to be the first line.
## Nothing may be written before that.
[...]
So, Rscript will be found and used to process the rest.
If you want to call Rscript
explicitly, then:
On the cluster the -H
option is not necessary, because a
queue system like slurm will determine which machines to use. The
program mpirun
may have a different name on a cluster
(e.g. srun
).
The argument 5000
(the sample size) is passed to the R
program AKAP79.R
, it’s not an mpirun
option.
-N ...
means that we launch this many MPI workers per
node (node means the same as machine or host). We have one node
(localhost), the -H localhost:$SLOTS
option determines the
maximum number of jobs per node.
If you have 16 cores:
mpirun -H localhost:$16 -N 16 Rscript ./pt-smmala-akap79.R 5000
In this example, we use the MPI chains for parallel chains on
different temperatures. But, this is not mandatory. It is possible to
run $N
chains on the same temperature, or split the work in
some other way.
MPI Script Template
This is an example that will sample a model using the SMMALA
algorithm, using pbdMPI
and the parallel tempering technique.
#!/usr/bin/env Rscript
library(rgsl)
library(uqsa)
library(pbdMPI)
start_time <- Sys.time()
comm <- 0
pbdMPI::init()
r <- pbdMPI::comm.rank(comm=comm)
cs <- pbdMPI::comm.size(comm=comm)
attr(comm,"rank") <- r
attr(comm,"size") <- cs
beta <- (1.0 - (r/cs))^2 # PT: inverse temperature
a <- commandArgs(trailingOnly=TRUE)
if (length(a)>0){
N <- as.integer(a[1])
} else {
N <- 50000 # default sample size
}
h <- 0.01 # step size
modelName <- "AKAP79"
comment(modelName) <- "./AKAP79.so"
sb <- readRDS(file="AKAP79-sb.RDS")
ex <- readRDS(file="AKAP79-ex.RDS")
## In the C backend of simfi, we calculate the Fisher information
## and log-likelihood gradient for Gaussian liklihoods.
## For this to work at all, we convert the data-frames to simple matrices.
## The names 'time', 'data', and 'stdv' have a highest precedence in the solver code.
for (i in seq_along(ex)){
t_ <- ex[[i]]$outputTimes
nt <- length(t_)
BY <- 10 # take every BYth point
D <- t(ex[[i]]$outputValues)
D[is.na(D)] <- 0.0
SD <- D*0.05+apply(D,1,FUN=max,na.rm=TRUE)*0.05 #t(ex[[i]]$errorValues)
SD[is.na(SD)] <- Inf
ex[[i]]$time <- t_[seq(1,nt,by=BY)] # The solvers behind
ex[[i]]$data <- D[,seq(1,nt,by=BY),drop=FALSE] # simfi() will use these
ex[[i]]$stdv <- SD[,seq(1,nt,by=BY),drop=FALSE] # instead of outputValues
}
parMCMC <- log10(sb$Parameter[["!DefaultValue"]])
stopifnot("!Max" %in% names(sb$Parameter))
stopifnot("!Min" %in% names(sb$Parameter))
mu <- 0.5*(log10(sb$Parameter[["!Max"]])+log10(sb$Parameter[["!Min"]]))
stdv <- 0.5*(log10(sb$Parameter[["!Max"]])-log10(sb$Parameter[["!Min"]]))
stopifnot(length(mu)==length(stdv))
stopifnot(length(parMCMC)==length(mu))
dprior <- dNormalPrior(mean=mu,sd=stdv)
rprior <- rNormalPrior(mean=mu,sd=stdv)
gprior <- \(p) {return(-1.0*(p-parMCMC)/stdv^2)}
## ----simulate-----------------------------------------------------------------
sim <- simfi(ex,modelName,log10ParMap) # or simulator.c
## log-likelihood
llf <- function(parMCMC){
i <- seq_along(parMCMC)
J <- log10ParMapJac(parMCMC)
return(Reduce(\(a,b) a + b$logLikelihood[1], attr(parMCMC,"simulations"), init = 0.0))
}
## gradient of the log-likelihood
gllf <- function(parMCMC){
i <- seq_along(parMCMC)
J <- log10ParMapJac(parMCMC)
return(as.numeric(Reduce(\(a,b) a + b$gradLogLikelihood[i,1], attr(parMCMC,"simulations"), init = 0.0) %*% J))
}
## Fisher Information
fi <- function(parMCMC){
i <- seq_along(parMCMC)
J <- log10ParMapJac(parMCMC)
return(t(J) %*% Reduce(\(a,b) a + b$FisherInformation[i,i,1], attr(parMCMC,"simulations"), init = 0.0) %*% J)
}
## ----update-------------------------------------------------------------------
smmala <- mcmcUpdate(
simulate=sim,
experiments=ex,
logLikelihood=llf,
dprior=dprior,
gradLogLikelihood=gllf,
gprior,
fisherInformation=fi,
fisherInformationPrior=diag(1.0/stdv^2)
)
## ----mcmc-method--------------------------------------------------------------
MC <- mcmc(smmala)
ptSMMALA <- mcmc_mpi(smmala,comm=comm,swapDelay=0,swapFunc=pbdMPI_bcast_reduce_temperatures)
## ----init---------------------------------------------------------------------
x <- mcmcInit(
beta,
parMCMC,
simulate=sim,
logLikelihood=llf,
dprior,
gllf,
gprior,
fi
)
## beta,parMCMC,simulate,logLikelihood,dprior,gradLogLikelihood=NULL,gprior=NULL,fisherInformation=NULL
A <- function(a) { # step-size adjuster
return(0.5 + a^4/(0.25^4 + a^4))
}
## ----converge-and-adapt-------------------------------------------------------
if (r==0){
cat(sprintf("%10s %12s %16s %16s\n","rank","iteration","acceptance","step size"))
cat(sprintf("%10s %12s %16s %16s\n","----","---------","----------","---------"))
}
for (j in seq(2)){
for (i in seq(6)){
s <- MC(x,100,h) # evaluate rate of acceptance
a <- attr(s,"acceptanceRate")
h <- h*A(a) # adjust h up or down
x <- attr(s,"lastPoint") # start next iteration from last point
cat(sprintf("%10i %12i %16.4f %16.4g\n",r,i,a,h)) # cat(sprintf("rank: %02i; iteration: %02i; a: %f; h: %g\n",r,i,a,h))
}
pbdMPI::barrier()
## ---- here, the sampling happens
s <- ptSMMALA(x,N,h) # the main amount of work is done here
saveRDS(s,file=sprintf("AKAP79-pt-smmala-sample-%i-rank-%i.RDS",j,r))
## ---- when all are done, we load the sampled points from the files but only for the right temperature:
pbdMPI::barrier()
f <- dir(pattern=sprintf("^AKAP79-pt-smmala-sample-%i-rank-.*RDS$",j))
X <- uqsa::gatherSample(f,beta)
saveRDS(X,file=sprintf("AKAP79-temperature-ordered-pt-smmala-sample-%i-for-rank-%i.RDS",j,r))
x <- mcmcInit(
beta,
as.numeric(tail(X,1)), # last row
simulate=sim,
logLikelihood=llf,
dprior,
gllf,
gprior,
fi
)
}
time_ <- difftime(Sys.time(),start_time,units="min")
print(time_)
cat(sprintf("rank %02i of %02i finished with an acceptance rate of %f and swap rate of %f.\n",round(r),round(cs),attr(s,"acceptanceRate"),attr(s,"swapRate")))
finalize()