-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathpython-ray.qmd
More file actions
166 lines (122 loc) · 5.77 KB
/
python-ray.qmd
File metadata and controls
166 lines (122 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
---
title: Brief notes on parallel processing using the Ray package in Python
date: 2025-04-22
format:
html:
theme: cosmo
css: assets/styles.css
toc: true
code-copy: true
code-block-bg: true
code-block-border-left: "#31BAE9"
engine: knitr
ipynb-shell-interactivity: all
code-overflow: wrap
---
---
::: {.callout-warning}
Note that we haven't updated this material in a few years.
:::
## 1. Overview of Ray
The Ray package provides a variety of tools for managing parallel computations.
In particular, some of the key ideas/features of Ray are:
- Allowing users to parallelize independent computations across multiple cores on one or more machines.
- Different users can run the same code on different computational resources (without touching the actual code that does the computation).
- One nice feature relative to Dask is that Ray allows one to share data across all worker processes on a node, without multiple copies of the data, using the *object store*.
- Ray provides tools to build distributed (across multiple cores or nodes) applications where different processes interact with each other (using the notion of 'actors').
These brief notes just scratch the surface of Ray and just recapitulate basic information available in the [Ray documentation](https://docs.ray.io/en/master/).
## 2. Ray on one machine
On one machine, we can initialize Ray from within Python.
```{python}
#| eval: false
import ray
ray.init()
## alternatively, to specify a specific number of cores:
ray.init(num_cpus = 4)
```
To run a computation in parallel, we decorate the function of interest with the `remote` tag:
```{python}
#| eval: false
@ray.remote
def f(x):
return x * x
futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]
```
## 3. Ray on multiple machines (nodes)
Here we'll follow the [Ray instructions to start up Ray processes across multiple nodes within a Slurm job](https://docs.ray.io/en/latest/cluster/vms/user-guides/community/slurm.html).
Make sure to request multiple cores per node via --cpus-per-task (on Savio you'd generally set this equal to the number of cores per node).
We need to start the main Ray process (the Ray 'head node') on the head (first) node of Slurm allocation. Then we need to start one worker process for the remaining nodes (do not start a worker on the head node).
```bash
# Getting the node names
nodes=$(scontrol show hostnames "$SLURM_JOB_NODELIST")
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
port=6379
ip_head=$head_node_ip:$port
export ip_head
echo "IP Head: $ip_head"
echo "Starting HEAD at $head_node"
srun --nodes=1 --ntasks=1 -w "$head_node" \
ray start --head --node-ip-address="$head_node_ip" --port=$port \
--num-cpus "${SLURM_CPUS_PER_TASK}" --block &
# number of nodes other than the head node
worker_num=$((SLURM_JOB_NUM_NODES - 1))
for ((i = 1; i <= worker_num; i++)); do
node_i=${nodes_array[$i]}
echo "Starting WORKER $i at $node_i"
srun --nodes=1 --ntasks=1 -w "$node_i" \
ray start --address "$ip_head" \
--num-cpus "${SLURM_CPUS_PER_TASK}" --block &
sleep 5
done
```
Then in Python, we need to connect to the Ray head node process:
```{python}
#| eval: false
import ray, os
ray.init(address = os.getenv('ip_head'))
```
You should see something like this:
```
2021-03-16 14:39:48,520 INFO worker.py:654 -- Connecting to existing Ray cluster at address: 128.32.135.190:6379
{'node_ip_address': '128.32.135.190', 'raylet_ip_address': '128.32.135.190', 'redis_address': '128.32.135.190:6379', 'object_store_address': '/tmp/ray/session_2021-03-16_14-39-26_045290_3521776/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2021-03-16_14-39-26_045290_3521776/sockets/raylet', 'webui_url': 'localhost:8265', 'session_dir': '/tmp/ray/session_2021-03-16_14-39-26_045290_3521776', 'metrics_export_port': 63983, 'node_id': '2a3f113e2093d8a8abe3e0ddc9730f8cf6b4478372afe489208b2dcf'}
```
## 4. Using the Ray object store
The object store allows one to avoid making copies of data for each worker process on a node. All workers on a node can share the same data (this also avoids extra copying of data to the workers). And on top of this the object store allows one to use data in the form of numpy arrays directly using the memory allocated for the array in the object store, without any copying into a data structuer specific to the worker process.
Let's try this out.
```{python}
#| eval: false
ray.init(num_cpus = 4) # four worker processes on the local machine
@ray.remote
def calc(i, data):
import numpy as np
return([np.mean(data), np.std(data)])
import numpy as np
rng = np.random.default_rng()
## 'y' is an 800 MB object
n = 100000000
y = rng.normal(0, 1, size=(n))
y_ref = ray.put(y) # put the data in the object store
p = 50
## One can pass the reference to the object in the object store as an argument
futures = [calc.remote(i, y_ref) for i in range(p)]
ray.get(futures)
```
We'll watch memory use via `free -h` while running the test above.
Unfortunately when I test this on a single machine, memory use seems to be equivalent to four copies of the 'y' object, so something seems to be wrong. And trying it on a multi-node Ray cluster doesn't seem to clarify what is going on.
One can also run `ray memory` from the command line (not from within Python) to examine memory use in the object store. (In the case above, it simply reports the 800 MB usage.)
One can also create the object via a remote call and then use it.
```{python}
#| eval: false
@ray.remote
def create(n):
import numpy as np
rng = np.random.default_rng()
y = rng.normal(0, 1, size=(100000000))
return(y)
y_ref = create.remote(n)
futures = [calc.remote(i, y_ref) for i in range(p)]
ray.get(futures)
```