日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
Spark在供應(yīng)鏈核算中的應(yīng)用總結(jié)

一、業(yè)務(wù)背景

從上圖可以看到供應(yīng)鏈核算一腳在業(yè)務(wù)(計費/結(jié)算可以理解為財務(wù)視角的業(yè)務(wù)),一腳在財務(wù),職責(zé)上既要滿足核算團隊月結(jié)出賬的訴求,又要提供業(yè)財對賬的能力,基于此我們將數(shù)據(jù)處理統(tǒng)一為如下流程:

二、離線 SQL 模式存在的問題

從第1章節(jié)圖2可以看到,核算的流程就是ETL的過程,在早期的方案中通過離線+在線的實現(xiàn)方式,其中離線完成原始憑證的加工,業(yè)務(wù)接入的邏輯通過SQL實現(xiàn),在線系統(tǒng)完成記賬+拋賬,同時由于在線系統(tǒng)處理能力有限,在原始憑證加工中進行了業(yè)務(wù)單據(jù)的聚合,此種實現(xiàn)方式主要存在以下問題。

1.對賬問題定位困難,核算小二主要通過下載分錄及對應(yīng)的業(yè)務(wù)單據(jù)匯總數(shù)據(jù)進行對賬,如果某一分錄和業(yè)務(wù)數(shù)據(jù)有出入,只能逐一業(yè)務(wù)要素分析,由于缺乏通過分錄精確追溯到關(guān)聯(lián)業(yè)務(wù)單據(jù)的下鉆能力,問題定位耗時較長,造成這一問題的主要原因在于通過離線SQL實現(xiàn)的原始加工邏輯無法精確的建立業(yè)務(wù)單據(jù)和原始憑證的關(guān)聯(lián)關(guān)系。

2.日常運維困難,隨著業(yè)務(wù)的不斷發(fā)展,業(yè)務(wù)接入離線任務(wù)在不斷的膨脹,最終成為一個橫跨4個項目空間,150+離線任務(wù)、100+離線表的工程,任一節(jié)點的錯誤都會造成月結(jié)數(shù)據(jù)出錯。

3.行業(yè)實施效率較低,每次新接入行業(yè)都需要開發(fā)小二新建一套離線表+離線任務(wù),相應(yīng)的也造成運維問題的持續(xù)惡化。

三、為什么選擇Spark

1.核心訴求

在核算主版本的建設(shè)中,我們希望能夠通過打造穩(wěn)定可復(fù)用的產(chǎn)品能力最大程度的解決上述問題,核心訴求如下:

1)核算規(guī)則(業(yè)務(wù)接入/記賬/拋賬)可配、可視,不存在黑盒的加工邏輯,加工流程對核算小二全透明(提升實施+對賬效率)

2)建立整個核算鏈路單據(jù)維度的關(guān)聯(lián)關(guān)系(業(yè)務(wù)單據(jù)<->原始憑證<->記賬憑證<->拋賬憑證),具備雙向的單據(jù)追溯能力(提升對賬效率)

基于以上訴求,我們抽象了標(biāo)準的規(guī)則模型,滿足用戶多場景下各個鏈路(業(yè)務(wù)接入、記賬、拋賬)的加工邏輯配置(規(guī)則相關(guān)設(shè)計方案不再此文展開),與之配套的會計引擎完成基于核算規(guī)則的數(shù)據(jù)處理,另外在主版本的設(shè)計中,原始憑證需要1V1還原業(yè)務(wù)單據(jù),每月原始憑證數(shù)據(jù)量達到了10億級別,為了滿足月結(jié)時效性的要求,我們需要采用高性能、支持大數(shù)據(jù)量、且編程友好(便于建立單據(jù)關(guān)系)的計算引擎。

2.Spark VS MapReduce

基于上述訴求,我們重點調(diào)研了Spark和MapReduce兩款計算引擎,差異如下所示:

引擎

MapReduce

Spark

編程友好

一般,支持Map/Reduce兩種算子

較好,支持的算子豐富(map/filter/reduce/aggregate等)

性能

一般,中間態(tài)數(shù)據(jù)需要落盤,計算邏輯相對復(fù)雜時,MapReduce會涉及到多MapReduce任務(wù)執(zhí)行(多次shuffle),每次shuffle也會涉及到大量的磁盤IO

較好,基于內(nèi)存計算,基于DAG可以構(gòu)建RDD的血緣關(guān)系,在調(diào)度過程中可以避免大量無效的磁盤IO,另外rdd共享機制可以降低網(wǎng)絡(luò)IO的開銷

集團生態(tài)

較好,odps提供MapReduce計算框架支持,可以通過LogView查看日志

較好,odps提供Spark計算引擎支持,可以通過LogView查看日志,目前提供了stand-alone、集群及client三種模式的支持

比較形象的對比(并不是說spark不會落盤,在基于DAG圖拆分stage時,也會涉及到shuffle,但整體的磁盤IO消耗比MapReduce要低)。

3.編程模式優(yōu)勢: RDD + DataFrame 的編程模式

如上面和MapReduce的比較中看到 Spark 在編程友好性上比MapReduce好一些,比較適合后端開發(fā)人員。

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

上面是一個官方的例子,在schema控制,可編程性和 sql 操作等能較好的結(jié)合,邏輯比較類同后端開發(fā)。

基于上述spark特點及優(yōu)勢,我們最終選擇spark實現(xiàn)會計引擎邏輯。

四、spark基礎(chǔ)介紹

1.基礎(chǔ)概念

  • Rdd(Resilient distributed dataset):不可變的彈性分布式數(shù)據(jù)集(不可變性似于docker中的只讀鏡像層),只能通過其他的transformation算子創(chuàng)建新的RDD。
  • Operations:算子,spark包括兩類算子,transformation(轉(zhuǎn)換算子,通過對前置rdd的處理生成新的rdd)/action(觸發(fā)spark job的拆分及執(zhí)行,負責(zé)將rdd輸出)。
  • Task:執(zhí)行器執(zhí)行的任務(wù)單元,一般基于當(dāng)前rdd的分區(qū)數(shù)量拆分。
  • Job:包含多個task的集合,基于Action算子拆分。
  • Stage:基于當(dāng)前rdd處理邏輯的寬窄依賴拆分,spark中非常重要的概念,stage的切換會涉及到IO。
  • Narrow/Wide dependencies:參考下圖,區(qū)分的重要依據(jù)在于父節(jié)點是否會被多個子節(jié)點使用。

2.Spark on MaxCompute(ODPS)

我們在實踐中,主要基于spark on odps提供的client模式實現(xiàn),client模式的詳細介紹可以參考相關(guān)文檔。

  • Spark 有很多的后端的 Runtime,例如其商業(yè)化公司的Databricks Runtime, 彈內(nèi)我們使用的是 AliSpark,是集團的適配MaxComputer,同時在離線交互是使用了 Cupid-SDK 的 Client模式,這個模式不是獨立集群的模式,類Serveless模式,整體的成本上比獨立集群要低,當(dāng)然資源保障上沒有獨立集群好。

Client模式原理參考相關(guān)文檔,比調(diào)度模式有更好的應(yīng)用交互性。

  • 集團client模式將spark session作為服務(wù)提供,可以方便地與在線系統(tǒng)交互,包括任務(wù)的提交、關(guān)閉、實例的關(guān)閉等;
  • 在使用集團提供的spark能力時,比較麻煩的在于如何方便的查看日志,從我們的實踐看主要有以下2個路徑。

申請odps對應(yīng)項目空間的logview權(quán)限,可以直接在https://logview.alibaba-inc.com/中基于sparkInstanceId定位到具體的日志;

借助odps client+提交spark任務(wù)時返回的實例ID獲取log地址,代碼參考如下:

//instanceIdd對應(yīng)odps client中的lookupName
Account account = new AliyunAccount(sparkSessionConfig.getAccessId(), sparkSessionConfig.getAccessKey());
Odps odps = new Odps(account);
odps.setEndpoint(sparkSessionConfig.getEndPoint());
odps.setDefaultProject(sparkSessionConfig.getNamespace());
//日志地址目前設(shè)定有效期為7*24小時
try {
return odps.logview().generateLogView(odps.instances().get(sparkInstanceId), 7 * 24L);
} catch (OdpsException e) {
LOGGER.error("生成logView地址失敗,config:{},instanceId:{},e:{}", sparkSessionConfig, sparkInstanceId, e);
}

五、技術(shù)方案

1.整體方案

spark作為大數(shù)據(jù)處理引擎,在實例數(shù)量較少的情況下采用odps任務(wù)目前的運維方式來管理的話成本并不高,但是在供應(yīng)鏈核算的場景下,需要支持每天將近600+(行業(yè)*核算場景)數(shù)量的實例運行,且需滿足核算完整性、準確性、及時性的要求,另外由于目前我們的spark任務(wù)(cupid)與odps任務(wù)共享項目空間資源,意味著我們需要在有限的資源下支持核算的業(yè)務(wù),基于以上背景及訴求,供應(yīng)鏈核算整體的應(yīng)用架構(gòu)設(shè)計如下:

其中ascp-finance-accounting負責(zé)任務(wù)調(diào)度,組件交互如下:

  • spark任務(wù)管理:負責(zé)spark任務(wù)相關(guān)生命周期的管理,承接核算任務(wù)和spark session之間的交互;
  • spark session管理:負責(zé)spark實例的創(chuàng)建、銷毀、job提交等,另外針對不同類型的session,支持自定義所需資源,包括實例worker數(shù)量、分區(qū)大小等,主要與spark on odps交互;
  • 核算任務(wù)管理:負責(zé)業(yè)務(wù)接入、記賬、拋賬等核算任務(wù)的生命周期管理;
  • spark job版本管理:spark任務(wù)所需jar包會不斷的迭代,針對不同的核算場景可以定制所需的job版本。

ascp-finance-accounting-spark負責(zé)spark job的開發(fā)維護,spark on odps client模式下需要基于服務(wù)上傳jar包,若jar包較大,性能較差,所以基于client模式下提供的resource管理能力,我們將項目module拆分如下:

包名

作用

accounting-spark-client

對外提供spark任務(wù)的啟動、查詢及終止服務(wù)

accounting-spark-common

公共包,包括常量、工具類等

accounting-spark-job

spark任務(wù)包,封裝了任務(wù)接入和記賬兩個任務(wù)的實現(xiàn)

accounting-spark-dependency

spark任務(wù)包依賴的二方包,client模式下若job包過大,會造成上傳失敗的問題,所以部分job依賴的二方包可以放在dependency中,單獨打包,手工在datawork中上傳,通過resources傳遞參數(shù)

2.數(shù)據(jù)處理流程

核算接入、記賬、拋賬等主流程的spark處理邏輯如下所示:

六、運維及調(diào)優(yōu)

基于spark的特性,完成數(shù)據(jù)處理邏輯的編寫對我們來說并不困難,問題主要集中在如何用盡可能低的成本滿足業(yè)務(wù)需求,特別是在目前控制成本的背景下,在供應(yīng)鏈核算的落地過程中,我們主要采用了以下優(yōu)化方式。

1.數(shù)據(jù)量評估

spark任務(wù)的運行效率很大程度上受到分區(qū)數(shù)量的影響,spark提供了如下手段來進行分區(qū)數(shù)量的調(diào)整(部分為spark on odps能力),供應(yīng)鏈核算在實現(xiàn)過程中主要用到了odps離線表和lindorm兩種數(shù)據(jù)源。

1)spark.hadoop.odps.input.split.size:用于設(shè)置spark讀取odps離線表的分區(qū)大小,默認為256M,在實踐過程中需要結(jié)合當(dāng)前分區(qū)的大小進行調(diào)整,比如當(dāng)前分區(qū)大小為1GB,那么默認情況下會拆分為4個分區(qū);

2)spark讀寫lindorm(類hbase)的分區(qū)數(shù)主要受到region數(shù)量的影響,在供應(yīng)鏈核算系統(tǒng)的實踐中,由于初始region數(shù)量較少,導(dǎo)致分區(qū)數(shù)量很少,spark執(zhí)行效率很差,針對此問題我們實踐了兩種處理策略;

  • 進行重分區(qū)(repartition算子):針對數(shù)據(jù)傾斜進行重新分區(qū),但是會拆分stage,觸發(fā)shuffle,增加額外的IO成本。
  • lindorm進行預(yù)分區(qū),比如預(yù)分區(qū)為128個region,但此種實現(xiàn)方案需要結(jié)合rowkey的設(shè)計一起使用,會影響到scan的效率。

2.代碼邏輯相關(guān)job/stage/task評估

除了六中所述數(shù)據(jù)量以外,數(shù)據(jù)處理邏輯的實現(xiàn)方法也會影響到任務(wù)的執(zhí)行效率,spark比mapreduce執(zhí)行效率高的一個原因就在于spark會先基于處理流程構(gòu)建DAG,這樣可以有效評估每個stage是否需要落盤(IO成本),在邏輯實現(xiàn)過程中我們在保證數(shù)據(jù)處理無誤的情況下需要盡可能得降低IO(減少shuffle),比如可以執(zhí)行以下策略。

  • 慎用效率角度的算子,比如groupBy。
  • 盡量減少stage數(shù)量。

3.計算存儲資源評估

計算存儲資源同樣是spark執(zhí)行效率優(yōu)化的關(guān)鍵,spark也提供了多種手段來調(diào)整資源的使用情況;

  • spark.executor.instances executor:設(shè)置當(dāng)前實例的worker數(shù)量;
  • spark.executor.cores:核數(shù),每個Executor中的可同時運行的task數(shù)目;
  • spark.executor.memory:executor內(nèi)存。

4.其他參數(shù)

odps.cupid.clientmode.heartbeat.timeout 此配置用來調(diào)節(jié)cupid(spark on odps) client模式下的心跳超時時間,默認為30分鐘,若任務(wù)執(zhí)行較長,需要進行調(diào)整。

hbase.client.write.buffer:用來調(diào)節(jié)lindorm的flush磁盤的buffer大小,lindorm mput數(shù)量限制為100(經(jīng)咨詢?yōu)槿窒拗?,無法調(diào)整),所以在spark寫lindorm時我們主要采用此配置項調(diào)節(jié)批量寫入的數(shù)量,這點比較坑。

spark.hadoop.odps.cupid.job.priority:用于調(diào)節(jié)任務(wù)資源獲取的優(yōu)先級。

5.Spark UI

spark 本身的 UI 中有整體的job/stage/task的可視化分析數(shù)據(jù),比較方便的查詢到對應(yīng)的執(zhí)行過程,如下圖:

通過SparkUI 可以看到任務(wù)的驅(qū)動步驟和對應(yīng)的執(zhí)行的日志。通過分析可以針對性的優(yōu)化提升。

6.交互式開發(fā)測試

ODPS 有一個非常好的所見所得的 dataworks 平臺,大大提升了開發(fā)的效率,spark 當(dāng)前在dataworks沒有直接的交互的IDE,需要通過 zeppelin 來實現(xiàn)。zeppelin在數(shù)據(jù)技術(shù)棧中的定位如下:

Web-based notebook that enables data-driven,interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.

可以在交互中實現(xiàn)結(jié)果的快速反饋。

支持 scala 的 UDF 驗證等,提升了測試驗證效率。

7.效果

經(jīng)過以上優(yōu)化,在2500萬數(shù)據(jù)量60worker數(shù)的場景,接入+記賬+拋賬流程由之前的2小時提效至10分鐘,同時在編程模式上更加匹配服務(wù)端技術(shù)的研發(fā)模式,提升了研發(fā)效率。

七、總結(jié)

核算業(yè)務(wù)的特征比較偏向數(shù)據(jù)和規(guī)則的處理,大數(shù)據(jù)引擎的引入有助于整體業(yè)務(wù)的交付效率提升和成本降低。目前我們對Spark的認知主要在完成數(shù)據(jù)處理邏輯開發(fā)及日常的調(diào)優(yōu)上,隨著運行實例的增多以及業(yè)務(wù)的不斷發(fā)展,當(dāng)前的技術(shù)方案也會不斷的迭代演進。

參考文檔

通過spark訪問lindorm:https://help.aliyun.com/document_detail/174657.html


分享題目:Spark在供應(yīng)鏈核算中的應(yīng)用總結(jié)
網(wǎng)站路徑:http://m.5511xx.com/article/cdsopoi.html