RealtimeStreamingEngineering icon indicating copy to clipboard operation
RealtimeStreamingEngineering copied to clipboard

The system cannot find the path specified.

Open Aremstrom opened this issue 1 year ago • 1 comments

1.streaming-socket.py

import json
import socket
import time
import pandas as pd

def send_data_over_socket(file_path,host='127.0.0.1',port=9999,chunk_size=2):
    s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.bind((host,port))
    s.listen(1)
    print(f"Listing For connection {host}:{port}")

    conn,addr = s.accept()
    print(f"Connection From {addr}")

    last_sent_index = 0

    try:
        with open(file_path,'r') as file:
            #Skip The Lines That Were Already Sent
            for _ in range(last_sent_index):
                next(file)

            records = []
            for line in file:
                records.append(json.loads(line))
                if(len(records)) == chunk_size:
                    chunk = pd.DataFrame(records)
                    print(chunk)
                    for record in chunk.to_dict(orient='records'):
                        serilize_data = json.dumps(record).encode('utf-8')
                        conn.send(serilize_data + b'\n')
                        time.sleep(5)
                        last_sent_index += 1

                    records = []

    except (BrokenPipeError,ConnectionResetError):
        print("Clinet Disconnected")
    finally:
        conn.close()
        print("Connection Closed")



if __name__ == "__main__":
    send_data_over_socket('datasets/yelp_academic_dataset_review.json')

2.spark-streaming.py

import pyspark
from pyspark.sql import SparkSession

def start_streaming(spark):
    stream_df = (spark.readStream.format("socket")
                 .option("host","localhost")
                 .option("port",9999)
                 .load()
                 )
    query = stream_df.writeStream.outputMode("append").format("console").start()
    query.awaitTermination()
#
if __name__ == "__main__":

    spark_conn = SparkSession.builder.appName("SocketStreamConsumer").getOrCreate()

    start_streaming(spark_conn)

When I Try To Run The Second File Got Error

The system cannot find the path specified.

Aremstrom avatar Jul 23 '24 11:07 Aremstrom

add the full path send_data_over_socket('src/datasets/yelp_academic_dataset_review.json')

elhadjaoui avatar Nov 19 '24 21:11 elhadjaoui