An empirical study of group-by strategies in Julia
Update 2017 Nov 12
- Detailed
radixgroup
method which is faster than data.table for bits types. - Multi-threaded
radixsort
method is the fastest known method
About me
I am a Data Scientist with 10 years+ experience in Credit Risk Data Science/Analytics/Modelling hoping to join Julia Computing full time and work on data related projects such as JuliaDB.jl and DataFrames.jl.
The group-by data manipulation problem
Julia is meant to be fast but upon initial investigation R's data.table
is streets ahead of Julia in terms of group-by performance. So I wanted to investigate if Julia can beat R's data.table
with more optimized algorithms and data structures.
The TL;DR version is that
- Julia is faster than
data.table
if the group-by vector is of "bits type" e.g. Int an Float but not String - Multi-threaded radixsort method is the fastest method known so far
- Julia using indexed columns are faster than
data.tables
pre-indexed group by - Julia is much slower than data.table if the group-by is a string
- Group by using CategoricalArrays is much faster than
data.table
- Parallelized algorithm are slower probably due my inexperience in building these algorithms
The code
The code used to produce the timings can is here https://gist.github.com/xiaodaigh/fbdbbfd32062e33a20b1895af24e1542
sumby
with large number of groups
Focus on After an initial study of the performance of group-by algorithms, I found the biggest performance gap between Julia and data.table
is the sumby
case where the number of groups is large relative to the length of the group-by vector e.g. the number of groups is 2_500_000 for a group-by vector of the length 250_000_000.
I figured if I can solve this efficiently then it will teach me a lot about how to tackle other cases, hence my focus on this problem at this stage.
Let's illustrate the sumby
algorithm with this example:
byvec = [100, 100, 200, 300, 400, 400]
valvec = [1, 1, 1, 1, 1, 1]
sumby(byvec, valvec)
# should return Dict(100 => 2, 200 => 1, 300 => 1, 400 => 2)
this version of sumby
is actually implemened by the StatsBase.countmap
Using StatsBase
import StatsBase.countmap
import StatsBase.Weigths
byvec = [100, 100, 200, 300, 400, 400]
valvec = [1, 1, 1, 1, 1, 1]
countmap(byvec, weights(v1)
However, I found that countmap
is not as performant. Initially, I also implemented my own version of sumby
by studying the source codes of StatsBase.countmap
and SplitApplyCombine.groupreduce
. But later on, I found that Radixsort-based methods are the fastest and hence based sumby
on Radixsort.
DataFrame overhead is insignificant
I found there is not much overhead in picking the columns from a DataFrame and treating it as vector hence I have chosen to create algorithms for sumby
with the following function signature first
function sumby{T,S}(by::Vector{T}, val::Vector{S})
# some code...
end
Benchmark - data setup
I ran the following code to setup the test data in Julia
using SplitApplyCombine, FastGroupBy, BenchmarkTools
const N = Int64(2e9/8)
const K = 100
id6 = rand(1:Int64(round(N/K)), N) # the group-by vector
pool1 = [@sprintf "id%010d" k for k in 1:(N/K)]
id3 = rand(pool1, N) # the string group-by vector
v1 = rand(1:5, N)
and in R I ran
library(data.table)
N = 2e9/8
K = 100
DT <- data.table(
id6 = sample(N/K, N, TRUE), # small groups (int)
id3 = sample(sprintf("id%010d",1:(N/K)), N, TRUE),# small groups (string)
v1 = sample(5, N, TRUE) # int in range [1,5]
)
Radixsort-based algorithms
R's data.table
uses Radixsort as its default sorting algorithm, so I looked into if radixsort can help speed up group-by operations. Sure enough, there is an implementation of Radixsort in SortAlgorithms.jl
which I have based my algorithms on.
Radixsort and Radixgroup
I took the source code of the Radixsort function and repurposed it so that it sorts the group-by vector and the value vector in the same function. This method is the radixsort method.
Technically sumby
does not require the output to be sorted; hence it may be possible to make a more performant sumby
algorithm by avoiding a full sort. To understand how this can be achieved it's good to have a high-level understanding of Radixsort.
Step | High-level explanation of radixsort in SortingAlgorithms.jl |
---|---|
1 | Radixsort works by first sorting the array using counting sort on the first 11 binary digits (this is defined by SortingAlgorithms.RADIX_SIZE which equals 11). There are only 2048 = (2^11) possible values in the first 11 binary digits. |
2 | A counting sort works by looping through the elements and count how many elements fall into each of the 2048 buckets; and then |
3 | loop through the elements again and use the counts to assign each element to its appropriate position in a newly created array of the same size; this is done by creating a cumulative sum of the counts, assign an element to the new array using the cumulative sum of its bucket as the array position and then decrement the cumulative sum corresponding to that bucket, continue until all elements are assigned to the new array |
4 | "throw away" the original array and repeat for the next set of 11 binary digits on the new array. It continues in this way until all digits are used in the sorted. It can be proved that the resultant array is sorted at this point |
The insight here is that once it has done the sorting of the first 11 binary digits (Step 2 above), we have broken up the array into up to 2048 buckets already; we need not sort the whole array again, we can simply apply a sumby
to each of 2048 buckets (some of which may be empty) knowing that two elements that are in different buckets cannot end up in the same by-group. Using this idea and a couple of optimizations I have implemented a sumby_radixgroup
method, this is faster than the sumby_radixsort
method on large input size.
Benchmark - Radixsort-based algorithms
The radixsort and radixgroup sumby
is implemented in FastGroupBy.jl
using FastGroupBy
sumby_radixsort(id6, v1)
sumby_radixgroup(id6, v1)
The timings of Radixgroup sumby
is about 14.9 seconds which is faster than data.table
! The timings of Radixsort sumby
is about 18.9 seconds which is on par with data.table
.
In data.table
one can index on a column and sacrifice some setup time for lightning fast performance. This can be achieved as below
setkey(DT, id6)
DT[,sum(v1), id6]
And the timing is ~2-3 seconds, but the Radixsort method in Julia outputs the original vectors in sorted order, similar to setkey
above. I can apply a faster algorithm that assumes that elements belonging to the same group are organized contiguously (and being sorted is a special case of this condition) to do group sum in ~1-2 seconds beating data.table
's indexed method!
However, radix sort cannot deal with string types as it's not a bits type. One way around this is to hash the string using hash(a_string)
and sort on the hashed results and then check for collisions in the summation. However hash.(id3)
where id3
is a vector of length 250_000_000 took more than 1 minute; this is the lower bound for speed for this algorithm as you must hash every string.
In data.table
, the sum-by id3
only took about 35 seconds (pre-indexed is about ~3 seconds). The same in Julia using hashed-radix sort took 110 seconds so the performance gap for string group by is fairly significant unless you store the string as CategoricalArrays
(see benchmarks listed further ahead).
Benchmark - Multi-threaded radixsort
One obvious way to speed up the code is to apply sumby
on mutually-exclusive subarrays and aggregate the results using multi-threaded code. I found that multi-threaded version of radixsort sumby
is more performant than radixgroup.
On my machine, the default number of threads is 4, and the timings were 10.5 seconds which is a lot faster than R's data.table!
The function sumby_multi_rs
is implemented in the FastGroupBy.jl package. The function blockranges
and the overall structure was copied from this StackOverflow question.
function sumby_multi_rs{T, S<:Number}(by::AbstractVector{T}, val::AbstractVector{S})::Dict{T,S}
(starts, ends) = blockranges(nthreads(), length(by))
res = Dict{T,S}[]
@threads for (ss, ee) in collect(zip(starts, ends))
# @inbounds byv = @view by[ss:ee]
# @inbounds valv = @view val[ss:ee]
@inbounds push!(res, sumby_radixsort(by[ss:ee], val[ss:ee]))
end
szero = zero(S)
@inbounds res_fnl = res[1]
for res1 in res[2:end]
for k in keys(res1)
res_fnl[k] = get(res_fnl,k,szero) + res1[k]
end
end
return res_fnl
end
Benchmarks - dictionary-based algorithm
The first algorithm that I came up with (which is independently implemented in SplitApplyCombine.groupreduce
is to enumerate through zip(by, val)
and build up a hashtable where the keys are the unique values of by
and corresponding hashtable values are the cumulative sum of val
. This algorithm works for any data type.
One can also use DataStructures.jl
's Accumulators to achieve this but I found that SplitApplyCombine
's implementation to be more performant.
The timings for Julia vs data.table is 46 seconds vs 20 seconds.
The timings in R might be a bit misleading as I find that R will hang for a few seconds before I can issue the next command, possibly due to some garbage collection in the background.
Benchmark - algorithms that require prior knowledge
I know that id6
contains the integers from 1 to 2_500_000; therefore, I can create an array of size 2_500_000 to store the result. Accessing an element of an array and adding to it should be a lot faster than hashing (however fast the hashing may be), also you need not resize the array (which I assume to be expensive), and manage the overheads of a Dict
.
import Iterators.filter
function sumby_intrange{T<:Integer, S}(by::Vector{T}, val::Vector{S}, bylow::T, byhigh::T)
lby = byhigh - bylow + 1
res = zeros(T, lby)
for (byi,vali) in zip(by, val)
@inbounds res[(byi-bylow) + 1] += vali
end
filter(x -> x[2] != 0, zip(bylow:byhigh,res))
end
@time res = sumby_intrange(id6, v1, 1, 2_500_000);
The timing for Julia is now 7 seconds.
However, this approach is "cheating" but also shows glimpses of hope. Of course in real life settings we may not know anything about the data until we first interrogate it. However, there is no reason why we shouldn't keep metadata about our dataset once we've gathered it.
Implications for persistent data manipulation framework design |
---|
Therefore I think JuliaDB should actively store metadata regarding the column and provide functionalities to profile the data. The more you know about the data, the easier it is to tailor the approach to achieve peak performance. |
The other point worth mentioning is that it's not obvious how to "cheat" like this in the R world. It's harder in R to define your alternativvector type, and you will probably need to go to C/C++ to code such an algorithm and link it to R. The advantage of Julia is that a novice Julia coder like myself can roll a reasonably performant solution using Julia very quickly.
Benchmark - PooledArrays (and therefore CategoricalArrays)
Both PooledArrays.jl and CategoricalArrays.jl stores a pool
array representing unique levels and uses integers that point to the pool
array to represent a vector. We can apply the trick above to perform sumby
relatively quickly. But it requires creating the vector as a CategoricalArray/PooledArray, which requires prior knowledge. Also converting an array to PooledArray is quite slow at the moment.
function sumby{S}(by::Union{PooledArray, CategoricalArray}, val::AbstractArray{S,1})
l = length(by.pool)
res = zeros(S, l)
refs = Int64.(by.refs)
for (i, v) in zip(refs, val)
@inbounds res[i] += v
end
return Dict(by.pool[i] => res[i] for i in 1:l)
end
const N = Int64(2e9/8); const K = 100
using PooledArrays
import PooledArrays.PooledArray
pool = [1:Int64(round(N/K))...]
@time id6_pooled = PooledArray(PooledArrays.RefArray(rand(Int32(1):Int32(round(N/K)), N)), pool)
@time sumby(id6_pooled, v1)
The timing is similar at 9 seconds.
The above optimization can be performed in data.table
as well, and I have initiated a feature request on its Github page.
Benchmarks - Parallelized sum
In Julia, one can use SharedArrays and write algorithm that work on chunks of the array. In this case, we can let n
workers work on a distinct range of the arrays each. The workers will collectively return n
Dicts which we can reduce using an algorithm into one Dict. Although Chris' comment pointed out that it's not optimal, I am yet to understand fully why that might be the case and this is the best approach I can come up with for now.
However, the time taken to "warm up" can be substantial as it requires converting the arrays to SharedArrays and it takes a bit of time to fire up Julia session using addprocs()
; and during the run it can make other concurrent tasks (e.g. watching Youtube Julia con video) sluggish.
@time addprocs() # create Julia workers
@time using FastGroupBy
@everywhere using FastGroupBy
@everywhere using SplitApplyCombine
@time psumby(id6,v1) # 35 seconds
The timing is ~35 seconds
If you pre-convert the vectors into SharedArrays, then you can speed it to around ~29 seconds and this is slower than the radix sort method. I would say parallel processing is not as beginner friendly.
@time as = SharedArray(id6);
@time bs = SharedArray(v1);
@time psumby(as,bs)
Conclusion
Julia's promise is clear as a beginner like myself can roll a few novel solutions and test them out. This is not easily achievable with R as you would need C/C++ knowledge to build your own solutions. (Update: Managed to match data.table
's performance on bits type group-by).
As PCs get more and more powerful, we will start to see laptop/desktop computers with 1TB of RAM becoming commonplace for data manipulations in several years time. Therefore many tasks that would have been considered "big data" (or just "medium data") will gradually become just "small data" problems, and Julia's superiority in performance will truly shine in such use-cases. As it stands, there are a few spots where Julia does not have fantastic performance, but I am confident that many of these problems can be solved using Julia without having to resort to C/C++.
One thing I found is that you need lots of time and patience to dig into Julia's ecosystem to find gems that can help you optimize performance. This means the community hasn't coalesced around a couple of tried-and-tested packages (like dplyr
and data.table
) and is symptomatic of two things
- Julia's data ecosystem is still relatively immature; and
- Julia is quite a bit more flexible than R in terms of allowing the average user to roll their own performant solutions; hence there are a lot of distinct solutions that try to solve the same problems in different ways. It will take time for clear winners to emerge and when they do they will be excellent solutions I am sure, but the current state is not that beginner friendly.
Overall I am very optimistic about the future of Julia because I find many smart people working hard in the data domain, and hopeful one day I can wholeheartedly recommend Julia as THE data manipulation platform. But for now, that recommendation still goes to R with data.table
because it is faster in many cases and has a richer data ecosystem around it.
This is a great post! JuliaDB tables do counting sort when the categories are integers within a small range. But would be nice to expand support for performant sortperm to be more comprehensive.
You can also cache the computed permutations in join operations by passing
cache=true
, so that many joins by the same keys are faster. Should be easy to extend this behavior to groupby as well. So in essence a groupby call will do both groupby and set_index (without actually returning a re-indexed table though.)Cheers!
So your conclusion is that it’s faster to first sort the data using radix sort, and only compute the group sums in a second step, rather than accumulating the sums on the fly? That’s what we need to do anyway for a generic
groupby
function forDataFrame
.An additional difficulty is to support multiple grouping columns, which are possibly of different types. Ideally we would be able to choose the most efficient method for each column type. String columns will necessarily be relatively slow compared with integers, but
CategoricalArray
/PooledArray
columns should be as fast or faster than integers since we know that the range of possible values or relatively limited.As an implementation detail, you might be able to get faster code by putting the
@inbounds
beforefor
rather than inside the loop, since the iteration protocol currently does not apply@inbounds
automatically (though it should). This could enable SIMD instructions and make a significant difference in some cases.Yes that is correct. Glad to hear that’s where dataframes’s backend is headed.
I always wonder if there’s an even more efficient data structure that can achieve even faster group by. Theoretically sorting is not needed in the concept of group-by operations as you can have groups that aren’t sorted in any way. Of course sorting implies grouping will be easier, but grouping does not imply sorted is the point. So the relationship is not an iff one.
And yes the algorithm for PooledArrays is very fast, much faster than RadixSort.
And thanks for the tips.
I would say really fast algorithms for hashing of arbitrary zipped types is a promising approach for multi-column group-bys. Currently the hashing algorithm for string is really slow and hence the bad performance. At worst you have to hash it twice to ensure there’s infinitesimal chance of data being hashed to the same key twice.
Is inlining the iterator via the @iter macro still required to get the full efficiency out?
https://github.com/JuliaCollections/IterTools.jl#the-itr-macro-for-automatic-inlining-in-for-loops
In general the only safe solution is what
Dict
does (and whatDataFrames
uses, inspired byDict
): once you’ve found the slot corresponding to the hash, check that the string is actually equal to that of the group. That implies more work, but we really can’t afford being wrong even 0,1% of the time. Anyway if you want performance you shouldn’t use strings, butCategoricalArray
orPooledArray
.Dict also relies on hash hence really fast hashing algorithms for strings will be beneficial.
also once you hash it and radix sort on the hashed values you still have a chance to look for collisons in the resultant hash when you are doing the reduce step, and make corrections there which will be quite efficient
Overall nice writeup, but some issues.
Yes. There’s nothing in the architecture of Julia that prevents your algorithm from being 1x from C. It’s up to the package developers and you to implement an algorithm which is as good as the C algorithms that R’s data.table is calling. There really isn’t anything to prove here because it’s obvious from the way that Julia is designed (i.e. all functions are static, so it gets statically typed and statically compiled) and it’s been shown in benchmarks. But coming up with good data algorithms is another story. It’s pretty clear that R’s algorithms got refined over the years, and it’ll take matching their clever optimizations to get 1x. But its code is GPL, right? So it’s effectively “closed source”, so it’ll take a bit of poking at it to get it right without looking at the source.
Other points to note is that you didn’t need to (and shouldn’t have) used SharedArrays in that parallel example. The data can easily be split in that example, so splitting an array (like to a DistributedArray, or just chunk it yourself) is the right solution. SharedArrays is not made for this, and of course it’s slow to keep different portions in sync when you don’t need to.
No you don’t. It auto-specializes. This:
function sumby_index{T,S}(index::Dict{T, Vector{Int64}}, val::Vector{S})
can be duck-typed
function sumby_index(index, val)
and get the same performance (and of course you don’t need to go this far, you can more abstractly type though).
I thought in my example SharedArray is good because each worker is accessing only one part of the array achieving parallelism.
No.
See https://docs.julialang.org/en/latest/manual/parallel-computing#man-shared-arrays-1
It’s exactly what you don’t want: performance hit which allows each process to access every part of the array.
Please avoid this kind of GPL bashing, that’s nonsense! Yes, if you want to release your code under the MIT or BSD licenses, you can’t effectively base your work on GPL code, and that has indeed been a problem for Julia packages in several occasions.
But the situation is totally different from closed source software. First, obviously, GPL code can be reused and modified freely by anyone (which is the whole point). Second, you can always ask somebody to read the GPL code and write a general summary of the algorithms it uses, and then write your MIT/BSD-licensed code based only on that description. That’s much easier to do than reverse-engineering closed-source software, which is often totally forbidden by EULAs.
It wasn’t GPL bashing, it’s just a neutral fact about the development process. You agreed to what I said: we can’t use the GPL source in order to build our algorithms. Of course it’s different than closed source, which is why I put “effectively” and then closed source in quotes, but in this case you agreed that it’s similar to closed source in that we cannot look directly at the source to understand the algorithm (other than a two-person loophole which you describe where someone writes up the algorithm and the other implements it…) which is exactly what I was describing. You can either look at it as a pro or con for GPL, that’s up to you.
actually, this won’t be the case much longer. see
https://github.com/Rdatatable/data.table/pull/2456