网站首页 > 保险知识 >

用友t6委外(用友t6erp)

2023-05-10 11:18:52 保险知识 阅读 0

Bitget下载

注册下载Bitget下载,邀请好友,即有机会赢取 3,000 USDT

APP下载   官网注册

需求

当前数仓架构流程图如下图所示,不支持端到端数据血缘,数据异常排查及影响分析比较被动,需要端到端数据血缘及元数据管理。

sqllineage解析FineBI数据集导入Datahub生成血缘

业务系统:各种制造业业务系统(高速迭代、重构、新建中) 数仓开发平台:数栖平台,支持数仓内各层级的DAG调度血缘图

sqllineage解析FineBI数据集导入Datahub生成血缘

数仓导出库:PG BI可视化系统:FineBI,支持内部数据集、图表的血缘

sqllineage解析FineBI数据集导入Datahub生成血缘

通过调研分析,引入datahub做元数据管理平台,实现效果如下图展示。

sqllineage解析FineBI数据集导入Datahub生成血缘

sqllineage解析FineBI数据集导入Datahub生成血缘

方案

实现如下端到端血缘图: BI报表/仪表盘(dashboard)->BI组件(chart)->BI数据集->数仓导出库(PG)->数仓数据资产(数栖平台)->上游业务系统

工作内容:

  • ✅datahub中自定义FineBI、数栖平台的plateform及图表
  • ✅解析FineBI数据库,获取FineBI中BI报表/仪表盘(dashboard)->BI组件(chart)->BI数据集的血缘关系,调用Datahub rest emiter接口,datahub中生成血缘。
  • ✅获取BI数据集的SQL代码,通过sqllineage解析BI数据集与数仓导出库(PG)的血缘关系,调用Datahub rest emiter接口,datahub中生成血缘。
  • ✅获取数栖平台数据库中工作流、Hive任务的关系,获取Hive任务的SQL代码,通过sqllineage解析SQL代码的血缘,调用Datahub rest emiter接口,datahub中生成血缘。

本文介绍:

  • ✅datahub中自定义FineBI、数栖平台的plateform及图表
  • ✅通过sqllineage解析SQL生成血缘关系
  • ✅调用Datahub rest emiter接口,datahub中生成血缘

前置工作

  • 安装Datahub :Datahub实验环境搭建
  • 安装sqllineage

datahub自定义图标

[cloud@dp-web-uic1 datahub_ingest]$ datahub put platform --name fine_bi --display_name "FineBI" --logo "https://www.finebi.com/images/logo-FineBI.png"✅ Successfully wrote data platform metadata for urn:li:dataPlatform:fine_bi to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)[cloud@dp-web-uic1 ~]$ datahub put platform --name yuan_xiang --display_name "源象" --logo "https://www.dtwave.com/images/index/product/shuqi.svg"✅ Successfully wrote data platform metadata for urn:li:dataPlatform:yuan_xiang to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)[cloud@dp-web-uic1 ~]$ datahub put platform --name dolphinscheduler --display_name "海豚调度" --logo "https://dolphinscheduler.apache.org/img/hlogo_white.svg"✅ Successfully wrote data platform metadata for urn:li:dataPlatform:dolphinscheduler to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)[cloud@dp-web-uic1 datahub_ingest]$ datahub put platform --name statrocks --display_name "StarRocks" --logo "https://docs.starrocks.io/static/b660bcde69091ea56bd94cac0a907018/95f17/starrocks-logo_en-us.png"✅ Successfully wrote data platform metadata for urn:li:dataPlatform:statrocks to DataHub (DataHubRestEmitter: configured to talk to http://localhost:8080)

sqllineage解析SQL生成血缘关系

  • sqllineage解析SQL生成血缘测试
from sqllineage.runner import LineageRunnerdef test_create_as():    sql="""-- mes数据中获取每个批次第一次上线扫码时间drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00;create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00as  select    min(produce_date) min_produce_DATE,   mo_lot_no,   organization_id from  bda${db_para}.BDA_MES_PRODUCT_SUMMARY     where factory_no ='CY-SR'    and step_name in ('OC上线组装','整机组装1')  group by mo_lot_no,     organization_id;-- 订单承诺drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1;create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1as select   t1.version_id                       , t1.promise_id                       , t1.organization_id                  , t1.order_id                         , t1.order_no                         , t1.order_stage                      , t1.order_type                       , t1.so_type                          , t1.order_status                     , t1.order_priority                   , t1.promise_status                   , t1.product_id                       , t1.product_no                       , t1.product_model                    , t1.order_qty                        , t1.bu_name                          , t1.rcv_client_name                  , t1.prepared_client_name             , t1.order_source                     , t1.om_user_name                     , t1.term_cust                        , t1.to_pur_time                      , t1.factory_no                       , t1.mo_lot_no                        , t1.completed_qty                    , t1.mo_audit_status                  , t1.req_arrival_time                 , t1.mtr_ready_time                   , t1.plan_promise_time                , t1.promise_date_change_reason       , t1.schedule_start_time              , t1.schedule_end_time                , t1.pps_type                         , t1.pps_exception_info               , t1.promise_diff_day                 , t1.promise_delivery_cycle           , t1.change_reason                    , t1.client_abbr                      , t1.item_type_product                , t1.match_forecast                   , t1.software_flag                    , t1.risk_level                       , t1.risk_reason                      , t1.ckd_type                         , t1.crt_user                         , t1.crt_time                         , t1.upd_user                         , t1.upd_time                         , t1.crt_user_name                    , t1.upd_user_name                                 from   bda${db_para}.bda_whole_pto_order  t1left join bda${db_para}.bda_promise_history_record  t2  on t1.promise_id = t2.promise_id  and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN'where  t1.version_id like '%最新版本%' and    t2.promise_id is nullunion all select    t1.version_id                       ,  t1.promise_id                       ,  t1.organization_id                  ,  t1.order_id                         ,  t1.order_no                         ,  t1.order_stage                      ,  t1.order_type                       ,  t1.so_type                          ,  t1.order_status                     ,  t1.order_priority                   ,  t1.promise_status                   ,  t1.product_id                       ,  t1.product_no                       ,  t1.product_model                    ,  t1.order_qty                        ,  t1.bu_name                          ,  t1.rcv_client_name                  ,  t1.prepared_client_name             ,  t1.order_source                     ,  t1.om_user_name                     ,  t1.term_cust                        ,  t1.to_pur_time                      ,  t1.factory_no                       ,  t1.mo_lot_no                        ,  t1.completed_qty                    ,  t1.mo_audit_status                  ,  t1.req_arrival_time                 ,  t1.mtr_ready_time                   ,  t1.plan_promise_time                ,  t1.promise_date_change_reason       ,  t1.schedule_start_time              ,  t1.schedule_end_time                ,  t1.pps_type                         ,  t1.pps_exception_info               ,  t1.promise_diff_day                 ,  t1.promise_delivery_cycle           ,  t1.change_reason                    ,  t1.client_abbr                      ,  t1.item_type_product                ,  t1.match_forecast                   ,  t1.software_flag                    ,  t1.risk_level                       ,  t1.risk_reason                      ,  t1.ckd_type                         ,  t1.crt_user                         ,  t1.crt_time                         ,  t1.upd_user                         ,  t1.upd_time                         ,  t1.crt_user_name                    ,  t1.upd_user_name                           from (       select   t1.version_id                             ,  t1.promise_id                             ,  t1.organization_id                        ,  t1.order_id                               ,  t1.order_no                               ,  t1.order_stage                            ,  t1.order_type                             ,  t1.so_type                                ,  t1.order_status                           ,  t1.order_priority                         ,  t1.promise_status                         ,  t1.product_id                             ,  t1.product_no                             ,  t1.product_model                          ,  t1.order_qty                              ,  t1.bu_name                                ,  t1.rcv_client_name                        ,  t1.prepared_client_name                   ,  t1.order_source                           ,  t1.om_user_name                           ,  t1.term_cust                              ,  t1.to_pur_time                            ,  t1.factory_no                             ,  t1.mo_lot_no                              ,  t1.completed_qty                          ,  t1.mo_audit_status                        ,  t1.req_arrival_time                       ,  t1.mtr_ready_time                         ,  t1.plan_promise_time                      ,  t1.promise_date_change_reason             ,  t1.schedule_start_time                    ,  t1.schedule_end_time                      ,  t1.pps_type                               ,  t1.pps_exception_info                     ,  t1.promise_diff_day                       ,  t1.promise_delivery_cycle                 ,  t1.change_reason                          ,  t1.client_abbr                            ,  t1.item_type_product                      ,  t1.match_forecast                         ,  t1.software_flag                          ,  t1.risk_level                             ,  t1.risk_reason                            ,  t1.ckd_type                               ,  t1.crt_user                               ,  t1.crt_time                               ,  t1.upd_user                               ,  t1.upd_time                               ,  t1.crt_user_name                          ,  t1.upd_user_name                          ,  row_number() over (partition by t1.promise_id order by t1.version_id desc) rn      from   bda${db_para}.bda_whole_pto_order  t1      where  version_id not like '%最新版本%'       and not exists (select 1 from bda${db_para}.bda_whole_pto_order t2 where version_id like '%最新版本%' and t1.promise_id = t2.promise_id )      ) t1 left join bda${db_para}.bda_promise_history_record  t2  on t1.promise_id = t2.promise_id  and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN'where     t2.promise_id is nulland       t1.rn = 1;-- CRM订单与工单关联drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01;create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01asselect      bu.dept_name bu_name            ,t2.organization_id        -- 20220701 wyr           --  ,'514' Organization_Id            ,t1.item_code item_code            ,cus.cus_name -- 收货客户            ,t1.so_header_id            ,t1.so_line_id so_line_id            ,t1.so_code so_header_code            ,t1.line_no so_line_code            ,t2.wip_entity_name -- 工单号            ,t2.lot_number -- 批次            ,t2.Project_Name            ,t1.om_user_name Om_User_Name -- 销管            ,t1.sale_name sales_user -- 销售            ,case when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造'                        and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20)                  when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造'                        and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 35)                  when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造'                        and mig.min_class like'%PC模块%' then date_add(t1.pur_start_time, 25)                  when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造'                        and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 45)                  when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造'                        and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20)                  when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造'                        and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 30)                  else t1.pur_start_time             end stat_date -- 统计日期 提交下采购日期 + 对应日期            ,substr(t1.expected_delivery_date, 1, 10) delivety_time -- 计划发运日期            ,substr(t1.crt_time, 1, 10) crm_create_time -- 销售订单创建时间            ,substr(t1.pur_start_time, 1, 10) purchase_date -- 提交下采购时间            ,substr(t1.produce_start_time, 1, 10) produce_date -- 下生产时间            ,substr(t2.Xwh_Creation_Date, 1, 10) wip_create_date -- 委外工单创建日期            ,substr(t2.Scheduled_Start_Date, 1, 10) Scheduled_Start_Date -- 工单齐套日期            ,substr(t2.Mc_Creation_Date, 1, 10)  Mc_Creation_Date -- 生管确认时间            ,substr(t2.first_trx_date, 1, 10) first_finish_date -- 首次完工入库日期            ,substr(t2.last_trx_date, 1, 10) last_finish_date -- 完全完工入库日期            ,t1.so_type_name order_type -- 订单类型            ,t2.wip_job_status -- 工单状态            ,t2.Job_Type -- 工单类型            ,t2.Class_Code -- 工单分类            ,t2.Quantity_Completed -- 工单已完工数量            ,t1.qty -- 订单数量            ,case when t6.order_no is not null then t6.match_forecast else bsse.is_source_forecast end as is_source_forecast  -- 订单有无预测            ,mio.planning_make_buy_code -- 整机加工模式 制造/采购            ,case when mig.min_class like '%PC模块%' then 'PC模块' else '其他' end prod_type            ,datediff(t2.last_trx_date, t1.pur_start_time) supply_cycle -- 供应链周期 (取多个工单中最早的完工入库时间,计算供应链周期)            ,case when t1.so_type_name <> '备品订单' and t2.first_trx_date is not null then 'Y' else 'N' end supply_cycle_flag -- 供应链周期标识            ,case when t1.so_type_name = '客户订单' and t2.Job_Type = '标准'                       and (                            (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造'                              and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20)                            or                             (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造'                              and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35)                            or                            (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造'                              and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 25)                            or                            (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造'                              and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 45)                            or                            (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造'                              and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20)                            or                            (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造'                              and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35)                           ) and t2.first_trx_date is not null then 'Y'                  else 'N' end delivety_complete_flag -- 交付达成标识            ,case when t1.so_type_name in  ('客户订单','销售订单') and t2.Job_Type = '标准' then 'Y' else 'N' end is_delivety_complete_flag -- 交付达成标识            ,t1.expected_delivery_date overseas_stat_date -- 海外订单交付达成归集时间            ,case when t1.so_type_name in  ('客户订单','销售订单')  -- and bsse.is_source_forecast is not null                        and datediff(t2.last_trx_date,  t1.expected_delivery_date) <= 0 and t2.last_trx_date is not null then 'Y'                  else 'N' end overseas_is_delivety_complete_flag -- 海外订单交付达成标识            ,case when t1.so_type_name in  ('客户订单','销售订单') -- and bsse.is_source_forecast is not null                       and (datediff('${bizDate}', t1.expected_delivery_date) >= 0                             or (datediff('${bizDate}', t1.expected_delivery_date) < 0 and datediff(t2.last_trx_date, t1.expected_delivery_date) <= 0)                           ) then 'Y'                   else 'N' end overseas_delivety_complete_flag -- 海外订单交付达成数据范围            ,row_number() over(partition by t2.Lot_Number order by t1.pur_start_time) rn            ,t2.Start_Quantity wip_qty            ,t2.fisrt_picking_date -- 首次领料时间            ,t3.first_ship_date            ,t3.last_ship_date            ,-1*trx33.shipped_qty shipped_qty -- 已出货数量             ,t2.Quantity_Completed + trx33.shipped_qty as difference_qty -- 差异            ,dmpm.screen_size -- 尺寸            ,t2.Created_By as pm_user -- 生管负责人            ,substr(t3.min_scheduled_date, 1, 10) as min_scheduled_date -- 实际齐套日期            ,substr(t5.min_produce_DATE, 1, 10)  min_produce_date            ,t1.bt_name             -- add by tjl 2022.07.21             ,bsse.so_line_group_id  --             ,substr(t3.online_date, 1, 10)  as online_date            ,datediff(substr(t1.expected_delivery_date, 1, 10),substr(t1.pur_start_time, 1, 10)) as cus_expect_cycle  -- 客户期望周期            ,case when t6.order_no is not null and t6.plan_promise_time is not null then datediff(substr(t6.plan_promise_time,1,10),substr(t1.pur_start_time, 1, 10))  -- 如有承诺日期 预计供应链=承诺日期-下采购日期                  when t6.order_no is not null and t6.plan_promise_time is null and t2.wip_entity_name is null then datediff(date_add(substr(t6.mtr_ready_time, 1, 10),6),substr(t1.pur_start_time, 1, 10))  -- 无承诺日期 未开工单,= 齐套日期+6                  when t2.wip_entity_name is not null and  t3.online_date is not null then datediff(date_add(substr(t3.online_date, 1, 10),4),substr(t1.pur_start_time, 1, 10))  -- 已开工单,已有上线日期,=上线日期+4                  when t2.wip_entity_name is not null and  t3.online_date is  null then datediff(date_add(substr(t2.Scheduled_Start_Date, 1, 10),6),substr(t1.pur_start_time, 1, 10))  -- 已开工单,暂无上线日期,=齐套日期+6              end as  estimate_supply_cycle   -- 预计供应链周期             ,t8.cus_level-- from        bda${db_para}.bda_oms_so_lines t1FROM        bda${db_para}.bda_sd_so t1left join  bda${db_para}.bda_sd_so_ext bsse on         t1.so_line_id = bsse.so_line_idand        bsse.part_dt IN ('crm_so', 'oms_so') join        bda${db_para}.bda_job_inv_trx_zj_dtl t2on           bsse.so_line_group_id = t2.source_line_id-- and    t1.so_header_id = t2.source_header_idleft join   dim${db_para}.dim_hcm_orgunit buon          t1.bill_bu_id = bu.dept_oidleft join   bda${db_para}.comm_market_cus cuson          t1.rec_cus_code = cus.id-- join        (select item_value, fullname --              from o_crm${db_para}.comm_dictionary_detail--              where parentcode = '$CRM_DELIVERY_SO_TYPE') cdd-- on          cdd.item_value = t1.so_typeleft join   dim${db_para}.md_item_group migon          t2.item_code = mig.item_codeleft join   dim${db_para}.md_item_org mioon          t1.item_code = mio.item_codeand         mio.Organization_Id = '514'left join   dim${db_para}.dim_md_prod_model dmpmon          mig.product_model = dmpm.prod_modelleft join   bda${db_para}.bda_job_dtl t3on          t2.wip_entity_name = t3.wip_entity_nameleft join   o_md${db_para}.md_prod_model t4on          mig.product_model = t4.product_modelleft join   (select sum(trx_so.trx_qty) shipped_qty                    ,trx_so.bch_nbr                from bda${db_para}.bda_inv_item_trx_bach_dtl trx_so                where trx_so.trx_type_id = 33                group by trx_so.bch_nbr) trx33 on          trx33.bch_nbr = t2.lot_numberleft join  sda${db_para}.tmp_sda_delivety_complete_sr_sum_00 t5 on t5.mo_lot_no = t2.lot_numberleft join  sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1 t6 on         t1.line_code = t6.order_noleft join   bda${db_para}.bda_wip_mo_header t7 on t3.wip_entity_name = t7.ebs_mo_codeleft join  (select  t.cus_code     , t2.hcm_dept_oid    as dept_oid     , max(t.cus_level)   as cus_level_id     , max(t1.fullname)   as cus_level     , t2.hcm_dept_name   as dept_namefrom      o_crm${db_para}.cus_bu_ext_info t left join o_crm${db_para}.comm_dictionary_detail t1on        t.cus_level = t1.item_valueand       t1.parentcode = '$CRM_CUS_LEVEL'inner join dim${db_para}.dim_hcm_crm_org_map t2on         t.bu_code = t2.dept_codewhere      t2.dept_name not like '%失效%'and        t.is_deleted = '0'and        t2.hcm_dept_oid is not nullgroup by  t.cus_code,t2.hcm_dept_oid,t2.hcm_dept_name)  t8 on         t1.rec_cus_code = t8.cus_codeand        bu.dept_oid = t8.dept_oidwhere       t1.pur_start_time is not nulland         t1.is_onhand_out in ('0','否')and         t4.finished_or_semi_finished_prod = '成品'AND         t1.part_dt IN ('crm_so', 'oms_so') and         t3.wip_job_status<>'已取消' and (t3.wip_job_status<>'已关闭' or t3.quantity_completed >0)and         coalesce(t7.source_demand_max,'')<>'相关需求';insert overwrite table sda${db_para}.sda_delivety_complete_sr_sumselect       t.bu_name             ,t.Organization_Id             ,t.item_code             ,t.cus_name -- 收货客户             ,t.so_header_code             ,t.so_line_code             ,t.wip_entity_name             ,t.lot_number             ,t.Project_Name             ,t.Om_User_Name -- 销管             ,t.sales_user -- 销售             ,t.delivety_time -- 计划发运日期             ,t.crm_create_time -- 销售订单创建时间             ,t.purchase_date -- 提交下采购时间             ,t.produce_date -- 下生产时间             ,t.stat_date -- 统计日期 提交下采购日期 + 对应日期             ,t.wip_create_date -- 委外工单创建日期             ,t.Scheduled_Start_Date -- 工单齐套日期             ,t.Mc_Creation_Date -- 生管确认时间             ,t.first_finish_date -- 首次完工入库日期             ,t.last_finish_date -- 完全完工入库日期             ,t.order_type -- 订单类型             ,t.job_type              ,t.supply_cycle -- 供应链周期             ,t.supply_cycle_flag -- 供应链周期标识             ,t.delivety_complete_flag -- 交付达成标识             ,t.is_delivety_complete_flag             ,t.overseas_stat_date             ,t.overseas_is_delivety_complete_flag             ,t.overseas_delivety_complete_flag             ,t.is_source_forecast is_source_forecast             ,t.wip_qty             ,t.fisrt_picking_date             ,t.first_ship_date             ,t.last_ship_date             ,'MTO' order_mode             ,current_timestamp()             ,'${bizDate}'             ,t.shipped_qty -- 已出货数量              ,t.difference_qty -- 差异             ,t.screen_size -- 尺寸             ,t.pm_user -- 生管负责人             ,t.min_scheduled_date             ,t.min_produce_date             ,t.bt_name   -- add by tjl 2022.07.21              ,t.so_line_group_id             ,t.Class_Code    -- add by wyr 2022.09.23             ,t.cus_level   as cus_level   --  tjl 2022.11.02             ,t.cus_expect_cycle       as cus_expect_cycle      -- 客户期望周期    -- add by tjl 2022.11.02             ,t.estimate_supply_cycle  as estimate_supply_cycle -- 预计供应链周期  -- add by tjl 2022.11.02from         sda${db_para}.tmp_sda_delivety_complete_sr_sum_01 twhere        t.rn = 1;    """    result = LineageRunner(sql.replace("${db_para}",''))    print(result.source_tables)    print(result.target_tables)if __name__ == "__main__":    test_create_as()

调用Datahub rest emiter接口,datahub中生成血缘

#!/usr/bin/python3# coding=utf8# -----------------------------------------------------------------------------------# 日  期:2022.08.30# 作  者:zds# 用  途: 数仓Hive血缘#        1. 通过Trino查询数据库,获取数栖平台调度DAG血缘关系#        2. 注意:直接操作数据库修改权限,BI有大概几分钟的缓存时间,需要等待数据更新。#        3. 注意:fine_pack_filter中create_type=3,是用户角色。使用的rowid = fine_user中的id,在最终用户权限上配置的。# .       4. "且" = 34;"或"=35#        5. 依赖数仓中manual开头的表,这些表通过爬虫采集,数据延迟一天# -----------------------------------------------------------------------------------import jsonimport timeimport datetimeimport base64import reimport pandas as pdfrom simple_ddl_parser import DDLParserfrom sqlalchemy import create_enginefrom sqllineage.runner import LineageRunnerimport datahub.emitter.mce_builder as builderfrom datahub.emitter.rest_emitter import DatahubRestEmitterclass DWHiveLineage:    def __init__(self):        self.shuxi_db = create_engine("mysql+pymysql://xxxx@p-dbsec-mysql.gz.cvte.cn:10006/uic")    def get_task_sql(self):        # tasktype_id in (4,8,11,12,16) 全部有源码的任务        sql = """select cata_id,flow_id,task_id,task_name,task_type_name,source, parameter from (    select rtc.task_id ,rtc.source,rtc.parameter,bt.task_name,bt.tasktype_id,btt.task_type_name,bc.cata_id,bc.flow_id    from dipper.rel_task_config rtc    left join  (     select task_name,tasktype_id,task_id,flow_id from dipper.bas_task where tasktype_id in (12,16) and tasktype_id is not null and ws_id = 11 and invalid = 0    )bt on rtc.task_id = bt.task_id     left join dipper.bas_tasktype btt on btt.tasktype_id = bt.tasktype_id    left join (select * from dipper.bas_cata where invalid = 0 and ws_id = 11) bc on bc.flow_id = bt.flow_id    )t where t.source is not null and t.task_name is not nullorder by flow_id          """        df = pd.read_sql(sql=sql, con=self.shuxi_db)        return df    def list_lineages(self):        df = self.get_task_sql()        dataset_lineages = {}        idx = 0        for row in df.to_dict(orient="records"):            try:                sql = base64.b64decode(row['source']).decode('utf-8')                print("============" + row['task_name'] + "========")                result = LineageRunner(sql.replace("${db_para}", ''))                # 一个文件中有多个SQL语句,需要拆分处理                if len(result.target_tables) > 2:                    print("目标表有多个,需要拆分SQL再计算血缘:【{}】".format(result.target_tables))                else:                    dataset_lineages[str(result.target_tables[0])] = [str(t) for t in self.source_tables]                    idx += 1            except Exception as e:                print("解析任务【{}】SQL失败。".format(row['task_name']))                print(e)                break            if idx > 10:                break        return dataset_lineages    def generate_lineages(self):        result_tables = self.list_lineages()        for target_table in result_tables.keys():            input_tables_urn = []            for source_table in result_tables[target_table]:                input_tables_urn.append(builder.make_dataset_urn("hive", source_table))            # Construct a lineage object.            lineage_mce = builder.make_lineage_mce(                input_tables_urn,                builder.make_dataset_urn("hive", target_table),            )            # Create an emitter to the GMS REST API.            emitter = DatahubRestEmitter("http://xx.xx.xx.xx:8080")            # Emit metadata!            emitter.emit_mce(lineage_mce)            try:                emitter.emit_mce(lineage_mce)                print("添加数仓表 【{}】血缘成功".format(target_table))            except Exception as e:                print("添加数仓表 【{}】血缘失败".format(target_table))                print(e)                break    if __name__ == "__main__":    dw = DWHiveLineage()    dw.generate_lineages()

效果图

sqllineage解析FineBI数据集导入Datahub生成血缘



相关内容

用友t6委外(用友t6erp)文档下载.: PDF DOC TXT

猜你喜欢