Empezaremos viendo cómo usar parallel y las funciones mcparallel/mccollect. Ejecutamos las tareas usando mcparallel y recupermos los resultados usando mccollect.

library(ggplot2)
library(parallel)
library(microbenchmark) # Para evaluar cuanto tarda
epa <- readRDS("./assets/clean-epa.RDS")
epa$works <- epa$trarem == 1

parfits <- function() {
  pfit <- mcparallel(glm(works ~ education, data=epa, family=binomial))
  mccollect(list(pfit))
}
mbm <- microbenchmark("par.boot"=parfits(),
                     "serial.boot"=glm(works ~ education,
                                       data=epa,
                                       family=binomial),
                     times=10)
autoplot(mbm)

Clearly actually forking the processes and waiting for them to rejoin itself takes some time.

Semillas aleatorias y paralalelismo

Usaremos la versión paralelizada de lapply

microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e3))),
               "par2"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=2)),
               "par4"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=4)),
               times=100)
microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e5))),
               "par2"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=2)),
               "par4"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=4)),
               times=100)
cis <- readRDS('./assets/clean-data.RDS')
cis <- cis[, c("economy",
              "econrighttrack",
              "politics",
              "polrighttrack",
              "ideology",
              "goveval")]
cis <- cis[complete.cases(cis), ]
cis <- do.call(cbind, lapply(cis, as.numeric))
system.time(serial.res <- kmeans(cis, centers=3, nstart=20))
serial.res$tot.withinss
do_n_kmeans <- function(n) {
    return(kmeans(cis, centers=7, nstart=n))
}

system.time(list.res <- lapply(runif(4, 1, 100), do_n_kmeans))
res <- sapply(list.res, function(x) x$tot.withinss)
res
system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4))
res <- sapply(list.res, function(x) x$tot.withinss)
res
RNGkind("L'Ecuyer-CMRG")

system.time(list.res <- mclapply(runif(4, 1, 1000),
                                do_n_kmeans,
                                mc.cores=4, mc.set.seed=FALSE))
res <- sapply(list.res, function(x) x$tot.withinss)
res

mcparallel works very well for task parallelism; the mclapply for data parallelism.

Things to watch for:

Multiple computers

library(parallel)
cl <- makeCluster(4)
clusterCall(cl, rnorm, 5)

clusterCall() runs the same function (here, rnorm, with argument 5) on all workers in the cluster. A related helper function is clusterEvalQ() which is handier to use for some setup tasks - eg,

clusterEvalQ(cl, {library(parallel); NULL})
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
stopCluster(cl)

El error es porque no hemos copiado los datos.

Recall that we aren’t forking here; we are creating processes from scratch. These processes, new to this world, are not familiar with our ways, customs, or datasets. We actually have to ship the data out to the workers:

system.time(clusterExport(cl, "cis"))
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
res <- sapply(list.res, function(x) x$tot.withinss)
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
res

Podemos generar un cluster usando otras máquinas

hosts <- c(rep("localhost", 8), rep("192.168.0.10", 8))
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
stopCluster(cl)

The cluster routines in parallel are good if you know you will eventually have to move to using multiple computers (nodes in a cluster, or desktops in a lab) for a single computation.

foreach and doparalel

The “master/worker” approach that parallel enables works extremely well for moderately sized problems, and isn’t that difficult to use. It is all based on one form of R iteration, apply, which is well understood.

However, going from serial to parallel requires some re-writing, and even going from one method of parallelism to another (eg, multicore-style to snow-style) requires some modification of code.

The foreach package is based on another style of iterating through data - a for loop - and is designed so that one can go from serial to several forms of parallel relatively easily. There are then a number of tools one can use in the library to improve performance.

for (i in 1:3) print(sqrt(i))
library(foreach)
foreach (i=1:3) %do% sqrt(i)
library(doParallel)
registerDoParallel(3)  # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
stopImplicitCluster()
cl <- makePSOCKcluster(3)
registerDoParallel(cl)  # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
foreach (i=1:3, .combine=c) %do% sqrt(i)
foreach (i=1:3, .combine="+") %do% sqrt(i)

%%%%%%%%%%

%%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%% %%%%%%%%%%
--- 
title: "Computación en paralelo"
date: "`r format(Sys.time(), '%B %d, %Y')`"
---

```{r setup, include=FALSE, cache=FALSE}
knitr::opts_chunk$set(eval = FALSE)
knitr::opts_chunk$set(fig.path = './assets/')
```

Empezaremos viendo cómo usar `parallel` y las funciones
`mcparallel/mccollect`. Ejecutamos las tareas usando `mcparallel` y
recupermos los resultados usando `mccollect`.

```{r}
library(ggplot2)
library(parallel)
library(microbenchmark) # Para evaluar cuanto tarda
```

```{r} 
epa <- readRDS("./assets/clean-epa.RDS")
epa$works <- epa$trarem == 1

parfits <- function() {
  pfit <- mcparallel(glm(works ~ education, data=epa, family=binomial))
  mccollect(list(pfit))
}
```

```{r}
mbm <- microbenchmark("par.boot"=parfits(),
                     "serial.boot"=glm(works ~ education,
                                       data=epa,
                                       family=binomial),
                     times=10)
autoplot(mbm)
```

Clearly actually forking the processes and waiting for them to rejoin
itself takes some time.

# Semillas aleatorias y paralalelismo


Usaremos la versión paralelizada de `lapply`

```{r}
microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e3))),
               "par2"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=2)),
               "par4"=unlist(mclapply(1:10, function(x) rnorm(1e3), mc.cores=4)),
               times=100)
```

```{r}
microbenchmark("serial"=unlist(lapply(1:10, function(x) rnorm(1e5))),
               "par2"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=2)),
               "par4"=unlist(mclapply(1:10, function(x) rnorm(1e5), mc.cores=4)),
               times=100)
```

```{r}
cis <- readRDS('./assets/clean-data.RDS')
```

```{r}
cis <- cis[, c("economy",
              "econrighttrack",
              "politics",
              "polrighttrack",
              "ideology",
              "goveval")]
cis <- cis[complete.cases(cis), ]
cis <- do.call(cbind, lapply(cis, as.numeric))
```

```{r}
system.time(serial.res <- kmeans(cis, centers=3, nstart=20))
serial.res$tot.withinss
```

```{r} 
do_n_kmeans <- function(n) {
    return(kmeans(cis, centers=7, nstart=n))
}

system.time(list.res <- lapply(runif(4, 1, 100), do_n_kmeans))
res <- sapply(list.res, function(x) x$tot.withinss)
res
```

<!-- ```{r} -->
<!-- res -->
<!-- ``` -->

```{r} 
system.time(list.res <- mclapply(runif(4, 1, 1000), do_n_kmeans, mc.cores=4))
res <- sapply(list.res, function(x) x$tot.withinss)
res
```

```{r}
RNGkind("L'Ecuyer-CMRG")

system.time(list.res <- mclapply(runif(4, 1, 1000),
                                do_n_kmeans,
                                mc.cores=4, mc.set.seed=FALSE))
res <- sapply(list.res, function(x) x$tot.withinss)
res
```

mcparallel works very well for task parallelism; the mclapply for data
parallelism.

Things to watch for:

* Modifying the big common data structure:
  - Won't be seen by other processes,
  - But will blow up the memory requirements
* You can only use one machine's processors
* Won't work on Windows 
* mc.cores is a lie. It's the number of tasks, not cores. 

# Multiple computers

```{r}
library(parallel)
cl <- makeCluster(4)
```

```{r}
clusterCall(cl, rnorm, 5)
```

clusterCall() runs the same function (here, rnorm, with argument 5) on
all workers in the cluster. A related helper function is
clusterEvalQ() which is handier to use for some setup tasks - eg,

```{r}
clusterEvalQ(cl, {library(parallel); NULL})
```

```{r}
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
stopCluster(cl)
```

El error es porque no hemos copiado los datos. 

Recall that we aren't forking here; we are creating processes from
scratch. These processes, new to this world, are not familiar with our
ways, customs, or datasets. We actually have to ship the data out to
the workers:

```{r}
system.time(clusterExport(cl, "cis"))
res <- clusterApply(cl, rep(10, 4), do_n_kmeans)
res <- sapply(list.res, function(x) x$tot.withinss)
lapply.res <- list.res[[which.min(res)]]
lapply.res$withinss
res
```

Podemos generar un cluster usando otras máquinas

```{r}
hosts <- c(rep("localhost", 8), rep("192.168.0.10", 8))
cl <- makePSOCKcluster(names=hosts)
clusterCall(cl, rnorm, 5)
stopCluster(cl)
```

The cluster routines in parallel are good if you know you will
eventually have to move to using multiple computers (nodes in a
cluster, or desktops in a lab) for a single computation.

- Use clusterExport for functions and data that will be needed by everyone.
- Communicating data is slow, but much faster than having every worker read the same data from a file.
- Use clusterApplyLB if the tasks vary greatly in runtime.
- Use clusterApply if each task requires an enormous amount of data.

# foreach and doparalel

The “master/worker” approach that parallel enables works extremely
well for moderately sized problems, and isn't that difficult to use.
It is all based on one form of R iteration, apply, which is well
understood.

However, going from serial to parallel requires some re-writing, and
even going from one method of parallelism to another (eg,
multicore-style to snow-style) requires some modification of code.

The foreach package is based on another style of iterating through
data - a for loop - and is designed so that one can go from serial to
several forms of parallel relatively easily. There are then a number
of tools one can use in the library to improve performance.

```{r}
for (i in 1:3) print(sqrt(i))
```

```{r}
library(foreach)
foreach (i=1:3) %do% sqrt(i)
```


```{r}
library(doParallel)
registerDoParallel(3)  # use multicore-style forking
foreach (i=1:3) %dopar% sqrt(i)
stopImplicitCluster()
```

```{r}
cl <- makePSOCKcluster(3)
registerDoParallel(cl)  # use the just-made PSOCK cluster
foreach (i=1:3) %dopar% sqrt(i)
```

```{r}
foreach (i=1:3, .combine=c) %do% sqrt(i)
foreach (i=1:3, .combine="+") %do% sqrt(i)
```



%%%%%%%%%% 

\begin{frame}{Packages}
  R does {some} parallelism but there are packages that explicitly use
  parallelism

  For a complete list, see

  \url{http://cran.r-project.org/web/views/HighPerformanceComputing.html}


  Some package simplify interect with parallel:

  Caret

  Caret is a widely-used machine learning package, that uses foreach
  (which we'll learn about) to parallelize things like CV-folds, etc
\end{frame}


%%%%%%%%%%
\begin{frame}{The Parallel Package}
  Since R 2.14.0 (late 2011), the parallel package has been part of
  core R.

  Incorporates two other packages:

  multicore: for using all processors on a single processor. Not on
  windows.

  snow: for using any group of processors, possibly across a cluster.

  Many packages which use parallelism use one of these two, so worth
  understanding.

  Both create new processes (not threads) to run on different
  processors; but in importantly different ways.
\end{frame}


%%%%%%%%%%
\begin{frame}{mcparallel/mccollect}

  The simplest use of the multicore package is the pair of functions
  mcparallel() and mccollect().

  mcparallel() forks a task to run a given function; it then runs in
  the background. mccollect() waits for and gets the result.

  Let's pick an example: reading the airlines data set, we want ---
  for a particular month --- to know both the total number of planes
  in the data (by tail number) and the median elapsed flight time.
  These are two independant calculations, and so can be done
  independantly.
\end{frame}

%%%%%%%%%%
\begin{frame}{Seeds}
  Parallel RNG (problems with reproducibility because different
  processes)

  Depending on what you are doing, it may be very important to have
  different (or the same!) random numbers generated in each process.

  Here, we definitely want them different - the whole point is to
  generate different random realizations.

  parallel has a good RNG suitable for parallel work based on the work
  of Pierre L'Ecuyer in Montréal:
\end{frame}

%%%%%%%%%%
\begin{frame}{Summary: parallel/multicore}
The mc* routines in parallel work particularly well when:

You want to make full use of the processors on a single computer Each
task only reads from some big common data structure and produces
modest-sized results mcparallel works very well for task parallelism;
the mclapply for data parallelism.

Things to watch for:

Modifying the big common data structure: Won't be seen by other
processes, But will blow up the memory requirements You can only use one
machine's processors Won't work on Windows (but what does?)

\end{frame}

%%%%%%%%%%
\begin{frame}{Multiple computers with parallel/snow}
The other half of parallel, routines that were in the still-active snow
package, allow you to again launch new R processes --- by default, on
the current computer, but also on any computer you have access to. (SNOW
stands for ``Simple Network of Workstations'', which was the original
use case).

The recipe for doing computations with snow looks something like:

other than the makeCluster()/stopCluster(), it looks very much like
multicore and mclapply.

Recall that we aren't forking here; we are creating processes from
scratch. These processes, new to this world, are not familiar with our
ways, customs, or datasets. We actually have to ship the data out to the
workers


Note that the costs of shipping out data back and forth, and creating
the processes from scratch, is relatively costly - but this is the price
we pay for being able to spawn the processes anywhere.

\end{frame}


%%%%%%%%%%
\begin{frame}{foreach and doparallel}

  The ``master/worker'' approach that parallel enables works extremely
  well for moderately sized problems, and isn't that difficult to use.
  It is all based on one form of R iteration, apply, which is well
  understood.

  However, going from serial to parallel requires some re-writing, and
  even going from one method of parallelism to another (eg,
  multicore-style to snow-style) requires some modification of code.

  The foreach package is based on another style of iterating through
  data - a for loop - and is designed so that one can go from serial
  to several forms of parallel relatively easily. There are then a
  number of tools one can use in the library to improve performance.
\end{frame}

%%%%%%%%%%
\begin{frame}{Summary: foreach}
  
  Foreach is a wrapper for the other parallel methods we've seen, so
  it inherits some of the advantages and drawbacks of each.

  Use foreach if:

  Your code already relys on for-style iteration; transition is easy
  You don't know if you want multicore vs. snow style parallel use (or
  other kinds, like batch jobs): you can switch just by registering a
  different backend! You want to be able to incrementally improve the
  performance of your code.

  Note that you can have portions of your analysis code use foreach
  with parallel and portions using the backend with apply-style
  parallelism; it doesn't have to be all one or the other.
\end{frame}

%%%%%%%%%%
\begin{frame}{Summary}
R comes with an increasingly rich set of tools for taking advantage of
more compute power:

parallel foreach/doParallel

Keep in mind what we talked about in terms of overhead, and:

Don't reinvent wheels Big chunks are better than little chunks
Parallelism gives you more compute, not I/O One task per core Don't trip
over your own feet
\end{frame}

