Wednesday, April 24, 2013

BigData: Terracotta BigMemory and ArcGIS Webmaps

I was asked to put a Proof of Concept implementation of a very fast interactive dynamic density map generation on 11 million records for a webmap application, where a user can specify dynamically a query definition (a where clause in SQL terms), a ramp color and the service implementation will return back a density representation of the records on a map.

This is typically done via a GeoProcessing task where the data is queried and stored into an intermediate FeatureClass that is further processed by a Kernel Density that produced a raster layer that is finally visualized. As you can tell, this is not interactive nor relatively fast.

Since the traditional means of retrieving the data from a relational database is not fast enough and 11 million records is not such a big set after all, I decided to put the whole thing in memory. BTW, this is a meme that has been trending for while now, and the most vocal about it is SAP HANA.
I decided to use Terracotta's BigMemory to hold the data in the "off-heap" and use its EHCache query capability to aggregate and fetch the data.  Now despite the name, I used the cache "eternal" capabilities to forever hold the data elements.

I was given the data in CSV format, so I wrote a simple CSV reader/tokenizer that bulk loads the data into BigMemory using multiple threads. The inter thread communication was handled by the LMAX disruptor, letting me focus on the loading, parsing and putting into BigMemory.

To access this BigMemory from a webmap as a layer, I decided to write a middleware that implements the ArcGIS Rest JSON interface for an ArcGISDynamicLayer. Now, I did not write the whole interface, just the layer metadata and the export image endpoints, in such that you can access the layer as:

http://host/arcgis/rest/services/Service/MapServer

The implementation of the Rest endpoint was easily achievable using the Spring MVC framework which enables me to export POJO methods as ArcGIS Rest endpoints:
The final step is the convert the aggregated features from BigMemory into an image that is returned as a response to an export Rest request. This reminded me of the days when I implemented the first version of ArcIMS in Java back in 1996/7, where I used AWT for exactly that same purpose.  What is old is new again, and used the same technique to create an in memory image buffer and used the Graphics2D API to draw and color code the aggregated cells. I used the ImageIO to convert the image buffer into a PNG output stream.  Now ImageIO is new and if this PoC becomes reality, I will use JAI with its GPU enhancements to do the same task.

To run this PoC, I started 2 extra large Amazon EC2 instances to run BigMemory with 15GB of off heap RAM (just because I can :-) and started a medium instance to run tomcat with the Spring MVC Rest interface that communicated with two large instances.

The PoC was very well received and extended it to make the layer time aware enabling the client to slide though time aware features.

Like usual, you can download all the source code from here, here and here.

Wednesday, April 3, 2013

BigData: DataRush Workflow in ArcMap


At this year's DevSummit, we announced the GIS Tools for Hadoop project. Included in that project is a low level geometry java API which enables spatial operations in MapReduce jobs or the construction of higher level functions such as Hive User Defined Functions. However, this geometry library is not restricted to Hadoop MapReduce. It is used in Geo Event Processor, and can be used in Storm bolts or other parallel workflows. One such parallel workflow processing engine is Pervasive DataRush that I demoed at the DevSummit. Using the KNIME visual workflow, I was able to process 90 million records (BTW, small in the BigData world) in Hadoop File System, for heatmap visualization in ArcMap.


A DataRush workflow engine can run on a local machine or remotely on a cluster of machines and is fully extensible with custom operators.  An operator is a node in a workflow graph whose input and output can be linked to other operators. So, I wrote my own spatial operators that utilizes the geometry API.

The first operator is a simple spatial filter that passes through records that contain LAT/LON fields and are within a user defined distance from a given point. This uses the "GeometryEngine.geodesicDistanceOnWGS84" function and augments the input record fields with a distance "METERS" output field.

In the above application, a graph workflow reads a tab delimited file and sends its output to the new distance operator who sends its output to the a console log operator. Simple and easy, but not simplistic - if you notice that when I execute the application without any arguments, the graph runs on the local machine taking full advantage of all the local processors. However, if I specify a dummy argument, the graph takes its input from HDFS and executes remotely on a DataRush cluster. This simple toggle is pretty impressive when switching from development to production mode.

Next is to add the operator to the KNIME visual workflow.
The Pervasive folks made that really easy using a plugin for the Eclipse environment.

Using the Node Extension wizard, you can select an operator and it generates template code for the operator xml descriptor, the dialog pane, the model factory and the node settings.

This gets you 80% to 90% there. The rest is completing the XML node descriptor and laying out a nice dialog pane.


The final step is to export the compiled code into KNIME as a plugin - BTW, KNIME is based on Eclipse.  Within KNIME, you are in what I call a PHD (Push Here Dummy) mode, where you can drag and drop and link operators on the workbench.

A slightly more complicated operator, is a composite operator that, as its name suggests is composed of two or more operators, but the whole is exposed as one operator.
So, the next spatial operator extension to write is a composite one to perform a spatial join between a point input source and a polygon input source where the output is a flow indicating what polygon an input point is contained into.
The Spatial Join composite operator contains a non parallelizable operator that blocks to read first time through into an in memory model the polygons.  The polygons input format is in the Esri JSON text format and we use the "GeometryEngine.jsonToGeometry" to convert it into a Polygon POJO. The second embedded operator is parallel enabled and walks the input points and uses the builtin in memory model of polygons to find the containing one using the "GeometryEngine.contains" function.  This "build and walk" pattern is used whenever a join or lookup needs to be performed.

The above graph adds one extra built-in operator (group by) where the output is the number of points that each input polygon contains.  The result of such a flow, can be used to thematically color code the polygons in ArcMap.

Displaying thousands or millions of points on a map is, well....not very smart and does not make sense. But if all these points are binned and each bin is weighted proportionally to the points in that bin, then a map displaying the color coded bins with respect to their weight is much more appealing and conveys much more information, such as in the above map, where I was rendering a heatmap of locations with businesses with more than 100,000 employees.

So I wrote a terminal operator, the raster operator, that writes bin information as an Esri raster file in either float or ASCII format. The interesting thing about that operator is that it is a "fan in" operator and despite being part of a graph that is executed on a remote cluster, the raster operator is executed on the client machine. The client being the machine that submitted the graph to the cluster.

One last thing. A KNIME workflow can be exported into an external representation that can be imported into a logical graph for local or remote execution.

This means, I can turn that code into an ArcMap GeoProcessing extension enabling me to invoke a DataRush workflow as a Tool and combine it with other existing tools for further analysis and visualization.

In my workflow example, rather than me manually calling the Float To Raster GP Task after Executing the DataRush Tool, I can combine that into one ArcPy Script Tool:

So to recap, you can store data into HDFS, or use the new GeoProcessing tools for Hadoop. Compose DataRush workflows with spatial operators that you export to be executed on a cluster from ArcMap, whose result is consumed back by ArcMap for further local GeoProcessing or visualization.

This is very cool!

Like usual, all the source code can be downloaded from here and here.