Automated Machine Learning (AutoML) and Big Data Platforms

Although machine learning exists already since decades, the typical data scientist – as you would call it today – would still have to go through a manual labor-intensive process of extracting the data, cleaning, feature extraction, regularization, training, finding the right model, testing, selecting and deploying it. Furthermore, for most machine learning scenarios you do not use one model/algorithms but evaluate a plethora of different algorithms to find one suitable for the given data and and use case. Nowadays a lot of data is available under the so-called Big Data paradigm introducing additional challenges of mastering machine learning on distributed computing platforms.

This blog post investigates on how to ease the burden on the data scientists of manual labor-intensive model evaluation by presenting insights on the recent concept of automated machine learning (AutoML) and if it can be adapted to Big data platforms.

What is AutoML

Machine learning is about learning and making prediction from data. This can be useful in many contexts, such as autonomous robots, smart homes, agriculture or financial markets. Machine Learning – despite the word „machine“ in its name – is mostly a manual process that requires a highly skilled person to execute. Although the learning part is rather automated, this person needs to extract data, transform it in potentially different alternative ways, feed it into many alternative machine learning algorithms as well as using different parameters for the same algorithm, evaluate the quality of the generated prediction model and finally deploy this model for others so that they can make their own predictions without going through this labor-intensive process themselves. This is illustrated in the following figure:
AutoML Flow Diagram(1)

Given this background, it comes at no surprise that huge market places have been created where people sell and buy trained machine learning models. For example, the Azure Machine Learning Market place, the Amazon AWS Artificial Intelligence market place, the Algorithmia Marketplace, the caffe model zoo, deeplearning4j models, mxnet model zoo, tensor flow models or the Acumos Marketplace (based on the open source software).

Nevertheless, most organizations that wants to keep up with the competition in these market places or that do not want to rely on market places due to unique problems in their organizations have still the issue to find skilled persons that create prediction models. Furthermore, machine learning models become so complex that they are not the outcome of a single person, but a team that needs to ensure consistent quality.

Here AutoML comes into play. It can support highly skilled machine learning persons, but also non-skilled machine learning persons to create their own prediction models in a transparent manner. Ideally this is done in an automated fashion for the whole machine learning process, but contemporary technology focuses mostly on the evaluation of different models, a set of suitable parameters for these models („hyperparameter optimization“) and automated selection of the „winning“ (or best) model(s) (this is highlighted in the previous figure in green). In case of several models a ranking is created on a so-called “leaderboard”. This has to be done in a given time budget, ie one has only limited time and resources. Potentially several models could be combined (aka deep learning) in more advanced solutions, but this is currently in its infancy.

Some observations here for this specific focus:

  • AutoML does not invent new models. It relies on an exisiting repository of algorithms that it iterates and tries out with different parameters. In itself AutoML can be seen as a “meta-algorithm” over these models.
  • Similarly it relies on an existing repository of tests that determine suitability of a model for a given problem.
  • It requires clever sampling (size of sample, randomness) of the underlying training data, otherwise it may take very long to evaluate the models or simply the wrong data is used. A preselection of the data by a person is still needed, although the person does not require as much machine-learning specific skills.
  • A person still needs to determine for the winning model if it makes sense what it predicts. For this the person does not need machine learning skills, but domain specific skills. For instance, a financial analyst can determine if from a dataset of financial transaction attributes can predict fraud or not.

Big Data platforms

The emergence of the open source Hadoop platform in 2006 introduced Big Data platforms on commodity hardware and networkl clusters to the world. Few years later Hadoop was adopted for data analytics in several data analysis organizations. The main focus of Hadoop was to enable analytic tasks on large dataset in a reliable manner it was not possible before. Meanwhile further improved platforms have been created, such as Apache Flink or Apache Spark, that focus not only on processing large data volumes, but processing them also faster by employing various optimization techniques.

Those platforms employ several nodes that communciate over a network to execute a task in a distributed manner. This imposes some challenges for machine learning, such as:

  • Standard machine learning algorithms are not designed to be executed over a distributed network of nodes.
  • Not all machine learning algorithms can be converted into a distributed one. For instance, if you need to estimate the parameters of a model then gradient descent might require a lot of memory on a single node. Hence, other estimation methods need to be used to work in parallel on different nodes.

This led to specific machine learning libraries, such as Flink ML or Spark Mlib, for those platforms that supported only a dedicated subset of algorithms that can be executed efficiently and effectively over a given set of nodes communicating via the network.

Tool support for AutoML

AutoML can be very useful for your organization. Amongst others the following tools exist.

Tool Description Supported Models Supported Hyperparameter optimization
Auto-SKLearn Automated machine-learning toolkit to be used in lieu of the non-automed scikit-learn. Budget can be defined by time and memory as well as search space. Automated preprocessing can be defined. Multicore processing is supported for some of the algorithms. Multiple classifiers and regressors as well as combinations (ensemble construction) Bayesian Optimization
TPOT Automated machine-learning toolkit offering various automated preprocessors (e.g. Standard Scaler, Principal Component Analysis). Multiple classifiers and regressors as well as combinations Genetic Programing
Auto-Weka Automated machine-learning toolkit. Budget can be defined by time and memory as well as search space. Multiple classifiers and regressors as well as combinations Bayesian Optimization
Devol Automated machine-learning toolkit for deep neural network architectures. Expects the data to be prepared and encoded. Sees itself more as support for experienced data scientists. Neural Networks and combinations. Genetic Programing
Machine.js/Auto-ML Automated machine learning kit based on auto-ml. Expects the data to be prepared and encoded. Various classifiers and regressor as well as Neural networks based on the Keras library. Supports combinations. Genetic Programing / Gridsearch

Most of these tools support only one method for hyperparameter optimization. However there are several methods. Some models do not require hyperparameter optimization, because they can derive optimal hyperparameter from the trained data. Unfortunately, this is currently integrated in none of the tools.

Those tools might not always be very end user friendly and you still need to deploy them in the whole organization as fat clients instead of light-weight HTML5 browser applications. As an alternative popular cloud provider integrating more assistants in the cloud that help you with your machine learning process.

AutoML on Big Data Platforms

The aforementioned tools have not been primarily designed for Big Data platforms. They usually are based on Python or Java, so one could use them with the Python or Java-bindings of those platforms (cf. Flink or Spark). One could use the available data sources (and efficient data formats such as ORC/Parquet) and sampling functionality of those platforms (e.g. sampling in Flink or sampling in Spark) and feed it into the tools that could even run on the cluster. However, they would only use one node and the rest of the cluster would not be utilized. Furthermore, the generated models are not necessarily compatible with the models provided by the Big Data platforms. Hence, one has to write a manual wrapper around those models so they can be distributed over the cluster. Nevertheless, also these models would only use one node.

This is not necessarily bad, because usually data scientists will not run one dataset to evaluate with AutoML but multiple datasets so you can utilize the whole cluster by running several AutoML processes. However, it also means data size as well as budget for time and memory is limited to one node, which might not be sufficient for all machine learning problems. Another interesting aspect could be to run one or more winning models over a much larger dataset to evaluate it in more detail or to optimize it even more. This would again require a more deep integration of the AutoML tools and Big Data platforms.

H2O AutoML is a recent (March 2018) example on how to provide AutoML on Big Data platforms, but this has currently similar limitations as described before with respect to the Big Data platform. The only advantage currently is that the models can be generated are compatible with the machine learning APIs of the Big Data platforms.

Apache Spark has some methods for hyperparameter tuning, but they are limited to a pipeline or model and do not cover aspects of other AutoML solutions, such as comparing different models. Furthermore, it only evaluates out a list of given sets of parameters and no time or cost budget definition is possible. This would have to be implemented by the user.

One can see that AutoML and Big Data platforms can benefit from a more tighter integration in the future. It would then also be more easy to leverage all the data in your data lake without extracting it and process it locally. At the moment, although some machine learning models are supported by Big Data platforms (with some caveats related to distributed processing) not all functionality of AutoML is supported (e.g. hyperparameter tuning or genetic programing).

Google AutoML provides an automated machine learning solution in the cloud. It augments it with pre-trained models (e.g. on image data), so not everyone has to spend time again to train models.

Conclusion

AutoML is a promising tool to facilitate the work of less skilled and very skilled machine learning persons to enable them to have more time focusing on their work instead of the manual error-prone labour-intensive machine learning process. You can make your organisation on-board on machine-learning even if you are in your early machine learning stages and facilitate learning of many people on this topic.

However, it does not mean that people without knowledge can use it out of the box. They should have a least a basic knowledge on how machine learning works in general. Furthermore, they should be domain experts in your organizations domain, e.g. a financial analyst in banks or an engineer in the car industry. They still need to decide which data to input to AutoML and if the model learned by AutoML is useful for your business purposes. For example, it does not make sense to put all the data of the world related to problematic situations for autonomous cars into an AutoML and expect that it can solve all problematic situation as best as possible. Moreover, it is more likely to be country/region-specific so it may make more sense to have several AutoML runs with different countries/regions to develop specific models for countries/regions. Other datasets, such as blockchains/cryptoledgers are more complex and require currently a manual preprocessing and selection.

Another example is known from spurious correlations, ie correlations that exists, but do not imply causality. In all this case you still need a domain expert that can judge if the model is a useful contribution for the organization.

All these things are related to the no free lunch theorem.

Even for highly-skilled machine learning persons AutoML can make sense. No-one can know all particularities of machine learning models and it simply takes a lot of time to evaluate them all manually. Some may even have their favorite models, which may mean other models are not evaluated although they may fit better. Or they simply do not have time for manual evaluation, so a preselection of candidate models can also be very useful.

One open issue is still how much time and resources you should let AutoML spend on a specific dataset. This is not easy to answer and here you may still need to experiment if the results are bad then you need to spend probably more.

Nevertheless, AutoML as a recent field still has a lot of room for improvements, such as full integration in Big Data machine learning platforms or support of more hyperparameter tuning algorithms as well as more user-friendly use interfaces, as pioneered by some cloud services.

Then, those models should be part of a continuous delivery pipeline. This requires unit testing and integration testing to avoid that the model has obvious errors (e.g. always returns 0) or that it does not fit into the environment in which it is used for prediction (e.g. web service cannot be called or model cannot be opened in R). Integration machine learning models into continuous delivery pipelines for productive use has not recently drawn much attention, because usually the data scientists push them directly into the production environment with all the drawbacks this approach may have, such as no proper unit and integration testing.

Advertisements

Lambda, Kappa, Microservice and Enterprise Architecture for Big Data

A few years after the emergence of the Lambda-Architecture several new architectures for Big Data have emerged. I will present and illustrate their use case scenarios. These architectures describe IT architectures, but I will describe towards the end of this blog the corresponding Enterprise Architecture artefacts, which are sometimes referred to as Zeta architecture.

Lambda Architecture

I have blogged before about the Lambda-Architecture. Basically this architecture consists of three layers:

  • Batch-Layer: This layer executes long-living batch-processes to do analyses on larger amounts of historical data. The scope is data from several hours to weeks up to years. Here, usually Hadoop MapReduce, Hive, Pig, Spark or Flink are used together with orchestration tools, such as Oozie or Falcon.

  • Speed-Layer/Stream Processing Layer: This layer executes (small/”mini”) batch-processes on data according to a time window (e.g. 1 minute) to do analyses on larger amounts of current data. The scope is data from several seconds up to several hours. Here one may use, for example, Flink, Spark or Storm.

  • Serving Layer: This layer combines the results from the batch and stream processing layer to enable fast interactive analyses by users. This layer leverages usually relational databases, but also NoSQL databases, such as Graph databases (e.g. TitanDB or Neo4J), Document databases (e.g. MongoDB, CouchDB), Column-Databases (e.g. Hbase), Key-Value Stores (e.g. Redis) or Search technologies (e.g. Solr). NoSQL databases provide for certain use cases more adequate and better performing data structures, such as graphs/trees, hash maps or inverse indexes.

In addition, I proposed the long-term storage layer to have an even cheaper storage for data that is hardly accessed, but may be accessed eventually. All layers are supported by a distributed file system, such as HDFS, to store and retrieve data. A core concept is that computation is brought to data (cf. here). On the analysis side, usually standard machine learning algorithms, but also on-line machine learning algorithms, are used.

As you can see, the Lambda-Architecture can be realized using many different software components and combinations thereof.

While the Lambda architecture is a viable approach to tackle Big Data challenges different other architectures have emerged especially to focus only on certain aspects, such as data stream processing, or on integrating it with cloud concepts.

Kappa Architecture

The Kappa Architecture focus solely on data stream processing or “real-time” processing of “live” discrete events. Examples are events emitted by devices from the Internet of Things (IoT), social networks, log files or transaction processing systems. The original motivation was that the Lambda Architecture is too complex if you only need to do event processing.

The following assumptions exists for this architecture:

  • You have a distributed ordered event log persisted to a distributed file system, where stream processing platforms can pick up the events

  • Stream processing platforms can (re-)request events from the event log at any position. This is needed in case of failures or upgrades to the stream processing platform.

  • The event log is potentially large (several Terabytes of data / hour)

  • Mostly online machine learning algorithms are applied due to the constant delivery of new data, which is more relevant than the old already processed data

Technically, the Kappa architecture can be realized using Apache Kafka for managing the data-streams, i.e. providing the distributed ordered event log. Apache Samza enables Kafka to store the event log on HDFS for fault-tolerance and scalability. Examples for stream processing platforms are Apache Flink, Apache Spark Streaming or Apache Storm. The serving layer can in principle use the same technologies as I described for the serving layer in the Lambda Architecture.

There are some pitfalls with the Kappa architecture that you need to be aware of:

  • End to end ordering of events: While technologies, such as Kafka can provide the events in an ordered fashion it relies on the source system that these events are indeed delivered in an ordered fashion. For instance, I had the case that a system in normal operations was sending the events in order, but in case of errors of communication this was not the case, because it stored the events it could not send and retransmitted them at a certain point later. Meanwhile if the communication was established again it send the new events. The source system had to be adapted to handle these situations correctly. Alternatively, you can only ensure a partial ordering using vector clocks or similar implemented at the event log or stream processing level.

  • Delivery paradigms on how the events are delivered (or fetched from) to the stream processing platform

    • At least once: The same event is guaranteed to be delivered once, but the same events might be delivered twice or more due to processing errors or communication/operation errors within Kafka. For instance, the stream processing platform might crash before it can marked events as processed although it has processed them before. This might have undesired side effects, e.g. the same event that “user A liked website W” is counted several times.

    • At most once: The event will be delivered at most once (this is the default Kafka behavior). However, it might also get lost and not be delivered. This could have undesired side effects, e.g. the event “user A liked website W” is not taken into account.

    • Once and only once: The event is guaranteed to be delivered once and only once. This means it will not get lost or delivered twice or more times. However, this is not simply a combination of the above scenarios. Technically you need to make sure in a multi-threaded distributed environment that an event is processed exactly once. This means the same event needs to be (1) only be processed by one sequential process in the stream processing platforms (2) all other processes related to the events need to be made aware that one of them already processes the event. Both features can be implemented using distributed system techniques, such as semaphores or monitors. They can be realized using distributed cache systems, such as Ignite, Redis or to a limited extent ZooKeeper. Another simple possibility would be a relational database, but this would quickly not scale with large volumes.
      • Needles to say: The source system must also make sure that it delivers the same event once and only once to the ordered event log.

  • Online machine learning algorithms constantly change the underlying model to adapt it to new data. This model is used by other applications to make predictions (e.g. predicting when a machine has to go into maintenance). This also means that in case of failure we may temporary have an outdated or simply a wrong model (e.g. in case of at least once or at most once delivery). Hence, the applications need to incorporate some business logic to handle this (e.g do not register a machine twice for maintenance or avoid permanently registering/unregistering a machine for maintenance)

Although technologies, such as Kafka can help you with this, it requires a lot of thinking and experienced developers as well as architects to implement such a solution. The batch-processing layer of the Lambda architecture can somehow mitigate the aforementioned pitfalls, but it can be also affected by them.

Last but not least, although the Kappa Architecture seems to be intriguing due to its simplification in comparison to the Lambda architecture, not everything can be conceptualized as events. For example, company balance sheets, end of the month reports, quarterly publications etc. should not be forced to be represented as events.

Microservice Architecture for Big Data

The Microservice Architecture did not originate in the Big Data movement, but is slowly picked up by it. It is not a precisely defined style, but several common aspects exist. Basically it is driven by the following technology advancements:

  • The implementation of applications as services instead of big monoliths
  • The emergence of software containers to deploy each of those services in isolation of each other. Isolation means that they are put in virtual environments sharing the same operating systems (i.e. they are NOT in different virtual machines), they are connected to each other via virtualized networks and virtualized storage. These containers leverage much better the available resources than virtual machines.
    • Additionally the definition of repositories for software containers, such as the Docker registry, to quickly version, deploy, upgrade dependent containers and test upgraded containers.
  • The deployment of container operating systems, such as CoreOS, Kubernetes or Apache Mesos, to efficiently manage software containers, manage their resources, schedule them to physical hosts and dynamically scale applications according to needs.
  • The development of object stores, such as OpenStack Swift, Amazon S3 or Google Cloud Storage. These object stores are needed to store data beyond the lifecycle of a software container in a highly dynamic cloud or scaling on-premise environment.
  • The DevOps paradigm – especially the implementation of continuous integration and delivery processes with automated testing and static code analysis to improve software quality. This also includes quick deliveries of individual services at any time independently of each other into production.

An example of the Microservice architecture is the Amazon Lambda platform (not to be confused with Lambda architecture) and related services provided by Amazon AWS.

Nevertheless, the Microservice Architecture poses some challenges for Big Data architectures:

  • What should be a service: For instance, you have Apache Spark or Apache Flink that form a cluster to run your application. Should you have for each application on them a dedicated cluster out of software container or should you provide a shared cluster of software containers. It can make sense to have the first solution, e.g. a dedicated cluster per application due to different scaling and performance needs of the application.
  • The usage of object stores. Object stores are needed as a large scale dynamically scalable storage that is shared among containers. However, currently there are some issues, such as performance and consistency models (“eventually consistent”). Here, the paradigm of “Bring Computation to Data” (cf. here) is violated. Nevertheless, this can be mitigated either by using HDFS as a temporal file system on the containers and fetching the data beforehand from the object store or use an in-memory caching solution, such as provided by Apache Ignite or to some extend Apache Spark or Apache Flink.

I see that in these environments the role of software defined networking (SDN) will become crucial not only in cloud data centers, but also on-premise data centers. SDN (which should NOT be confused with virtualized networks) enables centrally controlled intelligent routing of network flows as it is needed in dynamically scaling platforms as required by the Microservice architecture. The old decentralized definition of the network, e.g. in form of decentralized routing, does simply not scale here to enable optimal performance.

Conclusion

I presented here several architectures for Big Data that emerged recently. Although they are based on technologies that are already several years old, I observe that many organizations are overwhelmed with these new technologies and have some issues to adapt and fully leverage them. This has several reasons.

One tool to manage this could be a proper Enterprise Architecture Management. While there are many benefits of Enterprise Architecture Management, I want to highlight the benefit of managed of managed evolution. This paradigm enables to align business and IT, although there is a constant independent (and dependent) change of both with not necessarily aligned goals as illustrated in the following picture.

enterprise-architecture-managed-evolution

As you can see from the picture both are constantly diverging and Enterprise Architecture Management is needed to unite them again.

However, reaching managed evolution of Enteprise Architecture requires usually many years and business as well as IT commitment to it. Enterprise Architecture for Big Data is a relatively new concept, which is still subject to change. Nevertheless some common concepts can be identifed. Some people refer to Enterprise Architecture for Big Data also as Zeta Architecture and it does not only encompass Big Data processing, but in context of Microservice architecture also web servers providing the user interface for analytics (e.g. Apache Zeppelin) and further management workflows, such as backup or configuration, deployed in form of containers.

This enterprise architecture for Big Data describes some integrated patterns for Big Data and Microservices so that you can consistently document and implement your Lambda, Kappa, Microservice architecture or a mixture of them. Examples for artefacts of such an enterprise architecture are (cf. also here):

  • Global Resource Management to manage the physical and virtualized resources as well as scaling them (e.g. Apache Mesos and Software Defined Networking)

  • Container Management to deploy and isolate containers (e.g. Apache Mesos)

  • Execution engines to manage different processing engines, such as Hadoop MapReduce, Apache Spark , Apache Flink or Oozie

  • Storage Management to provide Object Storage (e.g. Openstack Swift), Cache Storage (e.g. Ignite HDFS Cache), Distributed Filesystem (e.g. HDFS) and Distributed Ordered Event Log (e.g. Kafka)

  • Solution architecture for one or more services that address one or more business problems. It should be separated from the enterprise architecture, which is focusing more on the strategic global picture. It can articulate a Lambda, Kappa, Microservice architecture or mixture of them.

  • Enterprise applications describe a set of services (including user interfaces)/containers to solve a business problem including appropriate patterns for Polyglot Persistence to provide the right data structure, such as graph, columnar or hash map, for enabling interactive and fast analytics for the users based on SQL and NoSQL databases (see above)

  • Continuous Delivery that describe how Enterprise applications are delivered to production ensuring the quality of them (e.g. Jenkins, Sonarqube, Gradle, Puppet etc).

Batch-processing & Interactive Analytics for Big Data – the Role of in-Memory

In this blog post I will discuss various aspects of in-memory technologies and describe how various Big Data technologies fit into this context.

Especially, I will focus on the difference between in-memory batch analytics and interactive in-memory analytics. Additionally, I will illustrate when in-memory technology is really beneficial. In-memory technology leverages the fast main memory and processor caches to deliver superior performance.

While initially the deployment of in-memory technology seems to be attractive, companies have to carefully design how they use the scarce resource memory for big data sets, because the amount of data tends to grow when the company masters successfully Big Data technologies. For instance, they need to think about the issue of memory fragmentation, scheduling, capacity management, the decision how the data should be structured in-memory or making the decision about what data should be represented in-memory.

I will explain that some paradigms introduced for non-in-memory analytics, such as the paradigm that it is better if you do not need to read data than reading it all, is still very valid for in-memory technologies.

Finally, I will give an outlook on current Big Data technologies and their strength and weaknesses with respect to in-memory batch analytics and interactive in-memory analytics.

The Concept of In-Memory

The concept of in-memory became more and more popular around 2007/2008, although the fundamental concepts behind it exist since decades. It was marketed quite heavily by SAP and its HANA in-memory database at this time.

Around the same time, a different paradigm appeared, the concept of distributed Big Data platforms.

In the beginning, both were rather disconnected, where in-memory technologies relied on one “big” machine and distributed data platforms consisted out of a huge set of different more commodity-like machines. In-memory was at this time often associated with interactive queries with fast responses for comparable small datasets fitting in-memory on machine and Big Data platforms for long-running analytics queries crunching large volumes of data scattered over several nodes.

This changed recently. Particularly, in-memory techniques have been brought to long-running analytics queries and distributed Big Data platforms to interactive analytics. The assumed benefit for both cases is that more data can be handled in more complex ways in a shorter time.

However, you need to carefully look what kind of business benefits you can gain from doing faster or more analytics.

Public sector organizations over various domains have significant benefits, because their “ROI” is usually measured in non-monetary terms as benefits for society. A faster, fair and more transparent or scientifically correct analysis can be one example of such a benefit. Additionally, supervision of the private sector need to be on the same level as the private sector.

Traditional private sector organizations on the other hand will have to invent new business models and convince the customer. Here, new machine learning algorithms on large data volumes are more beneficial in comparison of traditional data warehouse reports. Internet Industries including the Internet of Things and autonomous robots obviously have some benefits let it be the processing of large data volumes and/or the need to react quickly to events in the real world.

The Difference between in-memory batch processing and interactive analytics

Often people wonder why there is still a difference between batch processing and interactive analytics when using in-memory. In order to answer this question let us quickly recap the difference between the two:

  • Distributed big data processes: They are long-running because they need to query data residing on several nodes and/or calculations are very complex requiring a lot of computing power. Usually they make calculation/processed data available in a suitable format for interactive analytics. Usually, these processes are planned and scheduled in advance.
  • Interactive analytics: These are often ad-hoc queries from low to very high complexity. Usually it is expected that they return results within seconds or minutes. However, they can also take much longer and are then candidate for distributed big data processes, so that results are precomputed and stored for interactive analytics. Interactive analytics go beyond standard tables to return results faster.

The results of them can be either used by humans or by other applications, e.g. applications that require prediction to provide an automated service to human beings. Basically both approaches fit to the Lambda architecture paradigm.

In-memory technologies can basically speed up both approaches. However, you need to carefully evaluate your business case for this. For example, it make sense to speed up your Big Data batch processes to finish before your people start working or to have more time to do perform additional processes on the same resources – This is particularly interesting if you have a plethora of different large datasets where different analytics can make sense. With respect to interactive analytics, you benefit most if you have specific analytics algorithms that benefit from memory locality, e.g. iterative machine learning algorithms.

If you have people working on large tables using aggregations then you should make them aware that it make more sense to work with samples, in-memory indexes and data structures as well as high parallelism. Aggregating data of a large table in-memory is very costly and the speed difference to tables on disk is most likely not much. The core paradigm should be here: do not read what is needed.

To make it short: Be aware of your priorities to leverage speed-ups by using in-memory technology. Not everything has to be in-memory.

Nevertheless, you should first leverage all possible optimizations without using in-memory technology. An inefficient data structure on-disk is not a better structure if it is in-memory. Additionally, you should think about how much data you need and how precise your results need to be. As I wrote in a previous blog post, this can save you a lot of time that you can use to perform further analytic tasks.

We will in the following describe some challenges with in-memory that you need to tackle to be successful with in-memory technologies.

Challenges with in-memory

Memory fragmentation

Problem

Memory fragmentation does not only occur with in-memory technologies, but on any storage. You can have internal fragmentation, where you allocate more memory to an application than needed or external fragmentation, where you deallocate memory, but new data does not fit into the deallocated memory and you have to use additional memory.

However, it can be rather problematic with in-memory technologies because main memory is usually the smallest storage available. In the context of Big Data, where there can be a huge diversity of different data sets that grow from time to time as well as different algorithms that use memory in a different way, this becomes much quicker apparent as if there would be just one static data set that does not change and is always processed the same way.

The issue here with memory fragmentation is that you have less memory than physically available – potentially a lot less. This leads to unexpected performance degradation and the need to spill over to slower disk space to continue the computation, which may lead to thrashing.

You cannot avoid memory fragmentation, because one cannot look into the future when which data set is loaded and what computation is needed.

Solution

As a first step to handle memory fragmentation is to have a list of processes and interactive queries that are regularly executed and to look at them to see any potential issues with memory fragmentation. This can be used during monitoring to be aware of memory fragmentation. One indicator can be that the available memory does not match the memory that should be consumed. Another indicator can be a lot of spills to disk.

There are several strategies to handle identified memory fragmentation. In case of in-memory batch processes, one should release all the memory after the batch process have been executed. Furthermore, one should use distributed Big Data technologies, which usually work with fixed block sizes from the distributed file system layer (e.g. HDFS). In this case you can partially avoid external fragmentation. You can avoid it only partially, because many algorithms have some temporary data or temporary relevant data which needs to be taken into account as well.

If you have interactive analytics, a very common recommendation even by vendors of popular memcache solutions is to restart the cache from time to time and thereby forcing to reload the data in an ordered manner into cache avoiding fragmentation. Of course, once you add, modify, remove data you have again some fragmentation, which will grow over time.

Another similar approach is called compaction, which exist in traditional relational databases and big data systems. Compactation reduces fragmentation that occurs due to updates, deletion and insertion of new data. The key here is that you can gain performance benefits for your users if you schedule it to time where the system is not used. Surprisingly, often people do not look at compaction, although it has significant impact on performance and memory usage. Instead they rely only on non-optimal default settings, which usually not for large scale analytics, but smaller scale OLTP usage. For instance, it can make sense for large scale analytics to schedule compaction after loading all the data and no new data is arriving before the next execution of a batch processing process.

What data should be in-memory? About temperature of data…

The Concept

It is not realistic to have all your data in-memory. This is not only due to memory fragmentation, but also costs for memory, fault-tolerance, backups and many others. Hence, you need an approach to decide which data should be in-memory.

As already described before it is important to know your data priorities. Quite often these priorities change, e.g. new data is introduced, or data simply becomes outdated. Usually it is reasonable to expect that data that is several months or years old will not often be touched, expect for research purposes. Here is where the temperature of data, i.e. hot, warm and cold data comes into play.

Hot data has been used recently quiet frequently and is likely to be used quiet frequently in the near future.

Warm data has been used recently not as frequently as hot data and it is NOT likely to be used frequently in the near future.

Cold data has not been used recently and is not likely to be used in the near future.

Usually hot data resides on CPU caches and mainly on main memory. Warm data resides mainly on local disk drives and only a small fraction in main memory. Cold data resides mostly on external slow storage potentially accessed via the network or in the cloud.

Managing Temperature of Data

The concept of temperature of data applies to batch processes and interactive analytics equally. However, you need to think about what data needs to be kept hot, warm and cold. Ideally this happens automatically. For example many common in-memory system provide the strategy LRU (last recently used) to automatically move hot data to warm data and eventually to cold data and the other way around. For instance, Memcached or SAP HANA support this as a default strategy.

This seems to be a good default strategy, especially if you cannot or do not want to look into more detail about the temperature of data. Indeed, it has also some sound assumptions, since it is based on the principal of locality, which is also key to distributed Big Data processes and many machine learning algorithms.

However, there are alternative strategies to LRU that you may want to think about:

  • Most recently used (MRU): The most recently used memory element is moved to warm and eventually to cold storage. This assumes that there is stable data that is more relevant than having the newest data.
  • Least frequently used (LFU): The data which has been least frequently used will be moved to warm storage and eventually to cold storage. The advantage here is that recently used data that has been only accessed once is quickly moved as well as data which has been accessed quiet frequently, but not in the near past, will stay in-memory.
  • Most frequently used (MFU): The data which has been most frequently used in the past will be moved from warm storage and eventually to cold storage. The idea here is that the more data has been used the less valuable it will be and hence will be accessed much less in the near-future.
  • Any combination of the above

Obviously, the most perfect strategy would predict what data would least be used in the future (“Clearvoyant algorithms”) and move data accordingly to hot, warm, and cold storage. This is of course not exactly possible, but a sound understanding on how people use your Big Data platform can come pretty close to that ideal.

Of course, you can implement also more sophisticated machine learning algorithms that take into account the environment to predict what data and computation will be required in the future given a new task (cf. here for an approach for scheduling multimedia tasks in the cloud based on machine learning algorithms – the use case is different but the general idea the same). Unfortunately, most of the popular Big Data and in-memory solutions do not implement such an approach yet.

How should the data be structured?

Many people, including business people, have only the traditional world of tables, consisting of rows or columns, in mind when using data. In fact, a lot of analysis is based on this assumption. However, while tables are simple they might not be the most efficient way to store data in-memory or even to process it.

In fact, depending on your analysis different formats make sense, such as:

  • Use the correct data type: If you have numbers use a data types that support numbers, such as integer or double. Dates can often be represented as integers. This requires less storage and the cpu can read an integer represented as integer in magnitudes faster than an integer represented as a string. Similarly even with integer, you should select an appropriate size. If your numbers fit into a 32-bit integer then you should prefer storing it as 32-bit instead of 64-bit. This will increase your performance significantly. However, the key message here is store the data with the right data type and use the available ones and understand their advantages as well as limitations.
  • Column-based: Data is stored in columns instead of rows. This is usually beneficial if you need to access one or more full columns of a given data set. Furthermore, it enables one to avoid reading data that is not relevant using storage indexes (min/max) or bloom filters.
  • Graph-based: Data is stored as so-called Adjacency lists or sometimes as Adjacency matrices. This shows much more performance than row-based or column-based storage with respect to graph algorithms, such as strongly connected components, shortest path etc. These algorithms are useful for social network analytics, financial risks of assets sold by different organizations, dependencies between financial assets etc.
  • Tree-based: Data is stored in a tree structure. Trees can be searched usually comparable fast and is often used for database indexes to find out in which data block a row is stored.
  • Search indexes for terms in unstructured text. This is usually useful for matching data objects, which are similar, but do not have unique identifiers. Alternatively, they can be used for sentiment analysis.Traditional database technology shows, for example, a terrible performance for these use cases – even in-memory.
  • Hash-Clustering Indexes: This can be used in columns stores by generating a hash out of the values of several columns for one row. This hash is stored as another column. It can be used for quickly searching for several criteria at the same time by using only one column. This reduces the amount of data to be processed at the expense of additional storage needed.

Furthermore, the same data might be stored in different formats on warm or cold storage, meaning that you have to decide if you want to have redundant data or generate each time from scratch the optimal storage of data for a given computation.

Compression can make sense for data in-memory, because it enables storing more data in-memory instead of slower disk drives.

Unfortunately, contrary to the strategies to manage data temperature, there are currently no mature strategies to support you automatically how to store the data. This is a manual decision and thus requires good knowledge how your Big Data platform is used.

Do we always need in-memory?

With the need for processing large data sets some things became apparent: It is even with new technologies, such as in-memory or Big Data platforms, sometimes very inefficient to process data by looking at all of the data – it is better not to read data at all!

Of course, this means you should not read not-relevant data. For instance, it was very common in traditional databases to read all the rows to find out matching rows according to a query. Even when using indexes, some irrelevant data is read when scanning the index, although storing the index as a tree structure increased search performance already a lot.

More modern solutions use storage indexes and/or bloom filters to decide which rows they need to read. This means they can skip blocks of data where the rows not matching to a query are not contained (cf. here for implementation in Apache Hive).

Similarly, probablistic data structures, such as Hyperloglog or data based on sampling (cf. here) enables one to avoid reading all the data again or at all. In fact, here you can even skip “relevant” data – as long as you read enough data to provide correct results within a small error margin.

Hence, even with in-memory technologies it is always better to avoid reading data. Even if the data is already in-memory, the CPU needs more time the more data it has to process – a simple but often overlooked fact.

The impact of Scheduling: Pre-emption

Once your Big data platform or in-memory platform grows, you will not only get more data, but also more user working on it in parallel. This means if they use interactive queries or schedule Big Data processes then they need to share resources, including memory and CPU. Especially when taking into account speculative execution. As described before, you ideally have a general big picture on what will happen, especially with main memory. However, in peak times, but for some Big Data deployments also most of the time, the resources are not enough, because of cost or other reasons.

This means you need to introduce scheduling according to scheduling policies. We briefly touched this topic before, because the concept of temperature of data implies some kind of scheduling. However, if you have a lot of users the bottleneck is merely the number of processors that process data. Hence, sometimes some analytics by some users are partially interrupted to make some resources free for other users. These users may potentially use different data sets meaning that some data might be moved also from main memory to disk drives. After the interrupted tasks are resumed they may need to reload data from disk drives to memory.

This can make performance experience sometimes unpredictable and you should be aware of it so you can react properly to incidents created by users or do a more informed capacity management.

Big Data technologies for in-memory

In-memory batch processing

There are several in-memory batch processing technologies for Big Data platforms. For example, Apache Spark or Apache Flink. In the beginning, these platform especially Spark, had some drawbacks by representing everything as Java-Objects in memory. This would mean, for instance, a 10 character String would consume 6 times more memory then representing it as an array of bytes.

Luckily this changed and data is now stored in-memory in a columnar fashion supporting also to skip data on disk that is not relevant (via predicate pushdown and an appropriate disk storage format, such as ORC or Parquet).

Additionally, both support graph batch processing and processing of continuous streams in-memory. However, both rely on a common abstraction for a data structure in which they represent other data structures, such as graphs. For example, in case of Spark it is Resilient Distributed Datasets (RDD)/dataframes). This means they have not as much performance as a highly specialized graph engine, but they are more generic and it is possible to integrate them with other data structures. For most of the current use cases it is sufficient.

Additionally, different processing algorithms, mainly in the area of machine learning are supported.

Sometimes you will see that they are also advertised as interactive platforms. However, this is not their core strength, because they do not support, for example, the concept of data temperature automatically, i.e. the developer is fully responsible to take into account hot, warm, cold data or to implement a strategy as described above. Additionally, they do not provide index support for data in-memory, because this is usually much less relevant for batch processes. Hence, if you want to use these technologies for inter-active analysis you have to develop some common IT components and strategies how to address temperature of data and the do not read irrelevant data paradigm.

In any case you have to think about scheduling strategies to optimize your resource usage of your available infrastructure.

Depending on your requirements, in-memory batch processing is not needed in all cases and your big data platform should support both: in-memory batch processes, but also non in-memory batch processes to be efficient. Especially, if your batch process only loads as well processes once the data without re-reading parts of the data then you won’t benefit a lot from in-memory.

Interactive in-memory analytics

There are several technologies enabling interactive in-memory analytics. One of the older – but still highly relevant – ones is memcached for databases. Its original use case was to speed up web applications accessing the database with many user accessing, i.e. writing and reading, in parallel the same data. Similar technologies are also used for Master Data Management (MDM) systems, because they need to deliver and receive data from a lot of sources to different systems as well as business processes with many users. This would be difficult if one relies only on databases.

Other technologies focus on the emerging Big Data Platforms based on Hadoop, but also augment in-memory batch processing engines, such as Spark. For instance, Apache Ignite provides functionality similar to memcached, but also supporting Big Data platforms and in-memory batch processing engines. For example, you can create shared RDDs for Spark. Furthermore, you can cache Hive tables or partitions in-memory. Alternatively, you can use the Ignite DataGrid to cache selected queries in-memory.

These technologies support advanced in-memory indexes (keep in mind: it is always better not to read data!) and automated data temperature management. Another example is Apache Tachyon.

There are also very specialized interactive in-memory analytics engines, such as TitanDB for graphs. TitanDB is based on the Tinkerpop graph stack including the interactive Graph query (or graph traversal) language Gremlin. SAP HANA is a specific in-memory column database for OLTP, OLAP, text-analytics and graph applications. It has been extended to a full application stack cloud platform based on in-memory technology.

Taking into account scheduling is much more tricky with interactive analytics, because one does not know what the users exactly will do and prediction engines of user behavior for interactive analytics are currently nearly non-existing.

However, you can define different categories of interactive analytics (e.g. simple queries, complex queries, machine learning, graphs ….) and determine your infrastructure as well as its configuration based on these categories.

Conclusions

It makes sense to distinguish between in-memory batch processes and in-memory analytics. In-memory batch processes can be planned and scheduled easier in advance. Additionally, one can better optimize resources for this. They also are more focused towards processing all data. Specific technologies for distributed in-memory Big Data exists and are complementary to technologies for interactive in-memory analytics. The main difference are additional indexes and automated support for the concept of data temperature.

Even for in-memory technology the key concept of Big Data to not read data that is not relevant is of high importance. Processing terabytes of data in-memory even though only a subset is relevant is a waste of resources and particularly time. This is specially difficult to handle for interactive in-memory analytics where the user can do what they want. Hence, automated and intelligent mechanisms to support this are highly desirable. They should be preferred to manual developing the right data model and structures.

Another key concept is to have the right data structure in-memory for optimal processing. For instance, graph structures show much more performance in comparison to relational row or column-based structures that need to be joined very often to perform graph algorithms. Furthermore, probabilistic data structures and probabilistic sampling queries have a growing importance. Depending on your needs you might have the same data represented redundant in different data structures for different analysis purposes.

Finally, distinguishing interactive analytics and batch processing is not always that straight forward. For instance, you can have a batch process running 5 minutes, but the results are queried 1000 times and thus avoiding each time 5 minutes run time can be very beneficial. On the other hand you can have an interactive query by one user which takes 60 minutes, but it is only needed by one user once. This may also change over time, so it is important that even after development of a solution that you monitor and audit it regularly by business users and technical users to check if the current approach still make sense or another approach makes more sense. This requires a regular dialogue even after go-live of a Big Data application.

The convergence of both concepts requires more predictive algorithms for managing data in-memory and for queries. These can be seen only in their first stages, but I expect much more over the coming years.