Will Big Data Save You From MiFID II Hell?

Context

MiFID II and MAD/MAR II implementation projects have been an increasing pressure point on financial firms’ information systems and their teams. As organization challenges now start to become clearer for CIOs and IT managers, we see the technical issues gradually coming under the spotlight, with Big Data playing a central role.

The two promises made by Big Data – scalability and flexibility – may indeed sound particularly appealing for financial firms.

  • Big Data systems are scalable because their capacity to store and process data grows proportionally to any increase in the volume of data. This is especially important since upcoming regulations such as MiFID II should result in a 10-100x increase of the amount of data to be monitored, stored and reported.

  • The second point – flexibility – comes from the ability of Big Data systems to offer flexible data schemas – to seamlessly adapt to new and unanticipated modifications (of both data and data models). This is absolutely key as we see every new regulation widening the scope of asset classes and entities subject to reporting mandates.

While in the beginning of 2015, many actors still wondered whether Big Data should be considered to be part of the response, today most (if not all) acknowledge that it is unavoidable. The question has now clearly shifted from “should we use it” to “how can we use it”.

Problem

The “how” is indeed the true pain point – and from our perspective, any answer to the “how” must be articulated around 3 key regulatory requirements that emerge as the most challenging on your data organizations and workflows: 

  1. Large and heterogeneous sets of “trade data” must be monitored and stored
    (almost any asset, any format - including voice - from any source, regardless of silos)
     
  2. Trade data must be processed and stored in a way that guarantees a complete and instantly available audit trail
    (supervisory agencies should be able to access it without delay)
     
  3. Transactions must be monitored in near-real-time
    (with a latency not greater than 5 seconds)

From those key requirements, the following three technical challenges follow :

  • As we said, IT managers familiar with the topic now tend to agree that collecting and storing such very large volumes of data would be difficult to achieve without an appropriate Big Data architecture. Granted, this mere volume issue, on its own, is not the hardest challenge. Many banks have already deployed big data projects on other use cases, some of them even have large scale infrastructures already in production. There is now a complete array of mature solutions available on the market. Hadoop companies such as Cloudera, HortonWorks and MapR or alternate solutions such as MongoDB and DataStax for example, have developed offers around the need for enterprise-ready big data DBMS.

  • However, large distributed infrastructures for data collection and processing have been historically uneasy to reconcile with the level of data consistency and transactional integrity required in a regulatory context. This difficulty can be circumvented through the ability to timestamp and version any modification of a single data point – hence turning a consistency problem into a volume constraint.

  • But the final – and perhaps most critical – layer of complexity comes from the need for a near real- time monitoring of transactions, and rendering of the data (needless to say, this difficulty becomes even more acute in the context of algorithmic trading – and especially, high-frequency trading), without compromising on both scalability and consistency.

    When it comes to market surveillance, monitoring & reporting obligations, any choice in architecture and/or vendor should take a full account of these 3 challenges. Tackling them from the very beginning of your Mifid2 / MAR project will not only prevent unfortunate surprises down the road, it will also allow your organization to take a decisive competitive advantage. Having a centralized, real-time picture of all your transactional data with full analytics capabilities will drive better and more efficient decisions.

Bottom line

Regardless of the kind of Big Data solution your organization intends to implement to address MiFID II & MAR (in-house project, vendor toolsets, integrated platform...), do make sure that is solves the Scalability / Integrity / Real-Time equation. If it does not, you may end up with a shiny new big data platform which, unfortunately, will not solve your regulatory issues.

Read more in our complete White Paper how big is the challenge, and how Scaled Risk solve the issues on two practical uses cases. Download it here.

Leveraging HBase for a scalable distributed In-Memory OLAP/HTAP solution

The financial industry has been looking for a new technology that is able to continuously deliver risk figures accurately. After 2008 crisis, stricter regulation is being imposed on the financial industry, including intraday calculations and higher flexibility.

In this context, most tier-1 banks have identified Hadoop as a good candidate for delivering these high speed risk analytics. In reality IT departments face a too rich offer and too high costs of integration for building a consistent system that is actually able to deliver continuous Risk Computation.

Leveraging HBase

HBase offers a standard, consistent storage solution that gives very powerful capabilities to Scaled Risk OLAP cube. The distributed architecture of HBase provides true scalability that has been experimented up to several thousands of servers. Adding servers will enable more parallel processing capabilities to build cubes and then maintain up to date in real-time.

Automated data sharding and load balancing

Data sharding is the ability to split tables into several subsets of rows that are managed in parallel on several machines. Scaled Risk uses dynamic and flexible data sharding, which means that adding extra resources is as simple as adding extra nodes to the existing cluster.

The distributed capabilities that Scaled Risk offers must not get mixed up with distributed solutions that rely on static sharding, which always require huge deployment and administration efforts.

Distributed computation

Beyond data storage distribution, data processing is executed on each node for maximum parallelization and minimum network usage. Unitary computations and global aggregation never use locks thanks to the innovative architecture.

Caching capabilities

While data are persisted on disks, Scaled Risk enables huge cache on reads, which are enabled by each node RAM. It is actually a distributed cache that can be filled with various policies depending on data usage: cache on read, cache on write, mixed approach. A small cluster of 10 mid-range servers enables 1.2 TB of distributed cache. 

 

By leveraging HBase, Scaled Risk offers a disruptive and powerful OLAP/HTAP solution, at the crossroad of transactional data management, Business Intelligence, Big Data and real- time data processing.

Compared to other available solutions, Scaled Risk offers a unified platform:

  • Easy connectivity and built-in storage
  • Full scalability, so OLAP cube capabilities can be extended by adding extra resource (CPU, memory or HDD/SDD)
  • Low latency real-time architecture based on push technology from data input to the end-user desk
  • The best service continuity on the market with no need for active-passive architecture
  • Advanced built-in aggregations functions and an open API to integrate custom functions
  • Off-the-shelf grids and charts
  • An open and unified service layer to access all Scaled Risk Features 

Read more and discover how Scaled Risk leverages HBase to offer the best scalable distributed In-Memory HTAP solution on the market. Download our complete technical paper on Scalable Distributed In-Memory Analytics Platform.

Why is HBase the key enabler for trade repositories?

Trade and market data repositories contain many different types of formats including the space cruncher XML based FpML format. FpML is the recognized standard for communicating with Clearing Houses and counterparties. Hadoop as a distributed filesystem is obviously able to store all these documents. An even better option is to use HBase that enables very important properties:

  • Structured, interactive and indexed
  • Transactions

  • Versions
     

Structured, interactive and indexed

In the end, we would like the trade repository containing:

  • Each individual trade in different format: original from the internal system (proprietary, Murex, Misys, Calypso, ...), FpML and a pivot format if several internal systems are involved
  • All raw messages exchanged with clearing houses, counterparties: FpML, FIX, ...
  • Market data from usual vendors
  • Reports: consolidated risk, VAR, CVA, FpML, ...

We would like to benefit from the best values of the two worlds: traditional database for “structured data” and a file system for “unstructured data”. HBase is able to structure data, a bit differently from what a traditional database would do, which enable structured information, and even hierarchical organization of data.

A typical record in HBase. A row can be identified by its rowkey
and associated in a very flexible way to a bunch of values
 

This property allows building any kind of structures and data models. The Key-Value data organization enables the atomic CRUD operations in a structured way, which is the prerequisite for interactive system. CRUD stands for create, update delete.

Even if HBase does not implement secondary indexes, each row can be efficiently retrieved thanks to its row key. It provides HBase with some kind of indexation capabilities. In the next section, we will see how we have complemented it with an efficient indexing and searching feature.
 

Versions and timestamps

This interesting feature of HBase is very useful to implement a full audit trail of modification for every data. It is also possible to retrieve the data state at a given date.

A modified value. In bold is the active (latest) value,
but older values can be retrieved, all modifications are kept and time-stamped

 

Transactions

HBase guarantees that any data acknowledge is actually stored and kept. It is not ACID (see http://hbase.apache.org/acid-semantics.html) but is atomic and provides a sufficient level of consistency for a trade repository (we have complemented this in HBase).
 

Hadoop based: reliable, HA, extensible and cost- effective

HBase is built over Hadoop and benefits from all the nice properties listed in the previous chapter. 

 

Read more by downloading our white paper. We explain how Using Big Data low layers as foundations of your local Trade and Market Data repository, and why using Hadoop and HBase is the most efficient and cost effective option. 

Database transactions and distributed computing

A system that supports reliable transactions will:

  • Keep the data consistent at any time, including if failures happen
  • Provide isolation among all applications and user accessing the database

In 1983, transactional database properties were summarized by the acronym ACID:

  • Atomic: all or nothing for a set of operations, either all accepted or all rejected
  • Consistency: time consistency, semantic consistency and constraint consistency, certainly the most flexible property of the four
  • Isolated: reads are isolated from transactions being written. There are several levels for isolations. Most important is to avoid dirty reads, commonly accepted isolation level is “Read committed”. But it is not sufficient for all cases especially in compliance contexts
  • Durable: once a transaction has been committed, data will stay forever, whatever happens (e.g. crash just after a transaction committed)

In the world or Relational Database Management System, any serious solution complies with ACID properties. Most implementations are not distributed, which greatly simplifies the implantation of ACID properties.

Before addressing the ACID issue with distributed database, we must introduce the notion of eventually consistent and strongly consistent which is part of the distributed database vocabulary.

Distributed databases obviously drop into two main categories for implementing transactions. The first group focuses on performance and simplicity at the price of no certainty for consistency in time. The second category pays a small (when efficiently implemented) trade-off in performance to get certainty that a transaction has actually been written (per row Atomicity) and will remain written (Durability). Eventually consistent systems do not guarantee that a data will be viewed the same, while it is being changed (updated or deleted).

  • Eventually consistent systems, by nature, disable ACID properties
  • Strongly consistent systems are able to implement per row ACID properties

HBase implements strong consistency combined with replication, which means that a transaction is acknowledged when it has been actually done for all replicates; data is not visible as long as the transaction is not committed and that it is visible and consistent to every reader once committed. However a transaction in HBase does not have the exact same properties as a transaction in an ACID RDBMS. 

Is HBase ACID?

The straight forward answer is no, because at least, HBase does not provide atomicity for a set of operations. On the other hand, HBase claims to be ACID per row. In the end, this question sounds very theoretical. Scaled Risk solves a very practical equation:

HBase + Scaled Risk = Transactional system for finance

Because:

  • Any single table atomicity issue can actually be solved with HBase
  • Any multi-table problem can actually be addressed by a per row Atomicity
  • Scaled Risk is able to isolate multi-row transactions for reads

ACID properties do not actually work on traditional RDBMS when inserting/updating/deleting 1 billion rows in a single transaction.


Read more and discover how Scaled Risk extends HBase core features to enable off-the-shelf transactional Big Data for the finance professionals by downloading our complete white paper on Transactional Big Data for Finance.

Hortonworks, Scaled Risk and eBay Collaborate to Improve HBase Mean Time to Recovery (MTTR)

Our business uses Apache HBase to deliver value to our customers in real-time and we are sensitive to any failures because prolonged recovery times significantly degrade site performance and result in material loss of revenue. Over the past few months, we have been testing and using the Mean Time to Recovery (MTTR) feature improvements, delivered in the open HBase community.

eBay runs Apache Hadoop at extreme scale, with tens of petabytes of data. Hadoop was created for computing challenges like ours, and eBay runs some of the largest Hadoop clusters in existence.

Read more on the original blog post from Ming Ma (eBay) on the Hortonworks dev team Blog

Introduction to HBase Mean Time to Recovery (MTTR)

The following post is from Nicolas Liochon (Scaled Risk) and Devaraj Das (Hortonworks) with thanks to all members of the HBase team.

HBase is an always-available service and remains available in the face of machine failures and rack failures. Machines in the cluster runs RegionServer daemons. When a RegionServer crashes or the machine goes offline, the regions it was hosting goes offline as well. The focus of the MTTR work in HBase is to be able to detect abnormalities and to be able to restore access to (failed) offlined regions as early as possible.

In talking with customers and users, it turned out that MTTR for HBase regions is one of the significant concerns. A lot of improvements were implemented recently. In this blog post and a couple after this one, we will go over the work the HBase team in Hortonworks, and the community at large, has done, in the area of MTTR. We will also talk about some of them at HBaseCon 2013 in June.

This blog explains how HBase manages the MTTR. In this blog, we introduce some of the settings available in the released versions of HBase and HDFS.

How HBase is resilient to failures while being consistent

HBase ensures consistency by having a single server responsible for a subset of data. Namely, a region is managed by a single region server at a time.

The resiliency to failures comes from HDFS, as data written in HDFS is replicated on several nodes:

  • HBase writes the data in HFiles, stored in HDFS. HDFS replicates the blocks of these files, by default 3 times.
  • HBase uses a commit log (or Write-Ahead-Log, WAL), and this commit log is as well written in HDFS, and as well replicated, again 3 times by default.

 Steps in the failure detection and recovery process

  • Identifying that a node is down: a node can cease to respond simply because it is overloaded or as well because it is dead.
  • Recovering the writes in progress: that’s reading the commit log and recovering the edits that were not flushed.
  • Reassigning the regions: the region server was previously handling a set of regions. This set must be reallocated to other region servers, depending on their respective workload.

What are the pain points? Until the detection and recovery steps have happened, the client is blocked – a single major pain point! Expediting the process, so that clients see less downtime of their data while preserving data consistency is what MTTR is all about.

Detecting node failures

There are multiple ways for a region server to die: It can be a clean stop, i.e., the administrator calls the stop function on the region server. This allows the region server to properly close the regions and tell the HMaster that the shutdown is in progress. In this case the commit log is purged and the HMaster starts the assignment of the regions immediately.

Another way for the region server to stop, is the silent death of the computer, for example if the network card dies or if the ethernet cable is unplugged. In this case, the region server cannot raise an alarm. This is handled in HBase with the help of ZooKeeper: each region server is connected to ZooKeeper, and the master watches these connections. ZooKeeper itself manages an heartbeat with a timeout. So, on a timeout, the HMaster declares the region server as dead, and starts the recovery process.

Recovering in-progress writes

There is a single semantic commit log consisting of multiple files for all the user regions in a region server. When a region server dies, the recovery of the commit logs happens. The recovery is done in parallel, and as a first step, random region servers picks up commit logs (from the well known commit log directory), and splits them by edits-per-region into separate files on the HDFS. The regions are then reassigned to random region servers, and each regionServer then reads the edits from the respective log split file(s) to recover the correct region state. The difficulty arises when it’s not a simple process crash, but a node failure. The region server on the crashed node would have written the blocks locally on to the local DataNode (the standard HDFS client behavior). Assuming a replication factor of three, when a box is lost, you are losing not only a region server, but as well one of the three replicas. Doing the split means reading the block. As 33% of the replicas are dead, it means that for each block you’ve got 33% chance to be directed to the wrong replica. Moreover, the split process writes new files. Each of these files will be replicated 3 times: any of these replicas can be assigned to the dead datanode: the write will fail after a timeout, and will go to another datanode, slowing the recovery.

Assigning the regions

Here, the job is to reassign as fast as possible. Assignment relies on ZooKeeper, and requires synchronisation between the master and the region servers through ZooKeeper.

The MTTR improvements

Detecting node failures

First, it’s possible to lower the default timeout value. By default, HBase is configured with a 3 minutes ZooKeeper (ZK) timeout. This ensures that the Garbage Collection (GC) won’t interfere (GC pauses leads to ZK timeouts and lead to false failure detections). For production system, it’s more sensible to configure to one minute, or 30 seconds if you do care about MTTR. A reasonable minimum is around 20 seconds, even if there are some users who reported less. So you can change hbase.zookeeper.timeout to 60000 in hbase-site.xml. You’d also need to tweak your GC settings appropriately (incremental, generational GC with good figures for the young and old generations, etc., this is a topic by itself) so that you do not have the GC pauses longer than the ZK timeout.

Recovering in-progress writes

In standard cases, there are enough surviving region servers to split in parallel all the files of the commit log. So the issue is really to get directed to the only the live HDFS replicas. The solution for this is to configure HDFS in order to have a faster failure detection in HDFS than in HBase. That is, if in HBase you have a timeout of 60s, HDFS should consider a node as dead after 20 seconds. Here we must detail how HDFS handles dead nodes: HDFS failure detection relies as well on a heartbeat and timeout, managed by the NameNode. In HDFS, when a node is declared as dead, the replicas it contained are duplicated to the surviving datanodes. It’s an expensive process, and, when multiple nodes dies simultaneously, it can trigger “replication storms”: all the replicas are replicated again, leading to an overloaded system, then to non responding nodes, then to nodes being declared dead as well, then to new blocks being replicated, and so on. For this reason, HDFS waits a long time before starting this recovery process: a little bit more than 10 minutes. This is an issue for a low latency software such as HBase: going to dead datanodes means hitting timeouts. In the last HDFS versions 1.0.4 or 1.2, and branches 2 and 3, it’s possible to use a special state: ‘stale’. An HDFS node is stale when it has not sent a heartbeat for more than a configurable amount of time. A node in this state is used only as a last resort for reads, and excluded for writes. So activating these settings will make the recovery much faster. In HDFS 1.1, only the read path takes into account the stale status, but versions 1.2, 2.0 and 3.0 use it for both reads and writes.

The way to set it has changed between the releases: they are, in hdfs-site.xml [via the Apache jiras HDFS-3912HDFS-4350]:

<!-- stale mode - 1.2+ -->

<property>
 <name>dfs.namenode.avoid.read.stale.datanode</name>
 <value>true</value>
</property>

<property>
 <name>dfs.namenode.avoid.write.stale.datanode</name>
 <value>true</value>
</property>

<property>
 <name>dfs.namenode.write.stale.datanode.ratio</name>
 <value>1.0f</value>
</property>

<!-- stale mode - branch 1.1.1+ -->

<property>
 <name>dfs.namenode.check.stale.datanode</name>
 <value>true</value>
</property>

Assigning the regions

This is pure HBase internals. In HBase 0.94+, the assignment process has been improved to allow to assign more regions with less internal synchronisation, especially in the master [example – Apache jira HBASE-7247].

Conclusion

There are no global failures in HBase: if a region server fails, all the other regions are still available. For a given data-subset, the MTTR was often considered as around ten minutes. This rule of thumb was actually coming from a common case where the recovery was taking time because it was trying to use replicas on a dead datanode. Ten minutes would be the time taken by HDFS to declare a node as dead. With the new stale mode in HDFS, it’s not the case anymore, and the recovery is now bounded by HBase alone. If you care about MTTR, with the settings mentioned here, most cases will take less than 2 minutes between the actual failure and the data being available again in another region server.