databricks-accelerators icon indicating copy to clipboard operation
databricks-accelerators copied to clipboard

groupByInterval not working [Spark: 2.4.4]

Open drahnreb opened this issue 6 years ago • 3 comments

Two joint dfs should be grouped by a third (clock) df. But Error is thrown: java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$

    724         with traceback_utils.SCCallSiteSync(self._sc) as css:
    725             tsrdd = self.timeSeriesRDD.groupByInterval(clock.timeSeriesRDD, scala_key,
--> 726                                                        inclusion, rounding)
    727         return TimeSeriesDataFrame._from_tsrdd(tsrdd, self.sql_ctx)
    728 

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o3361.groupByInterval.
: java.lang.NoClassDefFoundError: Could not initialize class com.twosigma.flint.rdd.function.group.Intervalize$
	at com.twosigma.flint.rdd.OrderedRDD.intervalize(OrderedRDD.scala:560)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.summarizeIntervals(TimeSeriesRDD.scala:1605)
	at com.twosigma.flint.timeseries.TimeSeriesRDDImpl.groupByInterval(TimeSeriesRDD.scala:1493)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:295)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:251)
	at java.lang.Thread.run(Thread.java:748)

Steps to reproduce

from datetime import datetime
import numpy as np

import ts.flint
from ts.flint import FlintContext
flintContext = FlintContext(sqlContext)

values = {'time': [1543672800000, 1543672800100, 1543672800260],
         'F_1': [22, 38, 26],
         'F.2': [7.3, 71.3, 7.9]}

states = {'time': [1543672800100, 1543672800200, 1543672800300],
         'types': ["0", "24", "42"],
         'state': ["False", "True", "True"]}

stops = {'time': [1543672800150, 1543672800200, 1543672800300, 43672800360]}

states_pd = pd.DataFrame(states, columns=states.keys())
values_pd = pd.DataFrame(values, columns=values.keys())
stops_pd = pd.DataFrame(stops, columns=stops.keys())
states_pd['time'] = pd.to_datetime(states_pd['time'], unit='ms', origin='unix')
values_pd['time'] = pd.to_datetime(values_pd['time'], unit='ms', origin='unix')
stops_pd['time'] = pd.to_datetime(stops_pd['time'], unit='ms', origin='unix')

states_df = spark.createDataFrame(states_pd)
values_df = spark.createDataFrame(values_pd)
stops_df = spark.createDataFrame(stops_pd)

# Convert to Flint DataFrame
flint_df1, flint_states, flint_stops = [flintContext.read \
              .option("isSorted", False) \
              .option("timeColumn", 'time') \
              .option("timeUnit", 'ms') \
              .dataframe(
                # https://github.com/twosigma/flint:
                # 'To create a TimeSeriesRDD from a DataFrame, you have to make sure the DataFrame contains a column named "time" of type LongType'
                df.withColumn("time", (df.time.cast('double')*1000).cast("long"))
              ) for df in [values_df, states_df, stops_df]]

### combine data
tolerance = '100ms' #exact or '100ms'
data_joined = flint_df1.futureLeftJoin(flint_states, tolerance=tolerance)

data_joined.show() +--------------------+---+----+-----+-----+ | time|F_1| F.2|types|state| +--------------------+---+----+-----+-----+ | 2018-12-01 14:00:00.000| 22| 7.3| 0|False| |2018-12-01 14:00:00.100| 38|71.3| 0|False| |2018-12-01 14:00:00.260| 26| 7.9| 42| True| +--------------------+---+----+-----+-----+

### cut into bins
data_binned = data_joined.groupByInterval(flint_stops.select('time'),
                                                 inclusion='begin') # automatically uses col 'time'

drahnreb avatar Nov 30 '19 21:11 drahnreb

Troubleshooting suggests, that this has to do with the non-supported spark version and wrong build in which the Intervalize class is missing. After compiling a version for spark 2.4 the groupByInterval works without problems on a local cluster.

However, could you please document the necessary code changes of ts-flint to build a databricks version. Thanks.

drahnreb avatar Dec 03 '19 16:12 drahnreb

I managed to build it for the current databricks runtimes (5.3 and above; tested with 6.1) Steps to reproduce: 0. Requirements: install sbt

  1. git clone official ts-flint repository with latest commit 40fd887.

  2. cd into cloned repo top-level-dir and check specified versions in build.sbt e.g. changes made: line 34: scalaVersion := "2.11.12", line 50 (windows specific): "Local Maven Repository" at "file:///" + Path.userHome.absolutePath + "/.m2/repository", line 61: val spark = "2.4.4" line 65: val arrow = "0.12.0"

  3. add env variable (based on #71):

export TERM=xterm-color
  1. build from source (scala):
sbt assemblyNoTest

compiled .jar that runs under current databricks runtimes

drahnreb avatar Dec 04 '19 13:12 drahnreb

PR: #6

drahnreb avatar Dec 04 '19 13:12 drahnreb