groupByInterval not working [Spark: 2.4.4]
Two joint dfs should be grouped by a third (clock) df.
But Error is thrown:
java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$
724 with traceback_utils.SCCallSiteSync(self._sc) as css:
725 tsrdd = self.timeSeriesRDD.groupByInterval(clock.timeSeriesRDD, scala_key,
--> 726 inclusion, rounding)
727 return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
728
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o3361.groupByInterval.
: java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$
at com.twosigma.flint.rdd.OrderedRDD.intervalize(OrderedRDD.scala:560)
at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.summarizeIntervals(TimeSeriesRDD.scala:1605)
at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.groupByInterval(TimeSeriesRDD.scala:1493)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
Steps to reproduce
from datetime import datetime
import numpy as np
import ts.flint
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)
values = {'time': [1543672800000, 1543672800100, 1543672800260],
'F_1': [22, 38, 26],
'F.2': [7.3, 71.3, 7.9]}
states = {'time': [1543672800100, 1543672800200, 1543672800300],
'types': ["0", "24", "42"],
'state': ["False", "True", "True"]}
stops = {'time': [1543672800150, 1543672800200, 1543672800300, 43672800360]}
states_pd = pd.DataFrame(states, columns=states.keys())
values_pd = pd.DataFrame(values, columns=values.keys())
stops_pd = pd.DataFrame(stops, columns=stops.keys())
states_pd['time'] = pd.to_datetime(states_pd['time'], unit='ms', origin='unix')
values_pd['time'] = pd.to_datetime(values_pd['time'], unit='ms', origin='unix')
stops_pd['time'] = pd.to_datetime(stops_pd['time'], unit='ms', origin='unix')
states_df = spark.createDataFrame(states_pd)
values_df = spark.createDataFrame(values_pd)
stops_df = spark.createDataFrame(stops_pd)
# Convert to Flint DataFrame
flint_df1, flint_states, flint_stops = [flintContext.read \
.option("isSorted", False) \
.option("timeColumn", 'time') \
.option("timeUnit", 'ms') \
.dataframe(
# https://github.com/twosigma/flint:
# 'To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType'
df.withColumn("time", (df.time.cast('double')*1000).cast("long"))
) for df in [values_df, states_df, stops_df]]
### combine data
tolerance = '100ms' #exact or '100ms'
data_joined = flint_df1.futureLeftJoin(flint_states, tolerance=tolerance)
data_joined.show()
+--------------------+---+----+-----+-----+
| time|F_1| F.2|types|state|
+--------------------+---+----+-----+-----+
| 2018-12-01 14:00:00.000| 22| 7.3| 0|False|
|2018-12-01 14:00:00.100| 38|71.3| 0|False|
|2018-12-01 14:00:00.260| 26| 7.9| 42| True|
+--------------------+---+----+-----+-----+
### cut into bins
data_binned = data_joined.groupByInterval(flint_stops.select('time'),
inclusion='begin') # automatically uses col 'time'
Troubleshooting suggests, that this has to do with the non-supported spark version and wrong build in which the Intervalize class is missing. After compiling a version for spark 2.4 the groupByInterval works without problems on a local cluster.
However, could you please document the necessary code changes of ts-flint to build a databricks version. Thanks.
I managed to build it for the current databricks runtimes (5.3 and above; tested with 6.1) Steps to reproduce: 0. Requirements: install sbt
-
git clone official ts-flint repository with latest commit 40fd887.
-
cd into cloned repo top-level-dir and check specified versions in build.sbt e.g. changes made: line 34:
scalaVersion := "2.11.12",line 50 (windows specific):"Local Maven Repository" at "file:///" + Path.userHome.absolutePath + "/.m2/repository",line 61:val spark = "2.4.4"line 65:val arrow = "0.12.0" -
add env variable (based on #71):
export TERM=xterm-color
- build from source (scala):
sbt assemblyNoTest
PR: #6