Blockchain Consensus Algorithms – Proof of Anything?

Blockchains have been proven over the last years to be stable distributed ledger technologies. Stable refers to the fact that they can recover from attacks and/or bugs without compromising their assets. They are most commonly known for enabling transaction with virtual cryptocurrencies not issued by a central authority. Popular examples are Bitcoin and Ethereum. However, both have been forked to create also alternative coins (Altcoins) having different features. For instance, Namecoin, the first fork of Bitcoin, is a cryptocurrency providing at the same time distributed domain name and identity without relying on a central authority. Thus, it is more resilient to censorship or potentially not democratically elected central authorities governing it.

Of course at the same time this new technology – as all new technologies – have some risk because they need to be properly understood by their users. Initial Coin Offers (ICO) that fork from popular blockchains (or not even do this) may be part of frauds or scams.

Hence, it is important to understand their key mechanisms and this blog post describes one of them: Establishing consensus of transactions happening on a blockchain. Without secure consensus it is possible to steal value (coins) or manipulate entries in the blockchain. The consensus is usually decided by a participant that can provide a proof of something. This something differ in the different consensus mechanisms.

About Blockchain Consensus

One should not confuse blockchain consensus with consensus in distributed systems. Consensus in distributed systems is about agreeing on a certain data value during computation. The idea is to reach a common state among several copies of data despite of failure, network partitioning or even manipulation by the replicas. Paxos and Raft are typical algorithms/protocols to reach consensus and they require to elect a leader, which can be anyone involved in the consensus and it stays as long as it not fails.

Blockchain consensus is an economic consensus which is different, the economic participants (which are in the end humans) have a common economic interest to reach economic consensus:

  • Preserve the value of their cryptocurrency assets. This especially means double-spending should not be allowed and can be detected.

  • Leader election: The entity deciding consensus on a given block in the blockchain gets superior benefits (besides value preservation). This means leader should change very frequently and a transparent mechanism for changing the leader should exist.

  • All participants of the economic consensus can verify that the leader made a correct decision (ie produced a valid block). This is also needed so that the leader cannot modify transactions , e.g. their value or destination.

  • Timely communication with all participants before the leader decides on a given block or for electing a leader is seen as infeasible.

  • Produce a block with high probability in given timeframes (e.g. within 10 minutes) to avoid that participants leave the blockchain due to lack to transfer their cryptocurrency assets at any given point in time.

Why would they have such an interest? They all have assets (financial/non-financial) that they want to preserve. Hence, most of the participants also do not have an incentive to cheat or that they fail to agree on a state of the ledger, because if cheating is detectable or preventable (and it is with an adequate blockchain consensus algorithm) then the value for all participants diminishes. Of course outsiders may want to destroy a blockchain and thus attack the whole blockchain, because they have no stake in it – this would be comparable to burning physical money of another currency.

Leader election is different here – the different participants do not necessarily trust each other and especially do not want that one stays leader for a long time, because the leader benefits heavily in economic terms, for example, fees that are charged and the leader can decide to include certain transaction only, e.g. those of a minority of peers (“friends”). This ultimately leads to a situation where they cannot preserve the value of their assets.

Additionally, it can happen in blockchains that participants do not agree on a common data value during computation and this leads to a fork of the blockchain. In this case this is temporary or permanent dependent on what the participants want. For instance, some participants may want – and that happen in the past – create a separate blockchain from one originally common blockchain with other participants (e.g. the split between Bitcoin Core and Bitcoin Classic or Ethereum Core and Ethereum Classic). Then, consensus might be based on different rules. In fact, such a situation has never been consider in distributed consensus.

As you can see blockchain consensus and consensus in distributed systems are very different. Economic, governance and human factors make it different from consensus in distributed systems. That is why successful blockchain consensus algorithms are fundamentally different from consensus algorithms in distributed systems. One famous example is the proof of work algorithm that is used in the Bitcoin Blockchain. Additionally, the application of consensus algorithms of distributed systems has not been very popular, although some blockchains try to employ them for closed networks with selected participants only, which makes using blockchain technology meaningless.

Algorithms for Proof of Anything

Proof of Work

Bitcoin as the first practical successful cryptocurrency introduced the Proof-of-Work (PoW). However, it was not the first one as the Bitcoin paper stated. HashCash and others have proposed a similar approach mainly to address the issue of junk mail. The idea there was that someone has to prove to that some investment have been taken before sending an email. This would make sending junk mails not rentable.

Basically the proof-of-work demonstrates that a participant has done some work and gets a reward.

For example, a simplified version of the proof-of-work in Bitcoin is that a block including relevant parts of the transaction are hashed and a random nonce is added to it so that the resulting hash is below a certain value (the difficulty).

PoW has the following characteristics:

  • It must be predictable hard to obtain. For example, Bitcoin has as a rule (which of course could be change if a majority would vote it) that on average every 10 minutes a new proof-of-work (ie a block) can be generated.

  • It must adapt to innovation. For instance, new more powerful hardware may make a proof-of -work obsolete if it does not become more difficult. If a PoW is generated to fast then the network can be subject to double-spending attacks or one participant might have a monopoly as a leader on the network. Additionally, it must be able to withstand new technology, such as Quantum Computers or ASICs.

  • It must adapt to network power. For example, if the difficulty grows too much and it is too hard to solve then cryptocurrency assets loose in value, because it will take too long to transfer ownership. Hence, difficulty of the PoW must be able to grow and shrink according to network capacity to solve it. This is the case for all cryptocurrencies including Bitcoin.

  • It should be equally difficult for anyone to generate it, ie there should be not a centralization of several entities that are able to generate a PoW. This is somehow not exactly a black and white thing, but more greyish, because even in Bitcoin this is currently not fully ensured due to the appearance of ASICs.

  • It must be extremely fast to verify that it has been done by any other participant.

  • It must not be possible to give completely new nodes a long fake chain to dissolve the network and make it attackable. In fact, Bitcoin contains checkpoints that are hardcoded as consensus rules, ie certain block hashes at given points in time are valid and thus new nodes can start validating from a later stage to early detect if a fake blockchain has been supplied to them or not. Since Bitcoin is Open Source this is a somewhat transparent mechanism to which all participants have to agree on.

Although most of the PoW systems are CPU bound, the characteristics do not prevent that it is bound by anything else, such as memory. Theoretically, one could also imagine other PoWs, such as based on information entropy, colliders, speed of light or quantum computing-specific aspects (example). However, such a PoW system must be available to all participants and satisfy the characteristics above. Nowadays most PoW systems are solving hash-based problems (e.g. SHA2-256, SHA3, Scrypt or mixtures of different hash algorithms).

One main critic point of PoW is that a lot of energy is “consumed”. I purposely do not write here wasted, because the PoW ensures functioning of the cryptocurrency and as we will see later no viable alternatives currently exists for public blockchains. Additionally, one should keep in mind that payment processing, clearing, physical money, credit cards, server energy etc. have also an energy footprint, but there has – at least known to me – never been a study on this to compare if the PoW is more energy hungry (first attempts exist, check cf. here).

However, there have been attempts to improve the PoW. For example, some cryptocurrency have as PoW a more or less meaningful problem (Proofs of Useful Work – PoUW), e.g. Primecoin searches for chains of prime numbers. Meaningful usually implies a mathematical problem, which is simple to describe, but fulfills some or even better all characteristics above. It needs to be simple to describe, because if it is complex to describe then it is complex to understand, difficult to test and prone to errors. Permacoin uses PoW for distributed storage of achival data, ie one needs to provide storage to solve the PoW. Gridcoin attempts to solve scientific problems.

Nevertheless, they may not be able to fulfill all the characteristics mentioned above, which explains their limited popularity for cryptocurrencies. However, there has never been – to my knowledge – a complete study and comparison of all these mechanisms including quality (testing!), ecological, economic and socio-economic effects.

Others try to reuse an existing PoW. In fact, in some sense the PoW is reused in Bitcoin, because if a transaction is included in a block its output can be reused in other transactions. Other approaches, such as merged mining, allow at the same time merging for different blockchains using the same work (e.g. Bitcoin and Namecoin).

Finally, another criticism of PoW is that it is slow. Usually the Bitcoin delay of 10 minutes on average for generating is cited. However, these 10 minutes are a deliberate decision by the originators of Bitcoin and is not a technological limit. In fact, at any time this could be changed by a majority to be more or less. However, having less time might have significant security and economic impact, which needs to be carefully weighted. Furthermore, with the existence of side-chains, such as the Lightning network, this rule can be probably avoided more elegantly and allow scaling to payment processing similar to popular payment networks.

Proof of Stake

Proof of Stake (PoS) is another way to establish economic consensus.

PoS is basically about voting on the next block in a blockchain based on the economic stake into the network. A stake could be for instance be determined based on a stake of a number of cryptocurrency assets in a locked deposit or the stake of CPU/memory/energy/etc. in the network. Variants of it includes differences between who can propose a new block (consensus) and who can vote on it. The idea is that someone who has a lot of stake will not do anything to endanger this stake, such as cheating, because then it would become less valuable.

However, PoS has not been as successful as Proof-of-Work. Currently, none of the large cryptocurrencies uses this. Nevertheless, for Ethereum it was initially assumed to be used instead of PoW (“Slasher”), but currently Ethereum only supports PoW (Ethash). The reason was according to the originators that proof-of-stake is non-trivial.

The characteristics of PoS are the following:

  • Votes on a new block are according to economic stake in the blockchain of a participant. However, it should be avoided that there is a centralization towards the “richest” participant. This is usually done by differentiating between block proposers (which might be random or according to another rule) and block validators (that have a stake)
  • Economic stake may change and is not fixed.
  • It should not be possible to revote once a vote has been done and exists in a network since some time, but not too long. It should not be possible to vote on several alternative chains of the same blockchain. This implies that the economic stake must be at risk in case of abuse (“nothing at stake problem”).
  • Nodes need to be online, ie connected to other nodes, to vote with a relevant stake.
  • The vote needs to terminate with an outcome (yes/no) after a certain short amount of time.

The main difference it seems is that reward for work is replaced by vote based on stakes. Somehow the PoS can be compared with shareholder votes.

One interesting question is how such stakes can be distributes initially. Some cryptocurrencies sell stakes on their initial offering for Fiat currencies or already working (“bootstrapped”) cryptocurrencies, such as Bitcoin. This has recently led to a number of fraud initial coin offerings (ICO). The reason is that it is virtually impossible for participants to find out if a cryptocurrency will be successfully adapted or not (or if it even exists), which implies a very high risk. Then, even afterwards wrong decisions can render a cryptocurrency valueless.

Several theoretical ideas have been proposed for PoS, but they rarely end up in public blockchains, because of the inherent issues which are non-trivial. It is significantly more complex to implement compared to a PoW in case of decentralized public blockchains. It involves potentially several roles (e.g. proposer, voter and validator) that need to communicate actively (in PoW it is passively). Furthermore, it can be (but not need to be) less transparent than PoW, because the stakes and their development over time might be difficult to monitor (here dedicated analytics software may help). Examples are the previously mentioned Slasher protocol, the new protocol proposed by Ethereum (Casper) or the minting by Peercoin (basically based on coin age).

Practical examples for PoS exists, such as Peercoin, but there is one disadvantage is that only one person (of unknown identity) has the ability to invalid any chain at any point in time from anywhere. The reason for this checkpointing mechanism was the nothing at stake problem. However, meanwhile this mechanism will be chnaged for Peercoin.

These practical examples are nonetheless not as successful currently as cryptocurrencies based on PoW.

However, it has also some advantages, such as potentially lower energy consumption or the setup of more sophisticated governance mechanisms (including everyone).

Proof of Burn

Proof of Burn (PoB) is currently only a theoretical concept that has appeared in the Bitcoin mailing lists as an alternative to PoW. It should be seen as work in progress, because it has not yet been written formally down and analyzed.

This should not be confused with burning coins of one cryptocurrency to create coins of another cryptocurrency. This would be a more complex scenario related to PoB.

PoB works as follows: Someone sends some amount of cryptocurrencies (e.g. Bitcoins) to a destination from which they cannot be used anymore, ie they become provable unspendable (hence the analogy of burning it). After a certain amount of time (e.g. two months) a participant can propose a new block and have as a proof the burned amount of cryptocurrency.

Some might ask why someone would do such a thing to spend money just to propose a new block. Remember what I said in the Proof-of-Work section: The proposer of a block gets superior benefits, such as transaction fees. Obviously, for this to work the burned amount of cryptocurrency must be lower then the transaction fees.

The proposal might not be as senseless as it looks like, because its supporters argue that even for PoW some money needs to be burned by buying equipment to do the proof of work.

Furthermore, it requires that a certain amount of cryptocurrency is already there (e.g. generated via PoW).

PoB has also further implications that are not yet well-understood. Very few preliminary implementations, such as Slimcoin based on Peercoin, exist that should be seen with care.

Proof of Elapsed Time

Proof of Elapsed Time (PoET) attempts to address the problem of proof-of-stake that random election of participants proposing blocks is needed to ensure that every participant has a fair chance to propose a block and thus generate superior benefits.

The idea is the following: Every participant requests a wait time from its local trusted enclave. The participant with the shortest wait time is next to propose a block, after it waited for the assigned waiting time.

Each local trusted enclave signs the function and the outcome so that other participants can verify that none has cheated on the wait time.

As such it seems and it has been claimed by the people proclaiming PoET that it fulfills the characteristics of PoS described above.

The concept was first proposed by Intel as part of its HyperLedger Sawtooth Open Source technology, which is no surprise, because it is another use case for its SGX technology providing the enclave.

Although the approach does not prevent mixing or using other secure enclaves besides the Intel one, it has – to my knowledge – not yet been proposed (e.g. based on AMD Secure Memory Encryption (SME)/Secure Encrypted Virtualization (SEV)).

There are some things that you need to be aware of using this approach:

  • The secure enclave is rather complex technology which makes breaking it potentially easier than cheating in PoW.

  • In order for participants to verify that a secure enclave has provided the value they rely on a third party trusted certification authority or web of trust that signs the keys of a secure enclave. Hence, there is a clear tendency towards centralization, which is avoided in other PoW or PoS scenarios.

Practical Byzantine Fault Tolerance

Practical Byzantine Fault Tolerance (PBFT) is a consensus algorithm which is normally used for consensus in distributed system and as argued before does not really fulfill the requirements for economic consensus in blockchains.

Since PBFT becomes infeasible in networks with a number of nodes due to the required communication, blockchain technologies using PBFT only rely on a trusted subnetwork of participants to establish consensus (e.g. unique node list for each participant in Ripple). This poses some problems:

  • How large should this list be and how should a “normal” participant know who to include in its trusted network?
  • How can a participant detect forks of the blockchain (e.g. servers changing their trusted subnetwork)?
  • What is the incentive for a participant to participate in a consensus? There is no transaction fee per se foreseen in the consensus.

PBFT is advocated by a few blockchain technologies, such as Ripple as described here or Stellar. There, the use case however is also slightly different, it is more about connecting large banking networks and not anyone as in other blockchain technologies. Hence, most of the questions stated before may have a clear convincing answer. Additionally, transaction fees are introduced by burning a certain amount of currency in each transaction – none of the participants has access to the burned amount of currency. This is used to avoid that the blockchain is flooded with large number of transactions to render it useless or to get economic benefits from it. Hence, for these kind of special blockchains with a specialized set of participants this mechanism can still make sense.

Conclusion

Blockchains have been proven as mature technologies as public examples, such as Bitcoin or Ethereum demonstrate. However, from an Economic perspective not all mechanisms are well understood, especially due to the huge variety of concepts and their rapid development. This had also let to frauds of fake cryptocurrencies and blockchains as part of certain initial coin offerings (ICO). Furthemore, different type of participants in different types of blockchains making it even more difficult to understand the context.

Although it seems that PoW is dominating now, it is more suitable for public blockchains independent of any central entity. This might not be desired, because a central entity can ensure with right policies that every participant as access to the blockchain, protected from other participants and the same rights as well as responsibilities, similar as it is already now with Fiat money. Hence, PoS systems may gain more traction because they have a more flexible governance model than PoW. They could evolve in a system of proof of mastery, e.g. a certain subset of participants in a blockchain proposes new blocks, because they have been delegated this task by all the other participants. This subset of participants will use open source software, analytics on the blockchain and provide transparent mechanisms as well as information to all the other participants that delegated their stake to them.

However, due to the inherent challenges of PoS, combined systems out of PoW/PoS/PoB may be ultimately the successful one. There seems to be a tendency towards this (e.g. Casper for the Ethereum blockchain). Given the different approaches a lot of different combinations are possible. For instance, one can have PoS for the “daily” business of creating blocks, but PoW for checkpointing the blockchain at certain points in time.

Nevertheless, all these systems can only be successful and transparent if powerful analytics software is available to any participant, so they can track the effectiveness of decisions within certain blockchain technologies and derive appropriate consequences out of it.

Keep in mind that not only the Proof of Anything (PoW, PoS, PoB etc.) is here a challenge, but also other powerful groups, such as developer who can write blockchain technology – they tend also towards centralization of a single group and they have a lot of power.

In the future, we will see more cross blockchain activities challenging how cryptocurrencies from one chain can end up in another chain (e.g. via PoB). Similar to the nowadays exchanges for Fiat money, there will be always the need for exchanging different cryptocurrencies. There will be not one cryptocurrency, but always many due to different interests of participants or embarkment on new technologies.

Furthermore, we will need to deal with automated non-human participants within the blockchain. Robots or “things” may have a certain amount of cryptocurrencies to perform tasks, e.g. an autonomous car that needs to pay highway tolls (assuming there is no reason anymore that an automated car is “owned” but exists on itself and makes money by bringing passengers from A to B). These types of participants may have different incentives/requirements of economic consensus.

Advertisements

Spending Time on Quality in Your Big Data Open Source Projects

Open source libraries are nowadays a critical part of the economy. They are used in commercial and non-commercial applications directly or indirectly affecting virtually any human being. Ensuring quality should be at the heart of each open source project. Verifying that an open source project ensures quality is mandatory for each stakeholder of this project, especially its users. I describe in this post how software quality can be measured and ensured when integrating them in your application. Hence, we focus only on evaluating the technical quality of the library, because it suitability for business purposes has to be evaluated within the regular quality assurance process of your business.

The motivation for this post is that other open source projects will employ also transparent software quality measures so that eventually all software becomes better. Ideally, this information can be pulled by application developers to have a quality overview on their software, but also its dependencies. The key message is that you need to deploy solutions to efficiently measure software quality progress of your applications including dependencies and make the information transparently available in a dashboard (see screenshot) for everyone in near realtime.

softwarequalitydashboard

Although I write here about open source software, you should push your commercial software providers to at least provide the same level of quality assurance and in particular transparency. This also includes the external commercial and open source libraries they depend on.

Software quality is “The totality of functionality and features of a software product that bears on its ability to satisfy stated or implied needs” (cf. ISTQB Glossary). This definition is based on the ISO9216 standard (succeeded by ISO/IEC 25000), which describes various quality classes, such as functionality, reliability, usability, efficiency, maintainability and portability.

I chose two open source libraries in which I am deeply involved to demonstrate measures to increase their quality:

  • HadoopCryptoLedger: A library for processing cryptoledgers, such as the Bitcoin Blockchain, on Big Data platforms, such as Hadoop, Hive and Spark

  • HadoopOffice: A library for processing office documents, such as Excel, on Big Data platforms, such as Hadoop and Spark

Before you think about software quality, one important task is to think about what functionality and which business process that is supported by the application brings the maximum value to the customer. Depending on the customer needs, the same application can bring different value to different customers. When looking at the supported business process you also must consider that not all potential execution variants of it should be evaluated, but only the most valuable for the business.

At the same time, you should evaluate the risk that the application fails and how it impacts the business process. If the risk is low or you have equally mitigation measures (e.g. insurance) then you can put there less emphasis compared to a business process where the risk is high and no mitigation measures exist. Additionally, I also recommend to look at value of functionality. If a functionality provides a lot of value then you should test it much more. Sometimes this change of view from risk to value uncovers more important areas of quality of a software product.

Steps for measuring quality

One of the core requirements for increasing software quality is to measure quality for third party libraries. It implies that you have mapped and automated your software development and deployment process (this is also sometimes referred to as DevOps). This can be done in a few steps:

  1. Have a defined repeatable build process and execute them in an automated fashion via a continuous integration tool.

  2. Version your build tool

  3. Software repository not only for source code

  4. Integrate automated unit and integration tests into the build process.

  5. Download dependencies, such as third party libraries, from an artifact management server (repository) that manage also their versions

  6. Evaluate quality of test cases

  7. Code documentation

  8. Code Analytics: Integrate static code analysis into the build process.

  9. Security and license analysis of third party libraries

  10. Store build and tested libraries in a versioned manner into an artifact management server (repository)

  11. Define a Security policy (security.md)

  12. Define a contribution policy (contributing.md)

  13. Ensure proper amount of logging

  14. Transparency about the project activity

Of course, these are only technical means for measuring quality. Out of scope here, but equally important is to define and version requirements/use cases/business process models, system tests, acceptance tests, performance tests, reliability tests and other non-functional tests. Additionally you should take into account automated deployment and deployment testing, which is out of scope of this article.

Evaluate quality

Given the aforementioned points, we describe now how they have been realized for the hadoopoffice and hadoopcryptoledger. Of course, they can be realized also with different tools or methods, but they provide you some indicators on what to look for.

1. Have a defined repeatable automated continuous integration process

We use for the hadoopoffice and hadoopcryptoledger library the Continuous Integration Platform Travis CI, which is offered as a free service for open source projects in the cloud. The aforementioned projects include in the source code repository root folder a file named .travis.yml (example), which describes the build process that is executed by travis. Everytime new code is committed the library is build again including the examples and automated unit and integration tests take place.

Travis CI provides a container infrastructure that allows executing the build process in so called software containers, which can integrate flexible different types of software without the overhead of virtual machines. Container can be pulled from repositories and flexible composed together (e.g. different Java Development Kits, operating systems etc.). Additionally, one wastes the costs of virtual machines being idle and capacity not used. Furthermore, you can track previous execution of the build process (see example here) to measure its stability.

Gradle is used for nearly every step in the process to build the application, generate docuemntation, execute unit and integration tests and collect reports.

Shell scripts are used to upload the HTML reports and documentation to a branch in Github (gh-pages) so that it can be easily found in search engines by the users and viewed in the browser.

2. Version your build tool

As said, our main build tool is Gradle. Gradle is based on the popular Java Virtual Machine scripting language Groovy and thus it scales flexible to your build process as well as make build scripts more readable in comparison to XML based tools, which are inflexible and a nightmare to manage for larger projects. Furthermore, plugins can be easily implemented using Groovy.

Modern build tools are continuously evolving. Hence, one want to test new versions of the build tool or upgrade to new versions rather smoothly. Gradle allows this using Gradle Wrapper – you simply define the desired version and it downloads it (if not already done) to be executed for your current build process. You do not need any manual/error prone installation.

3. Software Version Repository not only for source code

One key important ingredient of the software development process is the software version repository. Most of the m now are based on GIT. It is extremely flexible and scale from single person projects to projects with thousands of developers, such as the Linux kernel. Furthermore, it allows you to define your own source code governance process. For example, the source code of open source libraries that I introduced in the beginning are hosted on a GIT server in the cloud: Github. Alternatives are Gitlab or Bitbucket.org, but also many more. These cloud solutions also allow you having private repositories fully backed up for a small fee.

As I said, these repositories are not only for your software source code, but also your build scripts and continuous integration scripts. For example, for the aforementioned software libraries it stores the Travis scripts and the build files as well as test data. Large test data files (e.g. full blockchains) should not be stored in the version management system, but in a dedicated cloud object storage, such as Amazon S3 or Azure Storage. Here you have also the possibility to store them on ultra-low cost storage which might take longer to access the files, but this can be alright for a daily CI process execution.

4. Integrate automated unit and integration tests into the build process.

Another important aspect of software delivery is the software quality management process. As a minimum you should have not only automated unit and integration tests, but also a daily reporting on the coverage.

The reporting for the aforementioned library is done via Sonarqube.com and Codacy. Both not only report results on static code analysis, but also on unit/integration test coverage. Two have been chosen, because Codacy supports Scala (a programing language used in part of the aforementioned libraries).

More interesting, you find on both platforms also result for popular open source libraries. Here you can check for the open source libraries that you use in your product how good they are and how they improve quality over time.

Of course there are many more cloud platforms for code coverage and quality analysis (e.g. SourceClear).

Another important method for testing beyond unit testing for developers is integration testing. The reason is that complex platforms for Big Data, such as Hadoop, Flink or Spark, have emerged over the past years. These tests are necessary, because unit tests cannot reflect properly the complexity of these platforms and system tests are not focusing on technical aspects, such as code coverage. Furthermore, the execution of integration tests is very close to the final execution on platforms, but requires less resources and virtually no installation effort (an example is Spring Boot for web applications). However, still unit tests are mandatory, because they are very suitable to verify quickly that calculations with many different inputs are done correctly automatically.

Nevertheless, these integration tests require additional libraries. For example, for the aforementioned library, which are supporting all major Big Data platforms, the Hadoop integration tests suites are used as a base. They provide small miniclusters that can be even executed on our CI infrastructure based on containers with very little configuration and no installation efforts. Of course we use also the testing features of the platforms on top of this, such as the one for Spark or Flink.

5. Download dependencies, such as third party libraries, from an artifact management server (repository) that manage also their versions

It should be a no-brainer, but for all of your third party libraries that you include in your application you should manage them an artifact management server. Popular choices are Nexus or Artifactory. Both offer also cloud solutions for open source and private libraries (e.g. Maven Central or Bintray). The public cloud solutions should be analysed with care, for example the NPM.js gate security affair for Javascript packages or the PyPi issue for Python packages, where attackers replaced popular packages or uploaded packages with similar names to popular packages to trick developers in using malicious libraries in their application code. Having a local artifact management server quickly indicates you which of your developed applications are exposed and you can initiate mitigation measures (e.g. directly communciate to the project which library is affected).

These repositories are important, because you do not need to manage access to third party libraries in your software project. You can simply define in your build tool the third party library as well as the version and it will simply download it during the build process. This allows you simply changing and testing new versions.

Furthermore, having an artifact management server allows you to restrict access to third party libraries, e.g. ones with known security issues or problematic licenses. This means you can easily make your organisations more safe and you can detect where in your software products are libraries with issues.

Especially with the recent trend of self-service by end users in the Big Data world it is crucial that you track which libraries they are using to avoid vulnerabilities and poor quality.

6. Evaluate quality of test cases

As I wrote before, quality of test cases is crucial. One key important aspect of quality is coverage, ie to which extend your tests cover the source code of your software. Luckily, this can easily be measured with tools, such as Jacoco. They give you important information on statement and condition coverage.

There is no silver bullet how much coverage you need. Some famous software engineers say you should have at least 100% statement coverage, because it does not make sense that someone uses/tests a piece of software if not even the developers themselves have tested it. 100% might be difficult especially in cases of rapidly changing software.

In the end this is also a risk/value estimation, but having high statement coverage is NOT costly and saves you later a lot of costs with bugs/generally better maintainable software. It often also uncovers aspects that someone has not thought about before and require clarification, which should happen as early as possible.

For the aforementioned libraries we made the decision that we should have at least 80% statement coverage with the aim to have 100% coverage. Condition coverage is another topic. There are some guidelines, such as DO-178B/DO-178C for avionics or ISO 26262 for automotive that describe a certain type of condition coverage for critical software.

For the aforementioned libraries, there was no such guideline or standard that could be applied. However, both libraries are supporting Big Data applications and these applications, depending on their criticality will need to be tested accordingly.

Finally, another measure for quality of test cases is mutation testing. This type of evaluation basically modifies the software under test to see if the test cases still cover a mutated software. This is very useful in case a method changes frequently, is used in a lot of parts of the application and thus is an important piece. I found it rather useful for the utility functions of the HadoopCryptoledger library that parse binary blockchain data to get some meaningful information of it (cf. An example mutation testing report here based on pitest). In this way you can cover more of the software with higher quality test cases.

7. Code documentation

Documentation of code is important. Of course, ideally the code is self-documenting, but with comments one can easier navigate in code and gets the key concepts of the underlying software. Everybody who has some more in-depth development exposure to popular big data platforms appreciates the documentation (but also of course the source code) shipped with it.

Additionally, one can automatically generate a manual (e.g. by using Javadoc or Scaladoc) that is very useful for developers to leverage the functionality of your library (e.g. find an example of the HadoopCryptoLedger library here). For the aforementioned libraries the documentation is automatically generated and stored in the version management branch “gh-pages”. From there Github pages takes the documentation and makes them available as web pages. Then, it can be found by search engines and viewed in the browser by the users.

The latter point is very important. Usually the problem is not that the developers do not write documentation, but publishing the documentation in time. The documentation for the aforementioned library is published every build in the most recent version. Usually in company it is not the case, it is a manual process that is infrequently or not done at all. However, solutions exist here, for example Confluence and the Docs plugin or the Jenkins Javadoc plugin, but also many others exist.

Furthermore, one should look beyond standard programing language documentation. Nowadays you can integrate UML diagrams (example) and markdown language (example) in your source code comments, which means that your technical specification can be updated every build eg at least once a day!

Finally, tools such as Sonarqube or Openhub can provide you information on the percentage of documentation in comparison to other software.

8. Code Analytics: Integrate static code analysis into the build process.

Besides testing, static code analysis can illustrate areas of improvements for your code. From my experience, if you start right away with it then it is a useful tool with very few false positive.

Even if you have a project that has grown over 10 years and did not start with code analysis, it can be a useful tool, because it shows also for newly added or modified code the quality and this one you can improve right away. Of course code that has never been touched and seems to work already since many years should be carefully evaluated if it makes sense to adapt to recommendation by a static code analysis tool.

Static code analysis is very useful in many areas, such as security or maintainability. It can show you code duplication, which should be avoided as much as possible. However, some code may always have some duplications. For example, for the aforementioned libraries two Hadoop APIs are supported that do essentially the same thing: mapred and mapreduce. Nevertheless, some Big Data tools, such as Hive, Flink or Spark use one API, but not the other one. Since, they are very similar some duplications exist and have to maintained in any case. Nevertheless, in most of the cases duplications should be avoided and remvoed.

One interesting aspect is that popular static code analysis tools in the cloud, such as Codacy, SourceClear or Sonarqube shows you also the quality or even better the evolution of quality of many popular open source projects, so that you can easily evaluate if you want to include them or not.

9. Security & License analysis of third party libraries

Many open source libraries use other open source libraries. The aforementioned libraries are no exception. Since they provide services to Big Data applications, they need to integrate with the libraries of popular Big Data platforms.

Here, it is crucial for the developers and security managers to get an overview of the libraries the application dependent on including the transitive dependencies (dependencies of dependencies). Then, ideally the dependencies and their versions are identified as well as matched to security databases, such as the NIST National Vulnerability Database (NVD).

For the aforementioned libraries the OWASP dependency check is used. After each build the dependencies are checked and problematic ones are highlighted to the developer. The dependency check is free, but it identifies dependencies and matches them to vulnerability based on the library name. This matching is not perfect and requires that you use a build tool with dependency management, such as Gradle, where there are clear names and versions for libaries. Furthermore, the NVD entries are not always of good quality, which makes matching even more challenging.

Commercial tools, such as Jfrog Xray or Nexus IQ have their own databases with enriched security vulnerability information based on various sources (NVD, Github etc.). Matching is done based on hashes and not names. This makes it much more difficult to include a library with wrong version information (or even with manipulated content) in your application.

Additionally, they check the libraries for open source licenses that may have certain legal implications on your organization. Licensing is a critical issue and you should take care that all source code files have a license header (this is the case for the libraries introduced in the beginning).

Finally, they can indicate good versions of a library (e.g. not snapshots, popular by other users, few known security vulnerabilities).

Since the aforementioned libraries introduced in the beginning are just used by Big Data applications, only the free OWASP dependency check is used. Additionally, of course the key important libraries are kept in sync with proper versions. Nevertheless, the libraries itself do not contain other libraries. When a Big Data application is developed with these libraries the developer has to choose the dependent libraries for the Big Data platform.

The development process of Big Data application that use these libraries can employ a more sophisticated process based on their needs or risk exposure.

10. Store build and tested libraries in a versioned manner into an artifact management server (repository)

Since you retrieve already all your libraries in the right version for the artifact management server, you should also make your own libraries or application modules available there.

Our libraries introduced in the beginning are stored on Maven central in a versioned fashion where the developer can pull them in the right version for their platform.

You can also think for your Big Data application to make it available on an on-premise or in the cloud artifact management server (private or publicly). This is very useful for large projects where an application is developed by several software teams and you want to, for example, test integration of new versions of a module of an application and easily switch back if they do not work.

Furthermore, you can use it as part of your continuous deployment process to provide the right versions of the application module for automated deployment up to the production environment.

11. Define a Security policy (security.md)

A security policy (example) is a shared understanding between developers and users on security of the software. It should be stored in the root folder of the source code so that it can be easily found/identified.

As a minimum it defines the security reporting process of issues to the developers. For instance, it should be not public – in the beginning – as issues that everyone can see. The reason is that developers and users need to have some time to upgrade to the new software. After some grace period after fixing the issue the issue of course should be published (depending on how you define it in your security policy).

Depending on your software you have different security policies – there is no unique template. However certain elements can be applied to many different types of software, such as security reporting process, usage of credentials, encryption/signing algorithms, ports used by software or anything that might affect security.

Although many open source software has nowadays such a policy it is by far not common or easy to find. Furthermore, there is clearly a lack of standardization and awareness of security policies. Finally, what is completely missing is automatically combining different security policies of the libraries your application depends on to make a more sophisticated security analysis.

12. Define a contribution policy (contributing.md)

Open source libraries live from voluntary contributions of people from all over the world. However, one needs to define a proper process that defines minimum quality expectations of contributions and description on how people can contribute. This is defined in a policy that is stored in the root folder of the source code (example).

This also avoids conflicts if the rules of contribution are clear. Source code repositories, such as Github, show the contribution policy when creating issues or when creating pull requests.

13. Ensure proper amount of logging

Logging is necessary for various purposes. One important purpose is log analytics:

  • Troubleshooting of issues

  • Security analysis

  • Forensics

  • Improving user interactions

  • Proactive proposal for users what to do next based on other users behavior

Finding the right amount of logging is crucial. Too much logging decreases system performance, too little logging gives not the right information. Different configurable log levels supports tuning of this effort, however, usually the standard log levels (error, warn, info, debug etc.) are used and they do not distinguish the use case mentioned before. Even worse, there is usually no standard across an enterprise. Thus they do not take into account the integration of development and operation. For instance, if there is a warning then operation should know if the warning needs to be taken care of immediately or if it has time to prioritize their efforts.

This also implies that there is a need to have standards for logging in open source libraries, which do not exist. As a first step they would need to document their logging approach to get this discussion in the right direction.

Finally, although certain programing languages such as Java with log4j have rather sophisticated logging support this is not necessarily the case for other programing languages, especially high level languages, such as R or Pig. Here you need to define standards and libraries across your enterprise.

14. Transparency about the project activity

This is the last point (in this blog), but also one of the more important ones. Currently, organizations do not collect any information of the open source libraries they are using. This is very risky, because some open source projects may show less and less activity and are abandoned eventually. If an organizations puts a high valuable business process on them then this can cause a serious issue. In fact many applications (even commercial ones) consists nowadays to a large extent of open source libraries.

Additionally, you may want to know how much effort is put into an open source library, how frequently it changes, the degree of documentation etc. Of course, all these should be shown in context with other libraries.

OpenHub is an example for such a platform that exactly does this (find here an example based on one of the aforementioned libraries). Here you can see the activity and the community of a project based on data it pulls out of the source code repository Github. It provides cost estimates (what would it cost to develop the software yourself) and security information.

Several popular open source projects have been scanned by OpenHub and you can track the activity over years to see if it is valuable to include a library or not.

Conclusion

Most of the aspects presented here are not new. They should been part of your organization since several years. Here, we put some emphasis on how open source developers can increase quality and its transparency using free cloud services. This may lead to better open source projects. Furthermore, open source projects have been always the driver for better custom development projects in company by employing best practices in the area of continuous integration and deployment. Hence, the points mentioned here should also serve as a checklist to improve your organizations software development quality.

Software quality increases the transparency on your software product which is the key of successful software development. This itself is a significant benefit, because if you do not know the quality then you do not have quality.

There are a lot of tools supporting this and even if you have to develop unit/integration tests for 100% coverage this usually does not take more than 5-10% of your developers time. The benefits are, however, enormous. Not only that the developer and users better understand the product, they are also more motivated to see the coverage and keep a high standard of quality in the product. If you software provider tells you that unit/integration tests are not worth the effort or that they do not have time for it – get rid of your software provider – there is no excuse. They must make quality information available in real time: This is the standard.

Given the increase of automation and documentation of software products, I expect a significant increase of productivity of developers. Furthermore, given the converge of Big Data platforms, automated assistants (bots) will guide users to develop themselves more easily and quicker high quality analysis depending on their data. Hence, they will be more empowered and less dependent on external developers – they will become developers themselves with the guidance of artificial intelligence doing most of their work to focus more on valuable activities.

Nevertheless, this depends also on their discipline and the employment of scientific methods in their analysis and execution of business processes.

Lambda, Kappa, Microservice and Enterprise Architecture for Big Data

A few years after the emergence of the Lambda-Architecture several new architectures for Big Data have emerged. I will present and illustrate their use case scenarios. These architectures describe IT architectures, but I will describe towards the end of this blog the corresponding Enterprise Architecture artefacts, which are sometimes referred to as Zeta architecture.

Lambda Architecture

I have blogged before about the Lambda-Architecture. Basically this architecture consists of three layers:

  • Batch-Layer: This layer executes long-living batch-processes to do analyses on larger amounts of historical data. The scope is data from several hours to weeks up to years. Here, usually Hadoop MapReduce, Hive, Pig, Spark or Flink are used together with orchestration tools, such as Oozie or Falcon.

  • Speed-Layer/Stream Processing Layer: This layer executes (small/”mini”) batch-processes on data according to a time window (e.g. 1 minute) to do analyses on larger amounts of current data. The scope is data from several seconds up to several hours. Here one may use, for example, Flink, Spark or Storm.

  • Serving Layer: This layer combines the results from the batch and stream processing layer to enable fast interactive analyses by users. This layer leverages usually relational databases, but also NoSQL databases, such as Graph databases (e.g. TitanDB or Neo4J), Document databases (e.g. MongoDB, CouchDB), Column-Databases (e.g. Hbase), Key-Value Stores (e.g. Redis) or Search technologies (e.g. Solr). NoSQL databases provide for certain use cases more adequate and better performing data structures, such as graphs/trees, hash maps or inverse indexes.

In addition, I proposed the long-term storage layer to have an even cheaper storage for data that is hardly accessed, but may be accessed eventually. All layers are supported by a distributed file system, such as HDFS, to store and retrieve data. A core concept is that computation is brought to data (cf. here). On the analysis side, usually standard machine learning algorithms, but also on-line machine learning algorithms, are used.

As you can see, the Lambda-Architecture can be realized using many different software components and combinations thereof.

While the Lambda architecture is a viable approach to tackle Big Data challenges different other architectures have emerged especially to focus only on certain aspects, such as data stream processing, or on integrating it with cloud concepts.

Kappa Architecture

The Kappa Architecture focus solely on data stream processing or “real-time” processing of “live” discrete events. Examples are events emitted by devices from the Internet of Things (IoT), social networks, log files or transaction processing systems. The original motivation was that the Lambda Architecture is too complex if you only need to do event processing.

The following assumptions exists for this architecture:

  • You have a distributed ordered event log persisted to a distributed file system, where stream processing platforms can pick up the events

  • Stream processing platforms can (re-)request events from the event log at any position. This is needed in case of failures or upgrades to the stream processing platform.

  • The event log is potentially large (several Terabytes of data / hour)

  • Mostly online machine learning algorithms are applied due to the constant delivery of new data, which is more relevant than the old already processed data

Technically, the Kappa architecture can be realized using Apache Kafka for managing the data-streams, i.e. providing the distributed ordered event log. Apache Samza enables Kafka to store the event log on HDFS for fault-tolerance and scalability. Examples for stream processing platforms are Apache Flink, Apache Spark Streaming or Apache Storm. The serving layer can in principle use the same technologies as I described for the serving layer in the Lambda Architecture.

There are some pitfalls with the Kappa architecture that you need to be aware of:

  • End to end ordering of events: While technologies, such as Kafka can provide the events in an ordered fashion it relies on the source system that these events are indeed delivered in an ordered fashion. For instance, I had the case that a system in normal operations was sending the events in order, but in case of errors of communication this was not the case, because it stored the events it could not send and retransmitted them at a certain point later. Meanwhile if the communication was established again it send the new events. The source system had to be adapted to handle these situations correctly. Alternatively, you can only ensure a partial ordering using vector clocks or similar implemented at the event log or stream processing level.

  • Delivery paradigms on how the events are delivered (or fetched from) to the stream processing platform

    • At least once: The same event is guaranteed to be delivered once, but the same events might be delivered twice or more due to processing errors or communication/operation errors within Kafka. For instance, the stream processing platform might crash before it can marked events as processed although it has processed them before. This might have undesired side effects, e.g. the same event that “user A liked website W” is counted several times.

    • At most once: The event will be delivered at most once (this is the default Kafka behavior). However, it might also get lost and not be delivered. This could have undesired side effects, e.g. the event “user A liked website W” is not taken into account.

    • Once and only once: The event is guaranteed to be delivered once and only once. This means it will not get lost or delivered twice or more times. However, this is not simply a combination of the above scenarios. Technically you need to make sure in a multi-threaded distributed environment that an event is processed exactly once. This means the same event needs to be (1) only be processed by one sequential process in the stream processing platforms (2) all other processes related to the events need to be made aware that one of them already processes the event. Both features can be implemented using distributed system techniques, such as semaphores or monitors. They can be realized using distributed cache systems, such as Ignite, Redis or to a limited extent ZooKeeper. Another simple possibility would be a relational database, but this would quickly not scale with large volumes.
      • Needles to say: The source system must also make sure that it delivers the same event once and only once to the ordered event log.

  • Online machine learning algorithms constantly change the underlying model to adapt it to new data. This model is used by other applications to make predictions (e.g. predicting when a machine has to go into maintenance). This also means that in case of failure we may temporary have an outdated or simply a wrong model (e.g. in case of at least once or at most once delivery). Hence, the applications need to incorporate some business logic to handle this (e.g do not register a machine twice for maintenance or avoid permanently registering/unregistering a machine for maintenance)

Although technologies, such as Kafka can help you with this, it requires a lot of thinking and experienced developers as well as architects to implement such a solution. The batch-processing layer of the Lambda architecture can somehow mitigate the aforementioned pitfalls, but it can be also affected by them.

Last but not least, although the Kappa Architecture seems to be intriguing due to its simplification in comparison to the Lambda architecture, not everything can be conceptualized as events. For example, company balance sheets, end of the month reports, quarterly publications etc. should not be forced to be represented as events.

Microservice Architecture for Big Data

The Microservice Architecture did not originate in the Big Data movement, but is slowly picked up by it. It is not a precisely defined style, but several common aspects exist. Basically it is driven by the following technology advancements:

  • The implementation of applications as services instead of big monoliths
  • The emergence of software containers to deploy each of those services in isolation of each other. Isolation means that they are put in virtual environments sharing the same operating systems (i.e. they are NOT in different virtual machines), they are connected to each other via virtualized networks and virtualized storage. These containers leverage much better the available resources than virtual machines.
    • Additionally the definition of repositories for software containers, such as the Docker registry, to quickly version, deploy, upgrade dependent containers and test upgraded containers.
  • The deployment of container operating systems, such as CoreOS, Kubernetes or Apache Mesos, to efficiently manage software containers, manage their resources, schedule them to physical hosts and dynamically scale applications according to needs.
  • The development of object stores, such as OpenStack Swift, Amazon S3 or Google Cloud Storage. These object stores are needed to store data beyond the lifecycle of a software container in a highly dynamic cloud or scaling on-premise environment.
  • The DevOps paradigm – especially the implementation of continuous integration and delivery processes with automated testing and static code analysis to improve software quality. This also includes quick deliveries of individual services at any time independently of each other into production.

An example of the Microservice architecture is the Amazon Lambda platform (not to be confused with Lambda architecture) and related services provided by Amazon AWS.

Nevertheless, the Microservice Architecture poses some challenges for Big Data architectures:

  • What should be a service: For instance, you have Apache Spark or Apache Flink that form a cluster to run your application. Should you have for each application on them a dedicated cluster out of software container or should you provide a shared cluster of software containers. It can make sense to have the first solution, e.g. a dedicated cluster per application due to different scaling and performance needs of the application.
  • The usage of object stores. Object stores are needed as a large scale dynamically scalable storage that is shared among containers. However, currently there are some issues, such as performance and consistency models (“eventually consistent”). Here, the paradigm of “Bring Computation to Data” (cf. here) is violated. Nevertheless, this can be mitigated either by using HDFS as a temporal file system on the containers and fetching the data beforehand from the object store or use an in-memory caching solution, such as provided by Apache Ignite or to some extend Apache Spark or Apache Flink.

I see that in these environments the role of software defined networking (SDN) will become crucial not only in cloud data centers, but also on-premise data centers. SDN (which should NOT be confused with virtualized networks) enables centrally controlled intelligent routing of network flows as it is needed in dynamically scaling platforms as required by the Microservice architecture. The old decentralized definition of the network, e.g. in form of decentralized routing, does simply not scale here to enable optimal performance.

Conclusion

I presented here several architectures for Big Data that emerged recently. Although they are based on technologies that are already several years old, I observe that many organizations are overwhelmed with these new technologies and have some issues to adapt and fully leverage them. This has several reasons.

One tool to manage this could be a proper Enterprise Architecture Management. While there are many benefits of Enterprise Architecture Management, I want to highlight the benefit of managed of managed evolution. This paradigm enables to align business and IT, although there is a constant independent (and dependent) change of both with not necessarily aligned goals as illustrated in the following picture.

enterprise-architecture-managed-evolution

As you can see from the picture both are constantly diverging and Enterprise Architecture Management is needed to unite them again.

However, reaching managed evolution of Enteprise Architecture requires usually many years and business as well as IT commitment to it. Enterprise Architecture for Big Data is a relatively new concept, which is still subject to change. Nevertheless some common concepts can be identifed. Some people refer to Enterprise Architecture for Big Data also as Zeta Architecture and it does not only encompass Big Data processing, but in context of Microservice architecture also web servers providing the user interface for analytics (e.g. Apache Zeppelin) and further management workflows, such as backup or configuration, deployed in form of containers.

This enterprise architecture for Big Data describes some integrated patterns for Big Data and Microservices so that you can consistently document and implement your Lambda, Kappa, Microservice architecture or a mixture of them. Examples for artefacts of such an enterprise architecture are (cf. also here):

  • Global Resource Management to manage the physical and virtualized resources as well as scaling them (e.g. Apache Mesos and Software Defined Networking)

  • Container Management to deploy and isolate containers (e.g. Apache Mesos)

  • Execution engines to manage different processing engines, such as Hadoop MapReduce, Apache Spark , Apache Flink or Oozie

  • Storage Management to provide Object Storage (e.g. Openstack Swift), Cache Storage (e.g. Ignite HDFS Cache), Distributed Filesystem (e.g. HDFS) and Distributed Ordered Event Log (e.g. Kafka)

  • Solution architecture for one or more services that address one or more business problems. It should be separated from the enterprise architecture, which is focusing more on the strategic global picture. It can articulate a Lambda, Kappa, Microservice architecture or mixture of them.

  • Enterprise applications describe a set of services (including user interfaces)/containers to solve a business problem including appropriate patterns for Polyglot Persistence to provide the right data structure, such as graph, columnar or hash map, for enabling interactive and fast analytics for the users based on SQL and NoSQL databases (see above)

  • Continuous Delivery that describe how Enterprise applications are delivered to production ensuring the quality of them (e.g. Jenkins, Sonarqube, Gradle, Puppet etc).

Batch-processing & Interactive Analytics for Big Data – the Role of in-Memory

In this blog post I will discuss various aspects of in-memory technologies and describe how various Big Data technologies fit into this context.

Especially, I will focus on the difference between in-memory batch analytics and interactive in-memory analytics. Additionally, I will illustrate when in-memory technology is really beneficial. In-memory technology leverages the fast main memory and processor caches to deliver superior performance.

While initially the deployment of in-memory technology seems to be attractive, companies have to carefully design how they use the scarce resource memory for big data sets, because the amount of data tends to grow when the company masters successfully Big Data technologies. For instance, they need to think about the issue of memory fragmentation, scheduling, capacity management, the decision how the data should be structured in-memory or making the decision about what data should be represented in-memory.

I will explain that some paradigms introduced for non-in-memory analytics, such as the paradigm that it is better if you do not need to read data than reading it all, is still very valid for in-memory technologies.

Finally, I will give an outlook on current Big Data technologies and their strength and weaknesses with respect to in-memory batch analytics and interactive in-memory analytics.

The Concept of In-Memory

The concept of in-memory became more and more popular around 2007/2008, although the fundamental concepts behind it exist since decades. It was marketed quite heavily by SAP and its HANA in-memory database at this time.

Around the same time, a different paradigm appeared, the concept of distributed Big Data platforms.

In the beginning, both were rather disconnected, where in-memory technologies relied on one “big” machine and distributed data platforms consisted out of a huge set of different more commodity-like machines. In-memory was at this time often associated with interactive queries with fast responses for comparable small datasets fitting in-memory on machine and Big Data platforms for long-running analytics queries crunching large volumes of data scattered over several nodes.

This changed recently. Particularly, in-memory techniques have been brought to long-running analytics queries and distributed Big Data platforms to interactive analytics. The assumed benefit for both cases is that more data can be handled in more complex ways in a shorter time.

However, you need to carefully look what kind of business benefits you can gain from doing faster or more analytics.

Public sector organizations over various domains have significant benefits, because their “ROI” is usually measured in non-monetary terms as benefits for society. A faster, fair and more transparent or scientifically correct analysis can be one example of such a benefit. Additionally, supervision of the private sector need to be on the same level as the private sector.

Traditional private sector organizations on the other hand will have to invent new business models and convince the customer. Here, new machine learning algorithms on large data volumes are more beneficial in comparison of traditional data warehouse reports. Internet Industries including the Internet of Things and autonomous robots obviously have some benefits let it be the processing of large data volumes and/or the need to react quickly to events in the real world.

The Difference between in-memory batch processing and interactive analytics

Often people wonder why there is still a difference between batch processing and interactive analytics when using in-memory. In order to answer this question let us quickly recap the difference between the two:

  • Distributed big data processes: They are long-running because they need to query data residing on several nodes and/or calculations are very complex requiring a lot of computing power. Usually they make calculation/processed data available in a suitable format for interactive analytics. Usually, these processes are planned and scheduled in advance.
  • Interactive analytics: These are often ad-hoc queries from low to very high complexity. Usually it is expected that they return results within seconds or minutes. However, they can also take much longer and are then candidate for distributed big data processes, so that results are precomputed and stored for interactive analytics. Interactive analytics go beyond standard tables to return results faster.

The results of them can be either used by humans or by other applications, e.g. applications that require prediction to provide an automated service to human beings. Basically both approaches fit to the Lambda architecture paradigm.

In-memory technologies can basically speed up both approaches. However, you need to carefully evaluate your business case for this. For example, it make sense to speed up your Big Data batch processes to finish before your people start working or to have more time to do perform additional processes on the same resources – This is particularly interesting if you have a plethora of different large datasets where different analytics can make sense. With respect to interactive analytics, you benefit most if you have specific analytics algorithms that benefit from memory locality, e.g. iterative machine learning algorithms.

If you have people working on large tables using aggregations then you should make them aware that it make more sense to work with samples, in-memory indexes and data structures as well as high parallelism. Aggregating data of a large table in-memory is very costly and the speed difference to tables on disk is most likely not much. The core paradigm should be here: do not read what is needed.

To make it short: Be aware of your priorities to leverage speed-ups by using in-memory technology. Not everything has to be in-memory.

Nevertheless, you should first leverage all possible optimizations without using in-memory technology. An inefficient data structure on-disk is not a better structure if it is in-memory. Additionally, you should think about how much data you need and how precise your results need to be. As I wrote in a previous blog post, this can save you a lot of time that you can use to perform further analytic tasks.

We will in the following describe some challenges with in-memory that you need to tackle to be successful with in-memory technologies.

Challenges with in-memory

Memory fragmentation

Problem

Memory fragmentation does not only occur with in-memory technologies, but on any storage. You can have internal fragmentation, where you allocate more memory to an application than needed or external fragmentation, where you deallocate memory, but new data does not fit into the deallocated memory and you have to use additional memory.

However, it can be rather problematic with in-memory technologies because main memory is usually the smallest storage available. In the context of Big Data, where there can be a huge diversity of different data sets that grow from time to time as well as different algorithms that use memory in a different way, this becomes much quicker apparent as if there would be just one static data set that does not change and is always processed the same way.

The issue here with memory fragmentation is that you have less memory than physically available – potentially a lot less. This leads to unexpected performance degradation and the need to spill over to slower disk space to continue the computation, which may lead to thrashing.

You cannot avoid memory fragmentation, because one cannot look into the future when which data set is loaded and what computation is needed.

Solution

As a first step to handle memory fragmentation is to have a list of processes and interactive queries that are regularly executed and to look at them to see any potential issues with memory fragmentation. This can be used during monitoring to be aware of memory fragmentation. One indicator can be that the available memory does not match the memory that should be consumed. Another indicator can be a lot of spills to disk.

There are several strategies to handle identified memory fragmentation. In case of in-memory batch processes, one should release all the memory after the batch process have been executed. Furthermore, one should use distributed Big Data technologies, which usually work with fixed block sizes from the distributed file system layer (e.g. HDFS). In this case you can partially avoid external fragmentation. You can avoid it only partially, because many algorithms have some temporary data or temporary relevant data which needs to be taken into account as well.

If you have interactive analytics, a very common recommendation even by vendors of popular memcache solutions is to restart the cache from time to time and thereby forcing to reload the data in an ordered manner into cache avoiding fragmentation. Of course, once you add, modify, remove data you have again some fragmentation, which will grow over time.

Another similar approach is called compaction, which exist in traditional relational databases and big data systems. Compactation reduces fragmentation that occurs due to updates, deletion and insertion of new data. The key here is that you can gain performance benefits for your users if you schedule it to time where the system is not used. Surprisingly, often people do not look at compaction, although it has significant impact on performance and memory usage. Instead they rely only on non-optimal default settings, which usually not for large scale analytics, but smaller scale OLTP usage. For instance, it can make sense for large scale analytics to schedule compaction after loading all the data and no new data is arriving before the next execution of a batch processing process.

What data should be in-memory? About temperature of data…

The Concept

It is not realistic to have all your data in-memory. This is not only due to memory fragmentation, but also costs for memory, fault-tolerance, backups and many others. Hence, you need an approach to decide which data should be in-memory.

As already described before it is important to know your data priorities. Quite often these priorities change, e.g. new data is introduced, or data simply becomes outdated. Usually it is reasonable to expect that data that is several months or years old will not often be touched, expect for research purposes. Here is where the temperature of data, i.e. hot, warm and cold data comes into play.

Hot data has been used recently quiet frequently and is likely to be used quiet frequently in the near future.

Warm data has been used recently not as frequently as hot data and it is NOT likely to be used frequently in the near future.

Cold data has not been used recently and is not likely to be used in the near future.

Usually hot data resides on CPU caches and mainly on main memory. Warm data resides mainly on local disk drives and only a small fraction in main memory. Cold data resides mostly on external slow storage potentially accessed via the network or in the cloud.

Managing Temperature of Data

The concept of temperature of data applies to batch processes and interactive analytics equally. However, you need to think about what data needs to be kept hot, warm and cold. Ideally this happens automatically. For example many common in-memory system provide the strategy LRU (last recently used) to automatically move hot data to warm data and eventually to cold data and the other way around. For instance, Memcached or SAP HANA support this as a default strategy.

This seems to be a good default strategy, especially if you cannot or do not want to look into more detail about the temperature of data. Indeed, it has also some sound assumptions, since it is based on the principal of locality, which is also key to distributed Big Data processes and many machine learning algorithms.

However, there are alternative strategies to LRU that you may want to think about:

  • Most recently used (MRU): The most recently used memory element is moved to warm and eventually to cold storage. This assumes that there is stable data that is more relevant than having the newest data.
  • Least frequently used (LFU): The data which has been least frequently used will be moved to warm storage and eventually to cold storage. The advantage here is that recently used data that has been only accessed once is quickly moved as well as data which has been accessed quiet frequently, but not in the near past, will stay in-memory.
  • Most frequently used (MFU): The data which has been most frequently used in the past will be moved from warm storage and eventually to cold storage. The idea here is that the more data has been used the less valuable it will be and hence will be accessed much less in the near-future.
  • Any combination of the above

Obviously, the most perfect strategy would predict what data would least be used in the future (“Clearvoyant algorithms”) and move data accordingly to hot, warm, and cold storage. This is of course not exactly possible, but a sound understanding on how people use your Big Data platform can come pretty close to that ideal.

Of course, you can implement also more sophisticated machine learning algorithms that take into account the environment to predict what data and computation will be required in the future given a new task (cf. here for an approach for scheduling multimedia tasks in the cloud based on machine learning algorithms – the use case is different but the general idea the same). Unfortunately, most of the popular Big Data and in-memory solutions do not implement such an approach yet.

How should the data be structured?

Many people, including business people, have only the traditional world of tables, consisting of rows or columns, in mind when using data. In fact, a lot of analysis is based on this assumption. However, while tables are simple they might not be the most efficient way to store data in-memory or even to process it.

In fact, depending on your analysis different formats make sense, such as:

  • Use the correct data type: If you have numbers use a data types that support numbers, such as integer or double. Dates can often be represented as integers. This requires less storage and the cpu can read an integer represented as integer in magnitudes faster than an integer represented as a string. Similarly even with integer, you should select an appropriate size. If your numbers fit into a 32-bit integer then you should prefer storing it as 32-bit instead of 64-bit. This will increase your performance significantly. However, the key message here is store the data with the right data type and use the available ones and understand their advantages as well as limitations.
  • Column-based: Data is stored in columns instead of rows. This is usually beneficial if you need to access one or more full columns of a given data set. Furthermore, it enables one to avoid reading data that is not relevant using storage indexes (min/max) or bloom filters.
  • Graph-based: Data is stored as so-called Adjacency lists or sometimes as Adjacency matrices. This shows much more performance than row-based or column-based storage with respect to graph algorithms, such as strongly connected components, shortest path etc. These algorithms are useful for social network analytics, financial risks of assets sold by different organizations, dependencies between financial assets etc.
  • Tree-based: Data is stored in a tree structure. Trees can be searched usually comparable fast and is often used for database indexes to find out in which data block a row is stored.
  • Search indexes for terms in unstructured text. This is usually useful for matching data objects, which are similar, but do not have unique identifiers. Alternatively, they can be used for sentiment analysis.Traditional database technology shows, for example, a terrible performance for these use cases – even in-memory.
  • Hash-Clustering Indexes: This can be used in columns stores by generating a hash out of the values of several columns for one row. This hash is stored as another column. It can be used for quickly searching for several criteria at the same time by using only one column. This reduces the amount of data to be processed at the expense of additional storage needed.

Furthermore, the same data might be stored in different formats on warm or cold storage, meaning that you have to decide if you want to have redundant data or generate each time from scratch the optimal storage of data for a given computation.

Compression can make sense for data in-memory, because it enables storing more data in-memory instead of slower disk drives.

Unfortunately, contrary to the strategies to manage data temperature, there are currently no mature strategies to support you automatically how to store the data. This is a manual decision and thus requires good knowledge how your Big Data platform is used.

Do we always need in-memory?

With the need for processing large data sets some things became apparent: It is even with new technologies, such as in-memory or Big Data platforms, sometimes very inefficient to process data by looking at all of the data – it is better not to read data at all!

Of course, this means you should not read not-relevant data. For instance, it was very common in traditional databases to read all the rows to find out matching rows according to a query. Even when using indexes, some irrelevant data is read when scanning the index, although storing the index as a tree structure increased search performance already a lot.

More modern solutions use storage indexes and/or bloom filters to decide which rows they need to read. This means they can skip blocks of data where the rows not matching to a query are not contained (cf. here for implementation in Apache Hive).

Similarly, probablistic data structures, such as Hyperloglog or data based on sampling (cf. here) enables one to avoid reading all the data again or at all. In fact, here you can even skip “relevant” data – as long as you read enough data to provide correct results within a small error margin.

Hence, even with in-memory technologies it is always better to avoid reading data. Even if the data is already in-memory, the CPU needs more time the more data it has to process – a simple but often overlooked fact.

The impact of Scheduling: Pre-emption

Once your Big data platform or in-memory platform grows, you will not only get more data, but also more user working on it in parallel. This means if they use interactive queries or schedule Big Data processes then they need to share resources, including memory and CPU. Especially when taking into account speculative execution. As described before, you ideally have a general big picture on what will happen, especially with main memory. However, in peak times, but for some Big Data deployments also most of the time, the resources are not enough, because of cost or other reasons.

This means you need to introduce scheduling according to scheduling policies. We briefly touched this topic before, because the concept of temperature of data implies some kind of scheduling. However, if you have a lot of users the bottleneck is merely the number of processors that process data. Hence, sometimes some analytics by some users are partially interrupted to make some resources free for other users. These users may potentially use different data sets meaning that some data might be moved also from main memory to disk drives. After the interrupted tasks are resumed they may need to reload data from disk drives to memory.

This can make performance experience sometimes unpredictable and you should be aware of it so you can react properly to incidents created by users or do a more informed capacity management.

Big Data technologies for in-memory

In-memory batch processing

There are several in-memory batch processing technologies for Big Data platforms. For example, Apache Spark or Apache Flink. In the beginning, these platform especially Spark, had some drawbacks by representing everything as Java-Objects in memory. This would mean, for instance, a 10 character String would consume 6 times more memory then representing it as an array of bytes.

Luckily this changed and data is now stored in-memory in a columnar fashion supporting also to skip data on disk that is not relevant (via predicate pushdown and an appropriate disk storage format, such as ORC or Parquet).

Additionally, both support graph batch processing and processing of continuous streams in-memory. However, both rely on a common abstraction for a data structure in which they represent other data structures, such as graphs. For example, in case of Spark it is Resilient Distributed Datasets (RDD)/dataframes). This means they have not as much performance as a highly specialized graph engine, but they are more generic and it is possible to integrate them with other data structures. For most of the current use cases it is sufficient.

Additionally, different processing algorithms, mainly in the area of machine learning are supported.

Sometimes you will see that they are also advertised as interactive platforms. However, this is not their core strength, because they do not support, for example, the concept of data temperature automatically, i.e. the developer is fully responsible to take into account hot, warm, cold data or to implement a strategy as described above. Additionally, they do not provide index support for data in-memory, because this is usually much less relevant for batch processes. Hence, if you want to use these technologies for inter-active analysis you have to develop some common IT components and strategies how to address temperature of data and the do not read irrelevant data paradigm.

In any case you have to think about scheduling strategies to optimize your resource usage of your available infrastructure.

Depending on your requirements, in-memory batch processing is not needed in all cases and your big data platform should support both: in-memory batch processes, but also non in-memory batch processes to be efficient. Especially, if your batch process only loads as well processes once the data without re-reading parts of the data then you won’t benefit a lot from in-memory.

Interactive in-memory analytics

There are several technologies enabling interactive in-memory analytics. One of the older – but still highly relevant – ones is memcached for databases. Its original use case was to speed up web applications accessing the database with many user accessing, i.e. writing and reading, in parallel the same data. Similar technologies are also used for Master Data Management (MDM) systems, because they need to deliver and receive data from a lot of sources to different systems as well as business processes with many users. This would be difficult if one relies only on databases.

Other technologies focus on the emerging Big Data Platforms based on Hadoop, but also augment in-memory batch processing engines, such as Spark. For instance, Apache Ignite provides functionality similar to memcached, but also supporting Big Data platforms and in-memory batch processing engines. For example, you can create shared RDDs for Spark. Furthermore, you can cache Hive tables or partitions in-memory. Alternatively, you can use the Ignite DataGrid to cache selected queries in-memory.

These technologies support advanced in-memory indexes (keep in mind: it is always better not to read data!) and automated data temperature management. Another example is Apache Tachyon.

There are also very specialized interactive in-memory analytics engines, such as TitanDB for graphs. TitanDB is based on the Tinkerpop graph stack including the interactive Graph query (or graph traversal) language Gremlin. SAP HANA is a specific in-memory column database for OLTP, OLAP, text-analytics and graph applications. It has been extended to a full application stack cloud platform based on in-memory technology.

Taking into account scheduling is much more tricky with interactive analytics, because one does not know what the users exactly will do and prediction engines of user behavior for interactive analytics are currently nearly non-existing.

However, you can define different categories of interactive analytics (e.g. simple queries, complex queries, machine learning, graphs ….) and determine your infrastructure as well as its configuration based on these categories.

Conclusions

It makes sense to distinguish between in-memory batch processes and in-memory analytics. In-memory batch processes can be planned and scheduled easier in advance. Additionally, one can better optimize resources for this. They also are more focused towards processing all data. Specific technologies for distributed in-memory Big Data exists and are complementary to technologies for interactive in-memory analytics. The main difference are additional indexes and automated support for the concept of data temperature.

Even for in-memory technology the key concept of Big Data to not read data that is not relevant is of high importance. Processing terabytes of data in-memory even though only a subset is relevant is a waste of resources and particularly time. This is specially difficult to handle for interactive in-memory analytics where the user can do what they want. Hence, automated and intelligent mechanisms to support this are highly desirable. They should be preferred to manual developing the right data model and structures.

Another key concept is to have the right data structure in-memory for optimal processing. For instance, graph structures show much more performance in comparison to relational row or column-based structures that need to be joined very often to perform graph algorithms. Furthermore, probabilistic data structures and probabilistic sampling queries have a growing importance. Depending on your needs you might have the same data represented redundant in different data structures for different analysis purposes.

Finally, distinguishing interactive analytics and batch processing is not always that straight forward. For instance, you can have a batch process running 5 minutes, but the results are queried 1000 times and thus avoiding each time 5 minutes run time can be very beneficial. On the other hand you can have an interactive query by one user which takes 60 minutes, but it is only needed by one user once. This may also change over time, so it is important that even after development of a solution that you monitor and audit it regularly by business users and technical users to check if the current approach still make sense or another approach makes more sense. This requires a regular dialogue even after go-live of a Big Data application.

The convergence of both concepts requires more predictive algorithms for managing data in-memory and for queries. These can be seen only in their first stages, but I expect much more over the coming years.

Big Data – What is next? OLTP, OLAP, Predictive Analytics, Sampling and Probabilistic Databases

Big Data has matured over the last years and is becoming more and more a standard technology used in various industries. Coming from established concepts, such as OLAP or OLTP, in context of Big Data, I go in this blog post beyond them describing what is needed for next generation applications, such as autonomous cars, industry 4.0 and smart cities. I will cover three new aspects: (1) making the underlying technology of predictive analytics transparent to the data scientist (2) avoiding Big Data processing of one large scale dataset by employing sampling and probabilistic datastructures and (3) ensuring quality and consistency of predictive analytics using probabilistic databases. Finally, I will talk about how these aspects change the Big Data Lambda architecture and briefly address some technologies covering the new three aspects.

Big Data

Big Data has emerged over the last years as a concept to handle data that requires new data modeling concepts, data structures, algorithms and/or large-scale distributed clusters. This has several reasons, such as large data volumes, new analysis models, but also changing requirements in the light of new use cases, such as industry 4.0 and smart cities.

During investigations of these new use cases it quickly came apparent that current technologies, such as relational databases would not be sufficient to address the new requirements. This was due to inefficient data structures as well as algorithms for certain analytics questions, but also to the inherent limitations of scaling them.

Hence, Big Data technologies have been developed and are subject to continuous improvement for old and new use cases.

Big Online Transaction Processing (OLTP)

OLTP has been around for a long time and focuses on transaction processing. When the concept of OLTP emerged it has been usually a synonym for simply using relational databases to store various information related to an application – most people forgot that it was related to processing of transactions. Additionally, it was not about technical database transactions, but business transactions, such as ordering products or receiving money. Nevertheless, most relational databases secure business transactions via technical transactions by adhering to the ACID criteria.

Today OLTP is relevant given its numerous implementations in enterprise systems, such as Enterprise Resource Management systems, Customer Relationship Management systems or Supply Chain Management systems. Due to the growing complexity of international organisations these systems tend to have more and more data and – from a data volume point of view – they tend to generate a lot of data. For instance, large online vendor can have several exabyte of transaction data. Hence, Big Data happen also for OLTP. Particularly, if this data needs to be historized for analytical purposes (see next section).

However, one important difference from other systems is the access pattern: Usually, there are a lot of concurrent users, who are interested in a small part of the data. For instance, a customer relation agent adds some details about a conversation with a customer. Another example is that an order is updated. Hence, you need to be able to find/update a small data set in a much large data set. Different mechanismto handle a lot of data for OLTP usage exist in relational database systems since a long time.

Big Online Analytical Processing (OLAP)

OLAP has been around nearly as long as OLTP, because most analysis have been done on historized transactional data. Due to the historization and different analysis needs the amount of data is significant higher than in OLTP systems. However, OLAP has a different access pattern: Less concurrent users, but they are interested in the whole set of data, because they want to generate aggregated statistics for them. Hence, a lot of data is usually transferred into an OLAP system from different source systems and afterwards it is only read very often.

This has led very early to the development of special OLAP databases for storing data for multidimensional analysis in cubes to match the aforementioned access pattern. They can be seen as very early NoSQL databases, although they have not been described as such at this time, because the term NoSQL databases appeared only much later.

While data from OLTP systems have been originally the primary source for OLAP systems, new sources of data have appeared, such as sensor data or social network graphs. This data goes beyond the capability of OLTP or special OLAP databases and requires new approaches.

Going beyond OLTP and OLAP

Aspect 1: Predictive Analytics

Data scientists employing predictive analytics are using statistic and machine learning techniques to predict how a situation may evolve in the future. For example, they predict how the sales will evolve given existing sales and patterns. Some of these techniques exist already since decades, but only since recently they make more sense, because more data can be processed with Big Data technologies.

However, current Big Data technologies, such as Hadoop, are not transparent to the end user. This is not really an issue with the Big Data technologies themselves, but with the tools used for accessing and processing the data, such as R, Matlab or SAS.

They require that the end user thinks about writing a distributed analysis algorithms, e.g. via map/reduce programs in R or other languages to do their analysis. The standard library functions for statistics can be included in such distributed programs, but still the user has to think about how to design the distributed program. This is undesirable, because these users are usually not skilled enough to design them optimally. Hence, frustration with respect to performance and efforts is likely to occur.

Furthermore, organisations have to think about an integrated repository where they store these programs to enable reuse and consistent analytics. This is particularly difficult, because these programs are usually maintained by business users, who lack proper software management skills.

Unfortunately, it cannot be expected that the situation changes very soon.

Aspect 2: Sampling & Probabilistic Data Structures

Surprisingly often when we deal with Big Data, end users tend to execute queries over the whole set of data independent if it is has 1 million rows or 1 billion rows.

Sampling databases

While it is certainly possible to process a data set of nearly any size with modern Big Data technologies, one should carefully think if this is desired due to increased costs, time and efforts needed.

For instance, if I want to calculate the average value of all transactions then I can calculate the average of all transactions. However, I could take also a random sample of 5 % of the transactions and know that the average of this sample is correct with an error of +-1 % in comparison of the total population. For most decision making this is perfectly fine. However, I needed to process only a fraction of the data and can now do further analysis due to the saved time and resources. This may even lead to better informed decisions.

Luckily, there are already technologies allowing this. For example, BlinkDB, which allows – amongst others – the following SQL queries:

  • Calculate the average of the transaction values within 2 seconds with some error:SELECT avg(transactionValue) FROM table WITHIN 2 SECONDS
  • Calculate the average of the transaction values within given error boundaries :SELECT avg(transactionValue) FROM table ERROR 0.1 CONFIDENCE 95.0%

These queries executed over large-scale dataset are executed much faster than in any other Big Data technology not employing sampling methods.

Particularly for predictive algorithms this makes sense, because they have anyway underlying assumption about statistical errors., which can be easily integrated by a data scientists with errors from sampling databases.

Bloom filters

Probabilistic data structures, such as Bloom filters or HyperLoglog, are aiming in the same direction. They are more and more implemented in traditional SQL databases and NoSQL databases.

Bloom filters can tell you if an element is part of a set of elements without browsing through the set of elements by employing a smart hash structure. This means you can skip trying to access elements on disk which anyway do not exist. For instance, if you want to join two large datasets you need only to load the data for which there is a corresponding value in the other dataset. This dramatically improves the performance, because you need to load less data from slow storage.

However, bloom filters can only tell you if an element is definitely not in the set. This means it can only tell you with a certain probability if a given element is in the set. However, this is for the the given use cases of bloom filters no problem.

HyperLogLog

Hyperloglog structures allow you to count the number of unique elements in a set without storing the whole set.

For example, let us assume you want to count the unique listeners of a song on the web.

If you use traditional SQL technologies then you need to store for 5 million unique listeners (not uncommon on Spotify) 80 MB of data for one song. It takes also several seconds for each web site request just to do the count unique or to insert a new unique listener.

By using HyperLoglog you need to store at a maximum only few kilobytes of information (usually much less) and can read/update instantaneously the counted unique listeners.

Usually these results are correct within a minor configurable error margin, such as 0,12%.

Find some calculations in my lecture materials.

Aspect 3: Probablistic Databases

Your company has deployed a Big Data technology and uses it productively. Data scientists generate new statistical models on a daily basis all over the world based on your enormous data reservoir. However, all these models have only a very selective view on the real world and different data scientists use different methods and assumptions to do the same analysis, i.e. they have a different statistical view on the same object in the real world.

This is a big issue: Your organization has an inconsistent view on the market, because different data scientists may use different underlying data, assumptions and probabilities. This leads to a loss of revenue, because you may define contradictory strategies. For example, data scientist A may do a regression analysis on sales of ice cream based on surveys in North France with a population of 1000. Data scientist B may independently do a regression analysis on sales of ice cream in South Germany with a population of 100. Hence, both may come up with different prediction for ice cream sales in middle Europe, because they have different views on the world.

This is not an issue only with collaboration. Current Big Data technologies do not support a consistent statistical view of the world on your data.

This is where probabilistic databases will play a key role. They provide an integrated statistical view on the world and can be queried efficiently by employing new techniques, but still supporting SQL-style queries. For example one can query the location of a truck from a database. However, instead of just one location of one truck you can get several locations with different probabilities associated to them. Similarly you may join all the trucks with a certain probability being close to goods at a certain warehouse.

Current technologies are based on academic prototypes, such as MayBMS by Cornell University, BayesStore by University of California Berkeley or Trio by Standford University.

However, the technologies lack still commercial maturity and more research is needed.

Many organization are not ready for this next big step in Big Data and it is expected that this will take at least 5-10 years until the first are ready to employ such technology.

Context of the Lambda architecture

You may wonder how this all fits into the Big Data Lambda architecture and I will briefly explain it to you here.

Aspect 1: Integration of analytics tools in the cluster

Aspect 1, the integration of analytics tools with your cluster, has not been really the focus of the Lambda Architecture. In fact, this is missing, although it has significant architectural consequences, since it affects resources used, reusability (cf. also here) or security.

Aspect 2: Sampling databases and probabilistic data structures

Sampling databases and probabilistic data structures are most suitable for the speed-and serving layer. They allow fast processing of data while only being as accurate as needed. If one is satisfied with their accuracy, which can be expected for most of the business cases after thoughtful reconsideration, then one even won’t need a batch layer anymore.

Aspect 3: Probabilistic databases

Probabilistic databases will be initially part of the serving layer, because this is the layer the data scientists directly interact with in most of the cases. However, later it will be integral part of all layers. Especially the deployment of quantum computing, as we see it already in big Internet and High Tech companies, will drive this.

Conclusion

I presented in this blog post important aspects for the future of Big Data technologies. This covered the near-term future, medium-term future and long-term future.

In the near-term future we will see a better and better integration of analytics tools into Big Data technology, enabling transparent computation of sophisticated analytics models over a Big Data cluster. In the medium-term future we will see more and more usage of sampling databases and probabilistic data structures to avoid unnecessary processing of large data to save costs and resources. In the long-term future we will see that companies will build up an integrated statistical view of the world to avoid inconsistent and duplicated analysis. This enables proper strategy definition and execution of 21st century information organizations in traditional as well as new industries.

Master Data Management and the Internet of Things

Master Data Management (MDM) has matured and grown significantly over the last years. The main motivation for master data management is to have a complete and accurate view on master data objects in your organization. Master data objects describe key assets, such as machines or customers, generating value for your organization. Hence, MDM fosters processes to enhance and improve the quality of master data, so that the key assets are used properly to generate value. However, most of these processes require still manual intervention by humans. Furthermore, master data is usually not up-to-date due to its manual improvement and tracking. Especially the current state of master data is usually only entered into the system after hours or even days. This makes it difficult to act upon this state or to predict changes to it. Clearly, this can be a disadvantage compared to the competition who leverages real-time information when using their master data. For instance, one cannot predict that a customer might move to another city in the near future or that the planes one operates will require maintenance at an inconvenient time delaying an offered flight.

The vision of the Internet of Things (IoT) is to connect things sensing and acting upon their environments to the Internet and exchanging data about their state as well as their environment. IoT enables a real-time 360° view on your key assets and their interaction with the environment. Current studies estimate that by 2020 several billions of things will be connected via the internet.

Hence, it make sense to combine MDM and IoT to improve your business processes acting upon master data. These processes will benefit from an up-to-date state of master data, but can use this data to enable predictive analytics applications, such as predictive maintenance or customer retention.

I will describe in more detail both concepts and how they can be integrated. Afterwards I will discuss current challenges with respect to architectures, data models and predictive analytics applications. Finally, I will provide insights on how next generation MDM systems look like.

What is Master Data Management?

Master Data is data about the key assets in a company. Examples are customers, machines, products, suppliers, financial assets or business partners.

One should differentiate master data from transactional data, which always refers to master data. Master data object can exist on their own and do not need to refer necessarily to other data, i.e. they make sense without any relations. For instance, a customer can exist without other customers. However, the customer has usually (social) relations to other customers. A transaction for buying a product cannot exist without a customer and a product.

One of the key issues for MDM is the integration of various systems containing master data. Usually this data is inconsistent and incomplete due to various reasons. This has significant impact on the business processes using master data, which leads to significant cost and waste of resources.

Hence master data management solutions provide various means to improve master data quality automatically and manually. For instance, they offer rules engine to validate data quality and workflow engines to assign tasks to data stewards to fix incorrect data. Currently, most efforts related to improving master data quality is by improving it manually.

What is the Internet of Things (IoT)?

The Internet of Things is about a paradigm that connect any things, such as machines, cars, smartphones, thermostates or smoke detectors to the Internet where they provide information about their state and their environment to other things as well as humans.

For example, a machine can report its utilization to other machines and inform its users about alternative machines to use in case of high utilization.

The Internet of Things does not only take into account the current state of things, but also the future state of things by employing predictive analytics applications.

For instance, a car can predict based on its sensor information that the engine is likely to fail within the next seven days. It can schedule maintenance with the manufacturer so it does not fail when it is needed by the driver.

Challenges Combining MDM and IoT

The main benefits of integrating MDM and IoT are the following:

  • Automatically update master data and its state to improve value-flow of current business processes.
  • Enable prediction on master data, such as predictive maintenance of machines or predictive customer behavior, to enable new types of business processes and models.

However, currently there are some challenges integrating them.

Internet of Things and Semantic Challenges

The Internet of Things brings only value to an organization if it can use the IoT information within a proper analytics model describing the semantic relations between things and master data objects.

For instance, if the company collects only information such as sensor “A4893983” reports its location as “50.106529,8.662162” then it is of very little value to the company.

However, if it would have a proper semantic description for MDM and IoT data then it can leverage this data to generate the following information: “Customer Max Mustermann is currently at Frankfurt central station and using one of our products. His friend, Martha Musterfrau, is currently near him, but having problems with one of our products”.

These types of predictive analytics and semantic models as well IoT information require new database technologies, which will be described later.

Combining Big Data and Master Data Management

Traditional master data management solutions have not been designed with “Big Data” in mind. However, combining MDM and IoT require “Big Data”:

  • Higher data volumes due to IoT Data
  • Complex analytics queries over existing MDM data with a lot of relationships
  • Variety of information in master data objects and IoT database

This requires as well new database technologies.

Providing Prediction to Business Processes

Traditional master data management solutions only support provision of master data to business processes. However, modern master data management solutions supporting IoT will have to provide predictive analytics to business processes. Examples are answers to questions, such as the following:

  • Which of my machines is likely to fail next and which ones should be sent to maintenance?
  • What product is the customer most likely to buy next and which material do I need to buy to build it?

Relational databases are suitable for descriptive statistics, but quickly reach their limit with respect to even simple prediction models. Hence, new database technologies have to be supported.

Technology Support

Current MDM solutions are based mostly on relational SQL databases together with caching solutions. This is suitable for integrating master data objects from MDM systems into today’s business processes. Unfortunately, this makes them less suitable for predictive analytics applications due to the limitation of relational algebra. They also cannot handle a lot of relations between master data objects as it is required today (e.g. many different versions of master data objects or by master reference data, such as social network graphs or dependency graphs). This limits as well opportunities for data quality enhancements and results in poorer data quality. This leads to higher costs within the business processes using master data.

Modern MDM solutions leverage Graph databases to store and analyze master data objects as well as provide them to business processes. They offer similar transactional guarantees as relational databases, but have different storage and index structures more suitable for MDM. However, they have not become yet first class citizens in companies which currently have to build up knowledge in this areas. Nevertheless, large software vendors, such as SAP or Oracle are starting to offer graph databases as part of their databases solutions. Popular open source graph databases/processing solutions, such as OrientDB, Neo4J or Spark GraphX, TitanDB exist since several years and they can cope with large amounts of data.

Furthermore, relational databases only poorly integrate IoT data which is about the ability to digest large volumes of data and do analytics on them. This cannot be coped with anymore using vertical scaling – a prominent paradigm for relational databases, but a database cluster consisting of several communicating nodes is needed. Column-stores, such as Apache Cassandra (together with an analytics framework, such as Hadoop MapReduce or Apache Spark), Hadoop/HBase (Parquet) or SAP HANA, seem to be most suitable for this scenario. They offer high read/write throughput and thus are able to cope with the high volume of IoT data. Furthermore, they can be scaled horizontally by adding new database nodes to an existing network of nodes. Finally, you can manage load by using Apache Kafka Messaging Technology.

Find here my university-level lecture material on NoSQL & Big Data platforms.

Example

The following figure illustrates the concept of MDM and IoT by means of an  exemplary data model. Master data objects are represented as nodes of a graph with relations to other nodes. The following master data objects can be identified: 2 electronic devices and 2 customers. The customers, Max Mustermann and Martha Musterfrau, are friends and this is represented in the Master data object graph. Furthermore each of the customers has an ownerships relation to a product (an electronic device) sold by a company.

Finally, IoT data is illustrated in the figure. This data is connected to the master data objects providing information about their state. For example, the smartphones of the customers provide information about their location (“Central Station, Frankfurt, Germany”). The IoT data of the eletronic devices provide information about their operation status. One electronic device is operating normal and the other one is broken.

examplemdmgraph The example demonstrate only a small excerpt of what is possible with a next generation master data and IoT management system. Some examples for queries that can be answered:

  • Who is the owner of devices in the state “Broken”?
  • Which customers can support other customers nearby with devices in state “Broken”?
  • Which customers influence their friends to buy new devices or recommend devices?
  • Which devices in Frankfurt are likely to fail within the next week and needs replacement?

Additional information from IoT data enables superior data quality. For instance, we can properly identify customers and devices. This avoids costly maintenance of working devices or costly replacement of non-working ones.

It is obvious that such a new system enables enhanced sales to customers because more information allows more targeted advertisement and more customization. Based on prediction models one can offer completely new value-added services.

Conclusions

Master Data Management enters a new area: New database technologies and the Internet of Things enable superior data quality and open up new business cases, such as predictive analytics. Ultimately this leads to new business processes offering superior value.

Nevertheless, only few MDM solutions are leveraging these new technologies yet, although these new technologies are already quiet mature. Additionally, the Internet of Things has to become more pervasive and organizations need to pressure their suppliers and customers to engage more with it.

The Lambda Architecture for Big Data in your Enterprise

I will present in this blog post the Lambda architecture for Big Data. This architecture is about integrating historical Big Data with “live” streaming Big Data. Afterwards, the concept of a large data lake in your enterprise or amongst enterprises in a B2B scenario is explained. This data lake – based on the lambda architecture – can replace a service oriented architecture (SOA), because it is easier to implement and manage for large data volumes in a variety of formats. Hence, a plethora of use cases arises. Finally, I will discuss how this architecture can be implemented using various open source software technologies based on the Hadoop Ecosystem.

The Lambda Architecture

Big Data has become an increasing popular topic over the last years. Big Data is about processing large volumes of data in a variety of formats taken into account live  streaming or historical data. One large computing cluster is used to store and process all of one or more companies’ data.

Internet companies, such as Google, Yahoo or Facebook, are driven by new business models for which existing technology was not suitable. This led to the development of new technologies known under the common umbrella of NoSQL. Furthermore, there has been the need to integrate them in a flexible standardized architecture to enable Big Data. The lambda architecture is such an architecture and has been coined recently by Nathan Marz and James Warren.

It has the following key features:

  • Standardized fault-tolerant distributed file system that spawns across the whole cluster – this file system is the base of the data lake that I will explain later.
  • A batch processing layer for processing large amounts of historical data stored in the computing cluster
  • A serving layer for providing fast access to results of batch processed data
  • A real-time processing layer (or “speed layer”) for “live” processing of data streams, such as sensor data or stock market data
  • A long term storage layer optimized for extremely cheap storage of data that is rarely used (e.g. for legal reasons). Usually you do not find this in other articles describe lambda architecture, but I think it is an important feature to highlight. Here you have very old data (more than multiple years) that you do not need in your day to day business – you can store them on very cheap hardware with a lot of disk space but much less computing power and memory capacity.

These features are not new and have been addressed partly also by other architectures known in other domains, such as Business Intelligence, Complex Event Processing, Data Warehouse or Master Data Management. However, the lambda architecture addresses them in context of huge data volumes, diversity of data formats (polyglot persistence) and integrates them all in one architecture.

The term “lambda” stems from the following function used for doing analytics in context of Big data:

query = λ(all data) = λ (live streaming data) * λ (historical data)

Basically it say that all analytics functions λ combining live streaming data and historical data can be computed on systems implementing the lambda architecture. I will later discuss the implication of this for the implementation of the architecture.

The lambda architecture is illustrated in the following figure

lambdaarchitecture

The lambda architecture provides the data scientist means and tools to analyze any data occuring in the company, whereby tools can be easily plugged into the architecture without requiring later major implementation efforts.

Machine learning components can autonomously leverage the lambda architecture to do prediction and automatically implement actions. This is known as predictive and prescriptive analytics.

Data Lake

One of the most interesting aspect of the lambda architecture is that you have a cluster of nearly unlimited storage and memory capacity. You can have even an in-memory database with a memory capacity on the terabyte to petabyte scale distributed over the whole cluster. Popular open source frameworks, such as Hadoop, allow you to use commodity hardware, so that deploying such an architecture can be relatively cheap and they have already built-in fault-tolerance, so that developers do not need to mess around with it.

With such a large cluster you can create a big data lake in your company (see next figure). Basically all your data ends up in this cluster and all applications including the one in the cloud can share it via simple file system access mechanisms and you can use the computing power of the whole cluster to do analysis. Needless to say that you save a lot of money, because you save a lot of redundant  ETL processes, which all have to be made fault-proof and interact with different systems. Modern Big Data architectures take care of this for you.

datalake

Finally, exchanging data becomes much easier than in a Service-oriented Architecture (SOA), where you need to design interfaces and implement services – here every application simply access the distributed file system in the cluster.

Implementing a Lambda Architecture

There are several things to consider when you implement the lambda architecture. Firstly, you can choose from a variety of components to implement it. For instance, on the open source side Apache Hadoop / Apache Spark is very popular which is used by many companies including all popular Internet companies, such as Facebook or Google. You can also use other open source components, such as Apache Cassandra for batch processing and Twitter Storm for Stream processing. Additionally, you can also use commercial tools, such as SAP HANA Cloud platform. Finally, you can put your lambda architecture completely on-premise, completely in the cloud (see my example with Amazon Elastic Map Reduce, which partly implements a Lambda Architecture) or have some kind of hybrid model. In the following I will describe an implementation using Apache Hadoop and additional tools that can integrate with Apache Hadoop.

Software Components

You can use the following components for implementing the lambda architecture.

  • Standardized fault-tolerant distributed file system: Hadoop Distributed Filesystem (HDFS). You can use also other distributed file systems. The choice of the file system is transparent to the application, i.e. they won’t need to use different APIs for different file systems. Most of the time you will be fine with HDFS, but, for example, cloud providers, such as Amazon, may implement their own that fits to their infrastructure.
  • Batch Processing layer: Here you can use Hadoop Yarn, which is responsible for distributing Big Data Analytics jobs, such as map reduce jobs. Yarn allows you even to “containerize” your jobs, i.e. define CPU, memory and network limitations across the big data cluster for a specific job. This allows you to do proper capacity management – one of the most important aspects of a lambda architecture. If you need in-memory batch processing then you should check out Apache Spark. If you want to have a more generic job control, i.e. because you have other distributed applications around your cluster , not based on the MapReduce paradigm, you can use Apache Mesos.
  • Serving layer: The serving layer provides fast access and advanced query mechanisms for results of batch jobs. Here you can use typical Big Data databases and data warehouses, such as Apache Hbase or Apache Shark (for in-memory access). You will probably have multiple different technologies here according to the polyglot persistence NoSQL paradigm. They offer typical interfaces, such as JDBC or ODBC, to integrate with any application.
  • Real-time processing layer: Although Hadoop can process streaming data, most of the time you will choose a software component supporting complex event processing of live streaming data across your cluster, such as Apache Spark Streaming or Twitter Storm.
  • The long term persistence layer is mostly a hardware choice: Here you need a lot of cheap hard disk space, e.g. by not using SSD flash drives, and little computing / memory power. It is usually a separate cluster connected to the other cluster and it leverages the fault tolerance features of HDFS, such as automated replication of data to several nodes and re-replication in case of node failures.

Furthermore, you can have a lot of other software components that automatically build on the aforementioned core technologies, such as Apache Hive or Apache Shark, a Data Warehouse for Hadoop, or Apache Oozie, which is a workflow tool for complex ETL processes distributed over your data lake.

As mentioned before, there is a wide variety of alternatives that you can use to implement the lambda architecture. The standardized fault-tolerant distributed file system is most of the time the base for everything and you can also gradually evolve your architecture and implement it using different components.

Delivery Pipeline

I briefly described before that capacity management is an important part of the lambda architecture. You need to define how big data jobs are programmed and tested as well how they get into the cluster. I expect that in the future not only programmers, but also business people, such as data scientist will need to load big data jobs in your cluster. This means you will need to (1) properly define your delivery pipeline (2) implement and enforce proper capacity management and (3) have a bullet-proof dependency management for different software versions in your cluster.

Luckily by using Apache Yarn or Apache Mesos together with a cluster monitoring software, such as Ganglia, you can do proper capacity management.

Recently, more tools, such as Docker, using advanced virtualization features of the Linux kernel (cgroups) have emerged making capacity management even more easier and flexible. These technologies also have built-in dependency management to avoid a library/versioning hell. Google developed an open source scheduling system, called Kubernetes for them.

Combining Stream-Processing and Batch Data

One core goal of the lambda architecture is to integrate live streaming and batch processing. In fact, most of the recent articles on lambda architecture are just about providing both as software components. However, you will also need to integrate this on the query level, because complex event processing queries are a little bit different from batch processing queries.

Spark Streaming demonstrates how you can join historical data with stream processed data at the same time.

Hardware Components

Hardware considerations for a lambda architecture have – if at all – only been briefly discussed in most of the publications. Hardware planning is important for your cluster – we have seen this already with the long term storage. Furthermore, if you have in your big data cluster a few very old machines than this will affect all jobs running on your cluster. You will need to have proper monitoring tools and rules deployed to identify automatically these kind of bottlenecks.

Conclusion

Once you have implemented the lambda architecture you will need to teach everybody to use it. You will need to plan migration of datas torage for analytics from the individual systems to your data lake, i.e. your big data cluster. Keep in mind that the lambda architecture is about analytics. Although it is possible to include transactional systems into this (e.g. a MySQL Cluster), you will probably still use for your individual ERP systems, CRM systems etc. standard transactional databases of which you extract the data in put them into the cluster for analytics.

However, there are also other tools for doing distributed transactions, such as CloudTPS or even more advanced the Bitcoin transaction system. They may replace individual transactional databases in the future.

More and more companies are embarking on the journey of a standardized Big Data architecture each year. Most of them use open source technologies to gradually migrate towards one big data lake as it has been described here.