流水線(xiàn)處理技術(shù)在數據集成中的應用
隨著(zhù)個(gè)人計算機和計算機網(wǎng)絡(luò )的飛速發(fā)展,以及信息化的高速推進(jìn),互聯(lián)網(wǎng)提供的信息總量也在迅猛增長(cháng)。如果企業(yè)和社會(huì )組織實(shí)現數據共享,可以使更多的人更充分地利用已有的數據資源??墒菫椴煌瑧梅?wù)的信息都存儲在許多不同的數據源之中,數據內容以及數據格式千差萬(wàn)別,且其管理系統也各不相同。如何對這些數據進(jìn)行有效的集成管理,屏蔽這些信息的異構,并提供一個(gè)統一的訪(fǎng)問(wèn)接口以透明地訪(fǎng)問(wèn)各信息源,成為一些大型企業(yè)或社會(huì )組織關(guān)心的事情。數據集成正是在這一背景下提出的。
1 基于數據復制方法的集成模式
數據復制方法[1]是當前比較常用的數據集成模式,該方法將各個(gè)數據源的數據復制到與其相關(guān)的其他數據源上,并維護數據源整體上的數據一致性、提高信息共享利用的效率。這種方式可以復制信息源的整個(gè)數據,也可以是信息源的部分信息。數據復制方法在用戶(hù)使用某個(gè)數據源之前,將用戶(hù)可能用到的其他數據源的數據預先復制過(guò)來(lái),如果用戶(hù)要使用的數據已經(jīng)被復制,則只需要查詢(xún)該集成信息源,并與中介器/包裝器的虛擬數據集成[2]相比,大大提高了系統處理用戶(hù)請求的效率。
基于數據復制方式最常見(jiàn)的一種方法是數據倉庫方法[1]。該方法將各個(gè)數據的全部或者部分數據復制到數據倉庫,用戶(hù)像訪(fǎng)問(wèn)普通數據庫一樣直接訪(fǎng)問(wèn)數據倉庫。該方式實(shí)現了對物理數據庫語(yǔ)義異構的屏蔽和數據訪(fǎng)問(wèn)的控制,提供了一個(gè)統一的數據邏輯視圖來(lái)隱藏底層的數據細節。圖1所示為一個(gè)典型的數據倉庫體系結構圖[3]。
在該集成模型中,每一個(gè)數據源對應一個(gè)監視器(Monitor),監視器負責收集所需要集成的信息源中數據的變化以便上報給集成端(收集的方式有如下類(lèi)別:針對信息源有日志的情況,可以通過(guò)日志分析提取要上報的增量;對于沒(méi)有日志情況可以通過(guò)觸發(fā)器方式或者快照差分方式獲取信息源的增量),同時(shí)監視器還具有一個(gè)包裝器的功能,提供信息源的數據查詢(xún)提取以及類(lèi)型轉化功能。當作為數據查詢(xún)功能的時(shí)候,不僅將數據初始化同步到數據倉庫中,同時(shí)也相當于一個(gè)服務(wù)器,不斷偵聽(tīng)來(lái)自于集成器的命令查詢(xún)請求,當有請求到達時(shí),執行查詢(xún),并將該監視器對應的數據源的數據包裝成基于公共類(lèi)型的數據,或以XML文件的方式和固定大小對象數據塊的方式傳遞給集成器,然后集成器負責將提取后的數據進(jìn)行合成。其中監視器與集成器中的通信流程如圖2所示。
2 基于內存控制的流水線(xiàn)處理方法
從上面的數據倉庫體系結構可知,監視器必須具備一個(gè)包裝器(wrapper)的功能。數據倉庫端保存的數據是各底層信息源的部分副本(一般情況為訪(fǎng)問(wèn)非常頻繁),但是不是很頻繁的訪(fǎng)問(wèn)數據還是保持在底層信息源端,當一個(gè)OLAP查詢(xún)(如下鉆)經(jīng)過(guò)查詢(xún)分解后,不能在數據倉庫端獲取,而必須通過(guò)一個(gè)甚至幾個(gè)底層信息源端的查詢(xún),然后在集成器端進(jìn)行結果的合并(如要通過(guò)兩個(gè)底層數據庫中表的連接操作)才能獲取查詢(xún)結果。在實(shí)化視圖初始化時(shí),提高查詢(xún)的效率以及提高實(shí)化視圖初始化的時(shí)間,是非常重要的。
本文關(guān)注的便是如何提高查詢(xún)效率、響應速度、集成端的處理效率,以及在提交查詢(xún)后,如何減少集成端的空閑等待時(shí)間,并且在大數據量的情況下同時(shí)做到內存控制,以防止在大數據量的情況下查詢(xún)導致內存溢出。
在解決提高查詢(xún)效率、響應速度、集成端的插入效率的同時(shí),防止內存溢出以及在減少集成端的空閑等待時(shí)間方面,利用了基于生產(chǎn)者/消費者的流水線(xiàn)處理方法,該方式主要思想是實(shí)現服務(wù)器與客戶(hù)端的流水并行[4],查詢(xún)的結果以固定大小數據塊的形式分塊組裝,并在監視器端與集成端都使用一定大小的緩沖隊列來(lái)暫存這些數據塊,以有效防止內存溢出。以一次實(shí)化視圖的初始化過(guò)程為例,描述該方式的算法流程為:
(1)集成端發(fā)送帶全局查詢(xún)QID(該查詢(xún)QID為全局唯一的,通過(guò)客戶(hù)端API自動(dòng)生成)的SQL查詢(xún)命令(結果查詢(xún)重寫(xiě)),并通過(guò)通信平臺將該查詢(xún)命令放入服務(wù)器端執行隊列中,同時(shí)預設一個(gè)數據塊計數為sum(該計數為服務(wù)器端初始要發(fā)送的數據塊個(gè)數),然后集成端監聽(tīng)接收隊列;
(2)監視器端從命令隊列中取出查詢(xún)命令,創(chuàng )建查詢(xún)管理器(Data Query Manager),并將該查詢(xún)管理器與查詢(xún)QID作為一個(gè)鍵值對放入進(jìn)程全局的哈希表(Concurrent Hash Map)中,然后通過(guò)該查詢(xún)管理器中的excuteQurey()方法啟動(dòng)查詢(xún)線(xiàn)程,該查詢(xún)線(xiàn)程將獲得的記錄組織成數據塊(Data Object Block),放入固定大小的數據塊緩沖隊列中,并在該隊列滿(mǎn)時(shí),查詢(xún)線(xiàn)程暫停,不滿(mǎn)時(shí)繼續查詢(xún),直到最后一塊為止。同時(shí)啟動(dòng)發(fā)送固定大小的數據塊的線(xiàn)程,該發(fā)送線(xiàn)程從緩沖隊列中取出數據塊,發(fā)送給客戶(hù)端,直到發(fā)送的最后一塊,該發(fā)送線(xiàn)程終止;
(3)當有數據塊到達客戶(hù)端的數據塊接收隊列時(shí),判斷該塊是否為最后一塊,如果是,則設置所有塊是否到達的標志“flag=true”,并通知客戶(hù)端進(jìn)行處理,客戶(hù)端處理線(xiàn)程從隊列中取出一個(gè)數據塊進(jìn)行處理(對實(shí)化視圖初始化,處理的方式就是將該數據塊插入到數據倉庫的實(shí)化視圖中),并將數據塊計數n減1,再判斷該數據塊計數是否小于客戶(hù)端要緩沖的個(gè)數N,并同時(shí)判斷flag的值,如果sumN,且flag= =false,則發(fā)送從服務(wù)器端調取固定數目K數據塊的命令(該命令帶QID,以便到服務(wù)器端時(shí)找到之前的查詢(xún)管理器),同時(shí)設置sum=sum+K;
(4)服務(wù)器端接收到客戶(hù)端的數據塊調取命令,分離出里面的QID,從進(jìn)程全局的哈希表中找到與該QID對應的查詢(xún)管理器,并調用里面的發(fā)送固定數據塊的方法以啟動(dòng)發(fā)送固定數目數據塊的線(xiàn)程,該線(xiàn)程與步驟(2)中發(fā)送線(xiàn)程相同;
(5)重復步驟(3)、步驟(4),直到查詢(xún)的最后一塊到達客戶(hù)端,與此同時(shí),服務(wù)器端的查詢(xún)管理器也從全局的哈希表中移除。
3 性能測試與分析
與流水線(xiàn)處理方法相對應的一種方法為同步方法,即通過(guò)查詢(xún)先將底層信息源的結果組裝在一起,一次傳給集成端處理。由于采用的都是對象數據塊的形式,因此用于與流水線(xiàn)對比的同步方法的算法思想為:
(1)客戶(hù)端發(fā)送帶全局查詢(xún)QID(該查詢(xún)QID為全局唯一的,通過(guò)客戶(hù)端API自動(dòng)生成)的SQL查詢(xún)命令(結果查詢(xún)重寫(xiě)),并通過(guò)通信平臺將該查詢(xún)命令放入服務(wù)器端執行隊列中;
(2)服務(wù)器端接收到查詢(xún)命令,執行查詢(xún),將所得的結果存放于文件中,然后一次發(fā)送給客戶(hù)端;
(3)客戶(hù)端接收到關(guān)于本次查詢(xún)結果集的文件,然后處理該結果集文件。
將基于內存控制的流水線(xiàn)處理方法與同步方法在以下實(shí)驗環(huán)境下進(jìn)行測試對比,為減少誤差,多次測試得出平均值,有如下數據:
監視器端與集成端采用相同配置環(huán)境,相關(guān)配置為:
CPU:Intel(R) Core(TM)2 Duo CPU E4500 @ 2.2 GHz;操作系統:Windows XP;內存:2.0 GB;數據庫:Oracle 9i;JDK版本:1.6.0_07;開(kāi)發(fā)工具:Myeclipse6.5。
本實(shí)驗性能測試如圖3所示,可以看出,與傳統的同步方法相比,采用本文算法具有較好的性能特性,主要在于基于內存控制的流水線(xiàn)處理過(guò)程是一個(gè)監視器端與集成器端并行流水線(xiàn)運行的過(guò)程,并充分應用了現在多處理器多線(xiàn)程處理的技術(shù),減少了集成端空閑等待的時(shí)間。
設查詢(xún)信息源并包裝所有數據成公共類(lèi)型數據塊的時(shí)間代價(jià)為Cost(Q),傳輸放入文件中的所有數據塊到集成端的時(shí)間為Cost(T),集成端將傳輸過(guò)來(lái)的數據解析并初始化到數據倉庫的時(shí)間為Cost(P),則基于同步方法的時(shí)間代價(jià)為:Cost(Q)+Cost(T)+Cost(P)。
設查詢(xún)信息源并包裝查詢(xún)的數據成公共類(lèi)型數據塊為一塊的時(shí)間代價(jià)為:Cost(Q1),傳輸其中一塊數據塊到集成端的時(shí)間為Cost(T1),集成端將傳輸過(guò)來(lái)的一塊數據塊解析并初始化到數據倉庫的時(shí)間為Cost(P1),因為這里數據塊是個(gè)固定的常數,則基于本文的算法的時(shí)間代價(jià)為:Cost(Q1)+Cost(T1)+Cost(P1)+max(Cost(Q)-Cost(Q1),Cost(T)-Cost(T1),Cost(P)-Cost(P1)),其中max為各處理邏輯減去初始處理的最大時(shí)間。
從上面理論上可以分析得出,基于內存控制的流水線(xiàn)處理技術(shù)較同步技術(shù)可以更好地提高效率。同時(shí)還存在幾個(gè)問(wèn)題:
(1)當集成端需要OLAP查詢(xún)或實(shí)化視圖初始化比較多時(shí),仍然會(huì )出現內存溢出的問(wèn)題,這時(shí)可以應用線(xiàn)程池技術(shù)[4],有效控制這類(lèi)線(xiàn)程運行的數量,同樣,監視器端也使用這種方案。
(2)當集成端與監視器端進(jìn)行流水線(xiàn)處理時(shí),如果監視器端與集成端出現網(wǎng)絡(luò )中斷,或者其中一個(gè)出現突發(fā)事件(如斷電)時(shí),之前的一些過(guò)程就需要重做,并回滾。特別是針對網(wǎng)絡(luò )中斷的情況,容易造成監視器端查詢(xún)線(xiàn)程的線(xiàn)程泄漏,即集成端認為之前的操作沒(méi)成功,然后重新進(jìn)行操作,然而監視器端的處理線(xiàn)程卻還沒(méi)完。避免這些情況出現的解決方案為:設置一個(gè)超時(shí),當達到設定時(shí)間而這一流水處理過(guò)程未進(jìn)行時(shí),自動(dòng)中斷這些處理流程,或者可以在監視器端對查詢(xún)組裝后數據塊分塊存儲在硬盤(pán)上,然后進(jìn)行文件數據塊的發(fā)送,這樣減少了塊之間的命令的交互邏輯,而且有效地控制了線(xiàn)程泄漏,但是也增加了文件的讀寫(xiě)與控制,增加了I/O開(kāi)銷(xiāo)。
數據集成仍然是一個(gè)比較熱門(mén)的研究點(diǎn),在基于數據倉庫方法的數據集成中,分析了實(shí)化視圖初始化以及OLAP查詢(xún)中面對大數據量處理的問(wèn)題,應用了基于內存控制的流水線(xiàn)處理方法,充分利用了Java的多線(xiàn)程處理技術(shù),并從實(shí)驗和理論上分析了該方法較同步方法的優(yōu)點(diǎn)。
評論