SQL Change Data Capture


重要資料表要做差異 Log 檔,可以不用手刻,有方便的作法。

過去手刻作法

過去筆者針對系統中主要的資料表要做交易歷程功能,作法是在資料庫中新增結構一樣的資料表,並賦予另一個自動序號欄位或時間欄位做為 Primary Key,接著撰寫一個 Trigger 當原本資料表新增、修改、刪除時另外抄寫到 Log 資料表。

其實可以更容易的

近期在做兩地資料同步時,發現早在 SQL Server 2008 時就有提供兩種解決方案:Change Data Capture 與 Change Tracking,兩者比較可參考 Comparing Change Data Capture and Change Tracking

筆者採用 Change Data Capture (以下簡稱 CDC) 取代上述原本資料歷程手動的做法,CDC 可以記錄執行 DML 時資料當時的原貌, 並可以區分資料被異動狀態 (新增、修改前、修改後、刪除),當然也可以查到資料被刪除前最後儲存的值。
以 Pub 資料庫中的 jobs 資料表為例,在資料庫與指定某一資料表啟用 CDC 功能,語法:
use [MyDataBase];

/*新增一個交易資料表*/
CREATE TABLE dbo.MyTable(
 ID int IDENTITY(1,1) NOT NULL,
 MyCode varchar(4) NULL,
 DESCRIP varchar(128) NULL,
CONSTRAINT PK_MyTable PRIMARY KEY CLUSTERED(ID ASC));

/*資料庫啟用 CDC*/
EXEC sys.sp_cdc_enable_db;

/*單一資料表 dbo.MyTable 啟用 CDC*/
EXEC sys.sp_cdc_enable_table
@source_schema='dbo',
@source_name='MyTable',
@role_name='cdc_Admin',
@capture_instance='dbo_MyTable',
@supports_net_changes=1;

完成後,每指定一個資料表啟用 CDC 就會建立兩個相對的函式

PS: CDC 需要 SQL Server Agent 服務啟用下才能正常運作。

在原本的資料表進行測試新增、修改、刪除等,可以查詢CDC資料表會有所有歷程,若是資料更新,則會新增兩筆資料分別是更新前與更新後。
select * from cdc.dbo_MyTable_CT order by 1; 

這個異動資料表上有原本資料表所有的欄位,另外有幾個欄位
  • _$start_lsn -- commit log sequence number (LSN) within the same Transaction
  • _$end_lsn -
  • _$seqval -- order changes within a transaction
  • _$operation -- 1=delete, 2=insert,3=updatebefore,4=updateafter
  • _$update_mask -- for insert,delete all bits are set, for update bits set correspond to columns changed
不過其儲存的值不容易直接識別,可以透過 CDC 提供的函式來讀取資料

/*測試異動資料 */
insert into  dbo.MyTable(MyCode, DESCRIP) values ('1000', '1000-DESCRIP');

update dbo.MyTable set MyCode='1001', DESCRIP='ABC' where ID = 1;

/*以 CDC 函數查詢*/
DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_MyTable')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all' --'all, 'all update old'
/*查出所有歷程*/
select * from [cdc].[fn_cdc_get_all_changes_dbo_MyTable](@from_lsn, @to_lsn, @row_filter_option)
/*查出最後狀態*/
select * from [cdc].[fn_cdc_get_net_changes_dbo_MyTable](@from_lsn, @to_lsn, @row_filter_option)

查詢結果


/*單一資料表 cox.CUSREP 取消 CDC*/
exec sys.sp_cdc_disable_table 
@source_schema='cox',
@source_name='CUSREP',
@capture_instance='cox_CUSREP';

/*資料庫停用 CDC*/
exec sys.sp_cdc_disable_db;

CDC 參考 T-SQL 範例

筆者應用在異地資料庫同步上,定時讀取出需要做同步資料的查詢語法


DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_jobs')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all'--'all, 'all with merge', 'all with mask'

--DECLARE ChangeSet_cursor CURSOR FOR 
select  __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=2
union
select  __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=4
union
select __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]from [cdc].[fn_cdc_get_all_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=1
order by __$start_lsn
;


預存程序範例 - 讀取所有異動記錄,區分新增、修改、刪除做處理,並清除本次所有異動記錄。

/*--------------------------------------------------
description: 讀取 jobs 異動記錄,
author: Robin
date: 2014/12/18
testing code:
--------------------------------------------------
update jobs set job_desc=job_desc+' *' where job_id=1

EXEC SYNC_Changes_jobs;

SELECT * FROM [cdc].[dbo_jobs_CT];
--------------------------------------------------*/
CREATE Procedure [dbo].[SYNC_Changes_jobs]

AS
BEGIN TRAN;

DECLARE @start_lsn binary(10), @operation int, 
 @job_id smallint,
 @job_desc varchar(50),
 @min_lvl tinyint,
 @max_lvl tinyint


DECLARE @from_lsn binary(10), @to_lsn binary(10), @row_filter_option nvarchar(30)
SET @from_lsn = [sys].[fn_cdc_get_min_lsn]('dbo_jobs')
SET @to_lsn = [sys].[fn_cdc_get_max_lsn]()
SET @row_filter_option = 'all'--'all, 'all with merge', 'all with mask'

DECLARE ChangeSet_cursor CURSOR FOR 
select  __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=2
union
select  __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]
from [cdc].[fn_cdc_get_net_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=4
union
select __$start_lsn, __$operation
 , [job_id],[job_desc],[min_lvl],[max_lvl]from [cdc].[fn_cdc_get_all_changes_dbo_jobs](@from_lsn, @to_lsn, @row_filter_option)
where __$operation=1
order by __$start_lsn
;

OPEN ChangeSet_cursor

FETCH NEXT FROM ChangeSet_cursor 
INTO @start_lsn, @operation, @job_id,@job_desc, @min_lvl, @max_lvl;

WHILE @@FETCH_STATUS = 0
BEGIN
    
 IF @operation=2
 BEGIN
  PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 新增'
 END

 IF @operation=4
 BEGIN
  PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 異動'
 END

 IF @operation=1
 BEGIN
  PRINT CONVERT(varchar(10), @job_id) + ' ' + @job_desc + ' --- 刪除'
 END
    
    FETCH NEXT FROM ChangeSet_cursor 
 INTO @start_lsn, @operation, @job_id,@job_desc, @min_lvl, @max_lvl;
END 
CLOSE ChangeSet_cursor;
DEALLOCATE ChangeSet_cursor;


COMMIT TRAN;

delete from [cdc].[dbo_jobs_CT];


最後若要取消 CDC 功能

/*單一資料表 dbo.MyTable 取消 CDC*/
exec sys.sp_cdc_disable_table 
@source_schema='dbo',
@source_name='MyTable',
@capture_instance='dbo_MyTable';


/*整個資料庫停用*/
exec sys.sp_cdc_disable_db;




相關連結