systemds icon indicating copy to clipboard operation
systemds copied to clipboard

[FUTURE][SYSTEMML-1160] [WIP] Enable prefetching of batches via for loop iterator

Open niketanpansare opened this issue 8 years ago • 13 comments

This PR extends current for construct to support iterating over batch. The current version only supports iterating over batches of rows.

The syntax

for(X_batch in X, nrow=batch_size) {
  ...
}

is equivalent to

i = 0
for(i in 1:iters) {
	i = i + 1
	# Get next batch
	beg = ((i-1) * batch_size) %% N + 1
	end = min(N, beg + batch_size - 1)
	X_batch = X[beg:end,]
}

Note: This PR does not introduce rectangular blocks in SystemML, instead handles it in the ForProgramBlock. To enable prefetching, the user just has to set a config flag:

ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory

@dusenberrymw @mboehm7 @bertholdreinwald @frreiss @asurve

niketanpansare avatar Feb 13 '17 20:02 niketanpansare

I will run few more experiments and update this PR soon :)

niketanpansare avatar Feb 14 '17 01:02 niketanpansare

Approach 1: With Prefetching

The below statistics show that for convnet on 200K column dataset with batchsize of 50, we only wait for fetching of the first batch (~41 seconds). For remaining batches, there is almost zero-wait time. I also did sanity testing using System.out.prints and double-checked that Spark job was executed in parallel as conv2d was executing.

ml.setConfigProperty("prefetch.budget.mb", "300"); // 300 mb prefetch memory

for (e in 1:epochs) {
        i = 0
        for(X_batch in X, nrow=batch_size) {
			i = i + 1
			if(i == 100) {
			   stop ("Early stopping for benchmarks")
			}

		  # Get next batch
		  beg = ((i-1) * batch_size) %% N + 1
		  end = min(N, beg + batch_size - 1)
		  y_batch = Y[beg:end,]
	  ....
	}
}

time: 1384233.345149ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  76.
Number of executed Spark inst:  3.
Batch-Fetch (next,indx,wait,first):     41.586/355.839/41.585/41.583 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS):    4767/139/0.
Cache times (ACQr/m, RLS, EXP): 0.090/0.018/30.515/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time:        1.699 sec.
Functions recompiled:           1.
Functions recompile time:       0.164 sec.
Spark ctx create time (lazy):   0.017 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/355.286 secs.
Total JIT compile time:         172.599 sec.
Total JVM GC count:             773.
Total JVM GC time:              123.818 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         817.763 sec     297
-- 2)   sel+    196.118 sec     396
-- 3)   -       45.240 sec      2178
-- 4)   conv2d_backward_filter  44.979 sec      297
-- 5)   relu_maxpooling         44.417 sec      297
-- 6)   +*      41.579 sec      1485
-- 7)   r'      28.868 sec      495
-- 8)   conv2d_backward_data    28.579 sec      198
-- 9)   maxpooling_backward     21.912 sec      297
-- 10)  *       19.874 sec      2880

Approach 2: Maxi-batch

for (e in 1:epochs) {
      maxi_batch_Size = 20*batch_size
      maxi_iters = ceil(N / maxi_batch_Size)
      stopping_i = 0
      for(maxi in 1:maxi_iters) {
          # Get next batch
          maxi_beg = ((maxi-1) * maxi_batch_Size) %% N + 1
          maxi_end = min(N, maxi_beg + maxi_batch_Size - 1)
          X_maxibatch = X[maxi_beg:maxi_end,]
          y_maxibatch = Y[maxi_beg:maxi_end,]
          maxi_N = nrow(y_maxibatch)
          iters = ceil(maxi_N / batch_size)
          for(i in 1:iters) {
            stopping_i = stopping_i + 1
            if(stopping_i == 100) {
              stop ("Early stopping for benchmarks")
            }
            # Get next batch
            beg = ((i-1) * batch_size) %% maxi_N + 1
            end = min(maxi_N, beg + batch_size - 1)
            X_batch = X_maxibatch[beg:end,]
            y_batch = y_maxibatch[beg:end,]
			....
		}
	}
}

time: 2334624.686333ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  81.
Number of executed Spark inst:  8.
Cache hits (Mem, WB, FS, HDFS): 19895/0/0/6.
Cache writes (WB, FS, HDFS):    7403/64/0.
Cache times (ACQr/m, RLS, EXP): 80.049/0.025/33.767/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time:        1.907 sec.
Functions recompiled:           1.
Functions recompile time:       0.303 sec.
Spark ctx create time (lazy):   0.026 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/79.953 secs.
Total JIT compile time:         83.437 sec.
Total JVM GC count:             1441.
Total JVM GC time:              254.973 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         780.249 sec     297
-- 2)   conv2d_backward_filter  442.185 sec     297
-- 3)   conv2d_backward_data    304.401 sec     198
-- 4)   sel+    200.346 sec     396
-- 5)   uack+   115.980 sec     495
-- 6)   maxpooling_backward     107.120 sec     297
-- 7)   rangeReIndex    85.561 sec      203
-- 8)   relu_backward   61.133 sec      495
-- 9)   ba+*    43.830 sec      594
-- 10)  relu_maxpooling         42.892 sec      297

I used the same cluster and same configuration as our VLDB experiments.

@bertholdreinwald @mboehm7 @dusenberrymw

niketanpansare avatar Feb 17 '17 22:02 niketanpansare

@niketanpansare Thanks for running this experiment. A 1.7x speedup certainly shows that our current methods are leaving a large amount of performance on the table.

dusenberrymw avatar Feb 18 '17 01:02 dusenberrymw

These numbers are a bit puzzling. If there is a 1000s difference, how come that rangeReIndex (which also covers the lazy spark indexing) only requires 85s (from which the 80s for ACQ give us an upper bound on the time for collecting the indexed data)?

mboehm7 avatar Feb 18 '17 03:02 mboehm7

I noticed backwards instructions are also taking more time.

On Feb 17, 2017, at 7:09 PM, Matthias Boehm [email protected] wrote:

These numbers are a bit puzzling. If there is a 1000s difference, how come that rangeReIndex (which also covers the lazy spark indexing) only requires 85s (from which the 80s for ACQ give us an upper bound on the time for collecting the indexed data)?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub, or mute the thread.

niketanpansare avatar Feb 18 '17 03:02 niketanpansare

Here is the statistics on a different cluster with 70g driver memory:

Approach 1: With Prefetching

time: 834467.702908ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  75.
Number of executed Spark inst:  3.
Batch-Fetch (next,indx,wait,first):     61.151/168.933/61.151/61.149 sec.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/1.
Cache writes (WB, FS, HDFS):    7506/3/0.
Cache times (ACQr/m, RLS, EXP): 0.110/0.010/3.239/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/595.
HOP DAGs recompile time:        0.985 sec.
Functions recompiled:           1.
Functions recompile time:       0.086 sec.
Spark ctx create time (lazy):   0.013 sec.
Spark trans counts (par,bc,col):0/0/102.
Spark trans times (par,bc,col): 0.000/0.000/168.039 secs.
Total JIT compile time:         54.711 sec.
Total JVM GC count:             508.
Total JVM GC time:              96.755 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         206.028 sec     297
-- 2)   conv2d_backward_filter  157.850 sec     297
-- 3)   sel+    110.459 sec     396
-- 4)   conv2d_backward_data    70.346 sec      198
-- 5)   uack+   50.411 sec      495
-- 6)   maxpooling_backward     50.044 sec      297
-- 7)   relu_backward   28.905 sec      495
-- 8)   relu_maxpooling         21.635 sec      297
-- 9)   ba+*    20.543 sec      594
-- 10)  +*      15.317 sec      1485

Approach 2: Maxi-batch

time: 915534.711838ms
SystemML Statistics:
Total elapsed time:             0.000 sec.
Total compilation time:         0.000 sec.
Total execution time:           0.000 sec.
Number of compiled Spark inst:  80.
Number of executed Spark inst:  8.
Cache hits (Mem, WB, FS, HDFS): 19796/0/0/6.
Cache writes (WB, FS, HDFS):    7438/1/0.
Cache times (ACQr/m, RLS, EXP): 138.341/0.009/3.571/0.000 sec.
HOP DAGs recompiled (PRED, SB): 0/605.
HOP DAGs recompile time:        0.900 sec.
Functions recompiled:           1.
Functions recompile time:       0.146 sec.
Spark ctx create time (lazy):   0.013 sec.
Spark trans counts (par,bc,col):0/0/5.
Spark trans times (par,bc,col): 0.000/0.000/138.253 secs.
Total JIT compile time:         41.224 sec.
Total JVM GC count:             574.
Total JVM GC time:              102.319 sec.
Heavy hitter instructions (name, time, count):
-- 1)   conv2d_bias_add         196.187 sec     297
-- 2)   conv2d_backward_filter  160.273 sec     297
-- 3)   rangeReIndex    142.017 sec     203
-- 4)   sel+    108.674 sec     396
-- 5)   conv2d_backward_data    70.360 sec      198
-- 6)   maxpooling_backward     51.672 sec      297
-- 7)   uack+   49.394 sec      396
-- 8)   relu_backward   36.753 sec      396
-- 9)   relu_maxpooling         20.152 sec      297
-- 10)  +*      18.896 sec      1485

niketanpansare avatar Feb 20 '17 20:02 niketanpansare

Does this second cluster use 10Gb or 1Gb Ethernet?

mboehm7 avatar Feb 21 '17 04:02 mboehm7

10 Gb Ethernet:

$ lspci | grep -iE --color 'network|ethernet'
01:00.0 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
01:00.1 Ethernet controller: Broadcom Limited NetXtreme II BCM57810 10 Gigabit Ethernet (rev 10)
16:00.0 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.1 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.2 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)
16:00.3 Ethernet controller: Broadcom Limited NetXtreme BCM5719 Gigabit Ethernet PCIe (rev 01)

niketanpansare avatar Feb 21 '17 18:02 niketanpansare

@bertholdreinwald @mboehm7 @dusenberrymw Any review updates for this PR ?

Since its being a while, here is the summary of the discussion related to this PR:

  1. This PR adds a new syntax to DML language, which is useful for implementing SGD-style algorithms:
for(X_batch in X, nrow=batch_size) {
  ...
}
  1. Additionally, it allows for prefetching of X_batch with the help of the configuration property prefetch.budget.mb. The memory specified by this property is subtracted from the local memory budget. This could be a drawback in low-memory settings.

  2. Alternative is to compute the aggregate memory budget of all the child blocks and subtract it from local memory budget to get the prefetch budget. In many cases, the aggregate memory budget will likely be a gross over-estimate of actual memory required as it doesnot take into account the effect of rmvar and our caching. We can extend ProgramBlock to compute this:

/**
 * Computes the memory estimate if known, else -1.
 * If OptimizerUtils.isMemoryBasedOptLevel(), it returns OptimizerUtils.INVALID_SIZE.
 * 
 * @return the memory estimate for this program block 
 * @throws HopsException if error occurs
 */
public double getMemEstimate() throws HopsException {
	if(_sb == null) 
		return 0;
	if(!OptimizerUtils.isMemoryBasedOptLevel())
		return OptimizerUtils.INVALID_SIZE;
	
	double memBudget = 0;
	for(Hop h : _sb.get_hops()) {
		double memEstimateForH = h.getMemEstimate();
		if(memEstimateForH < 0)
			return -1;
		else
			memBudget += memEstimateForH;
	}
	return memBudget;
}

Alternatively, we can replace memBudget += h.getMemEstimate(); with memBudget = max(memBudget, memEstimateForH) as an optimistic estimate assuming immediate rmvar and/or caching.

niketanpansare avatar May 04 '17 19:05 niketanpansare

I'm still not convinced by this PR because the experimental results were non-conclusive (especially in comparison to simple rewrites), it does not nicely fit into our the architecture of memory management (without unnecessary over-provisioning), it unnecessarily differs from R syntax, and the CP batch iterator might actually create more evictions in the presence of memory pressure. So I would recommend to not rush this in.

mboehm7 avatar May 04 '17 20:05 mboehm7

Refer to this link for build results (access rights to CI server needed): https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1439/

akchinSTC avatar May 04 '17 22:05 akchinSTC

additional things to think about: (1) handling of updates in terms of assignments or left indexing (we would need to handle this during validate), and (2) the impact on codegen, which is already able to fuse right indexing of column vectors into the subsequent fused operator.

mboehm7 avatar May 05 '17 03:05 mboehm7

Refer to this link for build results (access rights to CI server needed): https://sparktc.ibmcloud.com/jenkins/job/SystemML-PullRequestBuilder/1725/

akchinSTC avatar Jul 07 '17 22:07 akchinSTC