Empezaremos viendo cómo usar parallel y las funciones mcparallel/mccollect. Ejecutamos las tareas usando mcparallel y recupermos los resultados usando mccollect.

library(ggplot2)
library(parallel)
library(microbenchmark) # Para evaluar cuanto tarda
epa <- readRDS("./assets/clean-epa.RDS")
epa$works <- epa$trarem == 1

parfits <- function() {
pfit <- mcparallel(glm(works ~ education, data=epa, family=binomial))
mccollect(list(pfit))
}
mbm <- microbenchmark("par.boot"=parfits(),
"serial.boot"=glm(works ~ education,
data=epa,
family=binomial),
times=10)
autoplot(mbm)

Clearly actually forking the processes and waiting for them to rejoin itself takes some time.

# Semillas aleatorias y paralalelismo

Usaremos la versión paralelizada de lapply

microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e3))),
"par2"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=2)),
"par4"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=4)),
times=100)
microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e5))),
"par2"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=2)),
"par4"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=4)),
times=100)
cis <- readRDS('./assets/clean-data.RDS')
cis <- cis[, c("economy",
"econrighttrack",
"politics",
"polrighttrack",
"ideology",
"goveval")]
cis <- cis[complete.cases(cis), ]
cis <- do.call(cbind, lapply(cis, as.numeric))
system.time(serial.res <- kmeans(cis, centers=3, nstart=20))
serial.res$tot.withinss do_n_kmeans <- function(n) { return(kmeans(cis, centers=7, nstart=n)) } system.time(list.res <- lapply(runif(4, 1, 100), do_n_kmeans)) res <- sapply(list.res, function(x) x$tot.withinss)
res
system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4))
res <- sapply(list.res, function(x) x$tot.withinss) res RNGkind("L'Ecuyer-CMRG") system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4, mc.set.seed=FALSE)) res <- sapply(list.res, function(x) x$tot.withinss)
res

mcparallel works very well for task parallelism; the mclapply for data parallelism.

Things to watch for:

• Modifying the big common data structure:
• Won’t be seen by other processes,
• But will blow up the memory requirements
• You can only use one machine’s processors
• Won’t work on Windows
• mc.cores is a lie. It’s the number of tasks, not cores.

# Multiple computers

library(parallel)
cl <- makeCluster(4)
clusterCall(cl, rnorm, 5)

clusterCall() runs the same function (here, rnorm, with argument 5) on all workers in the cluster. A related helper function is clusterEvalQ() which is handier to use for some setup tasks - eg,

clusterEvalQ(cl, {library(parallel); NULL})
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
stopCluster(cl)

El error es porque no hemos copiado los datos.

Recall that we aren’t forking here; we are creating processes from scratch. These processes, new to this world, are not familiar with our ways, customs, or datasets. We actually have to ship the data out to the workers:

system.time(clusterExport(cl, "cis"))
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
res <- sapply(list.res, function(x) x$tot.withinss) lapply.res <- list.res[[which.min(res)]] lapply.res$withinss
res

Podemos generar un cluster usando otras máquinas

hosts <- c(rep("localhost", 8), rep("192.168.0.10", 8))
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
stopCluster(cl)

The cluster routines in parallel are good if you know you will eventually have to move to using multiple computers (nodes in a cluster, or desktops in a lab) for a single computation.

• Use clusterExport for functions and data that will be needed by everyone.
• Communicating data is slow, but much faster than having every worker read the same data from a file.
• Use clusterApplyLB if the tasks vary greatly in runtime.
• Use clusterApply if each task requires an enormous amount of data.

# foreach and doparalel

The “master/worker” approach that parallel enables works extremely well for moderately sized problems, and isn’t that difficult to use. It is all based on one form of R iteration, apply, which is well understood.

However, going from serial to parallel requires some re-writing, and even going from one method of parallelism to another (eg, multicore-style to snow-style) requires some modification of code.

The foreach package is based on another style of iterating through data - a for loop - and is designed so that one can go from serial to several forms of parallel relatively easily. There are then a number of tools one can use in the library to improve performance.

for (i in 1:3) print(sqrt(i))
library(foreach)
foreach (i=1:3) %do% sqrt(i)
library(doParallel)
registerDoParallel(3)  # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
stopImplicitCluster()
cl <- makePSOCKcluster(3)
registerDoParallel(cl)  # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
foreach (i=1:3, .combine=c) %do% sqrt(i)
foreach (i=1:3, .combine="+") %do% sqrt(i)

%%%%%%%%%%

%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%%
