目錄
- 1. 前言
- 2. Python 實現文件上傳
- 2-1獲取文件信息及切片數目
- 2-2切片及分段上傳
- 2-3合并文件
- 2-4文件路徑參數化
- 3. Jmeter 并發執行
- 4. 最后
本篇文章以文件上傳為例,聊聊 Jmeter 并發執行 Python 腳本的完整流程
1. 前言
大家好,我是安果!
最近有小伙伴后臺給我留言,說自己用 Django 寫了一個大文件上傳的 Api 接口,現在想本地檢驗一下接口并發的穩定性,問我有沒有好的方案
本篇文章以文件上傳為例,聊聊Jmeter 并發執行 Python 腳本的完整流程
2. Python 實現文件上傳
大文件上傳包含 3 個步驟,分別是:
- 獲取文件信息及切片數目
- 分段切片,并上傳- API
- 文件合并- API
- 文件路徑參數化
2-1獲取文件信息及切片數目
首先,獲取文件的大小
然后,利用預設的切片大小獲取分段總數
最后,獲取文件名及 md5 值
import os
import math
import hashlib
def get_file_md5(self, file_path):
"""獲取文件的md5值"""
with open(file_path, 'rb') as f:
data = f.read()
return hashlib.md5(data).hexdigest()
def get_filename(self, filepath):
"""獲取文件原始名稱"""
# 文件名帶后綴
filename_with_suffix = os.path.basename(filepath)
# 文件名
filename = filename_with_suffix.split('.')[0]
# 后綴名
suffix = filename_with_suffix.split('.')[-1]
return filename_with_suffix, filename, suffix
def get_chunk_info(self, file_path):
"""獲取分段信息"""
# 獲取文件總大小(字節)
file_total_size = os.path.getsize(file_path)
print(file_total_size)
# 分段總數
total_chunks_num = math.ceil(file_total_size / self.chunk_size)
# 文件名(帶后綴)
filename = self.get_filename(file_path)[0]
# 文件的md5值
file_md5 = self.get_file_md5(file_path)
return file_total_size, total_chunks_num, filename, file_md5
2-2切片及分段上傳
利用分段總數和分段大小,對文件進行切片,調用分段文件上傳接口
import requests
def do_chunk_and_upload(self, file_path):
"""將文件分段處理,并上傳"""
file_total_size, total_chunks_num, filename, file_md5 = self.get_chunk_info(file_path)
# 遍歷
for index in range(total_chunks_num):
print('第{}次文件上傳'.format(index + 1))
if index + 1 == total_chunks_num:
partSize = file_total_size % chunk_size
else:
partSize = chunk_size
# 文件偏移量
offset = index * chunk_size
# 生成分片id,從1開始
chunk_id = index + 1
print('開始準備上傳文件')
print("分片id:", chunk_id, "文件偏移量:", offset, ",當前分片大小:", partSize, )
# 分段上傳文件
self.__upload(offset, chunk_id, file_path, file_md5, filename, partSize, total_chunks_num)
def __upload(self, offset, chunk_id, file_path, file_md5, filename, partSize, total):
"""分次上傳文件"""
url = 'http://**/file/brust/upload'
params = {'chunk': chunk_id,
'fileMD5': file_md5,
'fileName': filename,
'partSize': partSize,
'total': total
}
# 根據文件路徑及偏移量,讀取文件二進制數據
current_file = open(file_path, 'rb')
current_file.seek(offset)
files = {'file': current_file.read(partSize)}
resp = requests.post(url, params=params, files=files).text
print(resp)
2-3合并文件
最后調用合并文件的接口,將分段小文件合成大文件
def merge_file(self, filepath):
"""合并"""
url = 'http://**/file/brust/merge'
file_total_size, total_chunks_num, filename, file_md5 = self.get_chunk_info(filepath)
payload = json.dumps(
{
"fileMD5": file_md5,
"chunkTotal": total_chunks_num,
"fileName": filename
}
)
print(payload)
headers = {
"Content-Type": "application/json"
}
resp = requests.post(url, headers=headers, data=payload).text
print(resp)
2-4文件路徑參數化
為了并發執行,將文件上傳路徑參數化
# fileupload.py
...
if __name__ == '__main__':
filepath = sys.argv[1]
# 每一段切片的大小(MB)
chunk_size = 2 * 1024 * 1024
fileApi = FileApi(chunk_size)
# 分段上傳
fileApi.do_chunk_and_upload(filepath)
# 合并
fileApi.merge_file(filepath)
3. Jmeter 并發執行
在使用 Jmeter 創建并發流程前,我們需要編寫批處理腳本
其中,執行批處理腳本時,需要跟上文件路徑一起執行
# cmd.bat
@echo off
set filepath=%1
python C:\Users\xingag\Desktop\rpc_demo\fileupload.py %*
然后,在本地新建一個 CSV 文件,寫入多個文件路徑
# 準備多個文件路徑(csv)
C:\\Users\\xingag\\Desktop\\charles-proxy-4.6.1-win64.msi
C:\\Users\\xingag\\Desktop\\V2.0.pdf
C:\\Users\\xingag\\Desktop\\HBuilder1.zip
C:\\Users\\xingag\\Desktop\\HBuilder2.zip
接著,就可以使用 Jmeter 創建并發流程了
完整步驟如下:
這里線程組數目與上面文件數目保持一致即可
同步定時器中的「模擬用戶組的數量」和上面參數數量保持一致
指向上面準備的 csv 數據文件,設置文件格式為 UTF-8,變量名稱設置為file_path,最后將線程共享模式設置為「當前線程組」
選擇上面創建的批處理文件,命令行參數設置為「${file_path}」
4. 最后
運行上面創建的 Jmeter 并發流程,在結果數中可以查看并發上傳文件的結果
當然,我們可以增加并發數量去模擬真實的使用場景,只需要修改 CSV 數據源及 Jmeter 參數即可
到此這篇關于Jmeter并發執行 Python 腳本的問題詳解的文章就介紹到這了,更多相關Jmeter并發執行 Python 腳本內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Jmeter并發執行Python 腳本的完整流程
- python軟件測試Jmeter性能測試JDBC Request(結合數據庫)的使用詳解
- Jmeter調用Python腳本實現參數互相傳遞的實現
- 基于Python組裝jmx并調用JMeter實現壓力測試
- python利用JMeter測試Tornado的多線程
- python Django編寫接口并用Jmeter測試的方法