Lately I have been considering potential designs for encapsulating time series operations in Apache Spark. The closest thing currently available is the IndexedRDD class, but my needs go beyond that and are more time seriesspecific  although extending that class would be a viable option.
My first architectural question was: is there a need for an RDD version of univariate time series? In practice, the need/advantage to distributing a single time series is a tough sell. Extremely rarely does a single time series not fit in local memory, and more importantly: I'm not convinced that many common operations on single time series (lagging, interpolating, differencing, etc.) would really benefit that much from distributed computing. Instead, the biggest bang for one's buck would occur when distributing a whole set of time series for running more expensive calculations on these. At least, it appears to be this way from the business cases I have been personally dealing with so far. Nonetheless, I came to the conclusion that it wouldn't hurt to have an RDD version, for the simple reason that the inputs/outputs to/from time series will generally be distributed. By that I mean: within a Spark environment, most data sources will already be providing RDDs (e.g. Spark Streaming dstreams, or loading Parquet files from a HDFS). The outputs (e.g. passing these time series to MLlib functions) will also tend to require RDDs. Bottom line: when in Rome, do as the Romans do. Then the second consideration was: since we will often need to combine individual time series into a coherent, timebased data set (e.g. for clustering or regression purposes), what internal representation would be optimal? Columnbased (where the individual time series are stored "separately" from each other and for the time dimension), such as: RDD[TimeSeriesRDD], where TimeSeriesRDD would be a single time series? or rowbased, such as: RDD[(Datetime, Vector)] (where all data points for a given time instant are stored together)? To answer that question I looked at the different kinds of operations that will be performed, to determine which data representation would be optimal for each one. I came up with two fundamentally distinct categories: 1. "Per time series" operations: lagging, interpolating, differencing, etc. These are operations that can be applied to a single time series in and of itself, regardless of its relation in time to the other variables in the dataset. These kinds of operations are more efficiently done in columnar representation. For example: lagging a time series is quick when all you have is a time vector, and a data vector, separate from each other. You drop a few elements at the end of the time vector, and drop the same amount of elements at the other end of the data vector, and you're done. But if instead you are storing the data as RDD[(Datetime, Vector)] (or some equivalent format), you will have to map every single pair's value to its previous value in time (which you will have to lookup using the time index). The latter is clearly more computationally expensive. 2. "Per time instant" operations: regression and prediction, clustering, synchronizing, etc. These are operations that can only be applied on a given timegrouped data set as a whole, because they relate to the value of all variables at each given instant in time. These operations are more efficiently done in rowbased representation. If your data is in columnar representation, it is literally impossible to do these things unless you first join the time series on the time dimension (thereby transposing them to rowbased representation). So it became clear to me that 2 distinct data structures are required. Given that transposing from columnar to rowbased representation is relatively expensive, it is important to clarify the recommended usage pattern/philosophy for these 2 data structures. The idea is as follows: first acquire the individual time series separately from your data source(s), and apply the necessary preprocessing (lagging, getting aggregated information such as min/max/average, etc.). Then, when you're ready to start running algorithms on the multiple time series as a whole set (each variable being "grouped" with the other ones for its given instant in time), construct the TimeSeriesEquationRDD (or however you would call it) from the individual TimeSeriesRDD through some factory method, constructor, or helper class of some kind. This will transpose the individual time series into a rowbased representation and, simultaneously (almost by accident in fact, given that this will be an inner join operation), it will synchronize these, such that any superfluous data points on any of the individual time series will be dropped. Such would be the recommended usage and, I believe, fits most business cases. The details of implementation I have yet to figure out  such as whether the time key should be a Joda time object, or a Long (to leverage the IndexedRDD) with a separate LongtoDatetime object mapping for "human representation" of time instants, etc. Also the question of custom optimal partitioning and custom algorithms for time series manipulation remains to be settled. This is essentially a work in progress, and I may yet discover fundamental flaws in this architecture as I gradually put it to the test in my realworld application.
0 Comments
I just finished implementing the Augmented DickeyFuller test using Scala and Apache Spark. The objective of the ADF test is, given some time series X(t), to determine whether this time series is stationary (i.e. revolves around a constant or slowmoving mean) or nonstationary (i.e. the expected value of the next innovation in the time series is equal to the last previous one, rather than some fixed mean). This is generally referred to as checking for a unit root. The intuition and derivation follows. Let's start by describing our time series as a function of a lag of itself (+ a constant alpha to allow for an intercept in the regression). If X(t) is nonstationary, then b will be equal to 1 (which is what we call a unit root). If X(t) is meanreverting, b will be less than 1. If X(t) is perfectly stationary, b will be equal to 0. So immediately we see that the DickeyFuller test will be about testing for the value of b. Our null hypothesis will be that b = 1 (nonstationary), and our alternative hypothesis will be that b < 1 (stationary). We can't test for b as is, because X(t) and X(t  1) are integrated (a.k.a I(1), a.k.a. nonstationary) under the null hypothesis, and when a variable is integrated, the Central Limit Theorem does not apply, so the traditional coefficient estimation techniques can't be used. Instead, we differentiate the equation, and obtain the following: This formulation is more appropriate for regression analysis: the dependent variable is now stationary. We must transform our hypothesis accordingly, because now it is about p = b  1, not b itself: Null hypothesis: p = 0 (because b  1 = 1  1) Alternative hypothesis: p < 0 X(t  1) itself is still integrated under the null hypothesis (thus the estimator for p would not follow a tdistribution), but Dickey & Fuller have gone through the trouble of building the distribution table appropriate for this test. So we are simply going to calculate the tstatistic for this regression, and compare against their table to see if we reject the null hypothesis or not. So we run a linear regression to obtain the estimated p coefficient, and plug it in to get the corresponding tstatistic. In the SE(p) equation, y hat represents the fitted values of delta X in the main regression, and y represents the observed values thereof. X bar is the average of x values, being the righthand side X variable. The 95% confidence DF statistic is 2.87 for a dataset of 500 or more values. If the tstatistic < 2.87, you can reject the null hypothesis and conclude at the 5% significance level that the time series is indeed stationary. But that's just the ordinary DickeyFuller test. What if your time series is indeed mean reverting, but the mechanics of it is more complicated (higher order lags are autocorrelated) than correlation with the previous lag? This is where the Augmented version comes in: In other words, you simply add up to n lags of the delta of X to your regression equation.
Note that it is possible to automatically find the optimal maximum lag order n using statistical significance tests, but this may be material for another post... 
AuthorSimon Ouellette Categories
All
Archives
March 2018
