2016-11-10 3 views
-2

Я хотел бы агрегировать (считать) в двух разных условиях и попытаться присвоить двум различным столбцам. Может ли кто-нибудь предложить мне любой простой способ?Pyspark Сумма агрегации Dataframe при разных условиях

В растворе, ниже которого я попытался отсчеты поступают так же, хотя условия фильтра различны,

num_ins_rec_cnt = F.count(col("ins_upd_flag") == "I").alias("ins_rec_cnt") 
num_upd_rec_cnt = F.count(col("ins_upd_flag") == "U").alias("upd_rec_cnt") 
delta_process_max_ld_df = cdc_all_record_sk_ld_df.agg(F.max('delta_account_sk_id').alias("max_account_sk_id"),(num_ins_rec_cnt),(num_upd_rec_cnt)).withColumn("lkp_process_name",lit(process_name)).withColumn("history_tbl_cnt",lit(base_rec_count)).withColumn("delta_tbl_cnt",lit(delta_rec_count)) 

Выход,

+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|max_account_sk_id|ins_rec_cnt|upd_rec_cnt| lkp_process_name|history_tbl_cnt|delta_tbl_cnt| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 
|   25099|  5100|  5100|amc_account_delta_ld|   19999|  20099| 
+-----------------+-----------+-----------+--------------------+---------------+-------------+ 

Но это должно было быть,

+-------+---------------+--+ 
| _c0 | ins_upd_flag | 
+-------+---------------+--+ 
| 5100 | I    | 
| 5000 | U    | 

Sample Data: 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
|delta_acct_nbr|delta_account_sk_id|delta_zip_code|delta_primary_state|delta_eff_start_date|delta_eff_end_date|  delta_load_tm|  delta_hash_key|delta_eff_flag|ins_upd_flag| 
+--------------+-------------------+--------------+-------------------+--------------------+------------------+-------------------+--------------------+--------------+------------+ 
| ID330020000|    20000|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020001|    20001|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y|   I| 
| ID330020002|    20002|   02345|     CA|   2016-11-10|  3099-12-31|2016-11-10 14:53:52|19DEDD4F9A55845E8...|    Y| 
+0

Да .. правильно .. Как я должен писать в таблицу в отдельной колонке. Мне нужен выходной фильтр t два отдельных столбца. – user3858193

ответ

-1

Я решил проблему ниже.

delta_process_max_ld_df = cdc_all_record_sk_ld_df.withColumn ('ins_upd_flag_cnt', **** F.when (cdc_all_record_sk_ld_df.ins_upd_flag == 'I', 1) .when (cdc_all_record_sk_ld_df.ins_upd_flag == 'U', 0) .otherwise (0)) .agg (F.max ('delta_account_sk_id'). Alias ​​("max_surrogate_id"), F.sum ('ins_upd_flag_cnt'). Alias ​​("insert_record_cnt"), F.count ('*'). Alias ​​("ins_upd_count")) .withColumn ("process_name", lit (имя_процесса)). withColumn ("process_run_date", lit (load_dt)). withColumn ("base_tbl_cnt", lit (base_rec_count)). withColumn ("delta_tbl_cnt", горит (delta_rec_count)) withColumn ("load_tm", lit (load_tm)) .Column ("tbl_name", lit (tgt_tbl_nm)) .Column ("load_date", lit (process_run_date)) #. select (* status_tbl_columns) delta_process_max_ld_df1 = delta_process_max_ld_df.withColu млн ("upd_record_cnt", горит (delta_process_max_ld_df.ins_upd_count - delta_process_max_ld_df.insert_record_cnt)) .select (* status_tbl_columns)