Lambda, Kappa, Microservice and Enterprise Architecture for Big Data

A few years after the emergence of the Lambda-Architecture several new architectures for Big Data have emerged. I will present and illustrate their use case scenarios. These architectures describe IT architectures, but I will describe towards the end of this blog the corresponding Enterprise Architecture artefacts, which are sometimes referred to as Zeta architecture.

Lambda Architecture

I have blogged before about the Lambda-Architecture. Basically this architecture consists of three layers:

  • Batch-Layer: This layer executes long-living batch-processes to do analyses on larger amounts of historical data. The scope is data from several hours to weeks up to years. Here, usually Hadoop MapReduce, Hive, Pig, Spark or Flink are used together with orchestration tools, such as Oozie or Falcon.

  • Speed-Layer/Stream Processing Layer: This layer executes (small/”mini”) batch-processes on data according to a time window (e.g. 1 minute) to do analyses on larger amounts of current data. The scope is data from several seconds up to several hours. Here one may use, for example, Flink, Spark or Storm.

  • Serving Layer: This layer combines the results from the batch and stream processing layer to enable fast interactive analyses by users. This layer leverages usually relational databases, but also NoSQL databases, such as Graph databases (e.g. TitanDB or Neo4J), Document databases (e.g. MongoDB, CouchDB), Column-Databases (e.g. Hbase), Key-Value Stores (e.g. Redis) or Search technologies (e.g. Solr). NoSQL databases provide for certain use cases more adequate and better performing data structures, such as graphs/trees, hash maps or inverse indexes.

In addition, I proposed the long-term storage layer to have an even cheaper storage for data that is hardly accessed, but may be accessed eventually. All layers are supported by a distributed file system, such as HDFS, to store and retrieve data. A core concept is that computation is brought to data (cf. here). On the analysis side, usually standard machine learning algorithms, but also on-line machine learning algorithms, are used.

As you can see, the Lambda-Architecture can be realized using many different software components and combinations thereof.

While the Lambda architecture is a viable approach to tackle Big Data challenges different other architectures have emerged especially to focus only on certain aspects, such as data stream processing, or on integrating it with cloud concepts.

Kappa Architecture

The Kappa Architecture focus solely on data stream processing or “real-time” processing of “live” discrete events. Examples are events emitted by devices from the Internet of Things (IoT), social networks, log files or transaction processing systems. The original motivation was that the Lambda Architecture is too complex if you only need to do event processing.

The following assumptions exists for this architecture:

  • You have a distributed ordered event log persisted to a distributed file system, where stream processing platforms can pick up the events

  • Stream processing platforms can (re-)request events from the event log at any position. This is needed in case of failures or upgrades to the stream processing platform.

  • The event log is potentially large (several Terabytes of data / hour)

  • Mostly online machine learning algorithms are applied due to the constant delivery of new data, which is more relevant than the old already processed data

Technically, the Kappa architecture can be realized using Apache Kafka for managing the data-streams, i.e. providing the distributed ordered event log. Apache Samza enables Kafka to store the event log on HDFS for fault-tolerance and scalability. Examples for stream processing platforms are Apache Flink, Apache Spark Streaming or Apache Storm. The serving layer can in principle use the same technologies as I described for the serving layer in the Lambda Architecture.

There are some pitfalls with the Kappa architecture that you need to be aware of:

  • End to end ordering of events: While technologies, such as Kafka can provide the events in an ordered fashion it relies on the source system that these events are indeed delivered in an ordered fashion. For instance, I had the case that a system in normal operations was sending the events in order, but in case of errors of communication this was not the case, because it stored the events it could not send and retransmitted them at a certain point later. Meanwhile if the communication was established again it send the new events. The source system had to be adapted to handle these situations correctly. Alternatively, you can only ensure a partial ordering using vector clocks or similar implemented at the event log or stream processing level.

  • Delivery paradigms on how the events are delivered (or fetched from) to the stream processing platform

    • At least once: The same event is guaranteed to be delivered once, but the same events might be delivered twice or more due to processing errors or communication/operation errors within Kafka. For instance, the stream processing platform might crash before it can marked events as processed although it has processed them before. This might have undesired side effects, e.g. the same event that “user A liked website W” is counted several times.

    • At most once: The event will be delivered at most once (this is the default Kafka behavior). However, it might also get lost and not be delivered. This could have undesired side effects, e.g. the event “user A liked website W” is not taken into account.

    • Once and only once: The event is guaranteed to be delivered once and only once. This means it will not get lost or delivered twice or more times. However, this is not simply a combination of the above scenarios. Technically you need to make sure in a multi-threaded distributed environment that an event is processed exactly once. This means the same event needs to be (1) only be processed by one sequential process in the stream processing platforms (2) all other processes related to the events need to be made aware that one of them already processes the event. Both features can be implemented using distributed system techniques, such as semaphores or monitors. They can be realized using distributed cache systems, such as Ignite, Redis or to a limited extent ZooKeeper. Another simple possibility would be a relational database, but this would quickly not scale with large volumes.
      • Needles to say: The source system must also make sure that it delivers the same event once and only once to the ordered event log.

  • Online machine learning algorithms constantly change the underlying model to adapt it to new data. This model is used by other applications to make predictions (e.g. predicting when a machine has to go into maintenance). This also means that in case of failure we may temporary have an outdated or simply a wrong model (e.g. in case of at least once or at most once delivery). Hence, the applications need to incorporate some business logic to handle this (e.g do not register a machine twice for maintenance or avoid permanently registering/unregistering a machine for maintenance)

Although technologies, such as Kafka can help you with this, it requires a lot of thinking and experienced developers as well as architects to implement such a solution. The batch-processing layer of the Lambda architecture can somehow mitigate the aforementioned pitfalls, but it can be also affected by them.

Last but not least, although the Kappa Architecture seems to be intriguing due to its simplification in comparison to the Lambda architecture, not everything can be conceptualized as events. For example, company balance sheets, end of the month reports, quarterly publications etc. should not be forced to be represented as events.

Microservice Architecture for Big Data

The Microservice Architecture did not originate in the Big Data movement, but is slowly picked up by it. It is not a precisely defined style, but several common aspects exist. Basically it is driven by the following technology advancements:

  • The implementation of applications as services instead of big monoliths
  • The emergence of software containers to deploy each of those services in isolation of each other. Isolation means that they are put in virtual environments sharing the same operating systems (i.e. they are NOT in different virtual machines), they are connected to each other via virtualized networks and virtualized storage. These containers leverage much better the available resources than virtual machines.
    • Additionally the definition of repositories for software containers, such as the Docker registry, to quickly version, deploy, upgrade dependent containers and test upgraded containers.
  • The deployment of container operating systems, such as CoreOS, Kubernetes or Apache Mesos, to efficiently manage software containers, manage their resources, schedule them to physical hosts and dynamically scale applications according to needs.
  • The development of object stores, such as OpenStack Swift, Amazon S3 or Google Cloud Storage. These object stores are needed to store data beyond the lifecycle of a software container in a highly dynamic cloud or scaling on-premise environment.
  • The DevOps paradigm – especially the implementation of continuous integration and delivery processes with automated testing and static code analysis to improve software quality. This also includes quick deliveries of individual services at any time independently of each other into production.

An example of the Microservice architecture is the Amazon Lambda platform (not to be confused with Lambda architecture) and related services provided by Amazon AWS.

Nevertheless, the Microservice Architecture poses some challenges for Big Data architectures:

  • What should be a service: For instance, you have Apache Spark or Apache Flink that form a cluster to run your application. Should you have for each application on them a dedicated cluster out of software container or should you provide a shared cluster of software containers. It can make sense to have the first solution, e.g. a dedicated cluster per application due to different scaling and performance needs of the application.
  • The usage of object stores. Object stores are needed as a large scale dynamically scalable storage that is shared among containers. However, currently there are some issues, such as performance and consistency models (“eventually consistent”). Here, the paradigm of “Bring Computation to Data” (cf. here) is violated. Nevertheless, this can be mitigated either by using HDFS as a temporal file system on the containers and fetching the data beforehand from the object store or use an in-memory caching solution, such as provided by Apache Ignite or to some extend Apache Spark or Apache Flink.

I see that in these environments the role of software defined networking (SDN) will become crucial not only in cloud data centers, but also on-premise data centers. SDN (which should NOT be confused with virtualized networks) enables centrally controlled intelligent routing of network flows as it is needed in dynamically scaling platforms as required by the Microservice architecture. The old decentralized definition of the network, e.g. in form of decentralized routing, does simply not scale here to enable optimal performance.

Conclusion

I presented here several architectures for Big Data that emerged recently. Although they are based on technologies that are already several years old, I observe that many organizations are overwhelmed with these new technologies and have some issues to adapt and fully leverage them. This has several reasons.

One tool to manage this could be a proper Enterprise Architecture Management. While there are many benefits of Enterprise Architecture Management, I want to highlight the benefit of managed of managed evolution. This paradigm enables to align business and IT, although there is a constant independent (and dependent) change of both with not necessarily aligned goals as illustrated in the following picture.

enterprise-architecture-managed-evolution

As you can see from the picture both are constantly diverging and Enterprise Architecture Management is needed to unite them again.

However, reaching managed evolution of Enteprise Architecture requires usually many years and business as well as IT commitment to it. Enterprise Architecture for Big Data is a relatively new concept, which is still subject to change. Nevertheless some common concepts can be identifed. Some people refer to Enterprise Architecture for Big Data also as Zeta Architecture and it does not only encompass Big Data processing, but in context of Microservice architecture also web servers providing the user interface for analytics (e.g. Apache Zeppelin) and further management workflows, such as backup or configuration, deployed in form of containers.

This enterprise architecture for Big Data describes some integrated patterns for Big Data and Microservices so that you can consistently document and implement your Lambda, Kappa, Microservice architecture or a mixture of them. Examples for artefacts of such an enterprise architecture are (cf. also here):

  • Global Resource Management to manage the physical and virtualized resources as well as scaling them (e.g. Apache Mesos and Software Defined Networking)

  • Container Management to deploy and isolate containers (e.g. Apache Mesos)

  • Execution engines to manage different processing engines, such as Hadoop MapReduce, Apache Spark , Apache Flink or Oozie

  • Storage Management to provide Object Storage (e.g. Openstack Swift), Cache Storage (e.g. Ignite HDFS Cache), Distributed Filesystem (e.g. HDFS) and Distributed Ordered Event Log (e.g. Kafka)

  • Solution architecture for one or more services that address one or more business problems. It should be separated from the enterprise architecture, which is focusing more on the strategic global picture. It can articulate a Lambda, Kappa, Microservice architecture or mixture of them.

  • Enterprise applications describe a set of services (including user interfaces)/containers to solve a business problem including appropriate patterns for Polyglot Persistence to provide the right data structure, such as graph, columnar or hash map, for enabling interactive and fast analytics for the users based on SQL and NoSQL databases (see above)

  • Continuous Delivery that describe how Enterprise applications are delivered to production ensuring the quality of them (e.g. Jenkins, Sonarqube, Gradle, Puppet etc).

The Lambda Architecture for Big Data in your Enterprise

I will present in this blog post the Lambda architecture for Big Data. This architecture is about integrating historical Big Data with “live” streaming Big Data. Afterwards, the concept of a large data lake in your enterprise or amongst enterprises in a B2B scenario is explained. This data lake – based on the lambda architecture – can replace a service oriented architecture (SOA), because it is easier to implement and manage for large data volumes in a variety of formats. Hence, a plethora of use cases arises. Finally, I will discuss how this architecture can be implemented using various open source software technologies based on the Hadoop Ecosystem.

The Lambda Architecture

Big Data has become an increasing popular topic over the last years. Big Data is about processing large volumes of data in a variety of formats taken into account live  streaming or historical data. One large computing cluster is used to store and process all of one or more companies’ data.

Internet companies, such as Google, Yahoo or Facebook, are driven by new business models for which existing technology was not suitable. This led to the development of new technologies known under the common umbrella of NoSQL. Furthermore, there has been the need to integrate them in a flexible standardized architecture to enable Big Data. The lambda architecture is such an architecture and has been coined recently by Nathan Marz and James Warren.

It has the following key features:

  • Standardized fault-tolerant distributed file system that spawns across the whole cluster – this file system is the base of the data lake that I will explain later.
  • A batch processing layer for processing large amounts of historical data stored in the computing cluster
  • A serving layer for providing fast access to results of batch processed data
  • A real-time processing layer (or “speed layer”) for “live” processing of data streams, such as sensor data or stock market data
  • A long term storage layer optimized for extremely cheap storage of data that is rarely used (e.g. for legal reasons). Usually you do not find this in other articles describe lambda architecture, but I think it is an important feature to highlight. Here you have very old data (more than multiple years) that you do not need in your day to day business – you can store them on very cheap hardware with a lot of disk space but much less computing power and memory capacity.

These features are not new and have been addressed partly also by other architectures known in other domains, such as Business Intelligence, Complex Event Processing, Data Warehouse or Master Data Management. However, the lambda architecture addresses them in context of huge data volumes, diversity of data formats (polyglot persistence) and integrates them all in one architecture.

The term “lambda” stems from the following function used for doing analytics in context of Big data:

query = λ(all data) = λ (live streaming data) * λ (historical data)

Basically it say that all analytics functions λ combining live streaming data and historical data can be computed on systems implementing the lambda architecture. I will later discuss the implication of this for the implementation of the architecture.

The lambda architecture is illustrated in the following figure

lambdaarchitecture

The lambda architecture provides the data scientist means and tools to analyze any data occuring in the company, whereby tools can be easily plugged into the architecture without requiring later major implementation efforts.

Machine learning components can autonomously leverage the lambda architecture to do prediction and automatically implement actions. This is known as predictive and prescriptive analytics.

Data Lake

One of the most interesting aspect of the lambda architecture is that you have a cluster of nearly unlimited storage and memory capacity. You can have even an in-memory database with a memory capacity on the terabyte to petabyte scale distributed over the whole cluster. Popular open source frameworks, such as Hadoop, allow you to use commodity hardware, so that deploying such an architecture can be relatively cheap and they have already built-in fault-tolerance, so that developers do not need to mess around with it.

With such a large cluster you can create a big data lake in your company (see next figure). Basically all your data ends up in this cluster and all applications including the one in the cloud can share it via simple file system access mechanisms and you can use the computing power of the whole cluster to do analysis. Needless to say that you save a lot of money, because you save a lot of redundant  ETL processes, which all have to be made fault-proof and interact with different systems. Modern Big Data architectures take care of this for you.

datalake

Finally, exchanging data becomes much easier than in a Service-oriented Architecture (SOA), where you need to design interfaces and implement services – here every application simply access the distributed file system in the cluster.

Implementing a Lambda Architecture

There are several things to consider when you implement the lambda architecture. Firstly, you can choose from a variety of components to implement it. For instance, on the open source side Apache Hadoop / Apache Spark is very popular which is used by many companies including all popular Internet companies, such as Facebook or Google. You can also use other open source components, such as Apache Cassandra for batch processing and Twitter Storm for Stream processing. Additionally, you can also use commercial tools, such as SAP HANA Cloud platform. Finally, you can put your lambda architecture completely on-premise, completely in the cloud (see my example with Amazon Elastic Map Reduce, which partly implements a Lambda Architecture) or have some kind of hybrid model. In the following I will describe an implementation using Apache Hadoop and additional tools that can integrate with Apache Hadoop.

Software Components

You can use the following components for implementing the lambda architecture.

  • Standardized fault-tolerant distributed file system: Hadoop Distributed Filesystem (HDFS). You can use also other distributed file systems. The choice of the file system is transparent to the application, i.e. they won’t need to use different APIs for different file systems. Most of the time you will be fine with HDFS, but, for example, cloud providers, such as Amazon, may implement their own that fits to their infrastructure.
  • Batch Processing layer: Here you can use Hadoop Yarn, which is responsible for distributing Big Data Analytics jobs, such as map reduce jobs. Yarn allows you even to “containerize” your jobs, i.e. define CPU, memory and network limitations across the big data cluster for a specific job. This allows you to do proper capacity management – one of the most important aspects of a lambda architecture. If you need in-memory batch processing then you should check out Apache Spark. If you want to have a more generic job control, i.e. because you have other distributed applications around your cluster , not based on the MapReduce paradigm, you can use Apache Mesos.
  • Serving layer: The serving layer provides fast access and advanced query mechanisms for results of batch jobs. Here you can use typical Big Data databases and data warehouses, such as Apache Hbase or Apache Shark (for in-memory access). You will probably have multiple different technologies here according to the polyglot persistence NoSQL paradigm. They offer typical interfaces, such as JDBC or ODBC, to integrate with any application.
  • Real-time processing layer: Although Hadoop can process streaming data, most of the time you will choose a software component supporting complex event processing of live streaming data across your cluster, such as Apache Spark Streaming or Twitter Storm.
  • The long term persistence layer is mostly a hardware choice: Here you need a lot of cheap hard disk space, e.g. by not using SSD flash drives, and little computing / memory power. It is usually a separate cluster connected to the other cluster and it leverages the fault tolerance features of HDFS, such as automated replication of data to several nodes and re-replication in case of node failures.

Furthermore, you can have a lot of other software components that automatically build on the aforementioned core technologies, such as Apache Hive or Apache Shark, a Data Warehouse for Hadoop, or Apache Oozie, which is a workflow tool for complex ETL processes distributed over your data lake.

As mentioned before, there is a wide variety of alternatives that you can use to implement the lambda architecture. The standardized fault-tolerant distributed file system is most of the time the base for everything and you can also gradually evolve your architecture and implement it using different components.

Delivery Pipeline

I briefly described before that capacity management is an important part of the lambda architecture. You need to define how big data jobs are programmed and tested as well how they get into the cluster. I expect that in the future not only programmers, but also business people, such as data scientist will need to load big data jobs in your cluster. This means you will need to (1) properly define your delivery pipeline (2) implement and enforce proper capacity management and (3) have a bullet-proof dependency management for different software versions in your cluster.

Luckily by using Apache Yarn or Apache Mesos together with a cluster monitoring software, such as Ganglia, you can do proper capacity management.

Recently, more tools, such as Docker, using advanced virtualization features of the Linux kernel (cgroups) have emerged making capacity management even more easier and flexible. These technologies also have built-in dependency management to avoid a library/versioning hell. Google developed an open source scheduling system, called Kubernetes for them.

Combining Stream-Processing and Batch Data

One core goal of the lambda architecture is to integrate live streaming and batch processing. In fact, most of the recent articles on lambda architecture are just about providing both as software components. However, you will also need to integrate this on the query level, because complex event processing queries are a little bit different from batch processing queries.

Spark Streaming demonstrates how you can join historical data with stream processed data at the same time.

Hardware Components

Hardware considerations for a lambda architecture have – if at all – only been briefly discussed in most of the publications. Hardware planning is important for your cluster – we have seen this already with the long term storage. Furthermore, if you have in your big data cluster a few very old machines than this will affect all jobs running on your cluster. You will need to have proper monitoring tools and rules deployed to identify automatically these kind of bottlenecks.

Conclusion

Once you have implemented the lambda architecture you will need to teach everybody to use it. You will need to plan migration of datas torage for analytics from the individual systems to your data lake, i.e. your big data cluster. Keep in mind that the lambda architecture is about analytics. Although it is possible to include transactional systems into this (e.g. a MySQL Cluster), you will probably still use for your individual ERP systems, CRM systems etc. standard transactional databases of which you extract the data in put them into the cluster for analytics.

However, there are also other tools for doing distributed transactions, such as CloudTPS or even more advanced the Bitcoin transaction system. They may replace individual transactional databases in the future.

More and more companies are embarking on the journey of a standardized Big Data architecture each year. Most of them use open source technologies to gradually migrate towards one big data lake as it has been described here.