[FUTURE][SYSTEMML-1160] [WIP] Enable prefetching of batches via for loop iterator
This PR extends current for construct to support iterating over batch. The current version only supports iterating over batches of rows.
The syntax
for(X_batch in X, nrow=batch_size) {
...
}
is equivalent to
i = 0
for(i in 1:iters) {
i = i + 1
# Get next batch
beg = ((i-1) * batch_size) %% N + 1
end = min(N, beg + batch_size - 1)
X_batch = X[beg:end,]
}
Note: This PR does not introduce rectangular blocks in SystemML, instead handles it in the ForProgramBlock. To enable prefetching, the user just has to set a config flag:
ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory
@dusenberrymw @mboehm7 @bertholdreinwald @frreiss @asurve
I will run few more experiments and update this PR soon :)
Approach 1: With Prefetching
The below statistics show that for convnet on 200K column dataset with batchsize of 50, we only wait for fetching of the first batch (~41 seconds). For remaining batches, there is almost zero-wait time. I also did sanity testing using System.out.prints and double-checked that Spark job was executed in parallel as conv2d was executing.
ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory
for (e in 1:epochs) {
i = 0
for(X_batch in X, nrow=batch_size) {
i = i + 1
if(i == 100) {
stop ("Early stopping for benchmarks")
}
# Get next batch
beg = ((i-1) * batch_size) %% N + 1
end = min(N, beg + batch_size - 1)
y_batch = Y[beg:end,]
....
}
}
time: 1384233.345149ms
SystemML Statistics:
Total elapsed time: 0.000 sec.
Total compilation time: 0.000 sec.
Total execution time: 0.000 sec.
Number of compiled Spark inst: 76.
Number of executed Spark inst: 3.
Batch-Fetch (next,indx,wait,first): 41.586/355.839/41.585/41.583 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS): 4767/139/0.
Cache times (ACQr/m, RLS, EXP): 0.090/0.018/30.515/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time: 1.699 sec.
Functions recompiled: 1.
Functions recompile time: 0.164 sec.
Spark ctx create time (lazy): 0.017 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/355.286 secs.
Total JIT compile time: 172.599 sec.
Total JVM GC count: 773.
Total JVM GC time: 123.818 sec.
Heavy hitter instructions (name, time, count):
-- 1) conv2d_bias_add 817.763 sec 297
-- 2) sel+ 196.118 sec 396
-- 3) - 45.240 sec 2178
-- 4) conv2d_backward_filter 44.979 sec 297
-- 5) relu_maxpooling 44.417 sec 297
-- 6) +* 41.579 sec 1485
-- 7) r' 28.868 sec 495
-- 8) conv2d_backward_data 28.579 sec 198
-- 9) maxpooling_backward 21.912 sec 297
-- 10) * 19.874 sec 2880
Approach 2: Maxi-batch
for (e in 1:epochs) {
maxi_batch_Size = 20*batch_size
maxi_iters = ceil(N / maxi_batch_Size)
stopping_i = 0
for(maxi in 1:maxi_iters) {
# Get next batch
maxi_beg = ((maxi-1) * maxi_batch_Size) %% N + 1
maxi_end = min(N, maxi_beg + maxi_batch_Size - 1)
X_maxibatch = X[maxi_beg:maxi_end,]
y_maxibatch = Y[maxi_beg:maxi_end,]
maxi_N = nrow(y_maxibatch)
iters = ceil(maxi_N / batch_size)
for(i in 1:iters) {
stopping_i = stopping_i + 1
if(stopping_i == 100) {
stop ("Early stopping for benchmarks")
}
# Get next batch
beg = ((i-1) * batch_size) %% maxi_N + 1
end = min(maxi_N, beg + batch_size - 1)
X_batch = X_maxibatch[beg:end,]
y_batch = y_maxibatch[beg:end,]
....
}
}
}
time: 2334624.686333ms
SystemML Statistics:
Total elapsed time: 0.000 sec.
Total compilation time: 0.000 sec.
Total execution time: 0.000 sec.
Number of compiled Spark inst: 81.
Number of executed Spark inst: 8.
Cache hits (Mem, WB, FS, HDFS): 19895/0/0/6.
Cache writes (WB, FS, HDFS): 7403/64/0.
Cache times (ACQr/m, RLS, EXP): 80.049/0.025/33.767/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time: 1.907 sec.
Functions recompiled: 1.
Functions recompile time: 0.303 sec.
Spark ctx create time (lazy): 0.026 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/79.953 secs.
Total JIT compile time: 83.437 sec.
Total JVM GC count: 1441.
Total JVM GC time: 254.973 sec.
Heavy hitter instructions (name, time, count):
-- 1) conv2d_bias_add 780.249 sec 297
-- 2) conv2d_backward_filter 442.185 sec 297
-- 3) conv2d_backward_data 304.401 sec 198
-- 4) sel+ 200.346 sec 396
-- 5) uack+ 115.980 sec 495
-- 6) maxpooling_backward 107.120 sec 297
-- 7) rangeReIndex 85.561 sec 203
-- 8) relu_backward 61.133 sec 495
-- 9) ba+* 43.830 sec 594
-- 10) relu_maxpooling 42.892 sec 297
I used the same cluster and same configuration as our VLDB experiments.
@bertholdreinwald @mboehm7 @dusenberrymw
@niketanpansare Thanks for running this experiment. A 1.7x speedup certainly shows that our current methods are leaving a large amount of performance on the table.
These numbers are a bit puzzling. If there is a 1000s difference, how come that rangeReIndex (which also covers the lazy spark indexing) only requires 85s (from which the 80s for ACQ give us an upper bound on the time for collecting the indexed data)?
I noticed backwards instructions are also taking more time.
On Feb 17, 2017, at 7:09 PM, Matthias Boehm [email protected] wrote:
These numbers are a bit puzzling. If there is a 1000s difference, how come that rangeReIndex (which also covers the lazy spark indexing) only requires 85s (from which the 80s for ACQ give us an upper bound on the time for collecting the indexed data)?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.
Here is the statistics on a different cluster with 70g driver memory:
Approach 1: With Prefetching
time: 834467.702908ms
SystemML Statistics:
Total elapsed time: 0.000 sec.
Total compilation time: 0.000 sec.
Total execution time: 0.000 sec.
Number of compiled Spark inst: 75.
Number of executed Spark inst: 3.
Batch-Fetch (next,indx,wait,first): 61.151/168.933/61.151/61.149 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS): 7506/3/0.
Cache times (ACQr/m, RLS, EXP): 0.110/0.010/3.239/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time: 0.985 sec.
Functions recompiled: 1.
Functions recompile time: 0.086 sec.
Spark ctx create time (lazy): 0.013 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/168.039 secs.
Total JIT compile time: 54.711 sec.
Total JVM GC count: 508.
Total JVM GC time: 96.755 sec.
Heavy hitter instructions (name, time, count):
-- 1) conv2d_bias_add 206.028 sec 297
-- 2) conv2d_backward_filter 157.850 sec 297
-- 3) sel+ 110.459 sec 396
-- 4) conv2d_backward_data 70.346 sec 198
-- 5) uack+ 50.411 sec 495
-- 6) maxpooling_backward 50.044 sec 297
-- 7) relu_backward 28.905 sec 495
-- 8) relu_maxpooling 21.635 sec 297
-- 9) ba+* 20.543 sec 594
-- 10) +* 15.317 sec 1485
Approach 2: Maxi-batch
time: 915534.711838ms
SystemML Statistics:
Total elapsed time: 0.000 sec.
Total compilation time: 0.000 sec.
Total execution time: 0.000 sec.
Number of compiled Spark inst: 80.
Number of executed Spark inst: 8.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/6.
Cache writes (WB, FS, HDFS): 7438/1/0.
Cache times (ACQr/m, RLS, EXP): 138.341/0.009/3.571/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time: 0.900 sec.
Functions recompiled: 1.
Functions recompile time: 0.146 sec.
Spark ctx create time (lazy): 0.013 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/138.253 secs.
Total JIT compile time: 41.224 sec.
Total JVM GC count: 574.
Total JVM GC time: 102.319 sec.
Heavy hitter instructions (name, time, count):
-- 1) conv2d_bias_add 196.187 sec 297
-- 2) conv2d_backward_filter 160.273 sec 297
-- 3) rangeReIndex 142.017 sec 203
-- 4) sel+ 108.674 sec 396
-- 5) conv2d_backward_data 70.360 sec 198
-- 6) maxpooling_backward 51.672 sec 297
-- 7) uack+ 49.394 sec 396
-- 8) relu_backward 36.753 sec 396
-- 9) relu_maxpooling 20.152 sec 297
-- 10) +* 18.896 sec 1485
Does this second cluster use 10Gb or 1Gb Ethernet?
10 Gb Ethernet:
$ lspci | grep -iE --color 'network|ethernet'
01:00.0 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
01:00.1 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
16:00.0 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.1 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.2 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.3 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
@bertholdreinwald @mboehm7 @dusenberrymw Any review updates for this PR ?
Since its being a while, here is the summary of the discussion related to this PR:
- This PR adds a new syntax to DML language, which is useful for implementing SGD-style algorithms:
for(X_batch in X, nrow=batch_size) {
...
}
-
Additionally, it allows for prefetching of
X_batchwith the help of the configuration propertyprefetch.budget.mb. The memory specified by this property is subtracted from the local memory budget. This could be a drawback in low-memory settings. -
Alternative is to compute the aggregate memory budget of all the child blocks and subtract it from local memory budget to get the prefetch budget. In many cases, the aggregate memory budget will likely be a gross over-estimate of actual memory required as it doesnot take into account the effect of
rmvarand our caching. We can extend ProgramBlock to compute this:
/**
* Computes the memory estimate if known, else -1.
* If OptimizerUtils.isMemoryBasedOptLevel(), it returns OptimizerUtils.INVALID_SIZE.
*
* @return the memory estimate for this program block
* @throws HopsException if error occurs
*/
public double getMemEstimate() throws HopsException {
if(_sb == null)
return 0;
if(!OptimizerUtils.isMemoryBasedOptLevel())
return OptimizerUtils.INVALID_SIZE;
double memBudget = 0;
for(Hop h : _sb.get_hops()) {
double memEstimateForH = h.getMemEstimate();
if(memEstimateForH < 0)
return -1;
else
memBudget += memEstimateForH;
}
return memBudget;
}
Alternatively, we can replace memBudget += h.getMemEstimate(); with memBudget = max(memBudget, memEstimateForH) as an optimistic estimate assuming immediate rmvar and/or caching.
I'm still not convinced by this PR because the experimental results were non-conclusive (especially in comparison to simple rewrites), it does not nicely fit into our the architecture of memory management (without unnecessary over-provisioning), it unnecessarily differs from R syntax, and the CP batch iterator might actually create more evictions in the presence of memory pressure. So I would recommend to not rush this in.
Refer to this link for build results (access rights to CI server needed): https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1439/
additional things to think about: (1) handling of updates in terms of assignments or left indexing (we would need to handle this during validate), and (2) the impact on codegen, which is already able to fuse right indexing of column vectors into the subsequent fused operator.
Refer to this link for build results (access rights to CI server needed): https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1725/