Do you watch the warm-up laps just before every NASCAR and Indy Car race, where everyone is following the pace car and doing slalom turns, making sure the track is clear of debris, warming the tires and checking their gear? Moving data from a single data file setup to a multiple file setup is kinda like that; you’re moving data out of pit row where everything is in single file and spreading out so everyone can get to top speed as soon as the green flag flies.
In my project, I had the dubious distinction of using a RANGE LEFT partition function and failing to split my function and schemes in a timely enough fashion to avoid having my most recent year of data being bottled up in a filegroup meant for one quarter. Code-wise, the easiest approach to correct the problem would be to just bite the bullet and do a partition split.
Dan Guzman, in addition to his current blog at www.dbdelta.com, has a handy script to plot out row counts by object and partition scheme…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
SELECT OBJECT_NAME(p.object_id) AS ObjectName ,i.name AS IndexName ,p.index_id AS IndexID ,ds.name AS PartitionScheme ,p.partition_number AS PartitionNumber ,fg.name AS FileGroupName ,prv_left.value AS LowerBoundaryValue ,prv_right.value AS UpperBoundaryValue ,CASE pf.boundary_value_on_right WHEN 1 THEN 'RIGHT' ELSE 'LEFT' END AS Range ,p.rows AS Rows FROM sys.partitions AS p INNER JOIN sys.indexes AS i ON i.object_id = p.object_id AND i.index_id = p.index_id INNER JOIN sys.data_spaces AS ds ON ds.data_space_id = i.data_space_id INNER JOIN sys.partition_schemes AS ps ON ps.data_space_id = ds.data_space_id INNER JOIN sys.partition_functions AS pf ON pf.function_id = ps.function_id INNER JOIN sys.destination_data_spaces AS dds2 ON dds2.partition_scheme_id = ps.data_space_id AND dds2.destination_id = p.partition_number INNER JOIN sys.filegroups AS fg ON fg.data_space_id = dds2.data_space_id LEFT JOIN sys.partition_range_values AS prv_left ON ps.function_id = prv_left.function_id AND prv_left.boundary_id = p.partition_number - 1 LEFT JOIN sys.partition_range_values AS prv_right ON ps.function_id = prv_right.function_id AND prv_right.boundary_id = p.partition_number WHERE OBJECTPROPERTY(p.object_id, 'ISMSShipped') = 0 UNION ALL --non-partitioned table/indexes SELECT OBJECT_NAME(p.object_id) AS ObjectName ,i.name AS IndexName ,p.index_id AS IndexID ,NULL AS PartitionScheme ,p.partition_number AS PartitionNumber ,fg.name AS FileGroupName ,NULL AS LowerBoundaryValue ,NULL AS UpperBoundaryValue ,NULL AS Boundary ,p.rows AS Rows FROM sys.partitions AS p INNER JOIN sys.indexes AS i ON i.object_id = p.object_id AND i.index_id = p.index_id INNER JOIN sys.data_spaces AS ds ON ds.data_space_id = i.data_space_id INNER JOIN sys.filegroups AS fg ON fg.data_space_id = i.data_space_id WHERE OBJECTPROPERTY(p.object_id, 'ISMSShipped') = 0 ORDER BY ObjectName ,IndexID ,PartitionNumber; |
Starting from my last post, we’re in this current data distribution…
So, let’s add a new filegroup and redistribute the data…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
USE [master] GO ALTER DATABASE [TPC_H] ADD FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_01', FILENAME = N'D:\SQL\TPC_H_DATA_1996_01.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_02', FILENAME = N'D:\SQL\TPC_H_DATA_1996_02.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_03', FILENAME = N'D:\SQL\TPC_H_DATA_1996_03.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_04', FILENAME = N'D:\SQL\TPC_H_DATA_1996_04.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_05', FILENAME = N'D:\SQL\TPC_H_DATA_1996_05.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_06', FILENAME = N'D:\SQL\TPC_H_DATA_1996_06.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_07', FILENAME = N'D:\SQL\TPC_H_DATA_1996_07.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO ALTER DATABASE [TPC_H] ADD FILE ( NAME = N'TPC_H_DATA_1996_08', FILENAME = N'D:\SQL\TPC_H_DATA_1996_08.ndf' , SIZE = 4096KB , FILEGROWTH = 1024KB ) TO FILEGROUP [DATA_1996] GO USE [TPC_H] GO ALTER PARTITION SCHEME PS_YEAR_LEFT NEXT USED [DATA_1995] GO ALTER PARTITION FUNCTION [PF_YEAR_LEFT]() SPLIT RANGE (N'1996-01-01') GO ALTER PARTITION SCHEME PS_YEAR_LEFT NEXT USED [DATA_1996] GO ALTER PARTITION FUNCTION [PF_YEAR_LEFT]() SPLIT RANGE (N'1997-01-01') GO |
It looks pretty straightforward: add a new filegroup, tag in the partition scheme what the next filegroup will be, then split the function with a new boundary. However, how does our data look now?
Well, we set the boundary between data for 1995 and 1996 and moved the 1996 data into its filegroup, but we still have 1997 data in the 1995 filegroup until we add its filegroup and do another split, incurring another data shuffle. Furthermore, the 1995 filegroup will forever be the spillover filegroup for rows beyond the highest boundary; it looks pretty un-intuitive and will be a hassle when try to age out this filegroup with a sliding window. I’ll be stuck with the filegroup for the life of the partition scheme.
Remember, this is a small example with a single table. Imagine the runtime when doing this with 100 tables with 500 GB of data, with everything locked at one point or another. Can you do this within the maintenance window you’re allowed?
A more controlled approach may be in order, where you build a new partition function and scheme, build staging tables for each partition, bulk load the data into each partition based on the indicated boundaries, then do a partition switch to merge all the slices together. Here’s one possible checklist…
- Make a Backup
- Set to Simple Recovery Model
- Create New Partition Function
- Create New Filegroups with Proper DataFiles
- Create New Partition Scheme with new FileGroups
- For Each Table
- Create New Final Table on New Partition Scheme
- Create Staging Tables in Each Partition Scheme Filegroup
- Add Check Constraints on Each Staging Table Based on Partition Scheme Boundaries
- Bulk Copy Data Into Each Staging Table With Data Sorted on Clustered Key
- Switch Staging Tables into New Final Table
- Confirm Rowcount Between Old and New Final Table
- Drop Staging Tables
- Rebuild Non-Clustered Indexes on New Final Table
- Rename New Table as Old Table
- Run DBCC CHECKIDENT to make sure all identities are set for future inserts
- Lather. Rinse. Repeat.
Step 6, as you can tell, will be a very complex scripting exercise. Of particular note is 6-3; I mentioned bulk loading and not a straight INSERT INTO…SELECT or SELECT…INTO statement as I want to minimize the logging as much as possible. You can do the bulk loading with bcp and the BULK INSERT statement, but they are not meant for table-to-table transfers. You can use the Import/Export wizard, but that’s one table at a time and it’s single-threaded with one data flow task if you look at the SSIS package it creates.
If you’re working with Enterprise edition, you may be familiar with the database engine’s Read–Ahead feature. While SQL Server is running a query and fetching data and 64K chunks (equal to an extent) it may anticipate you needing additional data and start reading ahead in 512K chunks. If you have a large fact table, with a size in the hundreds of gigabytes, it is a good candidate for persuading SQL Server to do a read–ahead operation. How can we do that using SSIS?
One possible implementation is to have a Data Flow task with a set of OLEDB data sources with nearly identical SELECT statements, with their only difference being a predicate that is not overlapping any other predicate in the set. Essentially, each is set to the upper and lower boundaries for a given partition. In the case of our example above with data from 1992 through 1997, with six years worth of data, there would be 6 OLE DB data sources with the same SELECT clause but different WHERE clauses. Then, each OLE DB destination would have a table lock on it with fast load also enabled to maximize the throughput. When in execution, you will have multiple concurrent OLE DB data sources feeding individual target tables with their specific years of data.
Once all the data flows have completed, you’d then do a partition switch operation with the ALTER TABLE statement to stitch all the individual target tables together. Prior to stitching the individual tables, you have to make sure the proper CHECK constraints are in place so that the column used in the partition function and reference in the partition scheme has no overlapping values with any other target table. Here’s an example of the staging table, new partition function and partition scheme and new target table along with the switch operation…
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
CREATE TABLE [dbo].[Orders_1992]( [OrderID] [int] IDENTITY(1,1) NOT NULL, [OrderDate] [date] NOT NULL, CONSTRAINT [PK_Orders_1992] PRIMARY KEY CLUSTERED ( [OrderID] ASC, [OrderDate] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [DATA_1992] ) GO ALTER TABLE [dbo].[Orders_1992] ADD CONSTRAINT PK_CK_OrderDate_1992 CHECK (OrderDate>='1992-01-01' AND OrderDate<'1993-01-01') GO CREATE PARTITION FUNCTION [PF_YEAR_RIGHT](date) AS RANGE RIGHT FOR VALUES (N'1992-01-01', N'1993-01-01',N'1994-01-01', N'1995-01-01', N'1996-01-01', N'1997-01-01',N'1998-01-01') GO CREATE PARTITION SCHEME [PS_YEAR_RIGHT] AS PARTITION [PF_YEAR_RIGHT] TO ([PRIMARY],[DATA_1992], [DATA_1993], [DATA_1994], [DATA_1995], [DATA_1996], [DATA_1997], [DATA_1998]) GO CREATE TABLE [dbo].[OrdersRight]( [OrderID] [int] IDENTITY(1,1) NOT NULL, [OrderDate] [date] NOT NULL, CONSTRAINT [PK_OrdersRight] PRIMARY KEY CLUSTERED ( [OrderID] ASC, [OrderDate] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON PS_YEAR_RIGHT(OrderDate) ) GO ALTER TABLE dbo.Orders_1992 SWITCH TO dbo.OrdersRight PARTITION 2 GO |
This gets to be pretty complicated when constructing it for single table, so imagine what it’s going to be like when you have to do it for a hundred tables. This is a good candidate for BIML, which uses SMO and XML markup to dynamically create SSIS packages. If you were pursuing a BIML approach, you’d loop through a collection of source tables to create individual packages for each source table to port their data to a new target table in the correct partition scheme. Ultimately, it would be a series of Execute SQL tasks to perform the DDL like the ones above operations plus the complex Data Flow task above to load the data in parallel. If it weren’t for the Data Flow, you’d be doing the work as a console app in your .NET language of choice or PowerShell.
Well, you actually can mimic the parallel data flow in a console app.
Back in the day, I was a VB developer, starting with VB6 and then moving on to the early .NET versions. While not nearly as popular as C#, Java or PowerShell nowadays, it still gets the job done. You just need to do a little homework to find out the syntax you need to do the same job that’s already published in those languages.
To simulate the functionality of Data Flow performing a bulk insert operation, you’re going to use the SqlBulkCopy class. The implement it, it’ll be just a matter of instantiating a SQLDataReader, indicating the number of rows per batch, specifying which target table to insert to and calling the WriteToServer method.
However, to do a parallel load of multiple staging tables, you need a little more code.
By default, a SQLBulkCopy object runs synchronously, meaning it’ll run the one bulk insert operation before continuing with code execution. However, there is an option to run multiple bulk copy operations asynchronously. This is done by using .NET’s multi-threading functionality to encapsulate the SQLBulkCopy objects as Task objects, building an array of tasks, and waiting on all the Tasks to finish before continuing with further code execution. Here’s a sample stub looping through 4 years of data.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 |
Imports System.Data.SqlClient Module Main Sub Main() Dim TaskArray(3) As Task Dim CurrTask As Integer = 0 For CurrTask = LBound(TaskArray) To UBound(TaskArray) Dim CurrSourceTSQL As String = "SELECT * FROM dbo.Orders WHERE YEAR(O_ORDERDATE)=" & (1992 + CurrTask).ToString 'make sure the rows return will fulfill the check constraint Dim CurrSourceTable As String = "dbo.Orders" Dim CurrTargetTable As String = "dbo.Orders_" + (1992 + CurrTask).ToString 'each target table is a staging table for the proper filegroup in the partition scheme TaskArray(CurrTask) = Async_BulkCopy(CurrSourceTable, CurrSourceTSQL, CurrTargetTable) 'adds an asynchronous bulk copy call to an array of Task objects Next Try Task.WaitAll(TaskArray) 'this initiates all the tasks and control flow doesn't return untill of of them succeed or fail Catch ex As AggregateException 'the sum of all the exceptions thrown by the tasks Console.WriteLine(CompleteExceptionMessage(ex)) Finally TaskArray = Nothing End Try End Sub Private Async Function Async_BulkCopy(ByVal CurrSourceTable As String, ByVal CurrSourceTSQL As String, ByVal CurrTargettable As String) As Task 'Note the ApplicationName is specific to the SourceTable so you can identify it readily by Extended Events or Profiler Dim SourceConn As SqlConnection = New SqlConnection("Data Source=" & My.Settings.SourceServer & ";Integrated Security=true;Application Name=SourceTable_" & CurrSourceTable & ";Initial Catalog=" & My.Settings.SourceDB) SourceConn.Open() Dim SourceCmd As SqlCommand = New SqlCommand(CurrSourceTSQL, SourceConn) SourceCmd.CommandType = CommandType.Text SourceCmd.CommandTimeout = 0 'Sequential Access is important to stream the data from the source. Without this, all data has to be buffered with the SQLDataReader before the insert begins Dim reader As SqlDataReader = SourceCmd.ExecuteReader(CommandBehavior.SequentialAccess) a Dim TargetConnString As String 'Again, using a unique ApplicationName property to rapidly eyeball the process with XE or Profiler. TargetConnString = "Data Source=" & My.Settings.TargetServer & ";Integrated Security=true;Application Name=TargetTable_" & CurrTargettable & ";Initial Catalog=" & My.Settings.TargetDB 'Don't forget to check if the table uses and IDENTITY value. The last thing you want to do is reset all the IDENTITY values. Using bulkcopy As SqlBulkCopy = New SqlBulkCopy(TargetConnString, SqlBulkCopyOptions.KeepIdentity) bulkcopy.DestinationTableName = CurrTargettable Try bulkcopy.BatchSize = My.Settings.SQLBulkCopyBatchSize 'Test out different settings to optimize throughput bulkcopy.BulkCopyTimeout = 0 Await bulkcopy.WriteToServerAsync(reader) 'control flow returns back to the calling sproc where it wait for this function call to complete. Catch ex As Exception Console.WriteLine(CompleteExceptionMessage(ex)) Finally reader.Close() SourceConn.Close() End Try End Using End Function Private Function CompleteExceptionMessage(ex2 As Exception) As String Dim _Message As String = "" Try _Message = ex2.Message If Not (ex2.InnerException Is Nothing) Then _Message += CompleteExceptionMessage(ex2.InnerException) 'appends all InnerException messages that are nested together for a complete error message. End If Catch ex As Exception Console.WriteLine("{0}: {1}", System.Reflection.MethodInfo.GetCurrentMethod().Name & ": Error During CompleteExceptionMessage: " & CompleteExceptionMessage(ex)) Finally CompleteExceptionMessage = _Message End Try End Function End Module |
In brief, you’re defining a function that returns a Task executing an asynchronous bulk copy operation, you’re creating an array of tasks that will run concurrently, and you accessing the data from the source sequentially so that the data streams directly from source table through the SQLDataReader object directly to the SQLBulkCopy object, otherwise you’d run the risk of running out of memory before you’ve buffered the source table in the SQLDataReader object.
It’ll take some trial and error, but you’ll want to find the balance between the amount of memory you can spare to SQL and the number of concurrent threads you can run to pump the data from the old table to the staging tables for the new partitioned table. Be careful of memory grant requirements, especially if you have resource governor enabled on your instance. If you see RESOURCE SEMPAPHORE waits, one or more of your threads is waiting for a memory grant in order to fulfill its query. For my 768 GB instance, I set a Max Memory Grant of 5% and was able to run eight threads concurrently to bulk load my partition slices.
After all your data has been rewritten to the slices, your partition SWITCH command can be issued to stitch them all together into a new single table. Before you could do that switch, every table slice needs a check constraint on the column being provided to the partition scheme with check enabled. This is very important as the database engine will not accept a partition slice that hasn’t had its data checked to make sure it fits within the upper and lower boundaries indicated for the partition. If you didn’t do it when creating the staging table in the first place, you’ll need to do it before issuing the ALTER TABLE command.
Couple this code with your favorite SMO implementation to duplicate table and index objects and to create new partition functions and schemes and, in the end, you will end up with the piece of very valuable code to handle restriping badly partitioned data into new tables without having to do the manual work in SSIS or figure out the mechanics of BIML.
With our new hardware racked ‘n’ stacked, disk subsystem stress-tested, consumption rates calculated for our server and our workload, and data restriped for optimal sequential access, all that’s left is to open the data warehouse to the users for validation and business use.
Leave A Comment