python 定时器参考

import time
import json
import requests
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
import pymysql.cursors
from tableau_rest_api import TableauRestApi
from read_config import get_value,REST_API_VERSION
from log_utils import logger,logger_no_found,logger_error


my_str = '''
[
    {
        "name": "SBJ_CON_BASE_INFO_PRO",
        "details": "销售明细表",
        "title": "xiao shou ming xi biao",
        "type":"mysql",
        "schedule": "07:15:20"
    },
    {
        "name": "RM_PH_ENR_DT",
        "details": "1.1.2余额明细",
        "title": "1.1.2 yu e ming xi",
        "type":"mysql",
        "schedule": "13:15:20"
    },
   {
        "name": "DMM_INNO_EPO_ENR_DTL",
        "details": "EPO及余额明细",
        "title": "EPO ji yu e ming xi",
        "type":"mysql",
        "schedule": "12:15:20"
    },
   {
        "name": "DMM_INNO_CUST_COMPLAINT_BOARD",
        "details": "客诉监控看板模型",
        "title": "ke su jian kong kan ban",
        "type":"mysql",
        "schedule": "07:45:20"
    },
   {
        "name": "销售明细Flow",
        "title": "xiao shou ming xi flow",
        "details": "销售明细Flow",
        "type":"restApi",
        "schedule": "07:30:20"
    }
]
'''


def getRestInfo():
    REST_URL = get_value('REST_URL')
    config_dict = {}
    req = requests.get(REST_URL)
    logger.info(req.text)
    json_ = json.loads(req.text)
    for content in json_['data']:
        key = content['name']
        value = content['content']
        config_dict[key] = value
    return config_dict


def fetchServerInfo(tableName,tableTitle,data_time):
    connection = pymysql.connect(host=get_value('MYSQL_HOST'),
                                 port=int(get_value('MYSQL_PORT')),
                                 database=get_value('MYSQL_DBNAME'),
                                 user=get_value('MYSQL_USER'),
                                 password=get_value('MYSQL_PASSWORD'),
                                 cursorclass=pymysql.cursors.DictCursor,
                                 charset='utf8')

    with connection.cursor() as cursor:
        sql = '''
            select b.name,b.details,a.date_updated,a.STAT_MSG
            from bdsp_tab_task_stat a 
            inner join bdsp_tab_task b
            on a.id_bdsp_tab_task=b.id_bdsp_tab_task
            and b.name = '{}'
            and DATE(a.DATE_UPDATED) = '{}'
        '''.format(tableName,data_time)
        cursor.execute(sql)
        result1 = cursor.fetchone()
        logger.info(result1)

        if result1 is not None:
            if result1['STAT_MSG'] != 'SUCCESS' :
                logger_error(tableName, tableTitle)
            else:
                logger.info('{} -- {} -- task status :{}'.format(data_time, tableName, result1['STAT_MSG']))
        else:
            logger_no_found(tableName, tableTitle)


def just_echo():
    # logger.error('--'*20)
    msg = '{} {} {}'.format('-'*10,time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time())),'-'*10)
    logger.error(msg)


def checkMysqlStatus(**options):
    # current_date = '2021-09-13'
    current_date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
    fetchServerInfo(options["name"],options["title"],current_date)


def checkRestApiStatus(**options):
    _dict = getRestInfo()
    server_url = _dict['TABLEAU_SERVER']
    personal_access_token_name = _dict['TABLEAU_USER']
    personal_access_token_secret = _dict['TABLEAU_PASSWORD']
    tableau_site = _dict['TABLEAU_SITE']
    # personal_access_token_name = 'PH-BDSP-TABLEAU'
    # personal_access_token_secret = 'ryl7Gg6uRoWg+LbsMpYmJA==:qLZxwye6u8ofj3pdGDtZKpZrvDG3kW5A'
    # tableau_site = ''
    # server_url = 'https://phbdsp-tab-stg1.paic.com.cn'

    server = TableauRestApi(REST_API_VERSION,server_url)
    server.sign_in(personal_access_token_name=personal_access_token_name,personal_access_token_secret=personal_access_token_secret,site_name=tableau_site)
    target_flow = server.quert_flow_for_user(options["name"])
    logger.info(target_flow)

    if target_flow is not None:
        current_date = time.strftime('%Y-%m-%d', time.localtime(time.time()))
        server.get_flow_run_status(target_flow,options['title'], current_date)
    else:
        logger_no_found(options["name"],options["title"])


if __name__ == '__main__':
    scheduler = BlockingScheduler()
    _json = json.loads(my_str)
    scheduler.add_job(just_echo, 'cron', second='0')

    for idx in _json:
        logger.info(idx)
        scheduleTime = datetime.strptime(idx['schedule'], '%H:%M:%S')
        if idx['type'] == 'mysql':
            scheduler.add_job(checkMysqlStatus, 'cron', hour=scheduleTime.hour, minute=scheduleTime.minute, second=scheduleTime.second, kwargs=idx)
        else:
            scheduler.add_job(checkRestApiStatus, 'cron',hour=scheduleTime.hour, minute=scheduleTime.minute, second=scheduleTime.second, kwargs=idx)

    scheduler.start()

你可能感兴趣的