Mathjax

Friday, December 6, 2013

Hadoop Processing Model

Note: These are preliminary results.

I've begun work on a application to perform various automatic statistical analyses of a large dataset (millions of records or more). It is implemented as a group of one-pass (or online) algorithms operating in a map-reduce topology. Technologically, the back-end uses Hadoop and the mrjob library. I haven't made technology choices for the front-end, but I'm leaning towards Django, Bootstrap, and D3.

One of the system engineering challenges for distributed processing is to scale the number of nodes to the workload. Eventually, I have to perform actual performance tests but for now I can approximate the resource requirements using an abstract model.

Abstract Model of Hadoop I/O Processing
The abstract model is a better approximation of a map-reduce conceptual model than Hadoop's actual architecture, but I hypothesize that it's sufficiently accurate for my purposes (and will later use test data to validate the model).

Input to the overall process is provided as a series of N records, with each record containing M columns or fields. (The M columns represent the number of columns processed/generated by the application; for simplicity we will assume the input's number of columns is one-to-one with the number of columns processed by the application.) Normally, N will be much greater than M. The input is divided between P map nodes and becomes the A datasets within each map node. The map process converts A into B. Likewise, the combine process converts B into C. At this point, the individual C records are merged, sorted by key, and transferred to Q reduce nodes where the records become D. The interim data is denoted as S. The D datasets are consumed by the reduce process and converted into E. Finally, E is merged into the final results of F.

Based on the algorithms, I can estimate the number of records in A through F and based on the current implementation estimate the size of records in each stage. Thus, I can calculate the total number of bytes that pass through the job from the origin to F. Setting M=10, I plotted the bytes (in terabytes) as a contour plot with N and P as the axes:



The plot is in Log2-Log2 form. The x-axis measures the number of records from a gigabyte (roughly a billion) to a terabyte (roughly a trillion). The y-axis varies from a single processor (2^0) to 128 processors (2^7). The ridges show that I/O is constant with doubling input and doubling the number of processors, so we are making effective use of the map-reduce framework. However, since the contour scale is measured in terabytes, the linear complexity has a large constant factor.

In the model, the I/O costs are concentrated in the B stage. Although the individual output of the Map process is small (when measured on a per-record level), the number of records in B is MN/P. The Combine process is necessary to immediately aggregate the results together and rid the N term from the later I/O steps. For comparison, F is proportional to M and, although each record will be in the 100s of kilobyte to megabyte range, the total size consumed can be a thousandth of B.