insert overwrite table bi_ads.ads_log_device_export_csv_out 
partition(dt='20230809',biz_type)
select 
  /*+ repartition(1) */
  device_id
  ,event_from
  ,event_id
  ,status
  , if(length(cast(event_time as string)) = 10,cast(concat(cast(event_time as string),'000') as bigint),event_time)
  ,code
  ,value
   -- ,event_detail
  ,biz_type
from (
  select 
    a.dev_id as device_id
    ,'' as code
    ,'' as value
    ,1 as event_from
    ,if(a.event_type = 'ONLINE' , 1, 2) as event_id
    ,1 as status
    ,coalesce(time,0) as event_time
    ,if(a.event_type = 'OFFLINE' , reason, '')  as event_detail
    ,b.biz_type
  from bi_ods_clear.ods_clear_smart_device_event a
  join bi_dw.dim_device_basic_info_item_df b 
  on a.dev_id = b.dev_id
  and a.dt = '20230809' 
  and b.dt = '20230809' 
  and b.biz_type in ('299212','306804','600358')
  and a.event_type IN ('ONLINE','OFFLINE')

  union all

  select 
    a.id as device_id
    ,'' as code
    ,'' as value
    ,1 as event_from
    ,3 as event_id
    ,1 as status
    ,a.active_time as event_time
    ,'' as event_detail
    ,b.biz_type
  from bi_ods_clear.ods_clear_smartapollobizdata_gateway_active a
  join bi_dw.dim_device_basic_info_item_df b on a.id = b.dev_id
  where a.dt = '20230809' 
  and b.dt = '20230809' 
  and b.biz_type in ('299212','306804','600358')

  union all

  select 
    a.gw_id as device_id
    ,'' as code
    ,'' as value
    ,coalesce(a.reason, 1) as event_from
    ,4 as event_id
    ,1 as status
    ,split(a.row_key, '-')[1] as event_time
    ,'' as event_detail
    ,b.biz_type
  from bi_ods_clear.ods_clear_smartapollobizdata_gateway_reset a
  join bi_dw.dim_device_basic_info_item_df b on a.gw_id = b.dev_id
  where a.dt = '20230809' 
  and b.dt = '20230809' 
  and b.biz_type in ('299212','306804','600358')

  union all

  select 
    a.subid as device_id
    ,c.code
    ,a.dp_value as value
    ,case when a.reason = 'voice_control' then 3
          when a.reason = 'rule' then 4
          when a.reason = 'app' then 2
    else 1 end as event_from
    ,5 as event_id
    ,1 as status
    ,a.dp_time as event_time
    ,'' as event_detail
    ,b.biz_type
  from bi_ods_clear.ods_clear_datapoint_publish a
  join (
          select dev_id, biz_type, product_id
          from bi_dw.dim_device_basic_info_item_df
          where dt = '20230809'
          and biz_type in ('299212','306804','600358')
      ) b 
  on a.subid = b.dev_id
  and a.dt = '20230809'
  left join (
              select b.id as product_id, a.dp_id, a.code, a.name, a.mode
              from bi_ods.ods_smart_schema_datapoint a
              left join bi_ods.ods_smart_product b
                  on a.schema_id = b.schema_id
                  and b.dt = '20230809'
              where a.dt = '20230809'
                  and a.status = 1
                  -- and a.selected = 1
              group by b.id, a.dp_id, a.code, a.name, a.mode
          ) c 
  on c.product_id = b.product_id
  and c.dp_id = a.dp_id

  union all

  select 
    a.dev_id as device_id
    ,'' as code
    ,'' as value
    ,case when ((a.log_type = 'device_upgrade_begin' or a.log_type = 'upgrade_device_info') and a.upgrade_type != null) then (a.upgrade_type + 7)
          when ((a.log_type = 'device_upgrade_begin' or a.log_type = 'upgrade_device_info') and a.upgrade_type = null) then 2
          when (a.log_type = 'device_upgrade_status' or a.log_type = 'device_upgrade_url' or a.log_type = 'device_upgrade_progress' or a.log_type = 'device_upgrade_trigger' or a.log_type = 'device_upgrade_version' or a.log_type = 'device_upgrade_inactive_connect') then 1
    else 2 end as event_from
    ,6 as event_id
    ,1 as status
    ,cast(to_timestamp(timestamp) as bigint) as event_time
    ,'' as event_detail
    ,b.biz_type
  from bi_ods_clear.ods_clear_apollobizdata_device_upgrade_all a
  join bi_dw.dim_device_basic_info_item_df b on a.dev_id = b.dev_id
  where a.dt = '20230809' 
  and b.dt = '20230809' 
  and b.biz_type in ('299212','306804','600358')

  union all

  select 
     a.dev_id as device_id
    ,c.code
    ,a.dp_value as value
    -- ,case when c.mode = 'ro' then '1'
    --       when c.mode = 'rw' then '2'
    --  else '-1' end   as event_from
    ,1 as event_from
    ,7 as event_id
    ,1 as status
    ,coalesce(dp_time,ts) as event_time
    ,'' as event_detail
    ,b.biz_type 
  from bi_ods_clear.ods_clear_datapoint_report a 
  join (
        select dev_id,biz_type,product_id
        from bi_dw.dim_device_basic_info_item_df
        where dt = '20230809'
        and biz_type in ('299212','306804','600358')
  ) b 
  on a.dev_id = b.dev_id
  and a.dt = '20230809'
  left join (
            SELECT b.id as product_id, a.dp_id, a.code, a.name, a.mode
            FROM bi_ods.ods_smart_schema_datapoint a 
            left join bi_ods.ods_smart_product b
            on a.schema_id = b.schema_id
            and b.dt = '20230809'
            where a.dt = '20230809'
                and a.status = 1
                -- and a.selected = 1
            group by b.id, a.dp_id, a.code, a.name, a.mode
         ) c 
   on c.product_id = a.pid
   and c.dp_id = a.dp_id

  union all

  select 
    a.dev_id as device_id
    ,'' as code
    ,'' as value
    ,1 as event_from
    ,8 as event_id
    ,1 as status
    ,a.event_time as event_time
    ,a.event_detail as event_detail
    ,b.biz_type
  from (
      select
          get_json_object(jsondata,'$.gwId') as dev_id
          ,get_json_object(regexp_extract(get_json_object(jsondata,'$.reportData'), '^\\{(.+)\\}$', 1), '$.t') as event_time
          ,get_json_object(jsondata,'$.reportData') as event_detail
      from bi_ods_log.ods_log_data_analysis_hades_analyzer_data_pipline
      where dt = '20230809'
      and get_json_object(jsondata,'$.logType') = 'runstat'
      ) a 
  join bi_dw.dim_device_basic_info_item_df b on a.dev_id = b.dev_id
  and b.dt = '20230809'
  and b.biz_type in ('299212','306804','600358')

  union all
  select 
      get_json_object(a.jsondata, '$.fields.devId') as device_id
      ,'' as code
      ,'' as value
      ,-1 as event_from
      ,10 as event_id
      ,1 as status
      ,get_json_object(a.jsondata, '$.fields.gmtCreate') as event_time
      ,get_json_object(a.jsondata, '$.msgJson') as   event_detail
      ,b.biz_type 
  from bi_ods_log.ods_log_data_analysis_sigma_facade_biz_data_pipline a
  join (
          select dev_id,biz_type,product_id
          from bi_dw.dim_device_basic_info_item_df
          where dt = '20230809'
          and biz_type in ('299212','306804','600358')
      )   b
   on get_json_object(a.jsondata, '$.fields.devId') = b.dev_id
   and a.dt = '20230809'
  
  union all 
  select 
     a.gw_id as device_id
    ,'' as code
    ,'' as value
    ,1 as event_from
    ,9 as event_id
    ,1 as status
    ,a.ts as event_time
    ,a.report_data as   event_detail
    ,b.biz_type 
  from bi_ods_clear.ods_clear_hadesanalyzerdata_device_restart a
  join (
          select dev_id,biz_type,product_id
          from bi_dw.dim_device_basic_info_item_df
          where dt = '20230809'
          and biz_type in ('299212','306804','600358')
      )   b
  on a.gw_id = b.dev_id
  and a.dt = '20230809'
) t
spark-sql --master yarn --queue default --driver-memory 2g --driver-cores 2 --executor-memory 6g --executor-cores 2 --num-executors 4 --conf spark.driver.cores=2 --conf spark.eventLog.enabled=false --conf spark.sql.broadcastTimeout=600000 --conf spark.dynamicAllocation.enabled=false --conf spark.sql.adaptive.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.blacklist.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.sql.autoBroadcastJoinThreshold=1G

 

 分析原因:

因为spark.sql.adaptive.enabled=true开启自动调优,spark.sql.autoBroadcastJoinThreshold=2G

会开启autoBroadcast,spark.sql.autoBroadcastJoinThreshold=-1关闭就可以了。

如果资源充足那就需要增加driver内存和调整spark.sql.autoBroadcastJoinThreshold内存,可以根据需要开启spark.broadcast.compress=true。

在使用多表关联的时候慎重开启spark.sql.adaptive.enabled=true。

 

 

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐