您現在的位置是:網站首頁> 內容頁

HBase之Table.put客戶端流程

  • e寶博官網手機版
  • 2019-08-23
  • 63人已閱讀
簡介  首先,讓我們從HTable.put方法開始。由于這一節有很多方法只是簡單的參數傳遞,我就簡單略過,但是,關鍵的方法我還是會截圖講解,所以希望大家盡可能對照源碼進行流程分析。另外,在
  首先,讓我們從HTable.put方法開始。由于這一節有很多方法只是簡單的參數傳遞,我就簡單略過,但是,關鍵的方法我還是會截圖講解,所以希望大家盡可能對照源碼進行流程分析。另外,在這一節,我單單介紹put操作在客戶端的流程,畢竟,這個內容已經很多了。至于具體服務端的流程,我會在后面的章節中介紹到,歡迎大家到時候閱讀?! ∮捎谶@一節的方法還是比較復雜的,我特地畫了一張思維導圖,大家可以先通過思維導圖來對本節的內容有一個大概的了解,置于具體的流程,我在下面將對照源碼的貼圖一一為大家講解(在這里聲明一點,我在這一節只介紹單個put操作的流程,至于put批處理,大家有興趣可以自己研究一下)?! ∈紫?,讓我們來到HTable.put方法,如下圖所示:  這里我先講一下這一節的最后調用流程,也同時讓大家明確一下在本節我著重要講解的流程是哪塊。在上圖中我已經表示出來了,后面方法的調用最后調用到了上面新創建的ClientServiceCallable中覆寫的rpcCall方法,也就是調用到了ClientServiceCallable.doMutate。關于這個方法中具體與服務端的交互流程在本節我就略過,但是,在后面的內容中,我會談到類似的情況,如果大家感興趣的話,可以繼續后面的內容?! 〗酉聛碜屛覀兓氐奖竟澋闹攸c。首先是RpcRetryingCallerFactory.newCaller方法的調用,該方法使用RpcRetryingCallerFactory的成員參數創建了RpcRetryingCaller,用于后面對于RetryingCallable的調用(該方法在后面也會多次調用,在后面我就不貼圖了)?! 〗酉聛碜屛覀儊淼絉pcRetryingCallerImpl.callWithRetries。這個方法是本節中最為重要的方法,在后面也會多次用到。方法雖然比較長,但大多是異常的情況的解決,在本節中我們就單單介紹callable.prepare與callable.call兩個方法。至于interceptor.intercept,由于在構造RpcRetryingCallerFactory時默認的interceptor類型為RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR,在本節并不會有其它影響,所以我們暫時不需要關注?! ∩厦娴姆椒ㄕ{用的callable具體類型為覆寫了rpcCall方法的ClientServiceCallable,下面讓我們來到ClientServiceCallable類的內部。ClientServiceCallable繼承自RegionServerCallable,因此,上面方法實際調用的是RegionServerCallable.prepare與RegionServerCallable.call?! ∈紫茸屛覀儊淼絉egionServerCallable.prepare方法。這里比較重要的方法我已經框選出來了。需要大家特別留意的是最后的setStubByServiceName,一是因為他比較重要,二是我在后面的內容才會介紹,大家到時候可能忘記了,所以在這里特別提醒一下大家?! ∪菀卓吹?,首先調用了connection.getRegionLocator獲得一個新構建的HRegionLocator(這里就不截圖了,因為實在是沒有什么內容需要講),不過大家需要注意的是,這里的tableName是我們實際要查詢到tableName,而后面會用到META_TABLE_NAME,容易混淆,我在這里簡單提一下。接下來調用了HRegionLocator.getRegionLocation?! ≡谡{用HRegionLocator.getRegionLocation時,這里會有一系列簡單方法的調用,由于在上面的導圖中我并沒有畫出,在這里我就一一貼圖描述?! ∫幌盗蟹椒ㄗ呦聛?,到這里就到了比較重要的方法。由于這個是長圖,沒有辦法框選除重點,我就在文字中一一介紹該方法中調用的比較好重要的方法?! ?.getCachedLocation,該方法簡介調用到了metaCache.getCachedLocation,但此時,由于我們是第一次調用該表的信息,并沒有放到緩存中,因此,這里返回的locations = null?! ?.然后我們來到RegionInfo.createRegionName,需要注意的是,其入參row就是我們put操作創建的rowKey,也就是我們常說的行鍵。另外,在metaStartKey中傳入的id為HConstants.NINES(NINES = "99999999999999"),而在metaStopKey中傳入的id為空字符串?! ?.接著構造了Scan。其中withStartRow與withStopRow中的inclusive入參都為true。將reversed設置為true,并且將catalog family設置為"info"(CATALOG_FAMILY_STR = "info")。大家可能注意到了,這里的info列族在我們的表中并不一定存在。到了這里,大家可能就猜到我在前面埋的伏筆了。沒錯,這里構建的Scan是為了后面的查詢后面的META_TABLE_NAME做準備?! ?.緊接著來到fro循環中,這里連著調用了兩次getCachedLocation,后面的那次調用加了鎖,類似我們在單例設計模式中流程,加鎖以確保對象不會重復?! ?.然后構建了ReversedClientScanner對象。(鑒于之前經驗,貼太多圖容易擾亂大家的思維,我在這里盡量用文字來介紹)。ReversedClientScanner是ClientScanner的子類,另外,大家需要注意的是,在構造ReversedClientScanner時傳入的tableName為TableName.META_TABLE_NAME。在ReversedClientScanner的構造過程中,雖然有一些需要注意的地方,不過,我還是放在后面來描述,以便大家能夠更好的理解整個流程?! ?.接下來調用了ReversedClientScanner.next,大家千萬不要小看這個方法,這個方法里面的一系列調用時非常復雜的,也是本節的另外一個重點,我將在后面詳細介紹?! ?.然后調用了MetaTableAccessor.getRegionLocations,其入參為ReversedClientScanner.next的返回值。這個方法的詳細流程也比較重要,同樣,我放到后面為大家講解?! ?.最后調用了cacheLocation,也就是將當前tableName放到緩存中?! ∩厦?,我將ConnectionImplementation.locateRegionInMeta方法中調用的各個流程都簡單介紹了一下,下面,我就選擇其中比較重要的方法來詳細描述?! ∈紫茸屛覀儊淼絉eversedClientScanner.next。這個方法調用了ClientScanner.nextWithSyncCache如下圖所示:  上圖框選的兩個方法都比較重要,讓我們首先介紹比較復雜的loadCache,如下圖所示?! 】吹竭@個方法大家可能比較慌,沒有關系,我會在這里為大家一一介紹?! ?.首先調用了moveToNextRegion。該方法首先調用closeScanner(其間首先調用了成員變量callable.setClose方法,然后調用了ClientScanner.call方法,這個方法我在后面也會提到,最后將當前成員變量callable中的值置為null,簡而言之,將成員變量callable.setClose置為null)?! ∪缓髽嬙炝薙cannerCallableWithReplicas并賦給成員變量callable。在構造ScannerCallableWithReplicas時需要注意的是其中創建了ReversedScannerCallable。也就是說ScannerCallableWithReplicas的成員變量currentScannerCallable為ReversedScannerCallable。順便提一下,ScannerCallableWithReplicas的成員變量scan為我們在上面構造的scan?! ?.接著調用了ClientScanner.call方法。這里的調用流程比較繁瑣。為了更清楚的解釋清楚loadCache方法,我們先跳過這里,假設其中已經有了返回值?! ?.然后調用了scanResultCache.addAndGet。簡單提示一下我們這里的scanResultCache類型為CompleteScanResultCache?! ?.然后將結果集中的內容遍歷放到成員變量cache中。這里我們可以回過頭來看看上面的圖。上面圖中我框選了cache.poll方法。也就是說cache.poll將在loadCache方法中放入的結果集取出來?! ∩厦嫖姨岬竭^很多次ClientScanner.call方法,但是都沒有詳細描述,下面我就特意來講解該方法。其實這個方法很簡單,只是調用了方法RpcRetryingCaller.callWithoutRetries。這里的caller是在ReversedClientScanner方法構造時創建的(上面只是提到說構造ReversedClientScanner有需要注意的地方,也就是這里,其截圖我在上面也已經貼出來了)?! 〗酉聛碜屛覀儊淼絉pcRetryingCallerImpl.callWithoutRetries。這里的入參callable我在上面的方法loadCache已經介紹過了。其類型為ScannerCallableWithReplicas。由于ScannerCallableWithReplicas.prepare方法為空實現,我在這里就不貼圖了,接下來將重點放在ScannerCallableWithReplicas.call?! ∽屛覀儊淼絊cannerCallableWithReplicas.call,如下圖所示?! ?.在ClientScanner.closeScanner方法調用時,會走上面的if判斷。由于currentScannerCallable.closed的值為true?! ?.由于默認的成員變量regionReplication,因此會調用RpcRetryingCallerWithReadReplicas.getRegionLocations。這個方法的調用與我們今天的主要流程并沒有什么太多的聯系,因此,在這里簡單略過。該方法我可能會放在后面的章節中講到?! ?.構造了ResultBoundedCompletionService。這個方法比較重要,在后面的流程中我會反復講到?! ?.調用了addCallsForCurrentReplica,將成員變量currentScannerCallable封裝到ScannerCallableWithReplicas.RetryingRPC中,并交由ResultBoundedCompletionService提交?! ?.接著調用cs.poll,獲取其提交的任務的返回值?! 『竺嫖覍⒃敿氈v解?! ∈紫葋淼絊cannerCallableWithReplicas.addCallsForCurrentReplica方法。容易看到,將成員變量currentScannerCallable封裝到RetryingRPC中。然后調用了ResultBoundedCompletionService.submit。這里著重提醒一下大家,這里的currentScannerCallable類型為ReversedScannerCallable?! 〗又屛覀儊淼絉esultBoundedCompletionService.submit,如下圖所示?! ∵@里將傳入的RetryingRPC封裝到QueueingFuture,然后調用了executor.execute。由于QueueingFuture繼承自java.util.concurrent.RunnableFuture,也就是在調用executor.execute時,QueueingFuture.run方法會執行?! 〗酉聛碜屛覀儊淼絈ueueingFuture。在下圖中,我框選出了其中比較重要的方法?! ∈紫冗@里調用了RpcRetryingCallerImpl.callWithRetries方法(由于這個方法我在上面已經提到過了,因此在這里就不貼圖了)。重要的是其中的入參future類型為ScannerCallableWithReplicas.RetryingRPC。另外后面將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中?! ∽屛覀儊淼絊cannerCallableWithReplicas.RetryingRPC.prepare方法。如下圖所示。大家可能對這里的成員變量callable比較模糊了,大家可以往上翻到方法addCallsForCurrentReplica的描述,沒錯這里的callable就是ScannerCallableWithReplicas的成員變量currentScannerCallable。而ScannerCallableWithReplicas.currentScannerCallable正是在構造ScannerCallableWithReplicas時傳入的ReversedScannerCallable?! 〗酉聛碜屛覀儊淼絉eversedScannerCallable.prepare。由于這是第一次調用prepare方法,因此其成員變量instantiated為false。這里簡單提一下,這里的getRow方法獲取的是我們調用put時的行鍵,也就是我們對于目標表的rowKey。由于這里的tableName為TableName.META_TABLE_NAME,其rowKey在后面并沒有用到?! ∪缓笳{用了ReversedScannerCallable.setStub方法。為成員變量stub的賦值。其值為getConnection().getClient(getLocation().getServerName())調用的返回值?! ∽屛覀儊淼紺onnectionImplementation.getClient方法??催^我博文《HBase之HRegionServer啟動(含與HMaster交互)》的同學看到這里可能就比較熟悉。 沒錯,這里正是通過ClientProtos.ClientService.newBlockingStub構造了協議ClientProtos.ClientService的客戶端stub。關于與服務端交互的流程,我在《HBase之HRegionServer啟動(含與HMaster交互)》中已經具體介紹了,大家感興趣的可以去看一下,我們這里來描述比較重要一個點?! 【褪莄omputeIfAbsentEx的最后一個入參IOExceptionSupplier。他類似于java中的Supplier(類似的方法調用我在后面講解方法MetaTableAccessor.getRegionLocations)?! ≡诘谝淮握{用時,我們的stubs中并沒有到該serverName的客戶端stub,因此調用了入參supplier的get方法。也就是我們上面看到的lambda表達式方法內容被調用?! 〉竭@里,ReversedScannerCallable.prepare方法就調用完成了。這個還有一個需要注意的點就是ReversedScannerCallable.prepare方法的最后將其成員變量instantiated置為true?! 〗酉聛碜屛覀儊淼絊cannerCallableWithReplicas.RetryingRPC.call方法(這里的callable類型為ReversedScannerCallable)?! ∵@里再次調用了RpcRetryingCallerImpl.callWithRetries,由于ReversedScannerCallable.prepare方法已經調用,并且其成員變量instantiated被置為true,所以上面描述的內容并不會再次調用(這里框選的內容作為后面的伏筆)?! ∫簿褪钦f,接下來應該調用的是ReversedScannerCallable.call。由于其并沒有call方法,因此,會一直調用到其父類RegionServerCallable.call。如下圖所示。這里的rpcController類型為HBaseRpcControllerImpl。接下來調用了rpcCall方法。由于ReversedScannerCallable中并沒有rpcCall方法的實現,而在其父類ScannerCallable有實現rpcCall?! 〗酉聛?,讓我們來到ScannerCallable.rpcCall。由于默認的成員變量scannerId為-1,因此,會調用openScanner。由于openScanner方法僅僅是通過Client協議發送到服務端。關于rpc流程我在博客《hbase之RPC調用流程簡介》中已經介紹過了,感興趣的同學可以去看一下,那篇博文講的比較淺顯,我會在春節期間將那篇內容更新,大家可以關注我,到時候有更新大家也就收到通知了?! ∪缓笳{用了ResponseConverter.getResults,將服務端的返回的ScanResponse轉換為Result?! ∽屛覀儊淼絉esponseConverter.getResults。這個方法的主要作用是將CellScanner中Cell的或ScanResponse中的PB類型的results轉換為java類型的Result。至于該方法的詳細描述我要放到后面開設的第二章節,也就是HBase中客戶端協議各個操作中來講解,因為這里流程是比較復雜的,要結合上服務端的流程才能講述清楚。所以這里暫時略過?! 〉竭@里,一個完整的RpcRetryingCallerImpl.callWithRetries方法調用流程可以說是完結了。然后在ResultBoundedCompletionService.QueueingFuture.run方法的后面,將當前QueueingFuture添加到ResultBoundedCompletionService成員變量completedTasks中(雖然我在上面提到過,但這里還是重述一下,以便我們后面的理解)?! 《谖覀儽竟澝枋龅恼w流程中,ScannerCallableWithReplicas.addCallsForCurrentReplica方法調用完結?! 〗酉聛碜屛覀儊淼絉esultBoundedCompletionService.poll,由于其間接調用了ResultBoundedCompletionService.pollForSpecificCompletedTask,如下圖所由于在QueueingFuture.run方法的最后,將自身添加到了completedTasks。因此,上面的方法獲取的正是剛剛添加的QueueingFuture。接著調用了ResultBoundedCompletionService.QueueingFuture.get方法。如下圖所示。也就是說,這里將result返回。這里result的類型我們需要注意一下,以便后面在類型上面的理解。由于這里QueueingFuture成員變量future的實際類型為ScannerCallableWithReplicas.RetryingRPC。大家可以往上翻到ScannerCallableWithReplicas.RetryingRPC.call,就可以發現,這里的result是從ResponseConverter.getResults獲得的Result數組與成員變量callable封裝后的Pair對象。接著,將r.getFirst(),也就是實際獲得的結果返回?! 〉竭@里,大家可能以為要結束了,很遺憾,這里只是到了ClientScanner.call方法的返回?! ∮捎诮酉聛淼氖莾蓚€單獨的流程了。一個是MetaTableAccessor.getRegionLocations,另外一個是ConnectionImplementation.cacheLocation。至于這兩個流程之外的后續流程比較簡單,我就不一一敘述了,相信大家跟著源碼與我在前面的提示很容易就可以弄清楚了。而前面提到的那兩個單獨的流程我將放在后面的一節《HBase之Table.put客戶端流程(續)》中介紹。到時候歡迎大家閱讀?! 〈蠹铱梢躁P注我的博客,或者發送郵件到我的郵箱[email protected]來溝通交流大數據相關的知識。感謝大家的閱讀,如果覺得不錯,希望您可以點擊下面的推薦。

文章評論

Top 沈阳娱网棋牌手机版下载 浙江20选5中奖规则 江西省十一选五走势图一定牛 真准网四川快乐12遗漏 北京快3开奖遗漏一牛 甘肃快3一定牛25日开奖结果 中原福彩22选5开奖结果 排列五天天开奖的彩票 排列三 重庆快乐十分技巧 江苏11选五奖结果