Codementor Events

Datamart synchronization

Published Jan 20, 2019

When your live database becomes big enough and you have millions of records in a lot of tables, it is quite hard to create on the fly, reliable reports without affecting the performance of these reports. Usually, when reports are created, they require a lot of aggregations and these cannot be performed, in a reasonable time, directly on the fly.

A new approach is needed and it implies the creation and the usage of a datamart. A datamart is a database or subset of a database which contains computed/aggregated data which are ready to be presented in a report without any other processing.

So we need to split the on the fly report creation in several steps:

  1. Create the datamart
  2. Synchronize data
  3. Read data from datamart and display them inside the report.

In this post I will describe an approach for accomplishing of the first two steps: creation of the datamart and the data synchronization. I will present the implementation using .Net C# language and MS SQL Server(TSQL).

What is our goal?

Of course the main goal is to display a report with all needed data in a timely manner BUT in order to do that, we need a way to implement the first two points described above.

Also we need to be able to easily create unit/integration tests for created datamart loaders.

We need to accomplish the following steps:

Datamart creation

We need to create proper tables which will contain pre-computed/aggregated data and we need to assure data integrity. We need to create the datamart not only in live database BUT in any available database located in whatever server even in Azure if needed.

Synchronization of the data

We need to be able to synchronize a large volume of data in a safely manner and in a reasonable time frame also we need to create a mechanism for synchronization based on a scheduler( data needs to be synchronized at certain times: once per day, twice per day and so on).

Datamarts could be in any database server(not just in live database), could be a separate database itself located elsewhere, so we need a mechanism for being able to move a large volume of data between two distinct databases located even in separated database servers, even between an on-premises database and an Azure database.

We need a synchronization container for scheduling and a synchronization executor for executing the synchronization.

Implementation

This part could be implemented in several ways and depends on the current needs but I think that the simplest container could be a Console application with a daemon thread and a configuration to indicate how many miliseconds the thread needs to be put to sleep after the synchronization.

Synchronization executor

Since we are using C#, the executor would be a new assembly(a Class Library) which could be used anywhere but especially inside the synchronization container.

Terminology

Exist specific naming conventions which are presented below

  1. Live database is called Source database
  2. The datamart itself, as a whole, is called Destination
  3. Temporary tables which are used to move data are called Staging tables
  4. Datamart’s tables are called Target tables
  5. Information about Destination , like server’s endpoint and etc., is called Metadata

Algorithm

In order to create and synchronize a datamart, several steps are required:

  1. Create or update the structure of the Destination
  2. Execute a pre-synchronization operation, like removing the existing data
  3. Read data from Source
  4. For every Target execute the following operations:
  5. Execute aggregations or calculations
  6. Move, in bulk, transformed data to a Staging table
  7. Move data from Staging table to Target table

For every datamart loader there is a folder with at least five .sql files(all scripts are meant to be rerun):

MERGE dbo.Client AS Target USING (SELECT [ID] ,[ClientID] FROM dbo.StagingClient) AS Source
ON (Target.[ID] = Source.[ID])
WHEN MATCHED THEN UPDATE SET [ClientID] = source.ClientID ,[OfficeID] = source.OfficeID WHEN NOT MATCHED BY TARGET THEN INSERT ([ID] ,[ClientID] ) VALUES (	source.ID ,source.ClientID );
  1. Script file for creation of all Target tables.
  2. Script file for creation of the S taging tables, each Target table should have a corresponding Staging table. The Staging tables have the same structure as the Target tables and they act like a temporary tables used for moving data between Source and Target.
  3. Script file with scripts for clearing the not needed data from Target tables and this script is run before the synchronization to start.
  4. with scripts for getting data from the Source
  5. Script file with scripts which is used for moving data from one Staging table to it’s corresponding Target table. So, depending of the number of these kind of binomials, we could have several script files of this type. These files uses a special TSQL construct which is called MERGE statement. This statement moves bulk data from one table to another and works fast. An example of it’s usage, is the following:

This statement says something like this: Merge in table called dbo.Client (the Target), the values specified by query (the Source) and for every row from the Source check if the ID value from Target matches the ID value from the Source and if true, execute update against Target row, otherwise insert the source row into the Target table.

The reason behind using a Staging table was that the BulkInsert operation can be used in order to move data in Destination database and, at this point, we don’t need to care about updates, we know that we need to do only inserts.

The update issue was solved using the MERGE statement which is considerable faster than row by row approach.

The datamart loader class diagram is below:

The following code snippet describes the data synchronization which is implemented in DatamartLoaderBase class:

public virtual void ExecuteLoader()
 {
           ExecuteOperationBeforeSynch();
            
           ExecuteSync();
            
 }

private void ExecuteOperationBeforeSynch()
        	{
            	string finalQuery = GetQueryToBeExecutedBeforeSync();
            if (string.IsNullOrEmpty(finalQuery))
            {
                return;
            }
            try
            {
                SetupDestinationSqlConnection();
                _datamartDestinationDbManager.ExecuteNonQuery(finalQuery, 0);
            }
            catch(Exception exc)
            {
                Logger.LogException(exc);
            }
            finally
            {
                CleanupDestinationSqlConnection();
            }
       }      


  
private void ExecuteSync()
        {
            DataSet dataSource = GetDataFromSource();
            MoveDataSourceToDestinationDb(dataSource);
        }

 protected abstract DataSet GetDataFromSource();

 private void MoveDataSourceToDestinationDb(DataSet dataSource)
        {
            List<DataTable> transformedDataSource = null;
            try
            {
                transformedDataSource = Transform(dataSource);
                SetupDestinationSqlConnection();
                foreach (var dataTable in transformedDataSource)
                {
                    CreateStagingTable(dataTable);
                    BulkSaveDataTableInStaging(dataTable);
                    MoveDataFromStagingToActualTables(dataTable.TableName);
                }
            }
            catch (Exception exc)
            {
                Console.WriteLine(exc.Message);
                Logger.LogException(exc);
            }
            finally
            {
                if (transformedDataSource != null)
                {
                    RemoveStagingTables(transformedDataSource.Select(p => p.TableName).ToList());
                }
                CleanupDestinationSqlConnection();
            }
        }

protected abstract List<DataTable> Transform(DataSet inputDatasource);

private void CreateStagingTable(DataTable transformedData)
        {
            string sqlQuery = GetCreateStagingTableSqlScript(transformedData);
     _datamartDestinationDbManager.ExecuteNonQuery(string.Format(sqlQuery, transformedData.TableName));
        }

protected abstract string GetCreateStagingTableSqlScript(DataTable transformedData);

private void BulkSaveDataTableInStaging(DataTable transformedData)
        {
     _datamartDestinationDbManager.BulkInsert(transformedData.TableName, transformedData, BulkSaveDataTableInStagingTimeout);
        }

 private void MoveDataFromStagingToActualTables(string stagingTableName)
        {
            string finalQuery = GetStagingMoveTargetTableSqlScript(stagingTableName);
     _datamartDestinationDbManager.ExecuteNonQuery(finalQuery, MoveDataFromStagingToActualTablesTimeout);
        }

 private void RemoveStagingTables(List<string> stagingTableNames)
        {
            string sqlText = GetRemoveStagingTablesSqlScript(stagingTableNames);
            _datamartDestinationDbManager.ExecuteNonQuery(sqlText);
        }

 private string GetRemoveStagingTablesSqlScript(List<string> stagingTableNames)
        {
            StringBuilder builder = new StringBuilder();
            stagingTableNames.ForEach(p =>
            {
                builder.Append(string.Format(" IF (object_id('{0}') IS NOT NULL) ", p));
                builder.Append(" BEGIN ");
                builder.Append(string.Format("DROP TABLE {0}", p));
                builder.Append(" END ");
            });

            return builder.ToString();
        }

All abstract methods are implemented in classes which inherit from DatamartLoaderBase. There could be loaders which inherit directly from DatamartLoaderBase(no dependencies are needed and/or exists just one target table) but there are cases when the datamart contains tables which are related and exist foreign keys. In this case, we need, in the first place to synchronize the dependencies and only then aggregated target tables. For this use case, I created another base class called CompositeLoaderBase

which has the following implementation:

public abstract class CompositeLoaderBase : DatamartLoaderBase, ICompositeLoader
    {
        protected CompositeLoaderBase(IDestinationDbManager datasourceManager) : base(datasourceManager)
        {
        }

        private Queue<ICompositeLoader> _children;
        public Queue<ICompositeLoader> Children
        {
            get
            {
                if (_children == null)
                {
                    _children = new Queue<ICompositeLoader>();
                }
                return _children;
            }
        }

        public override void CreateDatamartEntities()
        {
            ExecuteChildrenCreateDatamartEntities();
            base.CreateDatamartEntities();
        }
     

        public override void ExecuteLoader()
        {
            
            ExecuteChildrenLoaders();
            base.ExecuteLoader();
        }

        private void ExecuteChildrenCreateDatamartEntities()
        {
            foreach (ICompositeLoader childLoader in Children)
            {
                childLoader.CreateDatamartEntities();
            }            
        }

        private void ExecuteChildrenLoaders()
        {
            foreach(ICompositeLoader childLoader in Children)
            {
                childLoader.ExecuteLoader();
            }
        }
    }

The Children property keeps the dependencies which are firstly synchronized and after then, the synchronization for current loader takes place.

The advantages of this implementation are to following:

  1. New datamart loaders can be added easily by respecting the Open-Close principle(a new class could be added without affecting the existing ones).
  2. Since dependencies are injected in the constructor(hard aggregation), these can be mocked and unit tests can be created. Also integration tests can be created if the dependencies are not mocked but they point to some real databases.( IDestinationManager and ISourceDbManager )
  3. The template method pattern used in DatamartLoaderBaseMoveDataToDestinationDb (…) is reused in every newly created loader and it’s not a subject to change very often.
  4. Composite pattern used in ICompositeLoader allows the possibility to synchronize the dependencies.

Conclusions

Using this approach, in several steps, the performance of the synchronization increased considerable from a couple of hours(using row by row synchronization) to a couple of minutes(maximum thirty minutes).

Discover and read more posts from Vlad Daraban
get started
post commentsBe the first to share your opinion
Jacob Smith
5 years ago

Many user use this screen to increase datamart tables for the 1st time for OPERA Business Intelligence use. When the datamarts are built, they are updated during the End of Day/Night Audit Procedure.
Reference: https://www.cogniscient.in/

Show more replies