spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics

Open thejdeep opened this issue 3 years ago • 5 comments

What changes were proposed in this pull request?

This adds the following push based shuffle metrics like :

  • Merger count and magnet enabled/disabled for stage
  • Time spent on results finalization
  • Counting actual number of blocks other than chunks
  • Corrupt shuffle blocks chunks and fallback

Why are the changes needed?

These changes help to understand the push based shuffle metrics of the application

Does this PR introduce any user-facing change?

Changes to API responses by SHS (eg: /stages)

How was this patch tested?

Modified existing unit tests and also tested API response on event log files in SHS

thejdeep avatar Apr 12 '22 17:04 thejdeep

Can one of the admins verify this patch?

AmplabJenkins avatar Apr 14 '22 20:04 AmplabJenkins

@thejdeep This PR should have missed the changes required in ShuffleBlockFetchIterator which update the ShuffleMetrics while shuffle fetching is on-going.

zhouyejoe avatar May 27 '22 18:05 zhouyejoe

@zhouyejoe @otterc Updated the PR with the latest changes, please take a look. Thank you!

thejdeep avatar Aug 08 '22 16:08 thejdeep

@thejdeep Thanks for updating the PR. Yet to review it, but several UTs within HistoryServerSuite failed, can you help double check?

zhouyejoe avatar Aug 08 '22 21:08 zhouyejoe

@otterc @zhouyejoe Fixed the failing tests, should be good for another review now. Thanks!

thejdeep avatar Sep 06 '22 18:09 thejdeep

+CC @zhouyejoe

mridulm avatar Nov 02 '22 06:11 mridulm

Left some minor comments last week, others mostly LGTM. @thejdeep

zhouyejoe avatar Nov 28 '22 17:11 zhouyejoe

Thanks for reviewing @zhouyejoe @mridulm, addressed feedback.

thejdeep avatar Nov 28 '22 20:11 thejdeep

Please don't force push the code, unless it is to resolve conflicts/etc which can be handled otherwise - it makes reviewing what changed much harder

mridulm avatar Nov 29 '22 01:11 mridulm

There are also UT failures related to this PR @thejdeep

zhouyejoe avatar Dec 14 '22 18:12 zhouyejoe

Addressed comments, please take another look @otterc @mridulm @zhouyejoe Thanks!

thejdeep avatar Dec 21 '22 13:12 thejdeep

@thejdeep The test failures are related to this change. Please fix them.

/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala:92:5: not enough arguments for constructor TaskDataWrapper: (taskId: Long, index: Int, attempt: Int, partitionId: Int, launchTime: Long, resultFetchStart: Long, duration: Long, executorId: String, host: String, status: String, taskLocality: String, speculative: Boolean, accumulatorUpdates: Seq[org.apache.spark.status.api.v1.AccumulableInfo], errorMessage: Option[String], hasMetrics: Boolean, executorDeserializeTime: Long, executorDeserializeCpuTime: Long, executorRunTime: Long, executorCpuTime: Long, resultSize: Long, jvmGcTime: Long, resultSerializationTime: Long, memoryBytesSpilled: Long, diskBytesSpilled: Long, peakExecutionMemory: Long, inputBytesRead: Long, inputRecordsRead: Long, outputBytesWritten: Long, outputRecordsWritten: Long, shuffleRemoteBlocksFetched: Long, shuffleLocalBlocksFetched: Long, shuffleFetchWaitTime: Long, shuffleRemoteBytesRead: Long, shuffleRemoteBytesReadToDisk: Long, shuffleLocalBytesRead: Long, shuffleRecordsRead: Long, shuffleCorruptMergedBlockChunks: Long, shuffleMergedFetchFallbackCount: Long, shuffleMergedRemoteBlocksFetched: Long, shuffleMergedLocalBlocksFetched: Long, shuffleMergedRemoteChunksFetched: Long, shuffleMergedLocalChunksFetched: Long, shuffleMergedRemoteBytesRead: Long, shuffleMergedLocalBytesRead: Long, shuffleRemoteReqsDuration: Long, shuffleMergedRemoteReqDuration: Long, shuffleBytesWritten: Long, shuffleWriteTime: Long, shuffleRecordsWritten: Long, stageId: Int, stageAttemptId: Int)org.apache.spark.status.TaskDataWrapper.
[error] Unspecified value parameters shuffleCorruptMergedBlockChunks, shuffleMergedFetchFallbackCount, shuffleMergedRemoteBlocksFetched...
[error]     new TaskDataWrapper(
[error]     ^

otterc avatar Dec 22 '22 18:12 otterc

@otterc @mridulm The test failure is from a rebased commit that added protobuf serialization changes. I had to rebase locally and force push, this rewrote commit history.

thejdeep avatar Dec 23 '22 05:12 thejdeep

@mridulm StageDataWrapper was also added to protobuf serialization in #39192 (merged a few days ago) causing this build to fail. I have updated this PR to make more changes to the protobuf store types and also added serialization for push shuffle metrics. The build should be passing now. Thanks.

thejdeep avatar Jan 03 '23 20:01 thejdeep

Can you take a look at the test failures @thejdeep ? They look relevant.

mridulm avatar Jan 04 '23 05:01 mridulm

Fixed failing tests and updated commit messages to reflect the overall changes. PTAL. Thanks

thejdeep avatar Jan 04 '23 17:01 thejdeep

I am not sure what has changed since last review due to the force pushes - please do not force push, particularly for large PR's.

mridulm avatar Jan 06 '23 00:01 mridulm

Looks good to me, will wait for successful build before merging.

mridulm avatar Jan 06 '23 01:01 mridulm

LGTM. Thanks for working on this PR. @thejdeep

zhouyejoe avatar Jan 06 '23 01:01 zhouyejoe

Merged to master. Thanks for working on this @thejdeep ! Thanks for the reviews @otterc, @zhouyejoe, @gengliangwang :-)

mridulm avatar Jan 06 '23 08:01 mridulm

I think that we should skip logging of these metrics in JsonProtocol when push-based shuffle is disabled, so I've filed https://issues.apache.org/jira/browse/SPARK-42203

JoshRosen avatar Jan 26 '23 21:01 JoshRosen