• 當前位置:首頁 > IT技術 > 移動平臺 > 正文

    016、【前程貸—簡化版,實戰 09】 用 faker庫 生成的隨機手機號碼,去注冊,看是否注冊過 。
    2021-09-05 09:04:31

    ?

    ?

    在 上一節? (015、【前程貸—簡化版,實戰 08】 用pymysql庫,封裝 handler_mysql? 操作數據庫 )? 的基礎上,增加 驗證手機號是否被注冊過?如果注冊過再繼續生成隨機手機號碼,直到得到的手機號碼是未被注冊的。

    ?

    如何實現成功注冊呢 :

    步驟1、用faker庫生成一個隨機手機號碼 ;

    步驟2、用這個手機號碼去數據庫比對,是否注冊過;如果注冊過再生成一個隨機手機號碼 ;

    步驟3、如果未被注冊過,用生成的隨機手機號碼 (如:13123456789) 替換,excel表格中的標記字符 #phone# ;

    步驟4、替換成功后,得到一個臨時字典,用這個臨時字典做為數據發送requests請求 ;

    ?

    1、項目層級結構如下 :

    ?

    2、excel表格設計如下,注意 #phone#

    ?

    ?

    3、各代碼如下,標記紅色部分為主要修改點

    a、config.ini? 代碼如下:

    # 定義日志相關的配置
    # name表示自定義日志搜集器的名字;
    # level表示日志級別;
    # file_name 表示生成的日志名字
    # show_stream_handler表示是否在控制臺輸出日志。False表示不在控制臺顯示。
    # when,表示按H(小時)、D(天)、M(分鐘)、S(秒)生成日志 ;注意字母是大寫 ;
    [log]
    name=future_logging_collector
    level=INFO
    file_name=future_loan_test.log
    show_stream_handler=False
    when=H

    b、mysql_ini.py 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 13:20
    # Project: Future_Loan_day15
    # Module:  mysql_ini.py
    
    
    # 此配置文件用來配置鏈接mysql數據庫
    # key已經和代碼關聯了,不能更改key名;
    MYSQL_INI = {
        "host": "api.lemonban.com",
        "port": 3306,
        "user": "future",
        "password": "123456",
        "database": "futureloan",
        "charset": "utf8"
    }

    c、test_1_register.py? 代碼如下(有代碼修改)

    import os
    import json
    
    import jsonpath
    import pytest
    
    from tools.handle_phone import HandlePhone
    from tools.handler_request import HandlerRequests
    from tools.handler_assert_list import HandlerAssertList
    from tools.handler_excel import HandlerExcel
    from tools.handler_logging import handler_logger
    
    # 一、準備 數據 cases
    # A、用os.path找到 xlsx 文件
    file_dir = os.path.dirname(os.path.realpath('__file__'))
    base_dir = os.path.dirname(file_dir)
    print(f'根目錄:{base_dir}')
    
    file_name = os.path.join(base_dir, 'test_datas', 'test_register_cases.xlsx')
    print(f'excel表格的路徑:{file_name}')
    
    
    # B、調用common——>excel_handler.py 中已封裝好的ExcelHandler類來操作excel表格
    # # 1、打開xlsx表格;2、根據表單名字獲取表格數據;3、讀取數據;
    excel_handler = HandlerExcel(file_name)
    excel_handler.select_sheet_by_name('register_case')
    all_cases = excel_handler.read_all_rows_data()
    
    # print(all_cases)
    # handler_logger.info(f'=====all_cases=======:{all_cases}')
    
    
    mrq = HandlerRequests()
    add_assert = HandlerAssertList()
    hp = HandlePhone()
    
    
    class TestRegister:
        # 三、參數化測試用例
        # pytest.mark.parametrize("item", cases) 中的 "item" 必須 和 def test_my_requests(item): 中的 item 名字一樣
        @pytest.mark.parametrize("item", all_cases)
        def test_my_register(self, item):
            # handler_logger.info('=====注冊接口測試=======')
            handler_logger.info(f'"#phone#"未替換的前的數據:{item["req_data"]}=======')
    
            # 1、替換占位符
            # item 的值為:一行的數據,如下:
            """
                {
                    'case_id': 'test_register_002',
                    'case_name': '注冊成功,普通用戶',
                    'method': 'POST',
                    'url': 'http://api.lemonban.com/futureloan/member/register',
                    'req_data': '{"mobile_phone": "#phone#","pwd": "12345678","reg_name":"Sky","type":1}',
                    'assert_response_value_list': '[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"OK","type":"eq"}]',
                    'actual_results': None
                }
            """
    
            # 1、用隨機生成的,未被注冊過的手機號碼,把 標記 "#phone#"的字符串,替換;
         # a、查詢數據庫,并得到一個未被注冊的手機號碼 new_phone = hp.get_phone() # handler_logger.info(f'=====new_phone:{new_phone}=======') # handler_logger.info(f'=====對比結果:{item["req_data"].find("#phone#") != -1}=======')       

         # b、判斷字符串中是否含有 #phone#,如果有把它替換成 剛隨機生成的手機號碼 if item["req_data"] and item["req_data"].find("#phone#") != -1: item['req_data'] = item['req_data'].replace("#phone#", new_phone) if item["assert_response_value_list"] and item["assert_response_value_list"].find("#phone#") != -1: item["assert_response_value_list"] = item["assert_response_value_list"].replace("#phone#", new_phone) # 2、把json格式的字符串轉換成 字典 temp_req_data_dict = json.loads(item['req_data']) handler_logger.info(f'用隨機手機號碼替換"#phone#"得到的數據:{temp_req_data_dict}=======')
    # 3、發起請求 resp = mrq.send_reuqests(item['method'], item['url'], temp_req_data_dict) print(resp.json()) # 4、斷言 add_assert.assert_response_value(item["assert_response_value_list"], resp.json())

    ?

    d、handle_phone.py? 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    
    from faker import Faker
    from tools.handler_mysql import handler_mysql
    from tools.handler_logging import handler_logger
    
    
    class HandlePhone:
        def __init__(self):
            self.fk = Faker(locale='zh-cn')
    
        # 如果系統對手機號前綴有要求的,要做前綴校驗
        def __faker_phone(self):
            phone = self.fk.phone_number()
            handler_logger.info(f'隨機生成的手機號為:{phone}')
            return phone
    
        # 獲取數據庫查詢結果
        def __select_phone(self, phone):
            sql = "select * from member where mobile_phone = '{}'".format(phone)
            select_phone_result = handler_mysql.get_db_all_data(sql=sql)
            return select_phone_result
    
        # 獲取未注冊的手機號
        # 1、生成一個隨機手機號碼
        # 2、用生成的手機號碼去數據庫查詢,如果查詢結果大于1行說明已經存在,被注冊過;
        def get_phone(self):
            while True:
                phone = self.__faker_phone()  # 隨機生成手機號
                select_phone_result = self.__select_phone(phone)  # 拿到數據庫執行結果
                if len(select_phone_result) > 0:
                    continue
                else:
                    return phone
    
    
    if __name__ == '__main__':
        cl = HandlePhone()
        result = cl.get_phone()
        print(result)

    ?

    e、handler_assert_list.py 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 16:41
    # Project: Future_Loan_day15
    # Module:  handler_assert_list.py
    
    import jsonpath
    import ast
    
    
    # 封裝 assert_list列 添加斷言
    class HandlerAssertList:
    
        def assert_response_value(self, assert_response_value_list, response_dict):
            """
            根據響應的值斷言
            :param assert_response_value_list: 從excel表格中獲取到的斷言字符串,比如:'[{"expr":"$.code","expected":0,"type":"eq"},{"expr":"$.msg","expected":"賬號已存在","type":"eq"}]'
            :param response_dict: 發起請求后得到的響應結果
            :return:
            """
            # 把字符串轉換成python列表
            # 比eval安全一點,轉換成列表。eval去掉最外層引號后還會自動計算,literal_eval僅去掉最外層引號;
            check_list = ast.literal_eval(assert_response_value_list)
            # print(check_list)
    
            # 每一個單元格所有斷言的比對結果存放在check_res
            check_res = []
            for check in check_list:
                # 通過jsonpath表達式,從響應結果中拿到實際結果
                actual = jsonpath.jsonpath(response_dict, check["expr"])
                if isinstance(actual, list):
                    actual = actual[0]
                # 與實際結果比對
                if check["type"] == "eq":
                    # print(f'
    實際比對結果:{actual == check["expected"]}')
                    check_res.append(actual == check["expected"])
    
            # print(f'所有斷言結果:{check_res}')
            # 如果 斷言列表中有False,拋出 斷言異常
            if False in check_res:
                raise AssertionError
    
    
    if __name__ == '__main__':
        # 測試代碼
        assert_response_value_list = '[{"expr":"$.code","expected":0,"type":"eq"},' 
                                     '{"expr":"$.msg","expected":"OK","type":"eq"}]'
        response_dict = {
            "code": 0,
            "msg": "OK",
            "data": {
                "id": 123660458,
                "reg_name": "Sky",
                "mobile_phone": "13321886699"
            },
            "copyright": "Copyright 檸檬班 ? 2017-2020 湖南省零檬信息技術有限公司 All Rights Reserved"
        }
    
        add_assert = HandlerAssertList()
        check_res = add_assert.assert_response_value(assert_response_value_list, response_dict)

    ?

    f、handler_conf.py? 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 17:58
    # Project: Future_Loan_day15
    # Module:  handler_conf.py
    
    from configparser import ConfigParser
    
    
    class HandlerConf(ConfigParser):
    
        def __init__(self, file_name):
            super().__init__()
            self.read(file_name, encoding='utf-8')

    ?

    g、handler_excel.py? 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 16:34
    # Project: Future_Loan_day15
    # Module:  handler_excel.py
    
    import openpyxl
    
    from tools.handler_logging import handler_logger
    
    
    # 封裝一個xlsx表格操作類
    class HandlerExcel:
        # 操作一個excel表格:
        # 第一步:打開工作簿
        # 第二步:選取表單
        # 第三步:讀取數據
        # 第四步:關閉打開的工作簿
    
        def __init__(self, xlsx_file_path: str):
            """
            傳入一個xlsx文件路徑,用load_workbook()方法加載,如果文件加載不成功,拋出異常。如果成功,打開一個工作簿。
            :param xlsx_file_path: xlsx文件路徑
            """
            try:
                self.wb = openpyxl.load_workbook(xlsx_file_path)
            except FileNotFoundError as ffe:
                # print('打開文件失敗')
                handler_logger.error(ffe)
                raise
            # 不確定打開的是哪個表單
            self.sh = None
    
        def close_workbook(self):
            """
            關閉當前打開的工作簿
            :return:
            """
            self.wb.close()
    
        def select_sheet_by_name(self, sheet_name: str):
            """
            根據傳入的工作表的名字,打開工作表。
            :param sheet_name: 作表的名字
            """
            self.sh = self.wb[f'{sheet_name}']
    
        def read_all_rows_data(self):
            """
            從選定的表單當中,第一行作為key.
            將后面的每一行數據,與第一行拼接成一個字典數據,作為一條測試用例數據。
            將所有測試用例數據,添加到一個列表當中。
           :return: 測試用例數據列表
            """
            # 獲取表單的所有行,即獲取表單的所有數據
            sheet_all_rows = list(self.sh.values)
            # 把第一行作為數據的keys
            keys = sheet_all_rows[0]
            # print(keys)
    
            # 定義 cases_list 存放測試用例
            cases_list = []
    
            # 以下代碼功能:excel表單第2行開始的每一行測試數據,與第一行的keys拼接成一個字典。
            for single_row in sheet_all_rows[1:]:
                case_dict = dict(zip(keys, single_row))
                cases_list.append(case_dict)
            return cases_list
    
    
    if __name__ == '__main__':
        eh = HandlerExcel(r'D:SkyWorkSpaceWorkSpaceAPI_testlmFuture_Loan'
                          r'Future_Loan_day16	est_datas	est_register_cases.xlsx')
        eh.select_sheet_by_name('register_case')
        print(eh.read_all_rows_data())
        eh.close_workbook()

    ?

    h、handler_logging.py 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    
    
    import logging
    # 自定義一個日志模塊
    import os
    # 導入 ConfigParser 類 ,用來操作 config.ini 文件 ;
    import time
    from configparser import ConfigParser
    from logging import Logger
    from logging import handlers
    
    
    class HandlerLogger(Logger):
    
        def __init__(self):
    
            # 一、用 ConfigParser類 來操作 config.ini 配置文件 ;
            # 實例化一個 ConfigParser ;
            conf = ConfigParser()
    
            # 1、獲取config.ini文件
            common_dir = os.path.dirname(os.path.realpath(__file__))
            base_dir = os.path.dirname(common_dir)
            config_ini_file = os.path.join(base_dir, 'conf', 'config.ini')
    
            # 2、從配置文件獲取值
            conf.read(config_ini_file, encoding='utf-8')
            logger_name = conf.get('log', 'name')
            level = conf.get('log', 'level')
            file_name = conf.get('log', 'file_name')
            show_stream_handler = conf.get('log', 'show_stream_handler')
            when = conf.get('log', 'when')
    
            # 二、設置自定義日志搜集器名字、設置日志級別;
            super().__init__(logger_name, level)
    
            # 三、定義日志輸出格式, 使用Formatter類實例化一個日志格式類;
            fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s, %(pathname)s,line=%(lineno)d'
            # fmt = '%(asctime)s, %(levelname)s, %(message)s, %(name)s,line=%(lineno)d'
            formatter = logging.Formatter(fmt)
    
            # 四A、日志默認輸出到控制臺,如果設置為False,日志將不輸出到控制臺;
            if show_stream_handler == 'True':
                stream_handler = logging.StreamHandler()
                # 設置渠道當中的日志格式
                stream_handler.setFormatter(formatter)
                # 將渠道與實例日志搜集器綁定
                self.addHandler(stream_handler)
    
            # 四B、把日志輸出到文件file
            # 首先拼接存放log的file文件
            logs_file = os.path.join(base_dir, 'logs', file_name)
            print(logs_file)
            if logs_file:
                file_handle = handlers.TimedRotatingFileHandler(filename=logs_file,
                                                                when=when,
                                                                encoding='utf-8',
                                                                interval=1,
                                                                backupCount=5)
    
                # 設置渠道當中的日志格式
                file_handle.setFormatter(formatter)
                # 將渠道與實例日志搜集器綁定
                self.addHandler(file_handle)
    
    
    # 生成一個 handler_logger 實例,在其他所有模塊中導入該模塊時,共用這一個日志搜集實例。handler_logger 類似于 全局變量
    # 日志搜集是典型的單列設計模式 (單實例模式) 。
    handler_logger = HandlerLogger()
    
    if __name__ == '__main__':
        handler_logger = HandlerLogger()
        for i in range(10):
            time.sleep(1)
            handler_logger.debug('=====debug=====')
            handler_logger.info('=====info=====')
            handler_logger.warning('=====warning=====')
            handler_logger.error('=====error=====')

    ?

    i、handler_mysql.py 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 13:18
    # Project: Future_Loan_day15
    # Module:  handler_mysql.py
    
    import pymysql
    
    from conf.mysql_ini import MYSQL_INI
    
    
    # 封裝操作Mysql數據庫類
    class HandleMysql:
        def __init__(self):
            """
            1、初始化建立連接,創建數據庫連接
            charset='utf8', 注意,不是utf-8 哦
            返回數據格式控制: cursorclass=pymysql.cursors.DictCursor  加上這個表示返回字典格式的數據;不加的話,以元組的形式返回;
            """
            self.connection = pymysql.connect(
                host=MYSQL_INI['host'],
                port=MYSQL_INI['port'],
                user=MYSQL_INI['user'],
                password=MYSQL_INI['password'],
                database=MYSQL_INI['database'],
                charset=MYSQL_INI['charset'],
                cursorclass=pymysql.cursors.DictCursor)
            # 2、創建游標
            self.cur = self.connection.cursor()
    
        # 獲取 查詢得到的行數(數量)
        def get_count(self, sql):
            count = self.cur.execute(sql)
            return count
    
        # 獲取一條數據,一般都是最前面的那條數據
        def get_db_one_data(self, sql):
            self.cur.execute(sql)
            return self.cur.fetchone()
    
        # 獲取全部數據
        def get_db_all_data(self, sql):
            self.cur.execute(sql)
            return self.cur.fetchall()
    
        # 關閉數據庫連接
        def close(self):
            self.cur.close()
            self.connection.close()
    
    
    # 使用單例模式,后續導入數據庫就只導入 handler_mysql
    handler_mysql = HandleMysql()
    
    
    if __name__ == '__main__':
    
        # 1、建立鏈接
        handler_mysql = HandleMysql()
    
        # 2、執行 sql 語句
        # 先在 sql客戶端 (Navicat Premium 12免安裝) 執行以下sql語句,查詢結果
        phone = 18837906872
        sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二
        # 執行sql語句
        results = handler_mysql.get_db_one_data(sql_str_2)
        print(results)

    ?

    j、handler_request.py? 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    import requests
    from tools.handler_logging import handler_logger
    
    
    class HandlerRequests:
    
        def __init__(self):
            """
            初始化方法,初始化請求頭;
            """
            self.headers = {"X-Lemonban-Media-Type": "lemonban.v2"}
            handler_logger.info(f'請求頭為:{self.headers}')
    
        # 方法 post/put...  json=xxx, get方法用 params=xxx
        def send_reuqests(self, method, url, req_data, token=None):
            """
            調用requests庫里面的方法去發起請求,并得到響應結果;
            :param url: 接口url
            :param method: 請求方法,get,psot
            :param req_data: 請求數據
            :param token: 如果有token,添加token
            """
            handler_logger.info(f'請求方法為:{method}')
            handler_logger.info(f'請求url為:{url}')
            handler_logger.info(f'請求數據為:{req_data}')
    
            # 如果有token,添加token
            self.__del_header(token)
    
            # 注意 request() 方法是不帶s的,requests庫是帶s的;
            if method.upper() == "GET":
                resp = requests.request(method, url, params=req_data, headers=self.headers)
                return resp
            if method.upper() == "POST":
                # 為了便于學習,簡單的認為就是用 json 格式傳;
                resp = requests.request(method, url, json=req_data, headers=self.headers)
                return resp
    
        def __del_header(self, token=None):
            """
            如果有token,添加token處理
            :param token: 如果有token,添加token處理
            """
            if token:
                self.headers["Authorization"] = f"Bearer {token}"
    
    
    if __name__ == '__main__':
        handler_requests = HandlerRequests()

    ?

    k、操作mysql.py? 代碼如下:

    # -*- coding:utf-8 -*-
    # Author:  Sky
    # Email:   2780619724@qq.com
    # Time:    2021/9/4 10:02
    # Project: Future_Loan_day15
    # Module:  操作mysql.py
    
    
    import pymysql
    
    # 1、建立連接,創建數據庫連接
    # charset='utf8', 注意,不是utf-8 哦
    # 返回數據格式控制: cursorclass=pymysql.cursors.DictCursor  加上這個表示返回字典格式的數據;不加的話,以元組的形式返回;
    connection = pymysql.connect(host='api.lemonban.com',
                                 port=3306,
                                 user='future',
                                 password='123456',
                                 database='futureloan',
                                 charset='utf8',
                                 cursorclass=pymysql.cursors.DictCursor)
    
    # 2、創建游標
    cur = connection.cursor()
    
    # 3、執行sql語句 , 返回數據
    sql_str = "select * from member where reg_name='娜娜'"   # 方式一
    
    # phone = 18837906872
    # sql_str_2 = f"select * from member where mobile_phone='{phone}'"    # 方式二
    
    affected_rows = cur.execute(sql_str)    # 返回影響的行數
    print(f'返回影響的行數:{affected_rows}')
    
    # 4、獲取查詢的結果
    first_data = cur.fetchone()  # 獲取一條數據,第一條數據;字典形式返回 ;
    all_data = cur.fetchall()  # 獲取全部數據;列表形式返回 ;
    two_data = cur.fetchmany(size=2)   # 獲取前2行數據;列表形式返回 ;
    print(f'獲取第一條數據:{first_data}')
    print(f'獲取全部數據:{all_data}')
    print(f'獲取前2行數據:{two_data}')
    
    # 5、關閉數據庫連接
    cur.close()     # 首先關閉游標
    connection.close()      # 關閉數據庫

    運行方式如下:

    ?

    執行結果如下:

    ?

    本文摘自 :https://www.cnblogs.com/

    開通會員,享受整站包年服務
    国产呦精品一区二区三区网站|久久www免费人咸|精品无码人妻一区二区|久99久热只有精品国产15|中文字幕亚洲无线码