[SPARK-36620][SHUFFLE] Add Push Based Shuffle client side metrics
What changes were proposed in this pull request?
This adds the following push based shuffle metrics like :
- Merger count and magnet enabled/disabled for stage
- Time spent on results finalization
- Counting actual number of blocks other than chunks
- Corrupt shuffle blocks chunks and fallback
Why are the changes needed?
These changes help to understand the push based shuffle metrics of the application
Does this PR introduce any user-facing change?
Changes to API responses by SHS (eg: /stages)
How was this patch tested?
Modified existing unit tests and also tested API response on event log files in SHS
Can one of the admins verify this patch?
@thejdeep This PR should have missed the changes required in ShuffleBlockFetchIterator which update the ShuffleMetrics while shuffle fetching is on-going.
@zhouyejoe @otterc Updated the PR with the latest changes, please take a look. Thank you!
@thejdeep Thanks for updating the PR. Yet to review it, but several UTs within HistoryServerSuite failed, can you help double check?
@otterc @zhouyejoe Fixed the failing tests, should be good for another review now. Thanks!
+CC @zhouyejoe
Left some minor comments last week, others mostly LGTM. @thejdeep
Thanks for reviewing @zhouyejoe @mridulm, addressed feedback.
Please don't force push the code, unless it is to resolve conflicts/etc which can be handled otherwise - it makes reviewing what changed much harder
There are also UT failures related to this PR @thejdeep
Addressed comments, please take another look @otterc @mridulm @zhouyejoe Thanks!
@thejdeep The test failures are related to this change. Please fix them.
/home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala:92:5: not enough arguments for constructor TaskDataWrapper: (taskId: Long, index: Int, attempt: Int, partitionId: Int, launchTime: Long, resultFetchStart: Long, duration: Long, executorId: String, host: String, status: String, taskLocality: String, speculative: Boolean, accumulatorUpdates: Seq[org.apache.spark.status.api.v1.AccumulableInfo], errorMessage: Option[String], hasMetrics: Boolean, executorDeserializeTime: Long, executorDeserializeCpuTime: Long, executorRunTime: Long, executorCpuTime: Long, resultSize: Long, jvmGcTime: Long, resultSerializationTime: Long, memoryBytesSpilled: Long, diskBytesSpilled: Long, peakExecutionMemory: Long, inputBytesRead: Long, inputRecordsRead: Long, outputBytesWritten: Long, outputRecordsWritten: Long, shuffleRemoteBlocksFetched: Long, shuffleLocalBlocksFetched: Long, shuffleFetchWaitTime: Long, shuffleRemoteBytesRead: Long, shuffleRemoteBytesReadToDisk: Long, shuffleLocalBytesRead: Long, shuffleRecordsRead: Long, shuffleCorruptMergedBlockChunks: Long, shuffleMergedFetchFallbackCount: Long, shuffleMergedRemoteBlocksFetched: Long, shuffleMergedLocalBlocksFetched: Long, shuffleMergedRemoteChunksFetched: Long, shuffleMergedLocalChunksFetched: Long, shuffleMergedRemoteBytesRead: Long, shuffleMergedLocalBytesRead: Long, shuffleRemoteReqsDuration: Long, shuffleMergedRemoteReqDuration: Long, shuffleBytesWritten: Long, shuffleWriteTime: Long, shuffleRecordsWritten: Long, stageId: Int, stageAttemptId: Int)org.apache.spark.status.TaskDataWrapper.
[error] Unspecified value parameters shuffleCorruptMergedBlockChunks, shuffleMergedFetchFallbackCount, shuffleMergedRemoteBlocksFetched...
[error] new TaskDataWrapper(
[error] ^
@otterc @mridulm The test failure is from a rebased commit that added protobuf serialization changes. I had to rebase locally and force push, this rewrote commit history.
@mridulm StageDataWrapper was also added to protobuf serialization in #39192 (merged a few days ago) causing this build to fail. I have updated this PR to make more changes to the protobuf store types and also added serialization for push shuffle metrics. The build should be passing now. Thanks.
Can you take a look at the test failures @thejdeep ? They look relevant.
Fixed failing tests and updated commit messages to reflect the overall changes. PTAL. Thanks
I am not sure what has changed since last review due to the force pushes - please do not force push, particularly for large PR's.
Looks good to me, will wait for successful build before merging.
LGTM. Thanks for working on this PR. @thejdeep
Merged to master. Thanks for working on this @thejdeep ! Thanks for the reviews @otterc, @zhouyejoe, @gengliangwang :-)
I think that we should skip logging of these metrics in JsonProtocol when push-based shuffle is disabled, so I've filed https://issues.apache.org/jira/browse/SPARK-42203