Pinot 1.1 Multi-stage query OOMs and brings down all servers -- with all the default protections/params in place
We have a realtime table (6 partitions, 140gb), when querying the table with timeout of 3 minutes all servers (6 servers, each 24cores and ~100 gb allocated to pinot) OOM and hangs
Query:
with dups as (
select __global_counter,
__message_id,
__probe_id,
min(__record_timestamp) as min_record_timestamp,
max(__record_timestamp) as max_record_timestamp,
count(*) as cnt
from org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f
group by __global_counter, __message_id, __probe_id
)
select
*
from dups
order by cnt desc
limit 10
Explain plan:
Execution Plan
LogicalSort(sort0=[$5], dir0=[DESC], offset=[0], fetch=[10])
PinotLogicalSortExchange(distribution=[hash], collation=[[5 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])
LogicalSort(sort0=[$5], dir0=[DESC], fetch=[10])
LogicalAggregate(group=[{0, 1, 2}], agg#0=[MIN($3)], agg#1=[MAX($4)], agg#2=[COUNT($5)])
PinotLogicalExchange(distribution=[hash[0, 1, 2]])
LogicalAggregate(group=[{3, 4, 5}], agg#0=[MIN($6)], agg#1=[MAX($6)], agg#2=[COUNT()])
LogicalTableScan(table=[[org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f]])
We understand that such a query is perhaps not the best suited for Pinot but crashing all servers queried seems like a bug, especially as we haven't overridden any of the protections in place by the engine (besides timeout), we've reproduced it live for @mayankshriv and he suggested we open this issue.
OOM Logs from one of the servers (not super informative):
{"time":"2024-06-17T22:48:22.991521636+02:00","stream":"stdout","logtag":"F","message":"Terminating due to java.lang.OutOfMemoryError: Java heap space"}
--
Running server args (pinot 1.1):
\"args\" : [ \"--add-opens=java.base/java.nio=ALL-UNNAMED\", \"--add-opens=java.base/sun.nio.ch=ALL-UNNAMED\", \"--add-opens=java.base/java.lang=ALL-UNNAMED\", \"--add-opens=java.base/java.util=ALL-UNNAMED\", \"--add-opens=java.base/java.lang.reflect=ALL-UNNAMED\", \"-Xms32G\", \"-Xmx32G\", \"-XX:+ExitOnOutOfMemoryError\", \"-javaagent:/opt/pinot/etc/jmx_prometheus_javaagent/jmx_prometheus_javaagent.jar=8008:/opt/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml\", \"-Dlog4j2.configurationFile=/opt/pinot/etc/conf/pinot-server-log4j2.xml\", \"-Dplugins.dir=/opt/pinot/plugins\", \"-Dplugins.dir=/opt/pinot/plugins\", \"-Dapp.name=pinot-admin\", \"-Dapp.pid=1\", \"-Dapp.repo=/opt/pinot/lib\", \"-Dapp.home=/opt/pinot\", \"-Dbasedir=/opt/pinot\" ],"
Table config
{
"REALTIME": {
"tableName": "org_2dYiMRMfas142XRKQ3bJIqmN3V6_ethereum_erc20_balance_changes_block_b91edb0804c04c2d9851eb975b689f0f_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"replication": "2",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "30",
"replicasPerPartition": "1",
"timeColumnName": "__record_timestamp",
"minimizeDataMovement": false
},
"tenants": {
"broker": "sim_community",
"server": "sim_community"
},
"tableIndexConfig": {
"columnMajorSegmentBuilderEnabled": false,
"rangeIndexVersion": 2,
"autoGeneratedInvertedIndex": false,
"createInvertedIndexDuringSegmentGeneration": false,
"sortedColumn": [
"__global_counter",
"__message_id"
],
"loadMode": "MMAP",
"enableDefaultStarTree": false,
"enableDynamicStarTreeCreation": false,
"aggregateMetrics": false,
"nullHandlingEnabled": false,
"optimizeDictionary": true,
"optimizeDictionaryForMetrics": false,
"noDictionarySizeRatioThreshold": 0.85
},
"metadata": {},
"quota": {
"maxQueriesPerSecond": "20.0"
},
"query": {
"timeoutMs": 5000
},
"fieldConfigList": [
{
"name": "__global_counter",
"encodingType": "RAW",
"indexTypes": [],
"indexes": null,
"tierOverwrites": null
}
],
"ingestionConfig": {
"streamIngestionConfig": {
"streamConfigMaps": [
{
"metadata.populate": "true",
"realtime.segment.flush.autotune.initialRows": "1000000",
"realtime.segment.flush.threshold.rows": "0",
"realtime.segment.flush.threshold.segment.size": "500M",
"realtime.segment.serverUploadToDeepStore": "true",
"sasl.jaas.config": "*****";",
"sasl.mechanism": ****",
"security.protocol": "******",
"stream.kafka.broker.list": "*****",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.topic.name": "sim_f8ac4a56_17e7_4704_97a6_ba84f94b4d46_ethereum_erc20_balance_changes_block",
"streamType": "kafka"
}
],
"columnMajorSegmentBuilderEnabled": false
},
"transformConfigs": [
{
"columnName": "__record_timestamp",
"transformFunction": "__metadata$recordTimestamp"
},
{
"columnName": "__global_counter",
"transformFunction": "jsonPath(__internal, '$.global_counter')"
},
{
"columnName": "__message_id",
"transformFunction": "jsonPath(__internal, '$.message_id')"
},
{
"columnName": "__probe_id",
"transformFunction": "jsonPath(__internal, '$.probe_id')"
},
{
"columnName": "txn_hash",
"transformFunction": "concat('0x', jsonPath(__user, '$.txn_hash'))"
},
{
"columnName": "block_number",
"transformFunction": "jsonPath(__user, '$.block_number')"
},
{
"columnName": "block_timestamp",
"transformFunction": "jsonPath(__user, '$.block_timestamp')"
},
{
"columnName": "token_address",
"transformFunction": "concat('0x', jsonPath(__user, '$.token_address'))"
},
{
"columnName": "token_name",
"transformFunction": "jsonPath(__user, '$.token_name')"
},
{
"columnName": "token_symbol",
"transformFunction": "jsonPath(__user, '$.token_symbol')"
},
{
"columnName": "token_decimals",
"transformFunction": "bytesToBigDecimal(hexToBytes(concat('000000', jsonPath(__user, '$.token_decimals'))))"
},
{
"columnName": "account_address",
"transformFunction": "concat('0x', jsonPath(__user, '$.account_address'))"
},
{
"columnName": "balance",
"transformFunction": "bytesToBigDecimal(hexToBytes(concat('000000', jsonPath(__user, '$.balance'))))"
}
],
"continueOnError": false,
"rowTimeValueCheck": true,
"segmentTimeValueCheck": true
},
"isDimTable": false
}
}
Thanks for your report.
Apache Pinot has some OOM protection mechanisms that are applied to single-stage query engine but they are not applied in multi-stage query engine. I've created https://github.com/apache/pinot/issues/13436 in order to track them. We are actively working in some of them (specially in automatic query killing).
This specific query can be executed in single-stage. Have you try it there? Does it fail in that single-stage?
I need to find a proper time to test it because obviously killing our cluster is not something I can do at any time, (we don't have a test cluster with these big tables)
Regardless we care about the Multi-stage engine and we've seen it happen multiple times with different queries that some can only be executed on the multi-stage engine.
cc: @Jackie-Jiang