Lambda, Kappa, Microservice and Enterprise Architecture for Big Data

A few years after the emergence of the Lambda-Architecture several new architectures for Big Data have emerged. I will present and illustrate their use case scenarios. These architectures describe IT architectures, but I will describe towards the end of this blog the corresponding Enterprise Architecture artefacts, which are sometimes referred to as Zeta architecture.

Lambda Architecture

I have blogged before about the Lambda-Architecture. Basically this architecture consists of three layers:

  • Batch-Layer: This layer executes long-living batch-processes to do analyses on larger amounts of historical data. The scope is data from several hours to weeks up to years. Here, usually Hadoop MapReduce, Hive, Pig, Spark or Flink are used together with orchestration tools, such as Oozie or Falcon.

  • Speed-Layer/Stream Processing Layer: This layer executes (small/”mini”) batch-processes on data according to a time window (e.g. 1 minute) to do analyses on larger amounts of current data. The scope is data from several seconds up to several hours. Here one may use, for example, Flink, Spark or Storm.

  • Serving Layer: This layer combines the results from the batch and stream processing layer to enable fast interactive analyses by users. This layer leverages usually relational databases, but also NoSQL databases, such as Graph databases (e.g. TitanDB or Neo4J), Document databases (e.g. MongoDB, CouchDB), Column-Databases (e.g. Hbase), Key-Value Stores (e.g. Redis) or Search technologies (e.g. Solr). NoSQL databases provide for certain use cases more adequate and better performing data structures, such as graphs/trees, hash maps or inverse indexes.

In addition, I proposed the long-term storage layer to have an even cheaper storage for data that is hardly accessed, but may be accessed eventually. All layers are supported by a distributed file system, such as HDFS, to store and retrieve data. A core concept is that computation is brought to data (cf. here). On the analysis side, usually standard machine learning algorithms, but also on-line machine learning algorithms, are used.

As you can see, the Lambda-Architecture can be realized using many different software components and combinations thereof.

While the Lambda architecture is a viable approach to tackle Big Data challenges different other architectures have emerged especially to focus only on certain aspects, such as data stream processing, or on integrating it with cloud concepts.

Kappa Architecture

The Kappa Architecture focus solely on data stream processing or “real-time” processing of “live” discrete events. Examples are events emitted by devices from the Internet of Things (IoT), social networks, log files or transaction processing systems. The original motivation was that the Lambda Architecture is too complex if you only need to do event processing.

The following assumptions exists for this architecture:

  • You have a distributed ordered event log persisted to a distributed file system, where stream processing platforms can pick up the events

  • Stream processing platforms can (re-)request events from the event log at any position. This is needed in case of failures or upgrades to the stream processing platform.

  • The event log is potentially large (several Terabytes of data / hour)

  • Mostly online machine learning algorithms are applied due to the constant delivery of new data, which is more relevant than the old already processed data

Technically, the Kappa architecture can be realized using Apache Kafka for managing the data-streams, i.e. providing the distributed ordered event log. Apache Samza enables Kafka to store the event log on HDFS for fault-tolerance and scalability. Examples for stream processing platforms are Apache Flink, Apache Spark Streaming or Apache Storm. The serving layer can in principle use the same technologies as I described for the serving layer in the Lambda Architecture.

There are some pitfalls with the Kappa architecture that you need to be aware of:

  • End to end ordering of events: While technologies, such as Kafka can provide the events in an ordered fashion it relies on the source system that these events are indeed delivered in an ordered fashion. For instance, I had the case that a system in normal operations was sending the events in order, but in case of errors of communication this was not the case, because it stored the events it could not send and retransmitted them at a certain point later. Meanwhile if the communication was established again it send the new events. The source system had to be adapted to handle these situations correctly. Alternatively, you can only ensure a partial ordering using vector clocks or similar implemented at the event log or stream processing level.

  • Delivery paradigms on how the events are delivered (or fetched from) to the stream processing platform

    • At least once: The same event is guaranteed to be delivered once, but the same events might be delivered twice or more due to processing errors or communication/operation errors within Kafka. For instance, the stream processing platform might crash before it can marked events as processed although it has processed them before. This might have undesired side effects, e.g. the same event that “user A liked website W” is counted several times.

    • At most once: The event will be delivered at most once (this is the default Kafka behavior). However, it might also get lost and not be delivered. This could have undesired side effects, e.g. the event “user A liked website W” is not taken into account.

    • Once and only once: The event is guaranteed to be delivered once and only once. This means it will not get lost or delivered twice or more times. However, this is not simply a combination of the above scenarios. Technically you need to make sure in a multi-threaded distributed environment that an event is processed exactly once. This means the same event needs to be (1) only be processed by one sequential process in the stream processing platforms (2) all other processes related to the events need to be made aware that one of them already processes the event. Both features can be implemented using distributed system techniques, such as semaphores or monitors. They can be realized using distributed cache systems, such as Ignite, Redis or to a limited extent ZooKeeper. Another simple possibility would be a relational database, but this would quickly not scale with large volumes.
      • Needles to say: The source system must also make sure that it delivers the same event once and only once to the ordered event log.

  • Online machine learning algorithms constantly change the underlying model to adapt it to new data. This model is used by other applications to make predictions (e.g. predicting when a machine has to go into maintenance). This also means that in case of failure we may temporary have an outdated or simply a wrong model (e.g. in case of at least once or at most once delivery). Hence, the applications need to incorporate some business logic to handle this (e.g do not register a machine twice for maintenance or avoid permanently registering/unregistering a machine for maintenance)

Although technologies, such as Kafka can help you with this, it requires a lot of thinking and experienced developers as well as architects to implement such a solution. The batch-processing layer of the Lambda architecture can somehow mitigate the aforementioned pitfalls, but it can be also affected by them.

Last but not least, although the Kappa Architecture seems to be intriguing due to its simplification in comparison to the Lambda architecture, not everything can be conceptualized as events. For example, company balance sheets, end of the month reports, quarterly publications etc. should not be forced to be represented as events.

Microservice Architecture for Big Data

The Microservice Architecture did not originate in the Big Data movement, but is slowly picked up by it. It is not a precisely defined style, but several common aspects exist. Basically it is driven by the following technology advancements:

  • The implementation of applications as services instead of big monoliths
  • The emergence of software containers to deploy each of those services in isolation of each other. Isolation means that they are put in virtual environments sharing the same operating systems (i.e. they are NOT in different virtual machines), they are connected to each other via virtualized networks and virtualized storage. These containers leverage much better the available resources than virtual machines.
    • Additionally the definition of repositories for software containers, such as the Docker registry, to quickly version, deploy, upgrade dependent containers and test upgraded containers.
  • The deployment of container operating systems, such as CoreOS, Kubernetes or Apache Mesos, to efficiently manage software containers, manage their resources, schedule them to physical hosts and dynamically scale applications according to needs.
  • The development of object stores, such as OpenStack Swift, Amazon S3 or Google Cloud Storage. These object stores are needed to store data beyond the lifecycle of a software container in a highly dynamic cloud or scaling on-premise environment.
  • The DevOps paradigm – especially the implementation of continuous integration and delivery processes with automated testing and static code analysis to improve software quality. This also includes quick deliveries of individual services at any time independently of each other into production.

An example of the Microservice architecture is the Amazon Lambda platform (not to be confused with Lambda architecture) and related services provided by Amazon AWS.

Nevertheless, the Microservice Architecture poses some challenges for Big Data architectures:

  • What should be a service: For instance, you have Apache Spark or Apache Flink that form a cluster to run your application. Should you have for each application on them a dedicated cluster out of software container or should you provide a shared cluster of software containers. It can make sense to have the first solution, e.g. a dedicated cluster per application due to different scaling and performance needs of the application.
  • The usage of object stores. Object stores are needed as a large scale dynamically scalable storage that is shared among containers. However, currently there are some issues, such as performance and consistency models (“eventually consistent”). Here, the paradigm of “Bring Computation to Data” (cf. here) is violated. Nevertheless, this can be mitigated either by using HDFS as a temporal file system on the containers and fetching the data beforehand from the object store or use an in-memory caching solution, such as provided by Apache Ignite or to some extend Apache Spark or Apache Flink.

I see that in these environments the role of software defined networking (SDN) will become crucial not only in cloud data centers, but also on-premise data centers. SDN (which should NOT be confused with virtualized networks) enables centrally controlled intelligent routing of network flows as it is needed in dynamically scaling platforms as required by the Microservice architecture. The old decentralized definition of the network, e.g. in form of decentralized routing, does simply not scale here to enable optimal performance.

Conclusion

I presented here several architectures for Big Data that emerged recently. Although they are based on technologies that are already several years old, I observe that many organizations are overwhelmed with these new technologies and have some issues to adapt and fully leverage them. This has several reasons.

One tool to manage this could be a proper Enterprise Architecture Management. While there are many benefits of Enterprise Architecture Management, I want to highlight the benefit of managed of managed evolution. This paradigm enables to align business and IT, although there is a constant independent (and dependent) change of both with not necessarily aligned goals as illustrated in the following picture.

enterprise-architecture-managed-evolution

As you can see from the picture both are constantly diverging and Enterprise Architecture Management is needed to unite them again.

However, reaching managed evolution of Enteprise Architecture requires usually many years and business as well as IT commitment to it. Enterprise Architecture for Big Data is a relatively new concept, which is still subject to change. Nevertheless some common concepts can be identifed. Some people refer to Enterprise Architecture for Big Data also as Zeta Architecture and it does not only encompass Big Data processing, but in context of Microservice architecture also web servers providing the user interface for analytics (e.g. Apache Zeppelin) and further management workflows, such as backup or configuration, deployed in form of containers.

This enterprise architecture for Big Data describes some integrated patterns for Big Data and Microservices so that you can consistently document and implement your Lambda, Kappa, Microservice architecture or a mixture of them. Examples for artefacts of such an enterprise architecture are (cf. also here):

  • Global Resource Management to manage the physical and virtualized resources as well as scaling them (e.g. Apache Mesos and Software Defined Networking)

  • Container Management to deploy and isolate containers (e.g. Apache Mesos)

  • Execution engines to manage different processing engines, such as Hadoop MapReduce, Apache Spark , Apache Flink or Oozie

  • Storage Management to provide Object Storage (e.g. Openstack Swift), Cache Storage (e.g. Ignite HDFS Cache), Distributed Filesystem (e.g. HDFS) and Distributed Ordered Event Log (e.g. Kafka)

  • Solution architecture for one or more services that address one or more business problems. It should be separated from the enterprise architecture, which is focusing more on the strategic global picture. It can articulate a Lambda, Kappa, Microservice architecture or mixture of them.

  • Enterprise applications describe a set of services (including user interfaces)/containers to solve a business problem including appropriate patterns for Polyglot Persistence to provide the right data structure, such as graph, columnar or hash map, for enabling interactive and fast analytics for the users based on SQL and NoSQL databases (see above)

  • Continuous Delivery that describe how Enterprise applications are delivered to production ensuring the quality of them (e.g. Jenkins, Sonarqube, Gradle, Puppet etc).

Batch-processing & Interactive Analytics for Big Data – the Role of in-Memory

In this blog post I will discuss various aspects of in-memory technologies and describe how various Big Data technologies fit into this context.

Especially, I will focus on the difference between in-memory batch analytics and interactive in-memory analytics. Additionally, I will illustrate when in-memory technology is really beneficial. In-memory technology leverages the fast main memory and processor caches to deliver superior performance.

While initially the deployment of in-memory technology seems to be attractive, companies have to carefully design how they use the scarce resource memory for big data sets, because the amount of data tends to grow when the company masters successfully Big Data technologies. For instance, they need to think about the issue of memory fragmentation, scheduling, capacity management, the decision how the data should be structured in-memory or making the decision about what data should be represented in-memory.

I will explain that some paradigms introduced for non-in-memory analytics, such as the paradigm that it is better if you do not need to read data than reading it all, is still very valid for in-memory technologies.

Finally, I will give an outlook on current Big Data technologies and their strength and weaknesses with respect to in-memory batch analytics and interactive in-memory analytics.

The Concept of In-Memory

The concept of in-memory became more and more popular around 2007/2008, although the fundamental concepts behind it exist since decades. It was marketed quite heavily by SAP and its HANA in-memory database at this time.

Around the same time, a different paradigm appeared, the concept of distributed Big Data platforms.

In the beginning, both were rather disconnected, where in-memory technologies relied on one “big” machine and distributed data platforms consisted out of a huge set of different more commodity-like machines. In-memory was at this time often associated with interactive queries with fast responses for comparable small datasets fitting in-memory on machine and Big Data platforms for long-running analytics queries crunching large volumes of data scattered over several nodes.

This changed recently. Particularly, in-memory techniques have been brought to long-running analytics queries and distributed Big Data platforms to interactive analytics. The assumed benefit for both cases is that more data can be handled in more complex ways in a shorter time.

However, you need to carefully look what kind of business benefits you can gain from doing faster or more analytics.

Public sector organizations over various domains have significant benefits, because their “ROI” is usually measured in non-monetary terms as benefits for society. A faster, fair and more transparent or scientifically correct analysis can be one example of such a benefit. Additionally, supervision of the private sector need to be on the same level as the private sector.

Traditional private sector organizations on the other hand will have to invent new business models and convince the customer. Here, new machine learning algorithms on large data volumes are more beneficial in comparison of traditional data warehouse reports. Internet Industries including the Internet of Things and autonomous robots obviously have some benefits let it be the processing of large data volumes and/or the need to react quickly to events in the real world.

The Difference between in-memory batch processing and interactive analytics

Often people wonder why there is still a difference between batch processing and interactive analytics when using in-memory. In order to answer this question let us quickly recap the difference between the two:

  • Distributed big data processes: They are long-running because they need to query data residing on several nodes and/or calculations are very complex requiring a lot of computing power. Usually they make calculation/processed data available in a suitable format for interactive analytics. Usually, these processes are planned and scheduled in advance.
  • Interactive analytics: These are often ad-hoc queries from low to very high complexity. Usually it is expected that they return results within seconds or minutes. However, they can also take much longer and are then candidate for distributed big data processes, so that results are precomputed and stored for interactive analytics. Interactive analytics go beyond standard tables to return results faster.

The results of them can be either used by humans or by other applications, e.g. applications that require prediction to provide an automated service to human beings. Basically both approaches fit to the Lambda architecture paradigm.

In-memory technologies can basically speed up both approaches. However, you need to carefully evaluate your business case for this. For example, it make sense to speed up your Big Data batch processes to finish before your people start working or to have more time to do perform additional processes on the same resources – This is particularly interesting if you have a plethora of different large datasets where different analytics can make sense. With respect to interactive analytics, you benefit most if you have specific analytics algorithms that benefit from memory locality, e.g. iterative machine learning algorithms.

If you have people working on large tables using aggregations then you should make them aware that it make more sense to work with samples, in-memory indexes and data structures as well as high parallelism. Aggregating data of a large table in-memory is very costly and the speed difference to tables on disk is most likely not much. The core paradigm should be here: do not read what is needed.

To make it short: Be aware of your priorities to leverage speed-ups by using in-memory technology. Not everything has to be in-memory.

Nevertheless, you should first leverage all possible optimizations without using in-memory technology. An inefficient data structure on-disk is not a better structure if it is in-memory. Additionally, you should think about how much data you need and how precise your results need to be. As I wrote in a previous blog post, this can save you a lot of time that you can use to perform further analytic tasks.

We will in the following describe some challenges with in-memory that you need to tackle to be successful with in-memory technologies.

Challenges with in-memory

Memory fragmentation

Problem

Memory fragmentation does not only occur with in-memory technologies, but on any storage. You can have internal fragmentation, where you allocate more memory to an application than needed or external fragmentation, where you deallocate memory, but new data does not fit into the deallocated memory and you have to use additional memory.

However, it can be rather problematic with in-memory technologies because main memory is usually the smallest storage available. In the context of Big Data, where there can be a huge diversity of different data sets that grow from time to time as well as different algorithms that use memory in a different way, this becomes much quicker apparent as if there would be just one static data set that does not change and is always processed the same way.

The issue here with memory fragmentation is that you have less memory than physically available – potentially a lot less. This leads to unexpected performance degradation and the need to spill over to slower disk space to continue the computation, which may lead to thrashing.

You cannot avoid memory fragmentation, because one cannot look into the future when which data set is loaded and what computation is needed.

Solution

As a first step to handle memory fragmentation is to have a list of processes and interactive queries that are regularly executed and to look at them to see any potential issues with memory fragmentation. This can be used during monitoring to be aware of memory fragmentation. One indicator can be that the available memory does not match the memory that should be consumed. Another indicator can be a lot of spills to disk.

There are several strategies to handle identified memory fragmentation. In case of in-memory batch processes, one should release all the memory after the batch process have been executed. Furthermore, one should use distributed Big Data technologies, which usually work with fixed block sizes from the distributed file system layer (e.g. HDFS). In this case you can partially avoid external fragmentation. You can avoid it only partially, because many algorithms have some temporary data or temporary relevant data which needs to be taken into account as well.

If you have interactive analytics, a very common recommendation even by vendors of popular memcache solutions is to restart the cache from time to time and thereby forcing to reload the data in an ordered manner into cache avoiding fragmentation. Of course, once you add, modify, remove data you have again some fragmentation, which will grow over time.

Another similar approach is called compaction, which exist in traditional relational databases and big data systems. Compactation reduces fragmentation that occurs due to updates, deletion and insertion of new data. The key here is that you can gain performance benefits for your users if you schedule it to time where the system is not used. Surprisingly, often people do not look at compaction, although it has significant impact on performance and memory usage. Instead they rely only on non-optimal default settings, which usually not for large scale analytics, but smaller scale OLTP usage. For instance, it can make sense for large scale analytics to schedule compaction after loading all the data and no new data is arriving before the next execution of a batch processing process.

What data should be in-memory? About temperature of data…

The Concept

It is not realistic to have all your data in-memory. This is not only due to memory fragmentation, but also costs for memory, fault-tolerance, backups and many others. Hence, you need an approach to decide which data should be in-memory.

As already described before it is important to know your data priorities. Quite often these priorities change, e.g. new data is introduced, or data simply becomes outdated. Usually it is reasonable to expect that data that is several months or years old will not often be touched, expect for research purposes. Here is where the temperature of data, i.e. hot, warm and cold data comes into play.

Hot data has been used recently quiet frequently and is likely to be used quiet frequently in the near future.

Warm data has been used recently not as frequently as hot data and it is NOT likely to be used frequently in the near future.

Cold data has not been used recently and is not likely to be used in the near future.

Usually hot data resides on CPU caches and mainly on main memory. Warm data resides mainly on local disk drives and only a small fraction in main memory. Cold data resides mostly on external slow storage potentially accessed via the network or in the cloud.

Managing Temperature of Data

The concept of temperature of data applies to batch processes and interactive analytics equally. However, you need to think about what data needs to be kept hot, warm and cold. Ideally this happens automatically. For example many common in-memory system provide the strategy LRU (last recently used) to automatically move hot data to warm data and eventually to cold data and the other way around. For instance, Memcached or SAP HANA support this as a default strategy.

This seems to be a good default strategy, especially if you cannot or do not want to look into more detail about the temperature of data. Indeed, it has also some sound assumptions, since it is based on the principal of locality, which is also key to distributed Big Data processes and many machine learning algorithms.

However, there are alternative strategies to LRU that you may want to think about:

  • Most recently used (MRU): The most recently used memory element is moved to warm and eventually to cold storage. This assumes that there is stable data that is more relevant than having the newest data.
  • Least frequently used (LFU): The data which has been least frequently used will be moved to warm storage and eventually to cold storage. The advantage here is that recently used data that has been only accessed once is quickly moved as well as data which has been accessed quiet frequently, but not in the near past, will stay in-memory.
  • Most frequently used (MFU): The data which has been most frequently used in the past will be moved from warm storage and eventually to cold storage. The idea here is that the more data has been used the less valuable it will be and hence will be accessed much less in the near-future.
  • Any combination of the above

Obviously, the most perfect strategy would predict what data would least be used in the future (“Clearvoyant algorithms”) and move data accordingly to hot, warm, and cold storage. This is of course not exactly possible, but a sound understanding on how people use your Big Data platform can come pretty close to that ideal.

Of course, you can implement also more sophisticated machine learning algorithms that take into account the environment to predict what data and computation will be required in the future given a new task (cf. here for an approach for scheduling multimedia tasks in the cloud based on machine learning algorithms – the use case is different but the general idea the same). Unfortunately, most of the popular Big Data and in-memory solutions do not implement such an approach yet.

How should the data be structured?

Many people, including business people, have only the traditional world of tables, consisting of rows or columns, in mind when using data. In fact, a lot of analysis is based on this assumption. However, while tables are simple they might not be the most efficient way to store data in-memory or even to process it.

In fact, depending on your analysis different formats make sense, such as:

  • Use the correct data type: If you have numbers use a data types that support numbers, such as integer or double. Dates can often be represented as integers. This requires less storage and the cpu can read an integer represented as integer in magnitudes faster than an integer represented as a string. Similarly even with integer, you should select an appropriate size. If your numbers fit into a 32-bit integer then you should prefer storing it as 32-bit instead of 64-bit. This will increase your performance significantly. However, the key message here is store the data with the right data type and use the available ones and understand their advantages as well as limitations.
  • Column-based: Data is stored in columns instead of rows. This is usually beneficial if you need to access one or more full columns of a given data set. Furthermore, it enables one to avoid reading data that is not relevant using storage indexes (min/max) or bloom filters.
  • Graph-based: Data is stored as so-called Adjacency lists or sometimes as Adjacency matrices. This shows much more performance than row-based or column-based storage with respect to graph algorithms, such as strongly connected components, shortest path etc. These algorithms are useful for social network analytics, financial risks of assets sold by different organizations, dependencies between financial assets etc.
  • Tree-based: Data is stored in a tree structure. Trees can be searched usually comparable fast and is often used for database indexes to find out in which data block a row is stored.
  • Search indexes for terms in unstructured text. This is usually useful for matching data objects, which are similar, but do not have unique identifiers. Alternatively, they can be used for sentiment analysis.Traditional database technology shows, for example, a terrible performance for these use cases – even in-memory.
  • Hash-Clustering Indexes: This can be used in columns stores by generating a hash out of the values of several columns for one row. This hash is stored as another column. It can be used for quickly searching for several criteria at the same time by using only one column. This reduces the amount of data to be processed at the expense of additional storage needed.

Furthermore, the same data might be stored in different formats on warm or cold storage, meaning that you have to decide if you want to have redundant data or generate each time from scratch the optimal storage of data for a given computation.

Compression can make sense for data in-memory, because it enables storing more data in-memory instead of slower disk drives.

Unfortunately, contrary to the strategies to manage data temperature, there are currently no mature strategies to support you automatically how to store the data. This is a manual decision and thus requires good knowledge how your Big Data platform is used.

Do we always need in-memory?

With the need for processing large data sets some things became apparent: It is even with new technologies, such as in-memory or Big Data platforms, sometimes very inefficient to process data by looking at all of the data – it is better not to read data at all!

Of course, this means you should not read not-relevant data. For instance, it was very common in traditional databases to read all the rows to find out matching rows according to a query. Even when using indexes, some irrelevant data is read when scanning the index, although storing the index as a tree structure increased search performance already a lot.

More modern solutions use storage indexes and/or bloom filters to decide which rows they need to read. This means they can skip blocks of data where the rows not matching to a query are not contained (cf. here for implementation in Apache Hive).

Similarly, probablistic data structures, such as Hyperloglog or data based on sampling (cf. here) enables one to avoid reading all the data again or at all. In fact, here you can even skip “relevant” data – as long as you read enough data to provide correct results within a small error margin.

Hence, even with in-memory technologies it is always better to avoid reading data. Even if the data is already in-memory, the CPU needs more time the more data it has to process – a simple but often overlooked fact.

The impact of Scheduling: Pre-emption

Once your Big data platform or in-memory platform grows, you will not only get more data, but also more user working on it in parallel. This means if they use interactive queries or schedule Big Data processes then they need to share resources, including memory and CPU. Especially when taking into account speculative execution. As described before, you ideally have a general big picture on what will happen, especially with main memory. However, in peak times, but for some Big Data deployments also most of the time, the resources are not enough, because of cost or other reasons.

This means you need to introduce scheduling according to scheduling policies. We briefly touched this topic before, because the concept of temperature of data implies some kind of scheduling. However, if you have a lot of users the bottleneck is merely the number of processors that process data. Hence, sometimes some analytics by some users are partially interrupted to make some resources free for other users. These users may potentially use different data sets meaning that some data might be moved also from main memory to disk drives. After the interrupted tasks are resumed they may need to reload data from disk drives to memory.

This can make performance experience sometimes unpredictable and you should be aware of it so you can react properly to incidents created by users or do a more informed capacity management.

Big Data technologies for in-memory

In-memory batch processing

There are several in-memory batch processing technologies for Big Data platforms. For example, Apache Spark or Apache Flink. In the beginning, these platform especially Spark, had some drawbacks by representing everything as Java-Objects in memory. This would mean, for instance, a 10 character String would consume 6 times more memory then representing it as an array of bytes.

Luckily this changed and data is now stored in-memory in a columnar fashion supporting also to skip data on disk that is not relevant (via predicate pushdown and an appropriate disk storage format, such as ORC or Parquet).

Additionally, both support graph batch processing and processing of continuous streams in-memory. However, both rely on a common abstraction for a data structure in which they represent other data structures, such as graphs. For example, in case of Spark it is Resilient Distributed Datasets (RDD)/dataframes). This means they have not as much performance as a highly specialized graph engine, but they are more generic and it is possible to integrate them with other data structures. For most of the current use cases it is sufficient.

Additionally, different processing algorithms, mainly in the area of machine learning are supported.

Sometimes you will see that they are also advertised as interactive platforms. However, this is not their core strength, because they do not support, for example, the concept of data temperature automatically, i.e. the developer is fully responsible to take into account hot, warm, cold data or to implement a strategy as described above. Additionally, they do not provide index support for data in-memory, because this is usually much less relevant for batch processes. Hence, if you want to use these technologies for inter-active analysis you have to develop some common IT components and strategies how to address temperature of data and the do not read irrelevant data paradigm.

In any case you have to think about scheduling strategies to optimize your resource usage of your available infrastructure.

Depending on your requirements, in-memory batch processing is not needed in all cases and your big data platform should support both: in-memory batch processes, but also non in-memory batch processes to be efficient. Especially, if your batch process only loads as well processes once the data without re-reading parts of the data then you won’t benefit a lot from in-memory.

Interactive in-memory analytics

There are several technologies enabling interactive in-memory analytics. One of the older – but still highly relevant – ones is memcached for databases. Its original use case was to speed up web applications accessing the database with many user accessing, i.e. writing and reading, in parallel the same data. Similar technologies are also used for Master Data Management (MDM) systems, because they need to deliver and receive data from a lot of sources to different systems as well as business processes with many users. This would be difficult if one relies only on databases.

Other technologies focus on the emerging Big Data Platforms based on Hadoop, but also augment in-memory batch processing engines, such as Spark. For instance, Apache Ignite provides functionality similar to memcached, but also supporting Big Data platforms and in-memory batch processing engines. For example, you can create shared RDDs for Spark. Furthermore, you can cache Hive tables or partitions in-memory. Alternatively, you can use the Ignite DataGrid to cache selected queries in-memory.

These technologies support advanced in-memory indexes (keep in mind: it is always better not to read data!) and automated data temperature management. Another example is Apache Tachyon.

There are also very specialized interactive in-memory analytics engines, such as TitanDB for graphs. TitanDB is based on the Tinkerpop graph stack including the interactive Graph query (or graph traversal) language Gremlin. SAP HANA is a specific in-memory column database for OLTP, OLAP, text-analytics and graph applications. It has been extended to a full application stack cloud platform based on in-memory technology.

Taking into account scheduling is much more tricky with interactive analytics, because one does not know what the users exactly will do and prediction engines of user behavior for interactive analytics are currently nearly non-existing.

However, you can define different categories of interactive analytics (e.g. simple queries, complex queries, machine learning, graphs ….) and determine your infrastructure as well as its configuration based on these categories.

Conclusions

It makes sense to distinguish between in-memory batch processes and in-memory analytics. In-memory batch processes can be planned and scheduled easier in advance. Additionally, one can better optimize resources for this. They also are more focused towards processing all data. Specific technologies for distributed in-memory Big Data exists and are complementary to technologies for interactive in-memory analytics. The main difference are additional indexes and automated support for the concept of data temperature.

Even for in-memory technology the key concept of Big Data to not read data that is not relevant is of high importance. Processing terabytes of data in-memory even though only a subset is relevant is a waste of resources and particularly time. This is specially difficult to handle for interactive in-memory analytics where the user can do what they want. Hence, automated and intelligent mechanisms to support this are highly desirable. They should be preferred to manual developing the right data model and structures.

Another key concept is to have the right data structure in-memory for optimal processing. For instance, graph structures show much more performance in comparison to relational row or column-based structures that need to be joined very often to perform graph algorithms. Furthermore, probabilistic data structures and probabilistic sampling queries have a growing importance. Depending on your needs you might have the same data represented redundant in different data structures for different analysis purposes.

Finally, distinguishing interactive analytics and batch processing is not always that straight forward. For instance, you can have a batch process running 5 minutes, but the results are queried 1000 times and thus avoiding each time 5 minutes run time can be very beneficial. On the other hand you can have an interactive query by one user which takes 60 minutes, but it is only needed by one user once. This may also change over time, so it is important that even after development of a solution that you monitor and audit it regularly by business users and technical users to check if the current approach still make sense or another approach makes more sense. This requires a regular dialogue even after go-live of a Big Data application.

The convergence of both concepts requires more predictive algorithms for managing data in-memory and for queries. These can be seen only in their first stages, but I expect much more over the coming years.

The Lambda Architecture for Big Data in your Enterprise

I will present in this blog post the Lambda architecture for Big Data. This architecture is about integrating historical Big Data with “live” streaming Big Data. Afterwards, the concept of a large data lake in your enterprise or amongst enterprises in a B2B scenario is explained. This data lake – based on the lambda architecture – can replace a service oriented architecture (SOA), because it is easier to implement and manage for large data volumes in a variety of formats. Hence, a plethora of use cases arises. Finally, I will discuss how this architecture can be implemented using various open source software technologies based on the Hadoop Ecosystem.

The Lambda Architecture

Big Data has become an increasing popular topic over the last years. Big Data is about processing large volumes of data in a variety of formats taken into account live  streaming or historical data. One large computing cluster is used to store and process all of one or more companies’ data.

Internet companies, such as Google, Yahoo or Facebook, are driven by new business models for which existing technology was not suitable. This led to the development of new technologies known under the common umbrella of NoSQL. Furthermore, there has been the need to integrate them in a flexible standardized architecture to enable Big Data. The lambda architecture is such an architecture and has been coined recently by Nathan Marz and James Warren.

It has the following key features:

  • Standardized fault-tolerant distributed file system that spawns across the whole cluster – this file system is the base of the data lake that I will explain later.
  • A batch processing layer for processing large amounts of historical data stored in the computing cluster
  • A serving layer for providing fast access to results of batch processed data
  • A real-time processing layer (or “speed layer”) for “live” processing of data streams, such as sensor data or stock market data
  • A long term storage layer optimized for extremely cheap storage of data that is rarely used (e.g. for legal reasons). Usually you do not find this in other articles describe lambda architecture, but I think it is an important feature to highlight. Here you have very old data (more than multiple years) that you do not need in your day to day business – you can store them on very cheap hardware with a lot of disk space but much less computing power and memory capacity.

These features are not new and have been addressed partly also by other architectures known in other domains, such as Business Intelligence, Complex Event Processing, Data Warehouse or Master Data Management. However, the lambda architecture addresses them in context of huge data volumes, diversity of data formats (polyglot persistence) and integrates them all in one architecture.

The term “lambda” stems from the following function used for doing analytics in context of Big data:

query = λ(all data) = λ (live streaming data) * λ (historical data)

Basically it say that all analytics functions λ combining live streaming data and historical data can be computed on systems implementing the lambda architecture. I will later discuss the implication of this for the implementation of the architecture.

The lambda architecture is illustrated in the following figure

lambdaarchitecture

The lambda architecture provides the data scientist means and tools to analyze any data occuring in the company, whereby tools can be easily plugged into the architecture without requiring later major implementation efforts.

Machine learning components can autonomously leverage the lambda architecture to do prediction and automatically implement actions. This is known as predictive and prescriptive analytics.

Data Lake

One of the most interesting aspect of the lambda architecture is that you have a cluster of nearly unlimited storage and memory capacity. You can have even an in-memory database with a memory capacity on the terabyte to petabyte scale distributed over the whole cluster. Popular open source frameworks, such as Hadoop, allow you to use commodity hardware, so that deploying such an architecture can be relatively cheap and they have already built-in fault-tolerance, so that developers do not need to mess around with it.

With such a large cluster you can create a big data lake in your company (see next figure). Basically all your data ends up in this cluster and all applications including the one in the cloud can share it via simple file system access mechanisms and you can use the computing power of the whole cluster to do analysis. Needless to say that you save a lot of money, because you save a lot of redundant  ETL processes, which all have to be made fault-proof and interact with different systems. Modern Big Data architectures take care of this for you.

datalake

Finally, exchanging data becomes much easier than in a Service-oriented Architecture (SOA), where you need to design interfaces and implement services – here every application simply access the distributed file system in the cluster.

Implementing a Lambda Architecture

There are several things to consider when you implement the lambda architecture. Firstly, you can choose from a variety of components to implement it. For instance, on the open source side Apache Hadoop / Apache Spark is very popular which is used by many companies including all popular Internet companies, such as Facebook or Google. You can also use other open source components, such as Apache Cassandra for batch processing and Twitter Storm for Stream processing. Additionally, you can also use commercial tools, such as SAP HANA Cloud platform. Finally, you can put your lambda architecture completely on-premise, completely in the cloud (see my example with Amazon Elastic Map Reduce, which partly implements a Lambda Architecture) or have some kind of hybrid model. In the following I will describe an implementation using Apache Hadoop and additional tools that can integrate with Apache Hadoop.

Software Components

You can use the following components for implementing the lambda architecture.

  • Standardized fault-tolerant distributed file system: Hadoop Distributed Filesystem (HDFS). You can use also other distributed file systems. The choice of the file system is transparent to the application, i.e. they won’t need to use different APIs for different file systems. Most of the time you will be fine with HDFS, but, for example, cloud providers, such as Amazon, may implement their own that fits to their infrastructure.
  • Batch Processing layer: Here you can use Hadoop Yarn, which is responsible for distributing Big Data Analytics jobs, such as map reduce jobs. Yarn allows you even to “containerize” your jobs, i.e. define CPU, memory and network limitations across the big data cluster for a specific job. This allows you to do proper capacity management – one of the most important aspects of a lambda architecture. If you need in-memory batch processing then you should check out Apache Spark. If you want to have a more generic job control, i.e. because you have other distributed applications around your cluster , not based on the MapReduce paradigm, you can use Apache Mesos.
  • Serving layer: The serving layer provides fast access and advanced query mechanisms for results of batch jobs. Here you can use typical Big Data databases and data warehouses, such as Apache Hbase or Apache Shark (for in-memory access). You will probably have multiple different technologies here according to the polyglot persistence NoSQL paradigm. They offer typical interfaces, such as JDBC or ODBC, to integrate with any application.
  • Real-time processing layer: Although Hadoop can process streaming data, most of the time you will choose a software component supporting complex event processing of live streaming data across your cluster, such as Apache Spark Streaming or Twitter Storm.
  • The long term persistence layer is mostly a hardware choice: Here you need a lot of cheap hard disk space, e.g. by not using SSD flash drives, and little computing / memory power. It is usually a separate cluster connected to the other cluster and it leverages the fault tolerance features of HDFS, such as automated replication of data to several nodes and re-replication in case of node failures.

Furthermore, you can have a lot of other software components that automatically build on the aforementioned core technologies, such as Apache Hive or Apache Shark, a Data Warehouse for Hadoop, or Apache Oozie, which is a workflow tool for complex ETL processes distributed over your data lake.

As mentioned before, there is a wide variety of alternatives that you can use to implement the lambda architecture. The standardized fault-tolerant distributed file system is most of the time the base for everything and you can also gradually evolve your architecture and implement it using different components.

Delivery Pipeline

I briefly described before that capacity management is an important part of the lambda architecture. You need to define how big data jobs are programmed and tested as well how they get into the cluster. I expect that in the future not only programmers, but also business people, such as data scientist will need to load big data jobs in your cluster. This means you will need to (1) properly define your delivery pipeline (2) implement and enforce proper capacity management and (3) have a bullet-proof dependency management for different software versions in your cluster.

Luckily by using Apache Yarn or Apache Mesos together with a cluster monitoring software, such as Ganglia, you can do proper capacity management.

Recently, more tools, such as Docker, using advanced virtualization features of the Linux kernel (cgroups) have emerged making capacity management even more easier and flexible. These technologies also have built-in dependency management to avoid a library/versioning hell. Google developed an open source scheduling system, called Kubernetes for them.

Combining Stream-Processing and Batch Data

One core goal of the lambda architecture is to integrate live streaming and batch processing. In fact, most of the recent articles on lambda architecture are just about providing both as software components. However, you will also need to integrate this on the query level, because complex event processing queries are a little bit different from batch processing queries.

Spark Streaming demonstrates how you can join historical data with stream processed data at the same time.

Hardware Components

Hardware considerations for a lambda architecture have – if at all – only been briefly discussed in most of the publications. Hardware planning is important for your cluster – we have seen this already with the long term storage. Furthermore, if you have in your big data cluster a few very old machines than this will affect all jobs running on your cluster. You will need to have proper monitoring tools and rules deployed to identify automatically these kind of bottlenecks.

Conclusion

Once you have implemented the lambda architecture you will need to teach everybody to use it. You will need to plan migration of datas torage for analytics from the individual systems to your data lake, i.e. your big data cluster. Keep in mind that the lambda architecture is about analytics. Although it is possible to include transactional systems into this (e.g. a MySQL Cluster), you will probably still use for your individual ERP systems, CRM systems etc. standard transactional databases of which you extract the data in put them into the cluster for analytics.

However, there are also other tools for doing distributed transactions, such as CloudTPS or even more advanced the Bitcoin transaction system. They may replace individual transactional databases in the future.

More and more companies are embarking on the journey of a standardized Big Data architecture each year. Most of them use open source technologies to gradually migrate towards one big data lake as it has been described here.

Big Data: Bring Computation to Data

Big Data is the topic of the coming years. Even today large Internet companies store exabytes of data and their revenue model is based on selling products as well as services around this data. Consequently, they need to process data using advanced statistical methods, such as machine learning. Hence, they need to think about how to do this efficiently. Currently, especially in-memory is hyped to address this issue. However, this is only one aspect. A fundamentally more important aspect is where the data is processed in a distributed multi-node data environment.

A brief history on software architectures

In the beginning of software development, many applications have been single monolithic applications. They have been deployed on a single computer. This lead to several problems, such as that developers could hardly reuse code of monolithic applications and the approach did not scale very well since it was limited to a single computer. The first problem has been addressed by introducing different layers into the architecture. The resulting architectures are usually based on three layers (see next figure): data layer, service layer and presentation layer. The data layer handles any functionality for managing data, such as querying or storing it. The service layer implements business logic, e.g. it implements business process. The presentation layer allows the user to interact with the implemented business processes, e.g. entering of new customer data. The layers communicate with each other using well-defined interfaces implemented today in REST, OData, SOAP, Websockets or HTTP/2.0. threelayerarchitecture

With the emergence of the Internet, these layers had to be put physically on different machines to provide larger scalability. However, they have never been designed with this in mind. The network layer has only limited transport bandwidth and capacity. Indeed, for very large data it can be faster to store it on a large drive and transport it by truck to its destination than doing it by the network.

Additionally, during development scalability of data computation is of less interest, because in the Internet world it is often not known how many people will have access to an application and this may change over time. Hence, you need to be able to scale dynamically up an down. I observe that more and more of the development efforts in this area have moved to operations, who need to implement monitors, load-balancer and other technology to scale applications. This is also the reason why DevOps is a popular and emerging paradigm for developing and operating Internet-scale web applications, such as Netflix.

Towards New Software Architectures: Bring Computation to Data

The multiple layer approach does make sense and you could it even split it into more layers (“services”), but you have to evaluate carefully complexity and reusability of your service design. More important, you will have to think about new interfaces, because if components are located on different machines or different memory instances, your application will spend a lot of time for moving data between them. For instance, the application logic on the application server may request all customer transactions from the database and then correlate them to write the results back into the database. This requires a lot of data to be transferred from the database to the application server and potentially costs a lot of performance. Finally, it does not scale at all.

This problem first emerged when companies introduced the first Online Analytical Processing (OLAP) engines as part of business intelligence solutions for understanding their business. Database queries proved as too simple and would require to transfer first a lot of data to the application server. Hence, the Structured Query Language (SQL) for databases was extended to cope with these new requirements (e.g. the CUBE operator). Moreover, you can define your own custom functions (e.g. SQL Stored procedures), but they have to be implemented very vendor specific. For instance, distributed databases based on Apache Hadoop support custom functions. However, you can integrate sometimes other programming languages, such as Java. While stored procedures are already an improvement in terms of security (protection against SQL injection attacks), they have the problem that it is very difficult to write sophisticated programs to handle modern Big Data applications. For instance, many applications require machine learning, statistical correlation or other statistical methods. It is difficult to write them as stored procedures and to maintain support for different vendors. Furthermore, it leads again to monolithic applications. Finally, they are not dynamic – the application cannot decide to do any new computation on the fly without reimplementing it in the database layer (e.g. implement a new machine learning algorithm). Hence, I suggest another way to address this issue.

A Standard for Bringing Computation to Data?

As mentioned, we want to support modern Big Data applications by providing suitable language support for machine learning and statistical methods on top of any database system (e.g. MySQL, Hadoop, Hbase or IBM DB2). The next figure illustrates the new approach. The communication between the presentation and service layer works as usual. However, the services do not call functions on the data layer, but send any data-intensive computation they want to perform as an R script to the data layer, which executes it and only sends back the result.

bringcomputationtodataarchitecture

I observed that the programming language R for statistical computing has been recently integrated in various data environments, such as transactional databases, Apache Hadoop clusters or in-memory databases, such as SAP HANA. Hence, I think R could be a suitable language for describing computation that operates on data. Additionally, R has already a lot of built-in packages for machine learning or statistical data processing. Finally, depending on the openness of the underlying data environment, you can integrate R tightly into it, so you may not have to do extensive in-memory transfers.

The advantage of the approach are:

  • business logic stays in the service level and does not move to the data layer
  • You can easily add new services without modifying the data layer – so you avoid a tight coupling, which makes it easier to change the data layer or to introduce new functionality
  • You can mine R scripts generated by services to determine which computation the user is likely to do next to start executing it before the user requests it.
  • Caching and distribution of data processing can be based on a more sophisticated analysis of the R scripts using the R Profiler Rprof
  • R is already known by many business analysts or social scientists/psychologists

However, you will need to have some functionality for governing the execution of the R scripts in the data layer. This includes decisions on when to schedule computation or creating new computing/data nodes (e.g. real-time vs batch). This will require a company-wide enterprise architecture approach where you need to define which data should be real-time and which data should be batch-processed. Furthermore, you need to take into account security and separation of concerns.

In this context, Apache Hadoop might be an interesting solution from the technology perspective.

What is next

The aforementioned approach is only the beginning. By using this solution, you can think about true inter-cloud deployments of your application. Finally, you can enable inter-organizational data-processing business processes.