1. This is truly impressive work from AWS. Patrick Ames began speaking about this a couple years ago, though at this point the blog post is probably the best reference. https://www.youtube.com/watch?v=h7svj_oAY14
2. This is not a "typical" Ray use case. I'm not aware of any other exabyte scale data processing workloads. Our bread and butter is ML workloads: training, inference, and unstructured data processing.
3. We have a data processing library called Ray Data for ingesting and processing data, often done in conjunction with training and inference. However, I believe in this particular use case, the heavy lifting is largely done with Ray's core APIs (tasks & actors), which are lower level and more flexible, which makes sense for highly custom use cases. Most Ray users use the Ray libraries (train, data, serve), but power users often use the Ray core APIs.
4. Since people often ask about data processing with Ray and Spark, Spark use cases tend to be more geared toward structured data and CPU processing. If you are joining a bunch of tables together or running SQL queries, Spark is going to be way better. If you're working with unstructured data (images, text, video, audio, etc), need mixed CPU & GPU compute, are doing deep learning and running inference, etc, then Ray is going to be much better.
The paper mentions support for zero-copy intranode object sharing which links to serialization in the Ray docs - https://docs.ray.io/en/latest/ray-core/objects/serialization...
I'm really curious how this is performant - I recently tried building a pipeline that leveraged substantial multiprocessing in Python, and found that my process was bottlenecked by the serialization/deserialization that occurs during Python multiprocessing. Would love any reading or explanation you can provide as to how this doesn't also bottleneck a process in Ray, since it seems that data transferred between workers and nodes will need to serialized and deserialized.
Thanks in advance! Really cool tool, hopefully I'll be able to use it sooner rather than later.
One of the key things is to make sure the serialized object is stored in a data format where the serialized object does not need to be "transformed" in order to access it. For example, a numpy array can be created in O(1) time from a serialized blob by initializing a Python object with the right shape and dtype and a pointer to the right offset in the serialized blob. We also use projects like Apache Arrow that put a lot of care into this.
Example in more detail:
Imagine the object you are passing from process A to process B is a 1GB numpy array of floats. In the serialization step, process A produces a serialized blob of bytes that is basically just the 1GB numpy array plus a little bit of metadata. Process A writes that serialized blob into shared memory. This step of "writing into shared memory" still involves O(N) work, where N is the size of the array (though you can have multiple threads do the memcpy in parallel and be limited just by memory bandwidth).
In the deserialization step, process B accesses the same shared memory blob (process A and B are on the same machine). It reads a tiny bit of metadata to figure out the type of the serialized object and shape and so on. Then it constructs a numpy array with the correct shape and type and with a pointer to the actual data in shared memory at the right offset. Therefore it doesn't need to touch all of the bytes of data, it just does O(1) work instead of O(N).
That's the basic idea. You can imagine generalizing this beyond numpy arrays, but it's most effective for objects that include large numerical data (e.g., objects that include numpy arrays).
There are a bunch of little details to get right, e.g., serializing directly into shared memory instead of creating a serialized copy in process A and then copying it into shared memory. Doing the write into shared memory in parallel with a bunch of threads. Getting the deserialization right. You also have to make sure that the starting addresses of the numpy arrays are 64-byte aligned (if memory serves) so that you don't accidentally trigger a copy later on.
EDIT: I edited the above to add more detail.
Fun fact, very early on, we used to create one mmapped file per serialized object, but that very quickly broke down.
Then we switched to mmapping one large file at the start and storing all of the serialized objects in that file. But then as objects get allocated and deallocated, you need to manage the memory inside of that mmapped file, and we just repurposed a malloc implementation to handle that.
[1] https://github.com/ray-project/ray/blob/21202f6ddc3ceaf74fbc...
I've also looked at ray for running data pipelines before (at much much smaller scales) for the reasons you suggest (unstructured data, mixed CPU/GPU compute).
One thing I've wanted is an incremental computation framework (i.e., salsa [1]) built on ray so that I can write jobs that transparently reuse intermediate results from an object store if their dependents haven't changed.
Do you know if anyone has thought of building something like this?
I agree this is a very interesting area to consider Ray for. There are lots of projects/products that provide core components that could be used but there’s no widely used library. It feels like one is overdue.
But I'm not aware of anything exactly like what you're referring to!
The scenario where a Ray task forks is probably not very well supported. You can certainly start a subprocess from within a Ray task, but I think forking could easily cause issues.
You can definitely use Ray + Jax, but you probably need to avoid forking a process within a Ray worker.
Must be good enough if you're willing to dogfood it though?
Data processing workloads are quite common on Ray, especially with unstructured data.
Also, I work on Ray, which is the underlying framework used here, but all the work in the post was done by the Amazon team.
This is not badmouthing either project just an observation and if you architected one task you would be good at attacking the same problem better .
Mesos was a derivative idea from some sporadic idea from 2 level scheduling from inside Google based on mapreduce
Mesos was wrong from day one, they thought they have the right idea, but was really caught up by a Small group of Google engineers who happened to be from academia. These engineers were very good at having theoretically nice idea
In the time mesos was invented, Google had its own mesos, which is a similar project by learning wrong lesson from Borg. That thing is Omega.
Eventually everyone decided that Borg was right all along, thus the kubernetes
Quoting from the original spark paper:
> Spark is built on top of Mesos [16, 15], a “cluster operat- > ing system” that lets multiple parallel applications share > a cluster in a fine-grained manner and provides an API > for applications to launch tasks on a cluster
https://people.csail.mit.edu/matei/papers/2010/hotcloud_spar...
Note how Matei Zaharia - the inventor of spark - is also on the mesos paper:
The need for this pops up for nearly every large scale data processing enterprise- with k8s replacing mesos, yarn, and other systems as the cluster scheduler du-jour.
One of the big advantages of a service scheduler versus a batch queue is that you can implement a batch queue on top of a service scheduler much more easily than you can implement a service scheduler on top of a batch queue.
As a layman, I imagine lots of it loses relevancy very quickly, e.g Amazon sales data from 5 years ago is marginally useful to determining future trends and analyzing new consumer behavior regimes?
I do tend to agree data from five years ago is rarely relevant BUT our business is still using for some BI purposes data from the fiscal year before COVID as a comparison baseline for certain analytics/business processes which have been slow to reach pre-COVID levels of performance. So that means we are now using data 6 years old, comparing this year to that pre-COVID year for certain analytics!
Related, I rather enjoyed reading this other thread from June: "Ask HN: Is KDB a sane choice for a datalake in 2024?" https://news.ycombinator.com/item?id=40625800
The trick with doing compaction in Python was to ensure that the most performance-sensitive code was delegated to more optimal C++ (e.g, Ray and Arrow) and Rust (e.g., Daft) code paths. If we did all of our per-record processing ops in pure Python, compaction would indeed be much slower.
Edit: Nvm, I kept reading! Thanks for the interesting post!
I had no idea anything at AWS had that long of an attention span.
It's funny and telling that in the end, it's all backed by CSVs in s3. Long live CSV!
Also, we mostly have Parquet data cataloged in S3 today, but delimited text is indeed ubiquitous and surprisingly sticky, so we continue to maintain some very large datasets natively in this format. However, while the table's data producer may prefer to write delimited text, they are almost always converted to Parquet during the compaction process to produce a read-optimized table variant downstream.
CSV is only good for append only.
But so is Parquet and if you can write Parquet from the get go, you save on storage as well has have a directly queryable column store from the start.
CSV still exists because of legacy data generating processes and dearth of Parquet familiarity among many software engineers. CSV is simple to generate and easy to troubleshoot without specialized tools (compared to Parquet which requires tools like Visidata). But you pay for it elsewhere.
1. Dynamically typed (with type affinity) [1]. This causes problems with there are multiple data generating processes. The new sqlite has a STRICT table type that enforces types but only for the few basic types that it has.
2. Doesn't have a date/time type [1]. This is problematic because you can store dates as TEXT, REAL or INTEGER (it's up to the developer) and if you have sqlite files from > 1 source, date fields could be any of those types, and you have to convert between them.
3. Isn't columnar, so complex analytics at scale is not performant.
I guess one can use sqlite as a data interchange format, but it's not ideal.
One area sqlite does excel in is as a application file format [2] and that's where it is mostly used [3].
[1] https://www.sqlite.org/datatype3.html
We love Ray, and are excited about the awesome ecosystem of useful + scalable tools that run on it for model training and serving. We hope that Daft can complement the rest of the Ray ecosystem to enable large scale ETL/analytics to also run on your existing Ray clusters. If you have an existing Ray cluster setup, you absolutely should have access to best-in-class ETL/analytics without having to run a separate Spark cluster.
Also, on the nerdier side of things - the primitives that Ray provides gives us a real opportunity to build a solid non-JVM based, vectorized distributed query engine. We’re already seeing extremely good performance improvements here vs Spark, and are really excited about some of the upcoming work to get even better performance and memory stability.
This collaboration with Amazon really battle-tested our framework :) happy to answer any questions if folks have them.
Eg Ray, Databricks, Notebooks etc
In short, after evaluating our options, Ray seemed to strike the best balance between the one efficiency extreme of, say, building out custom "compaction-optimized" hardware/clusters, and the other maintainability extreme of just letting the latest managed cloud service run a 1-liner SQL statement for us without ever looking under the hood.
Regardless, I expect both our existing solution and the distributed compute frameworks leveraged to deliver it to continue to evolve over time.
- ~$1691/hour on demand
- ~$1065/hour reserved
- ~$521/hour spot
Not including any related data transfer costs.
I did not even realize it was that viable a substitute for straight data processing tasks.
Today, our immediate next goal for DeltaCAT is to ensure that the compactor, and similar procedures for Ray, can run on Apache Iceberg. So, if you're an Iceberg user relying on procedures like Spark's "rewrite_data_files" and/or "rewrite_positional_delete_files" to compact your datasets today, then DeltaCAT will let you easily run similar compaction procedures on Ray to realize similar efficiency/scale improvements (even if it winds up delegating some of the work to other projects like PyIceberg, Daft, etc. along the way).
Going forward, we'd like DeltaCAT to also provide better general-purpose abstractions (e.g., reading/writing/altering large datasets) to simplify writing Ray apps in Python that work across (1) different catalog formats like Iceberg, Hudi, and Delta and (2) different distributed data processing frameworks like Ray Data, Daft, Modin, etc.
From the perspective of an internal DeltaCAT developer, another goal is to just reduce the maintainability burden and dev hours required to write something like a compactor that works across multiple catalogs (i.e., by ensuring that all interfaces used by such a procedure can be readily implemented for multiple catalog formats like Iceberg, Hudi, Delta, etc.).
Ray allowed them to optimize elements that spark didn’t, and that was what improved performance, not that spark itself was slow.
Does the sales team know about this? /jk
I have seen dozens of big ticket “we saved this customer $xxxK/year” posts on slack and other internal venues, the customer obsession is real.
For some organisations, it will make sense to internalise and DIY everything. For many others, AWS/equivalent enables them to break down internal silos (instead of having to file a ticket to get storage team to provision storage for your new fancy RabbitMQ cluster, multiplied by 10 for networking, security AV bullshit, etc.) and just consume everything as a service. It's much faster, and in some cases even maybe cheaper.
Cf. Dropbox that started on S3, and a decade afterwards, migrated to self-managed datacenters.
Unclear if it's in the best interests of anyscale to promote Ray as a general purpose cluster productivity tool, even if it's good at that more general use case.
Our intention from the start was for Ray to be general purpose. And the core Ray APIs are quite general (basically just scheduling a Python function somewhere in a cluster or instantiating a Python class as a process somewhere in the cluster).
We had AI use cases in mind from the start, since we were grad students in AI. But the generality has really been important since AI workloads encompass a huge variety of computational patterns (allreduce style communication patterns on GPUs for training, embarrassingly parallel data processing workloads on spot instances, and so on).
If you're predicting for each user, you need all of the data.
And generally you probably wouldn't want to sample too much for BI as it could lead to people making wrong decisions.
But yeah, in general sampling rocks and is super effective.
Like, if the current latency is ~60 minutes for 90% of updates, will it ever be better than that? Won’t it just slowly degrade until the next multi-year migration?
PS: this article was infuriating to read on iPad - it kept jumping back to the top of the page and couldn’t figure out why