MapR Data Platform offers significant advantages over any other tool on the big data space. MapR-DB is one of the core components of the platform and it offers state of the art capabilities that blow away most of the NoSQL databases out there.
An important add-on to MapR-DB is the ability to use, for writing and querying, Apache Spark through the Connector for Apache Spark. Using this connector comes very handy since it can read and write from spark to MapR-DB using the different Spark APIs such as RDDs, DataFrames, and Streams.
Using the connector we can issue queries like the following one.
The resulting type is a
Dataframethat we can use as any other Dataframe from any other source, as we normally do in Spark.
If we then filter our data set out, problems start to emerge. For instance, let’s look at the following query.
The filter is being pushed down, so MapR-DB does the filtering and only sends back the data that complies with the filter reducing the amount of data transferred between MapR-DB and Spark. However, if there is an index created on the field first-name, the index is ignored and the table is fully scanned, trying to find the rows that comply with the filter.
By having an index on a field, we expect to use it so queries on that fields are optimized, ultimately speeding up the computation. The provided connector is simply not using this capability.
Our team, MapR Professional Services, knows that filtering using MapR-DB secondary indexes is huge for performance and since many of our customers do actually try to take advantages of this feature (secondary indexes) we have taken different approaches in order to force the use of the indexes when using Spark.
The following post was written by a fellow coworker, How to use secondary indexes in Spark with OJAI, where he explains some ways to overcome the issue on hand.
Even when we can take some shortcuts, we have to give up some of the of nice constructs the default connector has such as
An Independent Connector
In the past, I have extended Apache Spark in too many ways. I have written my own Custom Data Sources and most recently a Custom Streaming Source for Spark Structured Streams.
Once again, I have sailed in the adventure to write my own Spark Data Source, but this time for MapR-DB so we leverage the full advantages of secondary indexes while keeping the same API the Current MapR-DB Connector for Apache Spark has.
At the end of this post, we will be able to write a query in the following way while fully using secondary indexes.
Spark Data Sources Version 2
The following data source implementation uses spark 2.3.1 and uses the data source API V2.
Let’s start by looking at the things we need.
- ReadSupportWithSchema, allows us to create a DataSourceReader.
- DataSourceReader, allows us to get the schema for our data while we need to specify how to create a DataReaderFactory.
- SupportsPushDownFilters, allows us to intercept the query filters so we can push them down to MapR-DB.
- SupportsPushDownRequiredColumns, allows us to intercept the query projections so we can push them down to MapR-DB.
Let’s start by implementing ReadSupportWithSchema.
As we can see, we simply get the table path and the schema we want to use when reading from MapR-DB. Then we pass them to MapRDBDataSourceReader.
DataSourceReader and we are also mixing in
SupportsPushDownRequiredColumns to indicate that we want to push filters and projections down to MapR-DB.
projections variable will hold the schema we want to project if any. In case we don’t explicitly project fields by doing
.select we will project all the fields on the
readSchema works in conjunction with
pruneColumns. If in our Spark query we specify a
select then the selected fields are passed to
pruneColumns and those are the only fields we will bring from MapR-DB.
pushFilters indicates what filters we have specified in the
filter clause in our Spark query. Basically, we have to decide which of those we want to push down to MapR-DB, the other ones will be applied by Spark after the data is in memory.
In the snippet above, we are indicating we are going to push down only two types of filters,
GreaterThan. Any other filter besides these two will not be pushed down and the filtering will happen in memory (spark memory) after the data is loaded from MapR-DB.
We are working on adding more filters to match the current MapR-DB Connector.
createDataReaderFactories creates a list of data readers that actually do the heavy work of reading from our source, MapR-DB. In our case, we are only creating one data reader, but ideally, we have one reader for each MapR-DB region/partition so we can take advantage of parallelism offered by MapR-DB.
We are almost done, yet, the most important parts is about to come.
MapRDBDataReaderFactory is where we actually build the MapR-DB query and execute it again our MapR-DB table. Notice we are passing the table we are going to read from, the filters and projections we want to push down.
Now we need to connect to MapR-DB by opening a connection and creating a document store object.
createFilterCondition builds the query condition we want to execute against MapR-DB. This is the most important part of our entire implementation.
In here we are combining all the filters. As we can see, we are implementing our two supported filters only for two data types, but we are working on extending this implementation to match the current MapR-DB Connector.
query creates the final command to be sent to MapR-DB. This task is a matter of applying the query condition and the projections to our
It is very important to notice that since we are using OJAI, it will automatically use any secondary indexes for fields that are part of the filters we are applying. Make sure you check the output at the end of this post.
documents is a stream of data coming from MapR-DB based on
createDataReader uses the stream we have created (
documents) to do the actual reading and returning the data back to Spark.
Using our Connector
At this point, we are ready to plug in our custom data source into spark in the following way.
This allows us to use our own way to read from MapR-DB so that any filter being applied that is part of a secondary index on the physical table will be used to optimize the reading.
In order to maintain a similar API to the one offered by the default MapR-DB Connector, we added some syntax to our library in the following way.
We can now use our connector in the same way we used to use the default connector.
Using MapR-DB Secondary Indexes
When we run the code above, the TRACE output from OJAI looks similar to the following.
Notice that it automatically uses the index called uid_idx which is an index for the field
uid that at the same time is the field being used in the spark filter.
MapR-DB is a powerful tool that runs as part of the MapR Data Platform. The Spark Connector offers an interesting way to interact with MapR-DB since it allows us to use all Spark constructs at scale when working with this NoSQL system. However, some times the default connector falls short because it does not use the secondary index capabilities of MapR-DB when we need them the most.
On the other hand, our implementation mimics the Connector API and ensures that the implemented Spark data source uses MapR-DB secondary indexes since it relies on pure OJAI queries that are able to support secondary indexes out of the box.