Python模块化开发组织代码程序示例(2)

def user_remain_payamount_byweek(curr_week, his_week):
    os.system("""/usr/bin/MySQL -hMysqlHost -P6603 -uHadoop -pMysqlPass -e "use funnyai_data; \
                delete from bi_user_remain_payamount_byweek where data_week='%s' and remain_week='%s'; \
                " """ % (his_week, curr_week))

newuser_remain_pay_data = os.popen("""source /etc/profile; \
        /usr/lib/hive-current/bin/hive -e " \
        add jar /home/hadoop/nisj/udf-jar/hadoop_udf_radixChange.jar; \
        create temporary function RadixChange as 'com.kascend.hadoop.RadixChange'; \
        with his_new_user as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
        from bi_newuser_byweek \
        where pt_week = '%s' \
        ), \
        curr_week_data as (select appsource,appkey,identifier,RadixChange(uid,16,10) uid \
        from bi_all_access_log \
        where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
        group by appsource,appkey,identifier,RadixChange(uid,16,10)), \
        curr_week_pay as (select uid,sum(amount) amount \
        from data_chushou_pay_info \
        where state=0 and \
        case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
        group by uid) \
        select b1.appkey,b1.appsource,sum(b2.amount) pay_amount from \
        (select a1.appkey,a1.appsource,a1.uid \
        from his_new_user a1 \
        inner join curr_week_data a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource \
        group by a1.appkey,a1.appsource,a1.uid) b1 \
        left join curr_week_pay b2 on b1.uid=b2.uid \
        group by b1.appkey,b1.appsource \
        ;" \
        """ % (his_week, curr_week, curr_week)).readlines();

nrpd_list = []
    for nrp_list in newuser_remain_pay_data:
        nrp = re.split('\t', nrp_list.replace('\n', ''))
        nrpd_list.append(nrp)
    for nrpd in nrpd_list:
        remain_week = curr_week
        appkey = nrpd[0]
        appsource = nrpd[1]
        pay_amount = nrpd[2]
        etl_time = time.strftime('%Y-%m-%d %X', time.localtime())

os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
        insert into bi_user_remain_payamount_byweek(data_week,appsource,appkey,remain_week,pay_amount,etl_time) \
        select '%s','%s','%s','%s','%s','%s'; \
        " """ % (his_week, appsource, appkey, remain_week, pay_amount, etl_time))


def user_remain_pay_byweek(curr_week, his_week):
    os.system("""/usr/bin/mysql -hMysqlHost -P6603 -uhadoop -pMysqlPass -e "use funnyai_data; \
                delete from bi_user_remain_pay_byweek where data_week='%s' and remain_week='%s'; \
                " """ % (his_week, curr_week))

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4dfc40b6546e197aed6111875b47551c.html