Pages

SQL Change Data Capture

Dec 18, 2014

IC156272

重要資料表要做差異 Log 檔,可以不用手刻,有方便的作法。

 

過去手刻作法

過去筆者針對系統中主要的資料表要做交易歷程功能,作法是在資料庫中新增結構一樣的資料表,並賦予另一個自動序號欄位或時間欄位做為 Primary Key,接著撰寫一個 Trigger 當原本資料表新增、修改、刪除時另外抄寫到 Log 資料表。

 

image

圖上左側是原本的資料表,右側是做為交易歷程的 Log 資料表。

 

其實可以更容易的

近期在做兩地資料同步時,發現早在 SQL Server 2008 時就有提供兩種解決方案:Change Data Capture 與 Change Tracking,兩者比較可參考 Comparing Change Data Capture and Change Tracking

 

筆者採用 Change Data Capture (以下簡稱 CDC) 取代上述原本資料歷程手動的做法,CDC 可以記錄執行 DML 時資料當時的原貌, 並可以區分資料被異動狀態 (新增、修改前、修改後、刪除),當然也可以查到資料被刪除前最後儲存的值。

以 Pub 資料庫中的 jobs 資料表為例,在資料庫與指定某一資料表啟用 CDC 功能,語法:

/*整個資料庫啟用*/
exec sys.sp_cdc_enable_db;

/*設定單一資料表*/
exec sys.sp_cdc_enable_table
@source_schema='dbo',
@source_name='jobs',
@role_name='cdc_jobs',
@capture_instance='dbo_jobs',
@supports_net_changes=1;

/*新增-異動資料表*/
select * from cdc.dbo_jobs_CT;

完成後,每指定一個資料表啟用 CDC 就會建立兩個相對的函式

 

image

 

PS: CDC 需要 SQL Server Agent 服務啟用下才能正常運作。

 

在原本的資料表進行測試新增、修改、刪除等,可以查詢CDC資料表會有所有歷程,若是資料更新,則會新增兩筆資料分別是更新前與更新後。

select * from cdc.dbo_jobs_CT order by 1;

image

 

這個異動資料表上有原本資料表所有的欄位,另外有幾個欄位

  • _$start_lsn -- commit log sequence number (LSN) within the same Transaction
  • _$end_lsn -
  • _$seqval -- order changes within a transaction
  • _$operation -- 1=delete, 2=insert,3=updatebefore,4=updateafter
  • _$update_mask -- for insert,delete all bits are set, for update bits set correspond to columns changed

不過其儲存的值不容易直接識別,可以透過 CDC 提供的函式來讀取資料


DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_jobs')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all' --'all, 'all update old'
/*查出所有歷程*/
select * from [cdc].[fn_cdc_get_all_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
/*查出最後狀態*/
select * from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)

查詢結果

image

 

CDC 參考 T-SQL 範例

筆者應用在異地資料庫同步上,定時讀取出需要做同步資料的查詢語法



DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_jobs')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all'--'all, 'all with merge', 'all with mask'

--DECLARE ChangeSet_cursor CURSOR FOR
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=2
union
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=4
union
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]from [cdc].[fn_cdc_get_all_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=1
order by __$start_lsn
;

 

預存程序範例 - 讀取所有異動記錄,區分新增、修改、刪除做處理,並清除本次所有異動記錄。


/*--------------------------------------------------
description: 讀取 jobs 異動記錄,
author: Robin
date: 2014/12/18
testing code:
--------------------------------------------------
update jobs set job_desc=job_desc+' *' where job_id=1

EXEC SYNC_Changes_jobs;

SELECT * FROM [cdc].[dbo_jobs_CT];
--------------------------------------------------*/
CREATE Procedure [dbo].[SYNC_Changes_jobs]

AS
BEGIN TRAN;

DECLARE @start_lsn binary(10), @operation int,
@job_id smallint,
@job_desc varchar(50),
@min_lvl tinyint,
@max_lvl tinyint


DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_jobs')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all'--'all, 'all with merge', 'all with mask'

DECLARE ChangeSet_cursor CURSOR FOR
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=2
union
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=4
union
select __$start_lsn, __$operation
, [job_id],[job_desc],[min_lvl],[max_lvl]from [cdc].[fn_cdc_get_all_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=1
order by __$start_lsn
;

OPEN ChangeSet_cursor

FETCH NEXT FROM ChangeSet_cursor
INTO @start_lsn, @operation, @job_id,@job_desc, @min_lvl, @max_lvl;

WHILE @@FETCH_STATUS = 0
BEGIN

IF @operation=2
BEGIN
PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 新增'
END

IF @operation=4
BEGIN
PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 異動'
END

IF @operation=1
BEGIN
PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 刪除'
END

FETCH NEXT FROM ChangeSet_cursor
INTO @start_lsn, @operation, @job_id,@job_desc, @min_lvl, @max_lvl;
END
CLOSE ChangeSet_cursor;
DEALLOCATE ChangeSet_cursor;


COMMIT TRAN;

delete from [cdc].[dbo_jobs_CT];

 

最後若要取消 CDC 功能


/*取消單一資料表*/
exec sys.sp_cdc_disable_table
@source_schema='dbo',
@source_name='jobs',
@capture_instance='dbo_jobs';


/*整個資料庫停用*/
exec sys.sp_cdc_disable_db;


 

 

相關連結