Empezaremos viendo cómo usar parallel y las funciones mcparallel/mccollect. Ejecutamos las tareas usando mcparallel y recupermos los resultados usando mccollect.

library(parallel)
library(microbenchmark) # Para evaluar cuanto tarda
epa <- readRDS("./assets/clean-epa.RDS")
epa$works <- epa$trarem == 1

parfits <- function() {
  pfit <- mcparallel(glm(works ~ education, data=epa, family=binomial))
  mccollect(list(pfit))
}
mbm <- microbenchmark("par.boot"=parfits(),
                     "serial.boot"=glm(works ~ education, data=epa, family=binomial))
autoplot(mbm)

Clearly actually forking the processes and waiting for them to rejoin itself takes some time.

Semillas aleatorias y paralalelismo

cis <- readRDS('./assets/clean-data.RDS')

Usaremos la versión paralelizada de lapply

microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e3))),
               "par2"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=2)),
               "par4"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=4)))
cis <- cis[, c("economy", "econrighttrack", "politics", "polrighttrack", "ideology", "goveval")]
cis <- cis[complete.cases(cis), ]
cis <- do.call(cbind, lapply(cis, as.numeric))
system.time(serial.res <- kmeans(cis, centers=5, nstart=20))
serial.res$withinss
do_n_kmeans <- function(n) {
    return(kmeans(cis, centers=5, nstart=n))
}

system.time(list.res <- lapply(runif(4, 1, 100), do_n_kmeans))
res <- sapply(list.res, function(x) x$tot.withinss)
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
res
system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4))
res <- sapply(list.res, function(x) x$tot.withinss)
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
res
RNGkind("L'Ecuyer-CMRG")
system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4))
res <- sapply(list.res, function(x) x$tot.withinss)
res

mcparallel works very well for task parallelism; the mclapply for data parallelism.

Things to watch for:

Multiple computers

library(parallel)
cl <- makeCluster(4)
clusterCall(cl, rnorm, 5)

clusterCall() runs the same function (here, rnorm, with argument 5) on all workers in the cluster. A related helper function is clusterEvalQ() which is handier to use for some setup tasks - eg,

clusterEvalQ(cl, {library(parallel); NULL})
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
stopCluster(cl)

El error es porque no hemos copiado los datos.

Recall that we aren’t forking here; we are creating processes from scratch. These processes, new to this world, are not familiar with our ways, customs, or datasets. We actually have to ship the data out to the workers:

system.time(clusterExport(cl, "cis"))
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
res <- sapply(list.res, function(x) x$tot.withinss)
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
res

Podemos generar un cluster usando otras máquinas

hosts <- c(rep("localhost", 8), rep("192.168.0.10", 8))
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
stopCluster(cl)

The cluster routines in parallel are good if you know you will eventually have to move to using multiple computers (nodes in a cluster, or desktops in a lab) for a single computation.

foreach and doparalel

The “master/worker” approach that parallel enables works extremely well for moderately sized problems, and isn’t that difficult to use. It is all based on one form of R iteration, apply, which is well understood.

However, going from serial to parallel requires some re-writing, and even going from one method of parallelism to another (eg, multicore-style to snow-style) requires some modification of code.

The foreach package is based on another style of iterating through data - a for loop - and is designed so that one can go from serial to several forms of parallel relatively easily. There are then a number of tools one can use in the library to improve performance.

for (i in 1:3) print(sqrt(i))
library(foreach)
foreach (i=1:3) %do% sqrt(i)
library(doParallel)
registerDoParallel(3)  # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
stopImplicitCluster()
cl <- makePSOCKcluster(3)
registerDoParallel(cl)  # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
foreach (i=1:3, .combine=c) %do% sqrt(i)
foreach (i=1:3, .combine="+") %do% sqrt(i)

%%%%%%%%%%

%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%%