Python模块化开发组织代码程序示例

样例包含三部分代码,周的处理函数部分、业务数据处理部分及多线程跑批调度处理部分。

代码按功能分类存放,有助于使代码更清晰,通过from...import的方式,使代码重复使用。

另外,多线程的调用部分,有效处理了程序先后依赖及多程序串并行跑批问题,为以后相似问题的处理,提供了借鉴。

1、周处理函数

/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/WeekCalc.py

# -*- coding=utf-8 -*-
import warnings
import datetime

warnings.filterwarnings("ignore")

def getNowYearWeek():
    # 当前时间年第几周的计算
    timenow = datetime.datetime.now() - datetime.timedelta(days=7)
    NowYearWeek = timenow.isocalendar()
    return str(NowYearWeek[0])+"#"+str(NowYearWeek[1])

def dateRange(beginDate, endDate):
    dates = []
    dt = datetime.datetime.strptime(beginDate, "%Y-%m-%d")
    date = beginDate[:]
    while date <= endDate:
        dates.append(date)
        dt = dt + datetime.timedelta(1)
        date = dt.strftime("%Y-%m-%d")
    return dates

def weekRang(beginDate, endDate):
    week = set()
    for date in dateRange(beginDate, endDate):
        week.add(datetime.date(int(date[0:4]), int(date[5:7]), int(date[8:10])).isocalendar()[0:2])

wk_l = []
    for wl in sorted(list(week)):
        wk_l.append(str(wl[0])+'#'+str(wl[1]))
    return wk_l

def currWeekList(his_week):
    last_wk = datetime.datetime.now() - datetime.timedelta(days=7)
    end_day = str(last_wk)[0:10]
    curr_week_list = []
    for week in weekRang('2015-07-01', end_day):
        if (int(week[0:4]) == int(his_week[0:4]) and int(week[5:]) >= int(his_week[5:])) or (int(week[0:4]) > int(his_week[0:4])):
            curr_week_list.append(week)
    return curr_week_list

def hisRunWeekList(his_week):
    batch_week_list = []
    for curr_week in currWeekList(his_week):
        if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
            batch_week_list.append(([curr_week, his_week],None))
    return batch_week_list

def RuningWeekList():
    curr_week = getNowYearWeek()
    batch_week_list = []
    for his_week in currWeekList('2015#27'):
            if (int(his_week[0:4]) == int(curr_week[0:4]) and int(his_week[5:]) <= int(curr_week[5:])) or (int(his_week[0:4]) < int(curr_week[0:4])):
                batch_week_list.append(([curr_week, his_week],None))
    return batch_week_list

def getWeekFristday(weekflag):
    yearnum = weekflag[0:4]  # 取到年份
    weeknum = weekflag[5:7]  # 取到周
    stryearstart = yearnum + '0101'  # 当年第一天
    yearstart = datetime.datetime.strptime(stryearstart, '%Y%m%d')  # 格式化为日期格式
    yearstartcalendarmsg = yearstart.isocalendar()  # 当年第一天的周信息
    yearstartweekday = yearstartcalendarmsg[2]
    yearstartyear = yearstartcalendarmsg[0]
    if yearstartyear < int(yearnum):
        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 1) * 7
    else:
        daydelat = (8 - int(yearstartweekday)) + (int(weeknum) - 2) * 7

week1day = (yearstart + datetime.timedelta(days=daydelat)).date()
    return week1day

# Batch Test
# his_week_list = ['2015#46', '2015#45', '2016#2']
# batch_week_list = []
# for his_week in his_week_list:
#    batch_week_list.extend(hisRunWeekList(his_week))
# print batch_week_list
# print getWeekFristday('2016#11')
# his_week = '2016#11'
# print currWeekList(his_week)

# print getNowYearWeek()

2、业务处理部分
/Users/nisj/PycharmProjects/EsDataProc/Remain_Data_Proc/Hive_remain_byWeek_proc.py

# -*- coding=utf-8 -*-
import time
import os
import re
from WeekCalc import *

warnings.filterwarnings("ignore")


def newuser_byweek_proc(batch_week):
    week1day = getWeekFristday(batch_week)
    os.system("""/usr/lib/hive-current/bin/hive -e " \
    alter table bi_newuser_byweek drop if exists partition(pt_week='%s'); \
    alter table bi_newuser_byweek add partition(pt_week='%s'); \
    insert into table bi_newuser_byweek partition (pt_week='%s') \
    select a1.appsource,a1.appkey,a1.identifier,a1.uid from ( \
    select appsource,appkey,identifier,uid \
    from bi_all_access_log \
    where case when weekofyear(pt_day)>=52 and month(pt_day)=1 then concat(year(pt_day)-1,'#',weekofyear(pt_day)) else concat(year(pt_day),'#',weekofyear(pt_day)) end = '%s' \
    group by appsource,appkey,identifier,uid) a1 \
    left join \
    (select appsource,appkey,identifier,uid \
    from bi_all_access_log \
    where pt_day < '%s' ) a2 on a1.appkey=a2.appkey and a1.identifier=a2.identifier and a1.appsource=a2.appsource and a1.uid=a2.uid \
    where a2.identifier is null \
    ;" \
    """ % (batch_week, batch_week, batch_week, batch_week, week1day));

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/4dfc40b6546e197aed6111875b47551c.html