使用示例 consumer 的代码得到乱码消息
你好,我的架构是使用 filebeat 采集日志传输进 kafka 中, 使用了示范中的 consumer 代码取出后出现乱码, 但是使用 bin/kafka-console-producer 来生成消息就没问题. 使用示例中 consumer 取出的乱码数据片段 : YK╦÷µÊÐ┤ë} ¢w▀{Ñk│ÁtZ\t)#\x10ëÂ\x13«ÒñªÄƒ▒ØÁ]Uë\n ìn\f\t·e\x1F&┴\x00MúÊé║ò¡@ÚÇ~(Ù$@b\x08P\x11\x12Ü║¯\v¹Rÿ\x00mBÒ^█Î)q\x12┐{│7ƒù$ë"█±ƒ°×▀9þ×\x7FÎÎ]º¥:▓ø¤«Ïy¢©t²Þ¬\x7FuØzK|ë[\x07Å$ÂVrÒ┘r%=^L°\té░█ç╝>┬ç\x10¸)¸\tOz.²bó7▒u<[IÅñ+Úä\x7F$▒/ø«ê╗ŵ‗┘Û┼ÌDÕp1+n\x19\t2Ô╩²┘R9\x17\x14─u'╔ÆH■9(µ2Ôz!{░v±ho²IÄ$÷\x07ÕJ!=.\x1F×\x0F2Ú╝╝×¼^\x1A\tãË╣┬îO8þ\x03─ô╦█õôÀ║_9ÿ(eõ=R\x07âÊüT>\x18+º\n c╣┬íT~b<[¿$ËÖLÂ\x16\x0F\e\x13»\x1AîÄû│Ôy1▒¢ÌäXÀrzL>\x18{$ë\x1D7Ú$1ßVƒ°ÌMX¬┐XJë\x15Õ>B>v|ýY}ê#┤Î┌ôÏ▒}╚Jݤµ¾üu¸ðð«\x14NÔ=\tï!f1nÝFIäÞ^K▄»O▄║'1\x10<É╦þË);ë¼§├╣┬Hp░l}f╚\x12/À╔\x1A\x1E\x1CvXÅı_,µ│├┘};sòöMyÆ:Í·Øw\x0F\r|║Î╩þ\x0Ed¡\x1D┘╠üáÃ║k\x7F)\x18¤ªl×DIÔ╣R╝ÍþÊúÚR«■¿Û\vèù\x15´6W(NT\x17▒.\╣\x06b╔èÑá\╠f*Ai·▀ÄÍi\x13£uîûo§_l\x02nW;ü3ö5s¿Â¼\x19ØIÍ®┘äÝ─PÏS\n ▄By\r¶;┤\x02+Ô:$q'^¾\x7F\vï8\x1Dj(‗\tÐóå─ÜÜ8Ü}Ãq\x15q7HÔ║w\x7F´╗å─▒hêïº\x1A\e\v─ÕÜÂYjÖ×mãÝÍ2E▄2I\x1C║|▀\x15X─-ıƒ[wfQÀ\x18½\x14A╝aÒÌ%ë¾╬^9\n ï©ã┐J▄໲ïâ?E▄ìÆ©o²Ó4âE\x1Cê§î\x16t\x00±ªí┴b«;ÅX§¦29▓ò¢·5├õ╚B&╬L \f\x13ªOÙóIÄtJÓ.]}e\x02\x16pæ╩\vÇ\x17]\ü\x11úñ▒ï{Å\x04¯O╗~°2,Ó"^\x00øÆêíÄw·%\x02¿\x15p╦%p\x7F\x1DX÷<,Ó@¿sâON\x1DM>ëOÝV|\x12ä,J%ƒÏÒÝþSOwì]Û{%p┐╗‗╩Ñ\x06pOC\x00NG║L³\x0E']ºj}©█~Ú┬▓>Q{\x14ø┌H\x01¸>\t▄‗×=eX└\x19¥1█Í▄»\vX1ÊÇı]éu×§¡¸KÓN\x16\x1F~╔\x108;\x1AÓÇ»×®F8£╗┌\x1AAÿåF8\x1EPìP└u╔D▄å'?§½&Ó┬%Ô"\x02.╝L░Å\x1CƒÔ02┴╠¡\n
使用 gzdecode 尝试后, 得到 仍然带有部分乱码的片段 : b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02bIè;ƒ\x01\x00\x00\x00\x01f\x19ö\x0E®ÿÿÿÿ\x00\x00\x02L{"@timestamp":"2018-09-27T07:37:27.982Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.4.0","topic":"newtopic"},"input":{"type":"log"},"prospector":{"type":"log"},"beat":{"name":"localhost.localdomain","hostname":"localhost.localdomain","version":"6.4.0"},"host":{"name":"localhost.localdomain"} 虽然借鉴使用了其他 issues 提到的 gzdecode, 但是还是会发生乱码情况.
使用的各部分版本号 : filebeat v6.4.0 kafka 2.0.0 php v7.0.11 kafka-php v0.2.0.8
kafka 1.1.1 filebeat 6.4.0 php 7.1.11 kafka-php v0.2.0.8
我定位问题出现在 filebeat传输给 kafka 的时候默认配置compression: gzip 所以数据是压缩的,但是 php 的 gzdecode 还原出来的数据仍有乱码,如果 filebeat 收集发给 kafka 时设置 compression: none 就没有问题。
kafka-php 中 nmred/kafka-php/src/Kafka/Protocol/Protocol.php 方法 decodeString($data, $bytes, $compression = self::COMPRESSION_NONE)
第三个参数可以根据压缩类型处理字符串,但是现在项目中都没有用到。
陷入困境。
nmred/kafka-php 更新到 dev-master 似乎解决了,继续研究中。
kafka 1.1.1 filebeat 6.4.0 php 7.1.11 kafka-php v0.2.0.8
我定位问题出现在 filebeat传输给 kafka 的时候默认配置compression: gzip 所以数据是压缩的,但是 php 的 gzdecode 还原出来的数据仍有乱码,如果 filebeat 收集发给 kafka 时设置 compression: none 就没有问题。
kafka-php 中 nmred/kafka-php/src/Kafka/Protocol/Protocol.php 方法 decodeString($data, $bytes, $compression = self::COMPRESSION_NONE)
第三个参数可以根据压缩类型处理字符串,但是现在项目中都没有用到。
陷入困境。
你好, 是的,我也发现该问题,如果使用 compression: none 就没有问题, 至于你说的 ecodeString($data, $bytes, $compression = self::COMPRESSION_NONE) 这个是可以手动修改指明压缩方式从而解决该问题吗?
我临时的解决方案是 filebeat 既然是 json
我就手动 gzdecode 之后用 { 截取后面正常的数据然后做处理
我后来改用 dev-master 版本似乎可以识别是否压缩并返回正常数据,但引入了新问题,取不到消费的偏移量,每次消费完以后又重复消费了