RealtimeStreamingEngineering
RealtimeStreamingEngineering copied to clipboard
The system cannot find the path specified.
1.streaming-socket.py
import json
import socket
import time
import pandas as pd
def send_data_over_socket(file_path,host='127.0.0.1',port=9999,chunk_size=2):
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.bind((host,port))
s.listen(1)
print(f"Listing For connection {host}:{port}")
conn,addr = s.accept()
print(f"Connection From {addr}")
last_sent_index = 0
try:
with open(file_path,'r') as file:
#Skip The Lines That Were Already Sent
for _ in range(last_sent_index):
next(file)
records = []
for line in file:
records.append(json.loads(line))
if(len(records)) == chunk_size:
chunk = pd.DataFrame(records)
print(chunk)
for record in chunk.to_dict(orient='records'):
serilize_data = json.dumps(record).encode('utf-8')
conn.send(serilize_data + b'\n')
time.sleep(5)
last_sent_index += 1
records = []
except (BrokenPipeError,ConnectionResetError):
print("Clinet Disconnected")
finally:
conn.close()
print("Connection Closed")
if __name__ == "__main__":
send_data_over_socket('datasets/yelp_academic_dataset_review.json')
2.spark-streaming.py
import pyspark
from pyspark.sql import SparkSession
def start_streaming(spark):
stream_df = (spark.readStream.format("socket")
.option("host","localhost")
.option("port",9999)
.load()
)
query = stream_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()
#
if __name__ == "__main__":
spark_conn = SparkSession.builder.appName("SocketStreamConsumer").getOrCreate()
start_streaming(spark_conn)
When I Try To Run The Second File Got Error
The system cannot find the path specified.
add the full path send_data_over_socket('src/datasets/yelp_academic_dataset_review.json')