系统频繁的启动新线程,线程执行完被销毁,如果线程不能被重复使用,这必然会使得系统的性能下降,线程池的意义就在于减少线程创建及消毁过程中损失的系统资源。
EXecutor类包含两个子类:
线程池对象的方法:
实战:操作50w数据
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
出车拦截依赖ufs,拦截账号构造
多线程执行ufs写入
"""
import requests
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
ip = 'xx'
set_ufs = 'xx'
get_ufs = 'xx'
def SetUfs(value, city_id=1):
"""
设置ufs
:param value:
:param city_id:
:return:
"""
url = ip + set_ufs
data = {'featureValues': value}
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
result = requests.post(url=url, data=data, headers=headers).json()
return result
def GetUfs(value):
"""
读取ufs
:param driver_id:
:param value:
:return:
"""
url = ip + get_ufs
data = {'features': value}
result = requests.get(url=url, params=data).json()
return result
def ReadFile(filePath):
driver_id_list = []
with open(filePath, 'r') as f:
contents = f.readlines()
for content in contents:
driver_id = content.split(',')[0]
driver_id_list.append(driver_id)
# yield driver_id_list
return driver_id_list
def run_main(driver_id_list):
"""
多线程执行
:param driver_id_list:
:return:
"""
threading_list1 = []
l1 = len(driver_id_list)//4
l2 = l1 * 2
l3 = l1 * 3
value_list1 = []
for driver_id in driver_id_list[:l1]:
#不适合字符串拼接,使用replace实现driver_id参数化
value1 = "xxx".replace("xx", driver_id)
# thread1 = threading.Thread(target=SetUfs, args=(value1,))
# threading_list1.append(thread1)
value_list1.append(value1)
value_list2 = []
for driver_id in driver_id_list[l1:l2]:
value2 = "xxx".replace("xx", driver_id)
# thread2 = threading.Thread(target=SetUfs, args=(value2,))
# threading_list1.append(thread2)
value_list2.append(value2)
value_list3 = []
for driver_id in driver_id_list[l2:l3]:
value3 = "xxx".replace("xx", driver_id)
# thread3 = threading.Thread(target=SetUfs, args=(value3,))
# threading_list1.append(thread3)
value_list3.append(value3)
value_list4 = []
for driver_id in driver_id_list[l3:]:
value4 = "xxx".replace("xx", driver_id)
# thread4 = threading.Thread(target=SetUfs, args=(value4,))
# threading_list1.append(thread4)
value_list4.append(value4)
# yield threading_list1
return value_list1, value_list2, value_list3, value_list4
if __name__ == '__main__':
filepath = 'xxx'
driver_id_list = ReadFile(filepath)
list1, list2, list3, list4 = run_main(driver_id_list)
threading_list = []
now = time.strftime('%Y-%m-%d %H:%M:%S')
print(f'start time {now}')
i = 1
with ThreadPoolExecutor(max_workers=5) as executor: # 创建一个最大容纳数量为5的线程池
for data1 in executor.map(SetUfs, list1): #内部迭代中, 每个driver_id开启一个线程
i += 1
if i%100 == 0:
print(f"result {data1}")
for data2 in executor.map(SetUfs, list2):
pass
for data3 in executor.map(SetUfs, list3):
pass
for data4 in executor.map(SetUfs, list4):
pass
print(u'全部结束', time.strftime('%Y-%m-%d %H:%M:%S'))
执行结果:
from concurrent.futures import ThreadPoolExecutor
import time
def get_html(times):
time.sleep(times)
print(f"just wait {times}")
return times
executor = ThreadPoolExecutor(max_workers=2)
task1 = executor.submit(get_html,(3))
task2 = executor.submit(get_html,(2))
#获取已经成功的task的返回
urls = [2,3,4]
# all_task = [executor.submit(get_html, (url)) for url in urls]
# for task in as_completed(all_task):
# data = task.result()
# print(f"result {data}")
#功能同上executor.submit()遍历
for data in executor.map(get_html,urls):
print(f"result {data}")
if __name__ == '__main__':
for data in executor.map(get_html, urls):
print(f"result {data}")
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- ovod.cn 版权所有 湘ICP备2023023988号-4
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务