Snowflake and Saturn Cloud Partner to Bring 100x Faster Data Science to Millions of Python Users

January 7, 2021 Mike Klaczynski

Snowflake and Saturn Cloud are thrilled to announce our partnership to provide the fastest data science and machine learning (ML) platform. Snowflake’s Data Cloud comprises a global network where thousands of organizations mobilize data with near-unlimited scale, concurrency, and performance. Saturn Cloud’s platform provides lightning-fast data science. Combined, our solutions enable customers to maximize their ML and data science initiatives.

This is especially great news for Python users who want to augment SQL for advanced analytics and ML. It allows users of Snowflake’s Data Cloud to take advantage of Dask, a Python-native parallel computing framework, and RAPIDS, a GPU data science framework that parallelizes data across clusters with Dask. 

Currently, speeds using Saturn Cloud compared to speeds using traditional tooling such as serial Python and Apache Spark are generally 100x faster. AI leader Senseye recently copublished an article stating they could reduce model runtime from 60 days down to just 11 hours (more than a 130x speed improvement) using Saturn Cloud.

An Example with NYC Taxi Data

To start, let’s use an example that is representative of ML problems everywhere. In this example, we want to understand how to optimize ride sharing in New York City. As you would expect, there are millions of rides to analyze, and processing the data would usually take several days. By using Dask with Saturn Cloud we can process three years of NYC taxi data stored in Snowflake in just minutes. We do the feature engineering in Snowflake, then load the data into a distributed GPU DataFrame with dask_cudf, and train a Random Forest model on GPUs using cuML. For all the details and to try for yourself, check out the full notebook on GitHub.

Connecting to Dask and Snowflake

Let’s look at how it works. We’re using a 20-node Dask cluster using g4dn.xlarge instances on AWS. A Saturn Cloud cluster is provisioned with the following code: 

Note: If your Dask clusters are hosted with a different platform, you may need to adapt this code.

from dask.distributed import Client
from dask_saturn import SaturnCluster

n_workers = 20
cluster = SaturnCluster(n_workers=n_workers, scheduler_size='g4dnxlarge', worker_size='g4dnxlarge')
client = Client(cluster)

Next, we’ll connect to Snowflake. In our notebook, we’re pulling in the relevant connection information from environment variables. You will need to replace these values with your own Snowflake connection information.

SNOWFLAKE_ACCOUNT = os.environ['SNOWFLAKE_ACCOUNT']
SNOWFLAKE_USER = os.environ['SNOWFLAKE_USER']
SNOWFLAKE_PASSWORD = os.environ['SNOWFLAKE_PASSWORD']

SNOWFLAKE_WAREHOUSE = os.environ['SNOWFLAKE_WAREHOUSE']
TAXI_DATABASE = os.environ['TAXI_DATABASE']
TAXI_SCHEMA = os.environ['TAXI_SCHEMA']

conn_info = {
    'account': SNOWFLAKE_ACCOUNT,
    'user': SNOWFLAKE_USER,
    'password': SNOWFLAKE_PASSWORD,
    'warehouse': SNOWFLAKE_WAREHOUSE,
    'database': TAXI_DATABASE,
    'schema': TAXI_SCHEMA,
}

Loading Data and Performing Feature Engineering

We’re going to use Snowflake to compute some predictive features, and then we’ll have each of our 20 Dask distributed workers load a piece of the resulting training set. The NYC taxi data includes the pickup time of each ride, and the data is fairly evenly distributed across days, weeks, and months. Our approach here is to equally divide the entire three-year time period into 20 chunks based on the day of the ride, and then load each chunk into a Dask worker using concurrent SQL queries against Snowflake. First, we write a function to calculate our date chunks.

def get_dates(start, end):
    date_query = """
    SELECT
        DISTINCT(DATE(pickup_datetime)) as date 
    FROM taxi_yellow
    WHERE
        pickup_datetime BETWEEN %s and %s
    ORDER BY date
    """
    dates_df = conn.cursor().execute(date_query, (start, end))
    columns = [x[0] for x in dates_df.description]
    dates_df = pd.DataFrame(dates_df.fetchall(), columns=columns)
    return dates_df['DATE'].tolist()

dates = sorted(get_dates('2017-01-01', '2019-12-31'))
chunks = (len(dates) // n_workers) + 1
date_groups = dates[::chunks]
date_starts = date_groups
date_ends = date_groups[1:] + [dates[-1] + datetime.timedelta(days=1)]

After calculating the date chunks, we set up a Snowflake query that performs feature engineering, and we put in bindings that we will use later for grabbing the appropriate date chunks.

from dask import delayed

query = """
SELECT 
    pickup_taxizone_id,
    dropoff_taxizone_id,
    passenger_count,
    DIV0(tip_amount, fare_amount) > 0.2 AS high_tip,
    DAYOFWEEKISO(pickup_datetime) - 1 AS pickup_weekday,
    WEEKOFYEAR(pickup_datetime) AS pickup_weekofyear,
    HOUR(pickup_datetime) AS pickup_hour,
    (pickup_weekday * 24) + pickup_hour AS pickup_week_hour,
    MINUTE(pickup_datetime) AS pickup_minute
FROM taxi_yellow
WHERE
    DATE(pickup_datetime) >= %s and DATE(pickup_datetime) 

We’re using a subset of mathematical and date functions to highlight yearly, weekly, and daily seasonality (WEEKOFYEAR, DAYOFWEEKISO, and HOUR).

Now comes the fun part! Let’s load the data into a distributed GPU DataFrame with Dask and RAPIDS.

@delayed
def load(conn_info, query, date_start, date_end, meta=None):
    with snowflake.connector.connect(**conn_info) as conn:
        taxi = conn.cursor().execute(query, (date_start, date_end)).fetch_pandas_all()
        taxi.columns = [x.lower() for x in taxi.columns]
        if meta:
            taxi = taxi.astype(meta)
        taxi = cudf.from_pandas(taxi)
        return taxi
        
meta = {
    'pickup_taxizone_id': dtype('float32'),
    'dropoff_taxizone_id': dtype('float32'),
    'passenger_count': dtype('float32'),
    'high_tip': dtype('int32'),
    'pickup_weekday': dtype('float32'),
    'pickup_weekofyear': dtype('float32'),
    'pickup_hour': dtype('float32'),
    'pickup_week_hour': dtype('float32'),
    'pickup_minute': dtype('float32')
}

taxi_delayed = [
    load(conn_info, query, date_start, date_end, meta=meta) 
    for date_start, date_end in zip(date_starts, date_ends)
]
taxi = cudd.from_delayed(
    taxi_delayed, 
    meta
)

There’s a lot to unpack here, but we’ll walk through each step. Our load function is a Dask delayed function that decorates functions so they operate lazily. Rather than executing a function immediately, it defers execution, placing the function and its arguments into a task graph. Once the delayed function is run, each Dask node runs a parameterized query to retrieve a chunk of data. Snowflake will run these queries concurrently. The results are pulled into Python using fetch_pandas_all(). This is an optimized method which loads data from Snowflake directly using Arrow, so the results can be efficiently loaded into Pandas. Finally we turn the results into a GPU DataFrame.

The load function defines what needs to happen for one chunk of the larger DataFrame. We’re creating several of these calls, which return a chunk of the training data set, and then we’re passing them to dask_cudf.from_delayed, which creates a Dask GPU DataFrame out of many small GPU DataFrames.

You may also notice the meta parameter that we’re passing around. This isn’t strictly necessary; however, many Pandas routines infer data types based on the data. A chunk that has an integer field with no missing data may return that as an integer type, but another chunk that is missing some data points for that same field may cast it as a float. As a result, it’s useful to pass the metadata through and coerce types using metadata to ensure consistency across every chunk.

Snowflake Caching and Configuration

Snowflake has two primary caching mechanisms that are great for ML workloads. The first is the local disk cache. The local disk cache stores the underlying data on the local storage of the machines that make up a Snowflake virtual warehouse. The second is the result cache that stores the results of previous queries so they can be re-accessed quickly.

For ML workloads, both caches are useful. The local disk cache can speed up every query, especially as data sizes grow. And the result cache is useful if you’re re-executing the same query, such as when iterating on the ML model without changing the feature engineering, which can be very advantageous.

Snowflake tables can also be configured to use data clustering. In data clustering, one or more columns or expressions on the table can be designated as the clustering key. Using a clustering key to co-locate similar rows in the same micro-partitions enables several benefits, including improving scan efficiency in queries by skipping data that does not match filtering predicates.

In this blog post, we are disabling the result cache to demonstrate the performance improvement due to Dask (the local disk cache cannot be disabled), and we are clustering on PICKUP_DATETIME since that’s how we’ve chosen to partition the result set.

Training the ML Model

Now that we have our training data set loaded, we can train our model.

from cuml.dask.ensemble import RandomForestClassifier

taxi_train = taxi[features + [y_col]]
taxi_train[features] = taxi_train[features].astype("float32").fillna(-1)
taxi_train[y_col] = taxi_train[y_col].astype("int32").fillna(-1)

taxi_train = taxi_train.persist()

rfc = RandomForestClassifier(n_estimators=100, max_depth=10, seed=42)
_ = rfc.fit(taxi_train[features], taxi_train[y_col])

Here, taxi_train is a dask_cudf DataFrame and we’re importing RandomForestClassifier from the RAPIDS cuML package. As a result, the training happens entirely on the GPU. Note the call to .persist(), which tells Dask to perform the Snowflake queries and the data loading across the cluster and snapshots the results in GPU memory. If we don’t do this, the Snowflake queries would be executed each time the Random Forest model needed to pull some data.

Performance Results

The Saturn Cloud team previously did the same exercise with Apache Spark: loading data, performing feature engineering, and training a RandomForestClassifier. Spark took 38.5 minutes. We also loaded data and did feature engineering in Snowflake, and then trained the RandomForestClassifier in Saturn Cloud with Dask and RAPIDS. Using Snowflake and Saturn Cloud took 35 seconds. That’s a 60x speedup with minimal work, and it has the tremendous benefit of using Python end to end.

In terms of computational cost, the Snowflake and Saturn Cloud solutions are quite inexpensive because they run for such a short time. Snowflake charges for a minimum of one minute of usage and the X-Large warehouse we ran cost $0.53, and the 20 g4dn.4xlarge machines used for ML cost $0.12 for that period. The equivalent Spark cluster cost $7.19 for the 38 minutes it took to complete the workload.

Get Started Today

If you work in data science and ML, you can sign up for free Saturn Cloud and Snowflake accounts and start building models in minutes.

Join our webinar on Thursday, January 13 to see firsthand how Saturn Cloud and Snowflake make ML 100x faster, and take a look at future events here.

The post Snowflake and Saturn Cloud Partner to Bring 100x Faster Data Science to Millions of Python Users appeared first on Snowflake.

Previous Article
5 Best Practices for Integrating Data Science Into Your Marketing Analytics
5 Best Practices for Integrating Data Science Into Your Marketing Analytics

Personalization enables marketers to send hypertargeted content and offers that are more likely to drive pu...

Next Article
Snowflake Ventures Breaks Down Security Data Silos with Lacework
Snowflake Ventures Breaks Down Security Data Silos with Lacework

Companies operating in the cloud are increasingly viewing cybersecurity as a data problem. At Snowflake, a ...

×

Subscribe to email updates from the Snowflake Blog

You're subscribed!
Error - something went wrong!