成人国产在线小视频_日韩寡妇人妻调教在线播放_色成人www永久在线观看_2018国产精品久久_亚洲欧美高清在线30p_亚洲少妇综合一区_黄色在线播放国产_亚洲另类技巧小说校园_国产主播xx日韩_a级毛片在线免费

資訊專欄INFORMATION COLUMN

python crontab類

curried / 1802人閱讀

摘要:解析配置文件中的五個(gè)數(shù)間參數(shù)分時(shí)日月周,獲取他們對(duì)應(yīng)的取值范圍將時(shí)間戳與配置中一行時(shí)間參數(shù)對(duì)比,判斷該時(shí)間戳是否在配置設(shè)定的時(shí)間范圍內(nèi)按整型時(shí)間戳獲取格式化時(shí)間分時(shí)日月周為傳入的值為時(shí)間戳整形,如經(jīng)過(guò)轉(zhuǎn)換后變成返回分時(shí)日月周從字符

#/usr/bin/env python
#-*- coding:utf-8 -*-
 
"""
1.解析 crontab 配置文件中的五個(gè)數(shù)間參數(shù)(分 時(shí) 日 月 周),獲取他們對(duì)應(yīng)的取值范圍
2.將時(shí)間戳與crontab配置中一行時(shí)間參數(shù)對(duì)比,判斷該時(shí)間戳是否在配置設(shè)定的時(shí)間范圍內(nèi)
"""
 
#$Id $
 
import re, time, sys
#from Core.FDateTime.FDateTime import  FDateTime
 
def get_struct_time(time_stamp_int):
    """
    按整型時(shí)間戳獲取格式化時(shí)間 分 時(shí) 日 月 周
    Args:
        time_stamp_int 為傳入的值為時(shí)間戳(整形),如:1332888820
        經(jīng)過(guò)localtime轉(zhuǎn)換后變成
        time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
    Return:
        list____返回 分 時(shí) 日 月 周
    """
 
    st_time = time.localtime(time_stamp_int)
    return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
 
 
def get_strptime(time_str, str_format):
    """從字符串獲取 整型時(shí)間戳
    Args:
        time_str 字符串類型的時(shí)間戳 如 "31/Jul/2013:17:46:01"
        str_format 指定 time_str 的格式 如 "%d/%b/%Y:%H:%M:%S"
    Return:
        返回10位整型(int)時(shí)間戳,如 1375146861
    """
    return int(time.mktime(time.strptime(time_str, str_format)))
 
def get_str_time(time_stamp, str_format="%Y%m%d%H%M"):
    """
    獲取時(shí)間戳,
    Args:
        time_stamp 10位整型(int)時(shí)間戳,如 1375146861
        str_format 指定返回格式,值類型為 字符串 str
    Rturn:
        返回格式 默認(rèn)為 年月日時(shí)分,如2013年7月9日1時(shí)3分 :201207090103
    """
    return time.strftime("%s" % str_format, time.localtime(time_stamp))
 
def match_cont(patten, cont):
    """
    正則匹配(精確符合的匹配)
    Args:
        patten 正則表達(dá)式
        cont____ 匹配內(nèi)容
    Return:
        True or False
    """
    res = re.match(patten, cont)
    if res:
        return True
    else:
        return False
 
def handle_num(val, ranges=(0, 100), res=list()):
    """處理純數(shù)字"""
    val = int(val)
    if val >= ranges[0] and val <= ranges[1]:
        res.append(val)
    return res
 
def handle_nlist(val, ranges=(0, 100), res=list()):
    """處理數(shù)字列表 如 1,2,3,6"""
    val_list = val.split(",")
    for tmp_val in val_list:
        tmp_val = int(tmp_val)
        if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
            res.append(tmp_val)
    return res
 
def handle_star(val, ranges=(0, 100), res=list()):
    """處理星號(hào)"""
    if val == "*":
        tmp_val = ranges[0]
        while tmp_val <= ranges[1]:
            res.append(tmp_val)
            tmp_val = tmp_val + 1
    return res
 
def handle_starnum(val, ranges=(0, 100), res=list()):
    """星號(hào)/數(shù)字 組合 如 */3"""
    tmp = val.split("/")
    val_step = int(tmp[1])
    if val_step < 1:
        return res
    val_tmp = int(tmp[1])
    while val_tmp <= ranges[1]:
        res.append(val_tmp)
        val_tmp = val_tmp + val_step
    return res
 
def handle_range(val, ranges=(0, 100), res=list()):
    """處理區(qū)間 如 8-20"""
    tmp = val.split("-")
    range1 = int(tmp[0])
    range2 = int(tmp[1])
    tmp_val = range1
    if range1 < 0:
        return res
    while tmp_val <= range2 and tmp_val <= ranges[1]:
        res.append(tmp_val)
        tmp_val = tmp_val + 1
    return res
 
def handle_rangedv(val, ranges=(0, 100), res=list()):
    """處理區(qū)間/步長(zhǎng) 組合 如 8-20/3 """
    tmp = val.split("/")
    range2 = tmp[0].split("-")
    val_start = int(range2[0])
    val_end = int(range2[1])
    val_step = int(tmp[1])
    if (val_step < 1) or (val_start < 0):
        return res
    val_tmp = val_start
    while val_tmp <= val_end and val_tmp <= ranges[1]:
        res.append(val_tmp)
        val_tmp = val_tmp + val_step
    return res
 
def parse_conf(conf, ranges=(0, 100), res=list()):
    """解析crontab 五個(gè)時(shí)間參數(shù)中的任意一個(gè)"""
    #去除空格,再拆分
    conf = conf.strip(" ").strip(" ")
    conf_list = conf.split(",")
    other_conf = []
    number_conf = []
    for conf_val in conf_list:
        if match_cont(PATTEN["number"], conf_val):
            #記錄拆分后的純數(shù)字參數(shù)
            number_conf.append(conf_val)
        else:
            #記錄拆分后純數(shù)字以外的參數(shù),如通配符 * , 區(qū)間 0-8, 及 0-8/3 之類
            other_conf.append(conf_val)
    if other_conf:
        #處理純數(shù)字外各種參數(shù)
        for conf_val in other_conf:
            for key, ptn in PATTEN.items():
                if match_cont(ptn, conf_val):
                    res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
    if number_conf:
        if len(number_conf) > 1 or other_conf:
            #純數(shù)字多于1,或純數(shù)字與其它參數(shù)共存,則數(shù)字作為時(shí)間列表
            res = handle_nlist(val=",".join(number_conf), ranges=ranges, res=res)
        else:
            #只有一個(gè)純數(shù)字存在,則數(shù)字為時(shí)間 間隔
            res = handle_num(val=number_conf[0], ranges=ranges, res=res)
    return res
 
def parse_crontab_time(conf_string):
    """
    解析crontab時(shí)間配置參數(shù)
    Args:
        conf_string   配置內(nèi)容(共五個(gè)值:分 時(shí) 日 月 周)
                      取值范圍 分鐘:0-59 小時(shí):1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)
    Return:
    crontab_range      list格式,分 時(shí) 日 月 周 五個(gè)傳入?yún)?shù)分別對(duì)應(yīng)的取值范圍
    """
    time_limit    = ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
    crontab_range = []
    clist = []
    conf_length = 5
    tmp_list = conf_string.split(" ")
    for val in tmp_list:
        if len(clist) == conf_length:
            break
        if val:
            clist.append(val)
 
    if len(clist) != conf_length:
        return -1, "config error whith [%s]" % conf_string
    cindex = 0
    for conf in clist:
        res_conf = []
        res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
        if not res_conf:
            return -1, "config error whith [%s]" % conf_string
        crontab_range.append(res_conf)
        cindex = cindex + 1
    return 0, crontab_range
 
def time_match_crontab(crontab_time, time_struct):
    """
    將時(shí)間戳與crontab配置中一行時(shí)間參數(shù)對(duì)比,判斷該時(shí)間戳是否在配置設(shè)定的時(shí)間范圍內(nèi)
    Args:
        crontab_time____crontab配置中的五個(gè)時(shí)間(分 時(shí) 日 月 周)參數(shù)對(duì)應(yīng)時(shí)間取值范圍
        time_struct____ 某個(gè)整型時(shí)間戳,如:1375027200 對(duì)應(yīng)的 分 時(shí) 日 月 周
    Return:
    tuple 狀態(tài)碼, 狀態(tài)描述
    """
    cindex = 0
    for val in time_struct:
        if val not in crontab_time[cindex]:
            return 0, False
        cindex = cindex + 1
    return 0, True
 
def close_to_cron(crontab_time, time_struct):
    """coron的指定范圍(crontab_time)中 最接近 指定時(shí)間 time_struct 的值"""
    close_time = time_struct
    cindex = 0
    for val_struct in time_struct:
        offset_min = val_struct
        val_close = val_struct
        for val_cron in crontab_time[cindex]:
            offset_tmp = val_struct - val_cron
            if offset_tmp > 0 and offset_tmp < offset_min:
                val_close = val_struct
                offset_min = offset_tmp
        close_time[cindex] = val_close
        cindex = cindex + 1
    return close_time
 
def cron_time_list(
        cron_time,
        year_num=int(get_str_time(time.time(), "%Y")),
        limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),
        limit_end=get_str_time(time.time() + 86400, "%Y%m%d%H%M")
    ):
    #print "
from ", limit_start , " to " ,limit_end
    """
    獲取crontab時(shí)間配置參數(shù)取值范圍內(nèi)的所有時(shí)間點(diǎn) 的 時(shí)間戳
    Args:
        cron_time 符合crontab配置指定的所有時(shí)間點(diǎn)
        year_num____指定在哪一年內(nèi) 獲取
        limit_start 開(kāi)始時(shí)間
    Rturn:
        List   所有時(shí)間點(diǎn)組成的列表(年月日時(shí)分 組成的時(shí)間,如2013年7月29日18時(shí)56分:201307291856)
    """
    #按小時(shí) 和 分鐘組裝
    hour_minute = []
    for minute in cron_time[0]:
        minute = str(minute)
        if len(minute) < 2:
            minute = "0%s" % minute
        for hour in cron_time[1]:
            hour = str(hour)
            if len(hour) < 2:
                hour = "0%s" % hour
            hour_minute.append("%s%s" % (hour, minute))
    #按天 和 小時(shí)組裝
    day_hm = []
    for day in cron_time[2]:
        day = str(day)
        if len(day) < 2:
            day = "0%s" % day
        for hour_mnt in hour_minute:
            day_hm.append("%s%s" % (day, hour_mnt))
    #按月 和 天組裝
    month_dhm = []
    #只有30天的月份
    month_short = ["02", "04", "06", "09", "11"]
    for month in cron_time[3]:
        month = str(month)
        if len(month) < 2:
            month = "0%s" % month
        for day_hm_s in day_hm:
            if month == "02":
                if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):
                    #閏年2月份有29天
                    if int(day_hm_s[:2]) > 29:
                        continue
                else:
                    #其它2月份有28天
                    if int(day_hm_s[:2]) > 28:
                        continue
            if month in month_short:
                if int(day_hm_s[:2]) > 30:
                    continue
            month_dhm.append("%s%s" % (month, day_hm_s))
    #按年 和 月組裝
    len_start = len(limit_start)
    len_end = len(limit_end)
    month_dhm_limit = []
    for month_dhm_s in month_dhm:
        time_ymdhm = "%s%s" % (str(year_num), month_dhm_s)
        #開(kāi)始時(shí)間結(jié)束時(shí)間以外的排除
        if (int(time_ymdhm[:len_start]) < int(limit_start)) or 
         (int(time_ymdhm[:len_end]) > int(limit_end)):
            continue
        month_dhm_limit.append(time_ymdhm)
    if len(cron_time[4]) < 7:
        #按不在每周指定時(shí)間的排除
        month_dhm_week = []
        for time_minute in month_dhm_limit:
            str_time = time.strptime(time_minute, "%Y%m%d%H%M%S")
            if str_time.tm_wday in cron_time[4]:
                month_dhm_week.append(time_minute)
        return month_dhm_week
    return month_dhm_limit
 
 
#crontab時(shí)間參數(shù)各種寫法 的 正則匹配
PATTEN = {
    #純數(shù)字
    "number":"^[0-9]+$",
    #數(shù)字列表,如 1,2,3,6
    "num_list":"^[0-9]+([,][0-9]+)+$",
    #星號(hào) *
    "star":"^*$",
    #星號(hào)/數(shù)字 組合,如 */3
    "star_num":"^*/[0-9]+$",
    #區(qū)間 如 8-20
    "range":"^[0-9]+[-][0-9]+$",
    #區(qū)間/步長(zhǎng) 組合 如 8-20/3
    "range_div":"^[0-9]+[-][0-9]+[/][0-9]+$"
    #區(qū)間/步長(zhǎng) 列表 組合,如 8-20/3,21,22,34
    #"range_div_list":"^([0-9]+[-][0-9]+[/][0-9]+)([,][0-9]+)+$"
    }
#各正則對(duì)應(yīng)的處理方法
PATTEN_HANDLER = {
    "number":handle_num,
    "num_list":handle_nlist,
    "star":handle_star,
    "star_num":handle_starnum,
    "range":handle_range,
    "range_div":handle_rangedv
}
 
 
def  isdo(strs,tips=None):
    """
    判斷是否匹配成功!
    """
    try:
        tips = tips==None and "文件名稱格式錯(cuò)誤:job_月-周-天-時(shí)-分_文件名.txt" or tips
        timer = strs.replace("@","*").replace("%","/").split("_")[1]
        month,week,day,hour,mins = timer.split("-")
        conf_string = mins+" "+hour+" "+day+" "+month+" "+week
        res, desc = parse_crontab_time(conf_string)
        if res == 0:
            cron_time = desc
        else:
            return  False
 
        now  =FDateTime.now()
        now =  FDateTime.datetostring(now, "%Y%m%d%H%M00")
 
        time_stamp = FDateTime.strtotime(now, "%Y%m%d%H%M00")
 
        #time_stamp = int(time.time())
        #解析 時(shí)間戳對(duì)應(yīng)的 分 時(shí) 日 月 周
        time_struct = get_struct_time(time_stamp)
        match_res = time_match_crontab(cron_time, time_struct)
        return match_res[1]
    except:
        print tips
        return False
 
def main():
    """測(cè)試用實(shí)例"""
    #crontab配置中一行時(shí)間參數(shù)
    conf_string = "*/10 * * * * (cd /opt/pythonpm/devpapps; /usr/local/bin/python2.5 data_test.py>>output_error.txt)"
    #conf_string = "*/10 * * * *"
    #時(shí)間戳
    time_stamp = int(time.time())
 
    #解析crontab時(shí)間配置參數(shù) 分 時(shí) 日 月 周 各個(gè)取值范圍
    res, desc = parse_crontab_time(conf_string)
 
    if res == 0:
        cron_time = desc
    else:
        print desc
        sys, exit(-1)
 
    print "
config:", conf_string
    print "
parse result(range for crontab):"
 
    print " minute:", cron_time[0]
    print " hour: ", cron_time[1]
    print " day: ", cron_time[2]
    print " month: ", cron_time[3]
    print " week day:", cron_time[4]
 
    #解析 時(shí)間戳對(duì)應(yīng)的 分 時(shí) 日 月 周
    time_struct = get_struct_time(time_stamp)
    print "
struct time(minute hour day month week) for %d :" % 
         time_stamp, time_struct
 
    #將時(shí)間戳與crontab配置中一行時(shí)間參數(shù)對(duì)比,判斷該時(shí)間戳是否在配置設(shè)定的時(shí)間范圍內(nèi)
    match_res = time_match_crontab(cron_time, time_struct)
    print "
matching result:", match_res
 
    #crontab配置設(shè)定范圍中最近接近時(shí)指定間戳的一組時(shí)間
    most_close = close_to_cron(cron_time, time_struct)
    print "
in range of crontab time which is most colse to struct ", most_close
 
    time_list = cron_time_list(cron_time)
    print "

 %d times need to tart-up:
" % len(time_list)
    print time_list[:10], "..."
 
 
if __name__ == "__main__":
    #請(qǐng)看 使用實(shí)例
    main()
    #strs = "job_@-@-@-@-@_test02.txt.sh"
    #print isdo(strs)
 
    #main()0")

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://systransis.cn/yun/38001.html

相關(guān)文章

  • Django定時(shí)任務(wù)Django-crontab的使用方法介紹

      Python中的功能還是相當(dāng)?shù)亩嗟?,比如,?huì)涉及到使用Django定時(shí)任務(wù),在工作當(dāng)中,它的用途是比較的多的,其中,測(cè)試工程師用到這種的功能是比較多,所以我們要去進(jìn)行詳細(xì)的了解下,下面就給大家詳細(xì)的解答下?! ≡谑褂玫膁jango做測(cè)試平臺(tái)時(shí),,多多少少都會(huì)遇到需要定時(shí)任務(wù)的功能,比如定時(shí)執(zhí)行任務(wù),檢查訂單之類的??赡苁且欢螘r(shí)間,比如每隔10分鐘執(zhí)行一次,也可能是定點(diǎn)時(shí)間,比如14:00執(zhí)行,...

    89542767 評(píng)論0 收藏0
  • django開(kāi)發(fā)-定時(shí)任務(wù)的使用

    摘要:今天介紹在中使用定時(shí)任務(wù)的兩種方式。添加并啟動(dòng)定時(shí)任務(wù)其它命令顯示當(dāng)前的定時(shí)任務(wù)刪除所有定時(shí)任務(wù)今天的定時(shí)任務(wù)就說(shuō)到這里,有錯(cuò)誤之處,歡迎交流指正 今天介紹在django中使用定時(shí)任務(wù)的兩種方式。 方式一: APScheduler1)安裝: pip install apscheduler 2)使用: from apscheduler.scheduler import Scheduler...

    wean 評(píng)論0 收藏0
  • [新手開(kāi)源] 爬取韓寒“一個(gè)”文章且自動(dòng)郵件發(fā)送功能

    摘要:源碼地址準(zhǔn)備一臺(tái)云服務(wù)器寫好的腳本效果因?yàn)楝F(xiàn)在一個(gè)的客戶端啟動(dòng)越來(lái)越慢,而且很多自己不感興趣的東西我只是想看看文章,所以就寫了這個(gè)小爬蟲。因?yàn)橐粋€(gè)是每天點(diǎn)會(huì)更新,所以自己的服務(wù)器要做一個(gè)定時(shí)服務(wù),下自帶了定時(shí)任務(wù)。 源碼地址:https://github.com/xcc3641/pySendOneToEmail 準(zhǔn)備 一臺(tái)云服務(wù)器 寫好的Python腳本 效果 因?yàn)楝F(xiàn)在一個(gè)的And...

    zhkai 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<