ClickHouse.Client
ClickHouse.Client copied to clipboard
Support Watch Live View .
Hi.
I implement code that open http port and monitor watch live view changes. can you add this code to your library ,and return result in object ?
public class LiveViewWatcher : IDisposable
{
private readonly string _downloadUrl;
private readonly string _watchName;
private HttpClient? _httpClient = null;
public delegate void EvChangeLiveViewResult(string watchName, string data);
public event EvChangeLiveViewResult OnChangeLiveViewResult = delegate { };
public LiveViewWatcher(string ip_address, string user, string pwd, string watchName, int port = 8123, string? sessionId = null)
{
sessionId ??= Guid.NewGuid().ToString();
_downloadUrl = $"http://{ip_address}:8123/?user={user}&password={pwd}&session_id={sessionId}&allow_experimental_live_view=1&query=WATCH%20{watchName}%20FORMAT%20JSONEachRow";
_watchName = watchName;
}
public async Task StartWatch()
{
_httpClient = new HttpClient { Timeout = TimeSpan.FromDays(1) };
using (var response = await _httpClient.GetAsync(_downloadUrl, HttpCompletionOption.ResponseHeadersRead))
await DownloadFileFromHttpResponseMessage(response);
}
private async Task DownloadFileFromHttpResponseMessage(HttpResponseMessage response)
{
using (var contentStream = await response.Content.ReadAsStreamAsync())
await ProcessContentStream(contentStream);
}
private async Task ProcessContentStream(Stream contentStream)
{
var totalBytesRead = 0L;
var buffer = new byte[1024];
var memResponse = new MemoryStream();
do
{
var bytesRead = await contentStream.ReadAsync(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
System.Threading.Thread.Sleep(2000);
continue;
}
await memResponse.WriteAsync(buffer, 0, bytesRead);
totalBytesRead += bytesRead;
if (bytesRead < buffer.Length && buffer[bytesRead - 1] == 10)
{
OnChangeLiveViewResult(_watchName, System.Text.Encoding.UTF8.GetString(memResponse.ToArray()));
memResponse = new MemoryStream();
totalBytesRead = 0;
buffer = new byte[1024];
continue;
}
}
while (true);
}
public void Dispose()
{
_httpClient?.Dispose();
}
}
var downloadMem = new LiveViewWatcher("192.168.5.10", "default", "alireza", "test.lv");
downloadMem.OnChangeLiveViewResult += TestFunc;
await downloadMem.StartWatch();
Console.ReadKey();
void TestFunc(string watchName, string stringResults)
{
Console.WriteLine(stringResults);
}
output result is :
{"sum(a)":"1008","_version":"1"}
{"sum(a)":"1014","_version":"2"}
{"sum(a)":"1020","_version":"3"}
{"sum(a)":"1026","_version":"4"}
{"sum(a)":"1032","_version":"5"}
SET allow_experimental_live_view = 1;
DROP TABLE IF EXISTS test.lv;
DROP TABLE IF EXISTS test.mt;
CREATE TABLE test.mt (a Int32) Engine=MergeTree order by tuple();
CREATE LIVE VIEW test.lv AS SELECT sum(a) FROM test.mt;
WATCH test.lv LIMIT 0;
INSERT INTO test.mt VALUES (1),(2),(3);
WATCH test.lv LIMIT 0;
Hi,
Thanks for contribution, will definitely look into this
I have been using this code for some time and it works for me without any problems. One of the features of this code is that it continues to work if the connection is disconnected and connected, and one of its advantages is that it is written with a socket. Another feature is keeping the connection alive. Because of the interval.
@DarkWanderer
using System.IO;
using System.Net.Sockets;
using System.Text;
using SpanJson;
public class LiveViewWatcher : IDisposable
{
public static LiveViewWatcher Watch(string watchName, string ip_address = null, string user = null, string pwd = null, int port = 8123, string sessionId = null, bool only_event = false) =>
new LiveViewWatcher(watchName, ip_address, user, pwd, 8123, sessionId, only_event);
public LiveViewWatcher(string watchName, string ip_address = null, string user = null, string pwd = null, int? port = null, string sessionId = null, bool only_event = false)
{
ip_address ??= MachineConstants.CLICKHOUSE_SERVER.IP;
user ??= MachineConstants.CLICKHOUSE_SERVER.USER;
pwd ??= MachineConstants.CLICKHOUSE_SERVER.PASSWORD;
port ??= Convert.ToInt32(MachineConstants.CLICKHOUSE_SERVER.PORT);
SessionId = sessionId ??= Guid.NewGuid().ToString();
ServerIpAddres = ip_address;
ServerPort = port.Value;
Username = user;
Password = pwd;
WatchName = watchName;
OnlyEvents = only_event;
}
private readonly string WatchName;
private readonly bool OnlyEvents;
public int KeepAliveInterval { get; private set; } = 240;
public delegate void EvLiveViewResult(LiveViewWatcher watchObj, List<Dictionary<string, string>> data);
public delegate void EvLiveViewJsonResult(LiveViewWatcher watchObj, string data);
public event EvLiveViewResult OnLiveViewResult = null;
public event EvLiveViewJsonResult OnLiveViewJsonResult = null;
private void ProcessDataBuffer(byte[] bytes)
{
var json_data = "";
var all_content = "";
StringBuilder str_builder = new StringBuilder();
try
{
all_content = bytes.ToUTF8();
var splited_chunk = all_content.Split("\r\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();
foreach (var ch_line in splited_chunk)
str_builder.Append(ch_line);
all_content = str_builder.ToString();
str_builder = new StringBuilder();
var lines = all_content.Split("\n", StringSplitOptions.RemoveEmptyEntries).Where(c => c.Length > 6).ToArray();
str_builder.Append("[");
foreach (var line in lines)
{
if (line[0] != '{' || line.StartsWith("{\"progress\":"))
continue;
str_builder.Append(line.Substring(7, line.Length - 8));
str_builder.Append(",");
}
str_builder = str_builder.Remove(str_builder.Length - 1, 1);
str_builder.Append("]");
json_data = str_builder.ToString();
if (json_data.Length < 2)
return;
Task.Run(() =>
{
OnLiveViewJsonResult?.Invoke(this, json_data);
OnLiveViewResult?.Invoke(this, JsonSerializer.Generic.Utf16.Deserialize<List<Dictionary<string, string>>>(json_data));
});
}
catch (Exception expMsg)
{
Console.WriteLine(expMsg.StackTrace);
// System.IO.File.WriteAllText($"live_data_j_{DateTime.Now.ToFileTime()}.txt", json_data);
// System.IO.File.WriteAllText($"live_data_c_{DateTime.Now.ToFileTime()}.txt", all_content);
}
}
#region Scoket Methods
public string ServerIpAddres { get; }
public int ServerPort { get; } = 8123;
public string Password { get; }
public string Username { get; }
public string SessionId { get; set; }
public bool IsDisposed { get; private set; } = false;
public TcpClient socket;
public async Task StartWatch()
{
try
{
SessionId = Guid.NewGuid().ToString();
string url = $"http://{ServerIpAddres}:{ServerPort}/?user={Username}&password={Password}&session_id={SessionId}&allow_experimental_live_view=1&live_view_heartbeat_interval=120&query=WATCH%20{WatchName}{(OnlyEvents ? "%20EVENTS" : "")}%20FORMAT%20JSONEachRowWithProgress";
var request = $"GET {url} HTTP/1.1\r\nHost: {ServerIpAddres}\r\nContent-Length: 0\r\nConnection: Keep-Alive\r\nKeep-Alive: timeout=3600, max=100\r\n\r\n";
socket = new TcpClient(ServerIpAddres, ServerPort);
socket.ReceiveBufferSize = 4096 * 2;
socket.SendBufferSize = 4096 * 2;
socket.ReceiveTimeout = 30000;
socket.SendTimeout = 30000;
using var stream = socket.GetStream();
await stream.WriteAsync(request.ToBytes());
await stream.FlushAsync();
int byte_read = 0;
MemoryStream LiveViewBuffer = new MemoryStream();
DateTime lastMessageTime = DateTime.Now;
do
{
byte[] bytes = new byte[1024 * 32];
byte_read = await stream.ReadAsync(bytes, 0, bytes.Length);
if (byte_read <= 0)
{
if ((DateTime.Now - lastMessageTime).TotalSeconds > KeepAliveInterval)
throw new Exception($"[{WatchName}] [{DateTime.Now.ToMiniStr()}] [{lastMessageTime.ToMiniStr()}] http session is killed!");
await stream.WriteAsync(new byte[0]);
System.Threading.Thread.Sleep(2000);
continue;
}
lastMessageTime = DateTime.Now;
LiveViewBuffer.Write(bytes, 0, byte_read);
if (byte_read >= 3 && bytes[byte_read - 3] == 10 && bytes[byte_read - 2] == 13 && bytes[byte_read - 1] == 10)
{
LiveViewBuffer.Flush();
ProcessDataBuffer(LiveViewBuffer.ToArray());
LiveViewBuffer = new MemoryStream();
continue;
}
} while (true);
}
catch (Exception expMsg)
{
Console.WriteLine(expMsg.Message);
if (!IsDisposed)
{
ClearSocket();
System.Threading.Thread.Sleep(20000);
_ = StartWatch();
}
}
}
private void ClearSocket()
{
if (socket != null)
{
try
{
if (socket.Connected)
socket?.Close();
socket?.Dispose();
}
catch
{
socket = null;
}
}
}
#endregion
public void Dispose() => IsDisposed = true;
}