本文共 35852 字,大约阅读时间需要 119 分钟。
(1)线程的概念
线程是进程内同步执行的代码段,每个线程对应一个栈。(2)创建线程的2种方式
方式1:该方式常用(通过Thread类的args参数调用线程函数)
注意:线程函数传入的是元组类型,如果只有一个参数,需要加入逗号.#!/usr/bin/env python3
“”"
author:zhang ming yang #创建线程的第一种方式. “”"from threading import Thread
def task(arg):
print(arg)#创建2个线程对象.
t1 = Thread(target=task,args=(1,)) t2 = Thread(target=task,args=(2,))t1.start()
t2.start()print(“主线程”)
实例2:#!/usr/bin/env python3
“”"
author:zhang ming yang #创建线程的第一种方式. “”"from threading import Thread
def task(arg):
print("==================================>") print(arg)#创建2个线程对象.
for i in range(30): t = Thread(target=task,args=(i,)) #启动线程. t.start()print(“主线程”)
方式2:继承Thread类,重写run方法(同Java,但是在python当中很少用这种方式)
#!/usr/bin/env python3“”"
author:zhang ming yang #创建线程的第2种方式:继承thread类,重写run方法. “”"from threading import Thread
import timeclass MyThread(Thread):
#初始化构造方法不写也可以. def init(self,*args,**kwargs): super().init(*args,**kwargs)#重写Thread当中的run方法.def run(self): for i in range(0,5): print("%s"%(self.getName()),i)
t1 = MyThread()
t1.setName(“线程1”) t2 = MyThread() t2.setName(“线程2”)t1.start()
t2.start()for i in range(0,5):
print(i) (3)threading.local()方法介绍 (4)Python线程当中常用的一些方法 join(): 如果一个线程A调用了某个线程B对象的join() 方法,此时执行代码的线程A将会从可运行状态(RUNNABLE)转变为无限时等待状态(WAITING)状态,直到线程B执行完之后,等待它的线程A才会从无限时等待状态(WAITING)状态转变为可运行状态(RUNNABLE);
如果一个线程调用了有参数的的join方法,那么这个线程就会从可运行状态(RUNNABLE)转变为有限时等待状态(TIMED_WAITING),
该状态不同于WAITING,在达到一定时间的后它们会自动唤醒,即从有限时等待状态(TIMED_WAITING)转变为可运行状态(RUNNABLE);
threading.current_thread():当前线程对象
threading.current_thread().name:当前线程的名字
实例程序1:
#!/usr/bin/env python3
from threading import Thread
import timedef task(arg):
time.sleep(arg) print(arg)for i in range(0,5):
t = Thread(target=task,args=(i,)) t.start() t.join() #这个地方加会并行变为串行.print(">主线程==>")
实例程序2:只有其余的线程执行完毕,主线程才会执行完毕,同Java.#!/usr/bin/env python3
from threading import Thread
import timedef func(*args,**kwargs):
print(args) time.sleep(args[0])thread_list = []
for i in range(0,5):
t = Thread(target=func,args=(i,)) t.start() thread_list.append(t)#只有其余的线程都运行完毕,主线程才会运行完毕.
for r in thread_list: r.join()print(">主线程====>")
(6)守护线程
daemon:守护线程,类似于海底捞的服务员,守护线程守护非守护线程,这种线程一般用于服务类线程.如果进程中剩余的线程都是守护线程,那么该进程就结束了.
实例程序:
#!/usr/bin/env python3
“”"
author:zhang ming yang #创建线程的第一种方式. “”"from threading import Thread
import timedef func(*args,**kwargs):
time.sleep(1) print(args) print(kwargs)for i in range(0,30):
t = Thread(target=func,args=(i,)) t.setDaemon(True) #守护线程守护的是非守护线程,如果进程中剩余的线程都是守护线程,那么该进程就结束了. t.start()print(“主线程!”)
print(“主线程!”) 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
主线程! 主线程!Process finished with exit code 0
(8)线程(同步)安全的概念以及不安全导致的原因
所谓线程同步(安全)的问题就是多个线程在处理相同资源的时候,要保证共享数据的数据一致性和变化一致性;导致线程同步异常的原因共有两个:
①多个线程彼此之间处理的是相同的资源;
②多个线程彼此之间在处理相同关键步骤的时候,某个线程在这些关键的步骤没有执行完毕的时候,CPU切换到了另外一个线程去执行
这些关键的步骤,导致共享数据的一致性出现问题.
示例程序:买票异常程序
#!/usr/bin/env python3
import threading
import timetickets = 100
def saler():
while tickets >= 1: global tickets print(“当前线程:%s 正在售第:%s张票!”%(threading.current_thread().name,tickets)) tickets -= 1 time.sleep(1)t1 = threading.Thread(target=saler,args=())
t1.setName(“售票员1”) t2 = threading.Thread(target=saler,args=()) t2.setName(“售票员2”)t1.start()
t2.start()print("+++++++++++主线程+++++++++++++++")
异常运行结果:(9)线程安全的解决方案:互斥锁
线程同步(安全)的问题是通过线程锁机制来解决的,通过线程锁机制,保证这些关键的步骤在被某一个线程执行的时候,将不允许其它线程来执行这些步骤,直到该线程将这些关键的步骤执行完毕,才允许其他线程执行这些步骤.
使用lock = threading.Lock()函数创建线程锁、使用lock.acquire()方法加锁、使用lock.release()方法解锁.
实例程序1:
#!/usr/bin/env python3
import threading
import timetickets = 100
lock = threading.Lock()
def saler():
while tickets >= 1: global tickets #加锁. lock.acquire() print(“当前线程:%s 正在售第:%s张票!”%(threading.current_thread().name,tickets)) tickets -= 1 #释放锁. lock.release() time.sleep(1)t1 = threading.Thread(target=saler,args=())
t1.setName(“售票员1”) t2 = threading.Thread(target=saler,args=()) t2.setName(“售票员2”)t1.start()
t2.start()print("+++++++++++主线程+++++++++++++++")
运行结果:
pass
(10)semaphore信号量
信号量是一个计数器,用于记录资源的消耗情况,当资源消耗时递减,当资源释放时递增,可以认为信号量代表资源是否可用。semaphore是一个内置的计数器
每当调用acquire()时,内置计数器-1
每当调用release()时,内置计数器+1
计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。
互斥锁:同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,
后面的人只能等里面有人出来了才能再进去。
可以这么理解:同一时刻可以有n个线程可以使用到锁!
实例程序1:
#!/usr/bin/env python3
import threading
semaphore = threading.BoundedSemaphore(3)
semaphore.acquire() #使用资源
print(semaphore._value) semaphore.acquire() print(semaphore._value) semaphore.acquire() print(semaphore._value)semaphore.release() #释放资源.
print(semaphore._value) semaphore.release() print(semaphore._value) semaphore.release() print(semaphore._value) 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
2 1 0 1 2 3Process finished with exit code 0
实例程序2:
#!/usr/bin/env python3
import threading
import timesemaphore = threading.BoundedSemaphore(3)
def func(*args,**kwargs):
print(args,kwargs,time.ctime()) time.sleep(2)for i in range(0,10):
t = threading.Thread(target=func,args=(i,)) t.start()print(">主线程>")
运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
(0,) {} Wed Feb 5 18:13:17 2020 (1,) {} Wed Feb 5 18:13:17 2020 (2,) {} Wed Feb 5 18:13:17 2020 (3,) {} Wed Feb 5 18:13:17 2020 (4,) {} Wed Feb 5 18:13:17 2020 (5,) {} Wed Feb 5 18:13:17 2020 (6,) {} Wed Feb 5 18:13:17 2020 (7,) {} Wed Feb 5 18:13:17 2020 (8,) {} Wed Feb 5 18:13:17 2020 (9,) {} Wed Feb 5 18:13:17 2020 >主线程>Process finished with exit code 0
可以看到,程序会在很短的时间内生成10个线程来打印一句话,如果在主机执行IO密集型任务的时候再执行这种类型的程序时,计算机就有很大可能会宕机。
这时候就可以为这段程序添加一个计数器功能,来限制一个时间点内的线程数量。
优化程序:
#!/usr/bin/env python3
import threading
import time#同时有三个线程可以使用锁,进行处理,即多个人同时使用锁:信号量.
semaphore = threading.BoundedSemaphore(3)def func(*args,**kwargs):
semaphore.acquire() print(args,kwargs,time.ctime()) time.sleep(2) semaphore.release()for i in range(0,10):
t = threading.Thread(target=func,args=(i,)) t.start()print(">主线程>")
运行结果:
D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
(0,) {} Wed Feb 5 18:15:24 2020 (1,) {} Wed Feb 5 18:15:24 2020 (2,) {} Wed Feb 5 18:15:24 2020 >主线程> (3,) {} Wed Feb 5 18:15:26 2020 (4,) {} Wed Feb 5 18:15:26 2020 (5,) {} Wed Feb 5 18:15:26 2020 (7,) {} Wed Feb 5 18:15:28 2020 (6,) {} Wed Feb 5 18:15:28 2020 (8,) {} Wed Feb 5 18:15:28 2020 (9,) {} Wed Feb 5 18:15:30 2020Process finished with exit code 0
形象例子:上厕所.#!/usr/bin/env python3
“”"
author:zhang ming yang “”"from threading import BoundedSemaphore
import threadingsemaphore = BoundedSemaphore(3)
def go_wc(name):
semaphore.acquire() print("%s 正在上厕所!"%(name)) import time time.sleep(10) print(“上厕所完毕!”) semaphore.release()if name == ‘main’:
for i in range(0,5): t = threading.Thread(target=go_wc,args=(i,)) t.start()print("++++++++++主线程++++++++++++")
(11)线程其余锁介绍
a.递归锁 threading.RLock() lock = threading.Lock() 只能上一把,可以理解为进厕所自己只能上一把锁,但是别人不能进去.lock = threading.RLock() 可以上多把,可以理解为进厕所自己可以上多把锁,同时别人不能进去.
#!/usr/bin/env python3
import threading
import timetickets = 100
lock = threading.RLock()
def saler():
while tickets >= 1: global tickets # 加3把锁. lock.acquire() lock.acquire() lock.acquire() print(“当前线程:%s 正在售第:%s张票!” % (threading.current_thread().name, tickets)) tickets -= 1 print(time.ctime()) time.sleep(3) # 释放3把锁. lock.release() lock.release() lock.release()t1 = threading.Thread(target=saler, args=())
t1.setName(“售票员1”) t2 = threading.Thread(target=saler, args=()) t2.setName(“售票员2”)t1.start()
t2.start()print("+++++++++++主线程+++++++++++++++")
运行结果:
D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py:15: SyntaxWarning: name ‘tickets’ is used prior to global declaration global tickets 当前线程:售票员1 正在售第:100张票! Wed Feb 5 20:16:50 2020 +++++++++++主线程+++++++++++++++ 当前线程:售票员2 正在售第:99张票! Wed Feb 5 20:16:50 2020 当前线程:售票员2 正在售第:98张票! Wed Feb 5 20:16:53 2020 当前线程:售票员1 正在售第:97张票! Wed Feb 5 20:16:53 2020 当前线程:售票员2 正在售第:96张票! Wed Feb 5 20:16:56 2020 当前线程:售票员1 正在售第:95张票! Wed Feb 5 20:16:56 2020 当前线程:售票员2 正在售第:94张票! Wed Feb 5 20:16:59 2020 当前线程:售票员1 正在售第:93张票! Wed Feb 5 20:16:59 2020 当前线程:售票员2 正在售第:92张票! Wed Feb 5 20:17:02 2020 当前线程:售票员1 正在售第:91张票! Wed Feb 5 20:17:02 2020 当前线程:售票员1 正在售第:90张票! Wed Feb 5 20:17:05 2020 当前线程:售票员2 正在售第:89张票! Wed Feb 5 20:17:05 2020 当前线程:售票员1 正在售第:88张票! Wed Feb 5 20:17:08 2020 当前线程:售票员2 正在售第:87张票! Wed Feb 5 20:17:08 2020 当前线程:售票员1 正在售第:86张票! Wed Feb 5 20:17:11 2020 当前线程:售票员2 正在售第:85张票! Wed Feb 5 20:17:11 2020 当前线程:售票员1 正在售第:84张票! Wed Feb 5 20:17:14 2020 当前线程:售票员2 正在售第:83张票! Wed Feb 5 20:17:14 2020 当前线程:售票员1 正在售第:82张票! Wed Feb 5 20:17:17 2020 当前线程:售票员2 正在售第:81张票! Wed Feb 5 20:17:17 2020 当前线程:售票员1 正在售第:80张票! Wed Feb 5 20:17:20 2020 当前线程:售票员2 正在售第:79张票! Wed Feb 5 20:17:20 2020 当前线程:售票员2 正在售第:78张票! Wed Feb 5 20:17:23 2020 当前线程:售票员1 正在售第:77张票! Wed Feb 5 20:17:23 2020 当前线程:售票员2 正在售第:76张票! Wed Feb 5 20:17:26 2020 当前线程:售票员1 正在售第:75张票! Wed Feb 5 20:17:26 2020 当前线程:售票员2 正在售第:74张票! Wed Feb 5 20:17:29 2020 当前线程:售票员1 正在售第:73张票! Wed Feb 5 20:17:29 2020 当前线程:售票员2 正在售第:72张票! Wed Feb 5 20:17:32 2020 当前线程:售票员1 正在售第:71张票! Wed Feb 5 20:17:32 2020 当前线程:售票员1 正在售第:70张票! Wed Feb 5 20:17:35 2020 当前线程:售票员2 正在售第:69张票! Wed Feb 5 20:17:35 2020 当前线程:售票员2 正在售第:68张票! Wed Feb 5 20:17:38 2020 当前线程:售票员1 正在售第:67张票! Wed Feb 5 20:17:38 2020 当前线程:售票员2 正在售第:66张票! Wed Feb 5 20:17:41 2020 当前线程:售票员1 正在售第:65张票! Wed Feb 5 20:17:41 2020 当前线程:售票员2 正在售第:64张票! Wed Feb 5 20:17:44 2020 当前线程:售票员1 正在售第:63张票! Wed Feb 5 20:17:44 2020 当前线程:售票员2 正在售第:62张票! Wed Feb 5 20:17:47 2020 当前线程:售票员1 正在售第:61张票! Wed Feb 5 20:17:47 2020 当前线程:售票员2 正在售第:60张票! Wed Feb 5 20:17:50 2020 当前线程:售票员1 正在售第:59张票! Wed Feb 5 20:17:50 2020 当前线程:售票员1 正在售第:58张票! Wed Feb 5 20:17:53 2020 当前线程:售票员2 正在售第:57张票! Wed Feb 5 20:17:53 2020 当前线程:售票员2 正在售第:56张票! Wed Feb 5 20:17:56 2020 当前线程:售票员1 正在售第:55张票! Wed Feb 5 20:17:56 2020 当前线程:售票员2 正在售第:54张票! Wed Feb 5 20:17:59 2020 当前线程:售票员1 正在售第:53张票! Wed Feb 5 20:17:59 2020 当前线程:售票员2 正在售第:52张票! Wed Feb 5 20:18:02 2020 当前线程:售票员1 正在售第:51张票! Wed Feb 5 20:18:02 2020 当前线程:售票员1 正在售第:50张票! Wed Feb 5 20:18:05 2020 当前线程:售票员2 正在售第:49张票! Wed Feb 5 20:18:05 2020 当前线程:售票员2 正在售第:48张票! Wed Feb 5 20:18:08 2020 当前线程:售票员1 正在售第:47张票! Wed Feb 5 20:18:08 2020 当前线程:售票员1 正在售第:46张票! Wed Feb 5 20:18:11 2020 当前线程:售票员2 正在售第:45张票! Wed Feb 5 20:18:11 2020 当前线程:售票员1 正在售第:44张票! Wed Feb 5 20:18:14 2020 当前线程:售票员2 正在售第:43张票! Wed Feb 5 20:18:14 2020 当前线程:售票员2 正在售第:42张票! Wed Feb 5 20:18:17 2020 当前线程:售票员1 正在售第:41张票! Wed Feb 5 20:18:17 2020 当前线程:售票员1 正在售第:40张票! Wed Feb 5 20:18:20 2020 当前线程:售票员2 正在售第:39张票! Wed Feb 5 20:18:20 2020 当前线程:售票员1 正在售第:38张票! Wed Feb 5 20:18:23 2020 当前线程:售票员2 正在售第:37张票! Wed Feb 5 20:18:23 2020 当前线程:售票员2 正在售第:36张票! Wed Feb 5 20:18:26 2020 当前线程:售票员1 正在售第:35张票! Wed Feb 5 20:18:26 2020 当前线程:售票员2 正在售第:34张票! Wed Feb 5 20:18:29 2020 当前线程:售票员1 正在售第:33张票! Wed Feb 5 20:18:29 2020 当前线程:售票员1 正在售第:32张票! Wed Feb 5 20:18:32 2020 当前线程:售票员2 正在售第:31张票! Wed Feb 5 20:18:32 2020 当前线程:售票员2 正在售第:30张票! Wed Feb 5 20:18:35 2020 当前线程:售票员1 正在售第:29张票! Wed Feb 5 20:18:35 2020 当前线程:售票员2 正在售第:28张票! Wed Feb 5 20:18:38 2020 当前线程:售票员1 正在售第:27张票! Wed Feb 5 20:18:38 2020 当前线程:售票员2 正在售第:26张票! Wed Feb 5 20:18:41 2020 当前线程:售票员1 正在售第:25张票! Wed Feb 5 20:18:41 2020 当前线程:售票员1 正在售第:24张票! Wed Feb 5 20:18:44 2020 当前线程:售票员2 正在售第:23张票! Wed Feb 5 20:18:44 2020 当前线程:售票员2 正在售第:22张票! Wed Feb 5 20:18:47 2020 当前线程:售票员1 正在售第:21张票! Wed Feb 5 20:18:47 2020 当前线程:售票员2 正在售第:20张票! Wed Feb 5 20:18:50 2020 当前线程:售票员1 正在售第:19张票! Wed Feb 5 20:18:50 2020 当前线程:售票员2 正在售第:18张票! Wed Feb 5 20:18:53 2020 当前线程:售票员1 正在售第:17张票! Wed Feb 5 20:18:53 2020 当前线程:售票员2 正在售第:16张票! Wed Feb 5 20:18:56 2020 当前线程:售票员1 正在售第:15张票! Wed Feb 5 20:18:56 2020 当前线程:售票员1 正在售第:14张票! Wed Feb 5 20:18:59 2020 当前线程:售票员2 正在售第:13张票! Wed Feb 5 20:18:59 2020 当前线程:售票员1 正在售第:12张票! Wed Feb 5 20:19:02 2020 当前线程:售票员2 正在售第:11张票! Wed Feb 5 20:19:02 2020 当前线程:售票员1 正在售第:10张票! Wed Feb 5 20:19:05 2020 当前线程:售票员2 正在售第:9张票! Wed Feb 5 20:19:05 2020 当前线程:售票员1 正在售第:8张票! Wed Feb 5 20:19:08 2020 当前线程:售票员2 正在售第:7张票! Wed Feb 5 20:19:08 2020 当前线程:售票员2 正在售第:6张票! Wed Feb 5 20:19:11 2020 当前线程:售票员1 正在售第:5张票! Wed Feb 5 20:19:11 2020 当前线程:售票员2 正在售第:4张票! Wed Feb 5 20:19:14 2020 当前线程:售票员1 正在售第:3张票! Wed Feb 5 20:19:14 2020 当前线程:售票员2 正在售第:2张票! Wed Feb 5 20:19:17 2020 当前线程:售票员1 正在售第:1张票! Wed Feb 5 20:19:17 2020 。。。。 。。。。b.事件(event)
python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,
那么event.wait 方法时便不再阻塞。
clear:将Flag设置为False
set:将“Flag”设置为True event_obj = threading.Event()event_obj.wait() #让当前线程进入锁旗标的等待队列.
event_obj.set() #唤醒锁旗标等待队列当中的所有线程.
event_obj.clear():暂时未确定作用.
代码:
#!/usr/bin/env python3
import threading
event_obj = threading.Event()
def func(*args,**kwargs):
print(“start”) event_obj.wait() #让当前线程进入锁旗标的等待队列. print(“execute”)for i in range(0,10):
t = threading.Thread(target=func,args=(10,20)) t.start()print("==>主线程=>")
inp = input(‘请输入你的命令:’).strip() if inp == ‘true’: event_obj.set() #唤醒锁旗标等待队列当中的所有线程. 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
start start start start start start start start start start ==>主线程=> 请输入你的命令:true execute execute execute execute execute execute execute execute execute executeProcess finished with exit code 0
c.条件(Condition)
参考:https://www.cnblogs.com/wupeiqi/articles/5040827.html(12)线程池
参考博客:https://www.cnblogs.com/xiao-apple36/p/9499000.html
https://www.jb51.net/article/170571.htm
在刚开始学多进程或多线程时,我们迫不及待地基于多进程或多线程实现并发的套接字通信,然而这种实现方式的致命缺陷是:服务的开启的
进程数或线程数都会随着并发的客户端数目地增多而增多,这会对服务端主机带来巨大的压力,甚至于不堪重负而瘫痪,于是我们必须对服务
端开启的进程数或线程数加以控制,让机器在一个自己可以承受的范围内运行,这就是进程池或线程池的用途,
例如进程池,就是用来存放进程的池子,本质还是基于多进程,只不过是对开启进程的数目加上了限制.
a. 池的作用
池的功能:限制进程数或线程数.什么时候限制: 当并发的任务数量远远大于计算机所能承受的范围,即无法一次性开启过多的任务数量 我就应该考虑去限制我进程数或线程数,从
保证服务器不崩.
b. concurrent.futures
1.concurent.future模块是用来创建并行的任务,提供了更高级别的接口为了异步执行调用2.concurent.future这个模块用起来非常方便,它的接口也封装的非常简单
3.concurent.future模块既可以实现进程池,也可以实现线程池
4.模块导入进程池和线程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
p = ProcessPoolExecutor(max_works)对于进程池如果不写max_works:默认的是cpu的数目
p = ThreadPoolExecutor(max_works)对于线程池如果不写max_works:默认的是cpu的数目*5 (但是我并没有从源码看到.)
c. 基本方法
1、submit(fn, *args, **kwargs):异步提交任务2、map(func, *iterables, timeout=None, chunksize=1): 取代for循环submit的操作
3、shutdown(wait=True):相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续,源代码的内部其实就是通过join()方法来实现的:wait=False,立即返回,并不会等待池内的任务执行完毕,
但不管wait参数为何值,整个程序都会等到所有任务执行完毕, submit和map必须在shutdown之前4、result(timeout=None):取得结果
5、add_done_callback(fn):回调函数
d. 同步和异步
理解为提交任务的两种方式
同步: 提交了一个任务,必须等任务执行完了(拿到返回值),才能执行下一行代码,相当于执行任务的串行执行,类似于Kafka的同步阻塞的方式:
此时并行会变成串行!,不建议这么用,没有什么意义了!
同步代码示例:
#!/usr/bin/env python3
“”"
author:zhang ming yang “”"“”"
author:zhang ming yang “”"from concurrent.futures import ThreadPoolExecutor
import threading import os, timedef task(i):
print(“线程:%s 在运行当中…:%s” % (threading.current_thread().name, time.ctime())) time.sleep(2) return i * 2pool = ThreadPoolExecutor(4)
res = []
for i in range(0, 10): #同步提交方式,只有当前任务执行完毕之后,才能执行下一个任务,所以并行会变成串行,没有什么意义. future = pool.submit(task, i) print(future.result()) #一旦调用future.result(),相当于调用了kafka的future.get()print("++++++++++++++主线程继续执行++++++++++++++++++")
运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
线程:Thread-1 在运行当中…:Thu Feb 6 20:50:27 2020 0 线程:Thread-1 在运行当中…:Thu Feb 6 20:50:29 2020 2 线程:Thread-2 在运行当中…:Thu Feb 6 20:50:31 2020 4 线程:Thread-1 在运行当中…:Thu Feb 6 20:50:33 2020 6 线程:Thread-3 在运行当中…:Thu Feb 6 20:50:35 2020 8 线程:Thread-2 在运行当中…:Thu Feb 6 20:50:37 2020 10 线程:Thread-4 在运行当中…:Thu Feb 6 20:50:39 2020 12 线程:Thread-1 在运行当中…:Thu Feb 6 20:50:41 2020 14 线程:Thread-3 在运行当中…:Thu Feb 6 20:50:43 2020 16 线程:Thread-2 在运行当中…:Thu Feb 6 20:50:45 2020 18 ++++++++++++++主线程继续执行++++++++++++++++++Process finished with exit code 0
异步: 提交了一个任务,不要等执行完了,可以直接执行下一行代码,类似于Kafka的异步提交任务方式:这种方式也是我们推荐的!
代码示例:
#!/usr/bin/env python3
“”"
author:zhang ming yang “”"“”"
author:zhang ming yang “”"from concurrent.futures import ThreadPoolExecutor
import threading import os, timedef task(i):
print(“线程:%s 在运行当中…:%s” % (threading.current_thread().name, time.ctime())) time.sleep(2) return i * 2pool = ThreadPoolExecutor(4)
res = []
for i in range(0, 10): #异步提交方式. future = pool.submit(task, i) res.append(future)#主线程只有等待其余线程执行完毕之后,才会从无限时等待状态waiting转为可运行状态runnable
pool.shutdown(wait=True)print("++++++++++++++主线程继续执行++++++++++++++++++")
for r in res: print(r.result()) # 获取每个线程对应的执行结果. 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
线程:Thread-1 在运行当中…:Thu Feb 6 21:01:54 2020 线程:Thread-2 在运行当中…:Thu Feb 6 21:01:54 2020 线程:Thread-3 在运行当中…:Thu Feb 6 21:01:54 2020 线程:Thread-4 在运行当中…:Thu Feb 6 21:01:54 2020 线程:Thread-1 在运行当中…:Thu Feb 6 21:01:56 2020 线程:Thread-2 在运行当中…:Thu Feb 6 21:01:56 2020 线程:Thread-4 在运行当中…:Thu Feb 6 21:01:56 2020 线程:Thread-3 在运行当中…:Thu Feb 6 21:01:56 2020 线程:Thread-1 在运行当中…:Thu Feb 6 21:01:58 2020 线程:Thread-2 在运行当中…:Thu Feb 6 21:01:58 2020 ++++++++++++++主线程继续执行++++++++++++++++++ 0 2 4 6 8 10 12 14 16 18Process finished with exit code 0
e. 异步+回调函数
#!/usr/bin/env python3“”"
author:zhang ming yang “”"from concurrent.futures import ThreadPoolExecutor
import threading import timedef task(i):
print("%s: 线程:%s 在运行%s 任务." % (time.ctime(),threading.current_thread().name, i)) time.sleep(2) return i * 2def parse(future):
#处理每个线程的运行结果. print("%s: 线程结束了任务,相应的运行结果是:%s"%(threading.current_thread().name,future.result()))pool = ThreadPoolExecutor(4)
for i in range(20): #异步+回调函数. future = pool.submit(task,i) ‘’’ 给当前执行的任务绑定了一个函数,在当前任务结束的时候就会触发这个函数(称之为回调函数) 会把future对象作为参数传给函数 注:这个称为回调函数,当前任务处理结束了,就回来调parse这个函数 ‘’’ future.add_done_callback(parse)print("++++++++++++++主线程继续执行++++++++++++++++++")
运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
Thu Feb 6 21:15:31 2020: 线程:Thread-1 在运行0 任务. Thu Feb 6 21:15:31 2020: 线程:Thread-2 在运行1 任务. Thu Feb 6 21:15:31 2020: 线程:Thread-3 在运行2 任务. Thu Feb 6 21:15:31 2020: 线程:Thread-4 在运行3 任务. ++++++++++++++主线程继续执行++++++++++++++++++ Thread-1: 线程结束了任务,相应的运行结果是:0 Thread-2: 线程结束了任务,相应的运行结果是:2 Thu Feb 6 21:15:33 2020: 线程:Thread-1 在运行4 任务. Thu Feb 6 21:15:33 2020: 线程:Thread-2 在运行5 任务. Thread-4: 线程结束了任务,相应的运行结果是:6 Thread-3: 线程结束了任务,相应的运行结果是:4 Thu Feb 6 21:15:33 2020: 线程:Thread-4 在运行6 任务. Thu Feb 6 21:15:33 2020: 线程:Thread-3 在运行7 任务. Thread-2: 线程结束了任务,相应的运行结果是:10 Thread-1: 线程结束了任务,相应的运行结果是:8 Thu Feb 6 21:15:35 2020: 线程:Thread-2 在运行8 任务. Thu Feb 6 21:15:35 2020: 线程:Thread-1 在运行9 任务. Thread-4: 线程结束了任务,相应的运行结果是:12 Thu Feb 6 21:15:35 2020: 线程:Thread-4 在运行10 任务. Thread-3: 线程结束了任务,相应的运行结果是:14 Thu Feb 6 21:15:35 2020: 线程:Thread-3 在运行11 任务. Thread-1: 线程结束了任务,相应的运行结果是:18 Thu Feb 6 21:15:37 2020: 线程:Thread-1 在运行12 任务. Thread-2: 线程结束了任务,相应的运行结果是:16 Thu Feb 6 21:15:37 2020: 线程:Thread-2 在运行13 任务. Thread-3: 线程结束了任务,相应的运行结果是:22 Thu Feb 6 21:15:37 2020: 线程:Thread-3 在运行14 任务. Thread-4: 线程结束了任务,相应的运行结果是:20 Thu Feb 6 21:15:37 2020: 线程:Thread-4 在运行15 任务. Thread-2: 线程结束了任务,相应的运行结果是:26 Thread-1: 线程结束了任务,相应的运行结果是:24 Thu Feb 6 21:15:39 2020: 线程:Thread-2 在运行16 任务. Thu Feb 6 21:15:39 2020: 线程:Thread-1 在运行17 任务. Thread-4: 线程结束了任务,相应的运行结果是:30 Thread-3: 线程结束了任务,相应的运行结果是:28 Thu Feb 6 21:15:39 2020: 线程:Thread-3 在运行18 任务. Thu Feb 6 21:15:39 2020: 线程:Thread-4 在运行19 任务. Thread-1: 线程结束了任务,相应的运行结果是:34 Thread-2: 线程结束了任务,相应的运行结果是:32 Thread-3: 线程结束了任务,相应的运行结果是:36 Thread-4: 线程结束了任务,相应的运行结果是:38Process finished with exit code 0
爬虫代码示例:
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
import threading import requestsdef handle(future):
response = future.result() print(response.url,len(response.text),response.status_code)def download(url):
“”" :param url: :return: 返回链接的下载结果,response包含了下载的所有内容. “”" response = requests.get(url) return responseurl_list = [
“http://www.sogou.com.com”, “http://www.baidu.com”, “https://www.taobao.com”, ]pool = ThreadPoolExecutor(2)
for url in url_list: future = pool.submit(download,url) future.add_done_callback(handle)print("++++++++++++主线程++++++++++++++++++")
运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
++++++++++++主线程++++++++++++++++++ http://www.baidu.com/ 2381 200 https://www.taobao.com/ 133374 200 https://com.com/results?q=www.sogou 7107 200Process finished with exit code 0
f. 线程池的创建方式
思考:直接创建线程的方式肯定比线程池的速度快吗?不一定,因为线程需要考虑cpu上下文切换的问题.线程池创建的内部原理:最开始创建的时候线程池里面并没有线程,只有任务来的时候才会创建相应的线程.
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
import time import threadingdef task(*args, **kwargs):
time.sleep(2) print(args, kwargs, time.ctime(), threading.current_thread().name)pool = ThreadPoolExecutor(2)
for i in range(0, 10): # 去连接池当中获取连接. print(i) pool.submit(task, i, i + 1) “”" 相当于: t = threading.Thread(target=task,args=(i,i+1,)) t.start() “”"print("+++++++++++主线程++++++++++++")
print("+++++++++++主线程++++++++++++")运行结果:
D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
0 1 2 3 4 5 6 7 8 9 +++++++++++主线程++++++++++++ +++++++++++主线程++++++++++++ (0, 1) {} Wed Feb 5 22:00:14 2020 Thread-1 (1, 2) {} Wed Feb 5 22:00:14 2020 Thread-2 (2, 3) {} Wed Feb 5 22:00:16 2020 Thread-1 (3, 4) {} Wed Feb 5 22:00:16 2020 Thread-2 (5, 6) {} Wed Feb 5 22:00:18 2020 Thread-1 (4, 5) {} Wed Feb 5 22:00:18 2020 Thread-2 (6, 7) {} Wed Feb 5 22:00:20 2020 Thread-1 (7, 8) {} Wed Feb 5 22:00:20 2020 Thread-2 (9, 10) {} Wed Feb 5 22:00:22 2020 Thread-2 (8, 9) {} Wed Feb 5 22:00:22 2020 Thread-1Process finished with exit code 0
应用示例1:爬取网页
#!/usr/bin/env python3
from concurrent.futures import ThreadPoolExecutor
import threading import requestsdef handle(future):
response = future.result() “”" response中封装了Http请求响应的所有数据 response.url 请求的URL response.status_code 响应状态码 response.text 响应内容(字符串格式) response.content 响应内容(字节格式) “”" print(response.url,response.status_code,len(response.text),len(response.content))def download(url):
“”" :param url: :return: 返回链接的下载结果,response包含了下载的所有内容. “”" response = requests.get(url) return responseurl_list = [
“http://www.sogou.com.com”, “http://www.baidu.com”, “https://www.taobao.com”, ]pool = ThreadPoolExecutor(2)
for url in url_list: future = pool.submit(download,url) future.add_done_callback(handle)print("++++++++++++主线程++++++++++++++++++")
运行结果:
D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/Teacher.py”
++++++++++++主线程++++++++++++++++++ http://www.baidu.com/ 200 2381 2381 https://www.taobao.com/ 200 133374 135048 https://com.com/results?q=www.sogou 200 7107 7115Process finished with exit code 0
e. 主线程等待线程池的线程执行完任务之后才继续执行
#!/usr/bin/env python3“”"
author:zhang ming yang “”"from concurrent.futures import ThreadPoolExecutor
import threading import os,timedef task(i):
print(“线程:%s 在运行当中…:%s”%(threading.current_thread().name,time.ctime())) time.sleep(2) return i * 2pool = ThreadPoolExecutor(4)
res = []
for i in range(0,10): future = pool.submit(task,i) #异步提交方式. res.append(future)#主线程只有等待其余线程执行完毕之后,才会从无限时等待状态waiting转为可运行状态runnable
pool.shutdown(wait=True)print("++++++++++++++主线程继续执行++++++++++++++++++")
for r in res: print(r.result()) #获取每个线程对应的执行结果. 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
线程:Thread-1 在运行当中…:Thu Feb 6 20:31:11 2020 线程:Thread-2 在运行当中…:Thu Feb 6 20:31:11 2020 线程:Thread-3 在运行当中…:Thu Feb 6 20:31:11 2020 线程:Thread-4 在运行当中…:Thu Feb 6 20:31:11 2020 线程:Thread-1 在运行当中…:Thu Feb 6 20:31:13 2020 线程:Thread-2 在运行当中…:Thu Feb 6 20:31:13 2020 线程:Thread-3 在运行当中…:Thu Feb 6 20:31:13 2020 线程:Thread-4 在运行当中…:Thu Feb 6 20:31:13 2020 线程:Thread-3 在运行当中…:Thu Feb 6 20:31:15 2020 线程:Thread-2 在运行当中…:Thu Feb 6 20:31:15 2020 ++++++++++++++主线程继续执行++++++++++++++++++ 0 2 4 6 8 10 12 14 16 18Process finished with exit code 0
f. 综合运用示例1
MyThread模块:#!/usr/bin/env python3
“”"
author:zhang ming yang “”"from concurrent.futures import ThreadPoolExecutor
import threading import time import requestsdef download(url):
response = requests.get(url) return responsedef run(url_list):
pool = ThreadPoolExecutor(2) for item in url_list: url = item[‘url’] func = item[‘func’] #一旦拿到函数的引用,就可以调用这个函数了. #异步+回调函数. future = pool.submit(download,url) future.add_done_callback(func) 启动模块:#!/usr/bin/env python3
“”"
author:zhang ming yang “”"import MyThread
def handle(future):
response = future.result() print(response.url,len(response.text),response.status_code)url_list = [
{“url”:“http://www.sogou.com.com”,‘func’:handle}, {“url”:“http://www.baidu.com”,‘func’:handle}, {“url”:“https://www.taobao.com”,‘func’:handle} ]MyThread.run(url_list)
运行结果:#!/usr/bin/env python3
“”"
author:zhang ming yang “”"import MyThread
def handle(future):
response = future.result() print(response.url,len(response.text),response.status_code)url_list = [
{“url”:“http://www.sogou.com.com”,‘func’:handle}, {“url”:“http://www.baidu.com”,‘func’:handle}, {“url”:“https://www.taobao.com”,‘func’:handle} ]MyThread.run(url_list)
运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
http://www.baidu.com/ 2381 200 https://www.taobao.com/ 133374 200 https://com.com/results?q=www.sogou 7107 200Process finished with exit code 0
g. 综合运用示例2
1、在urls.txt文件中包含了若干个图像url,一行一个url,请使用多线程下载这些图像文件,并按url出现的顺序保存为0.jpg、1.jpg、2.jpg代码示例:
#!/usr/bin/env python3
“”"
author:zhang ming yang “”" from concurrent.futures import ThreadPoolExecutor import requests import timeurl_list = []
with open(“urls.txt”,mode=“r”) as fr: for line in fr.readlines(): if line.strip(): url_list.append(line.strip())def download(url,name):
response = requests.get(url) with open(name,mode=“wb”) as fw: fw.write(response.content)pool = ThreadPoolExecutor(2)
for index,url in enumerate(url_list): pool.submit(download,url,"%s.jpg"%(index))pool.shutdown(wait=True)
print("+++++++++++++++++++++++++")
url.txt:https://ss1.bdstatic.com/70cFvXSh_Q1YnxGkpoWK1HF6hhy/it/u=1764209685,1216799602&fm=26&gp=0.jpg
https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1581007666242&di=8f1ecfe8adb80ea8e88d080d0b867e3c&imgtype=0&src=http%3A%2F%2Fb-ssl.duitang.com%2Fuploads%2Fitem%2F201508%2F12%2F20150812214120_UekJn.thumb.700_0.jpeg https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1581604112916&di=32153f85208c92c8f65f42ec81daf17f&imgtype=0&src=http%3A%2F%2Fimg.pconline.com.cn%2Fimages%2Fupload%2Fupc%2Ftx%2Fitbbs%2F1310%2F07%2Fc45%2F27055241_1381140263580.jpg(13)MyThread综合组件应用(重点)
网友开发的:#!/usr/bin/python
import threading
from .MetaBaseRestrict import *class MyThread(BaseRestrictClass,metaclass=MetaRestrictClass):
__doc__ = '线程工具'def __init__(self, func_list=None):#所有线程函数的返回值汇总,用来判断线程是否执行异常 self.ret_flag = 0 self.func_list = func_listdef set_thread_func_list(self, func_list): """ @note: func_list是一个list,每个元素是一个dict,有func和args两个参数 func_list:[{"func":self.add_tableinfos,"args":(mart_table,)},{"func":self.add_tableinfos,"args":(mart_table,)}] """ self.func_list = func_listdef start(self): """ @note: 启动多线程执行,并阻塞到结束 """ self.ret_flag = 0 self.threads = [] for func_dict in self.func_list: if func_dict["args"]: new_arg_list = [] new_arg_list.append(func_dict["func"]) for arg in func_dict["args"]: new_arg_list.append(arg) new_arg_tuple = tuple(new_arg_list) t = threading.Thread(target=self.trace_func, args=new_arg_tuple) else: t = threading.Thread(target=self.trace_func, args=(func_dict["func"],)) self.threads.append(t) for thread_obj in self.threads: thread_obj.start() for thread_obj in self.threads: thread_obj.join()def ret_value(self): """ @note: 所有线程函数的返回值之和 """ return self.ret_flagdef trace_func(self, func, *args, **kwargs): """ @note:替代profile_func,新的跟踪线程返回值的函数,对真正执行的线程函数包一次函数,以获取返回值 """ func(*args, **kwargs) self.ret_flag += 1
组件1:创建多个线程执行任务,同时可以判断出所有的线程是否执行成功,同时主线程需要等待所有子线程运行完毕后才会继续执行.
缺点:如果开1万个任务,就会开启1万个线程,机器可能挂掉.#!/usr/bin/python
import threading
class MyThread(object):
__doc__ = '线程工具:多线程并行执行,同时判断出多个线程是否全部执行成功.'def __init__(self, func_list): """ :param func_list: @note: func_list是一个list,每个元素是一个dict,有func和args两个参数 """ #所有线程执行成功个数的汇总值,初始值是0个. self.ret_flag = 0 # 创建一个互斥锁. self.lock = threading.Lock() self.func_list = func_listdef start(self): """ @note: 启动多线程执行,并阻塞到结束 """ thread_list = [] for func_dict in self.func_list: func = func_dict["func"] args = func_dict["args"] t = threading.Thread(target=self.trace_func,args=(func,args)) t.start() thread_list.append(t) for thread_obj in thread_list: thread_obj.join()def trace_func(self, func, args): """ :param func: 用户真正的线程函数. :param args: 线程参数. :return: """ func(*args) #当前线程执行成功的话,标记数+1 self.lock.acquire() self.ret_flag += 1 self.lock.release()
应用示例1:
#!/usr/bin/env python3
from MyThread import MyThread
import traceback import time#线程函数.
def func(m): try: print(1/m) time.sleep(2) except Exception as e: #注意:这个地方必须用traceback,如果没用的话,某个线程即使执行时出现异常,返回标志依然是成功. traceback.print_exc(str(e))func_list = []
for i in range(0,10):
t = {“func”:func,“args”:(i,)} func_list.append(t)mt = MyThread(func_list)
mt.start()if (mt.ret_flag) == len(func_list):
print(“全部线程:%s 全部执行成功!”%(len(func_list))) else: print(“全部线程:%s 执行成功:%s 个,失败:%s 个.”%(len(func_list),mt.ret_flag,len(func_list)-mt.ret_flag))print("+++++++++++++主线程++++++++++++++")
print("+++++++++++++主线程++++++++++++++") print("+++++++++++++主线程++++++++++++++") print("+++++++++++++主线程++++++++++++++") 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test.py”
1.0 Traceback (most recent call last): 0.5 0.3333333333333333 0.25 0.2 0.16666666666666666 0.14285714285714285 0.125 0.1111111111111111 Exception in thread Thread-1: Traceback (most recent call last): File “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test.py”, line 12, in func print(1/m) ZeroDivisionError: division by zeroDuring handling of the above exception, another exception occurred:
Traceback (most recent call last):
File “D:\Python34\lib\threading.py”, line 911, in _bootstrap_inner self.run() File “D:\Python34\lib\threading.py”, line 859, in run self._target(*self.args, **self.kwargs) File "D:\Python Work Location\DBS_NEW@\DBS_NEW\MyThread.py", line 46, in trace_func func(*args) File "D:/Python Work Location/DBS_NEW@/DBS_NEW/test.py", line 16, in func traceback.print_exc(str(e)) File “D:\Python34\lib\traceback.py”, line 252, in print_exc print_exception(*sys.exc_info(), limit=limit, file=file, chain=chain) File “D:\Python34\lib\traceback.py”, line 169, in print_exception for line in _format_exception_iter(etype, value, tb, limit, chain): File “D:\Python34\lib\traceback.py”, line 153, in _format_exception_iter yield from _format_list_iter(_extract_tb_iter(tb, limit=limit)) File “D:\Python34\lib\traceback.py”, line 18, in _format_list_iter for filename, lineno, name, line in extracted_list: File “D:\Python34\lib\traceback.py”, line 58, in _extract_tb_or_stack_iter while curr is not None and (limit is None or n < limit): TypeError: unorderable types: int() < str()全部线程:10 执行成功:9 个,失败:1 个.
+++++++++++++主线程++++++++++++++ +++++++++++++主线程++++++++++++++ +++++++++++++主线程++++++++++++++ +++++++++++++主线程++++++++++++++Process finished with exit code 0
应用示例2:
#!/usr/bin/env python3
from MyThread import MyThread
import traceback import timeclass Obj_Thread(object):
# 线程函数.def func(self,m): try: print(1 / m) time.sleep(2) except Exception as e: # 注意:这个地方必须用traceback,如果没用的话,某个线程即使执行时出现异常,返回标志依然是成功. traceback.print_exc(str(e))def run(self): func_list = [] for i in range(0, 10): t = {"func": self.func, "args": (i,)} func_list.append(t) mt = MyThread(func_list) mt.start() if (mt.ret_flag) == len(func_list): print("全部线程:%s 全部执行成功!" % (len(func_list))) else: print("全部线程:%s 执行成功:%s 个,失败:%s 个." % (len(func_list), mt.ret_flag, len(func_list) - mt.ret_flag))
if name == ‘main’:
obj = Obj_Thread() obj.run() 运行结果:同上.组件2:通过线程池并行执行任务,同时可以判断出所有的线程是否执行成功,同时主线程需要等待所有子线程运行完毕后才会继续执行.
#!/usr/bin/pythonimport threading
from concurrent.futures import ThreadPoolExecutorclass MyThread(object):
__doc__ = '线程工具:多线程并行执行,同时判断出多个线程是否全部执行成功.'def __init__(self, func_list,thread_workers): """ :param func_list: @note: func_list是一个list,每个元素是一个dict,有func和args两个参数 :param max_workers: 线程池的工作个数. """ #所有线程执行成功个数的汇总值,初始值是0个. self.ret_flag = 0 # 自定义一个锁. self.lock = threading.Lock() # 自定义一个线程池. self.pool = ThreadPoolExecutor(thread_workers) self.func_list = func_list def start(self): """ @note: 启动多线程执行,并阻塞到结束 """ for func_dict in self.func_list: func = func_dict["func"] args = func_dict["args"] #放入线程池,开始执行任务. self.pool.submit(self.trace_func,func,args) #主线程只有等待线程池所有任务都执行完之后,才会从无限时等待状态waiting转为可运行状态runnable self.pool.shutdown(wait=True)def trace_func(self, func, args): """ :param func: 用户真正的线程函数. :param args: 线程参数. :return: """ func(*args) #当前线程执行成功的话,标记数+1,一旦某个线程执行的过程当中出现异常,下面的代码就不会执行. self.lock.acquire() self.ret_flag += 1 self.lock.release()
应用实例1:
#!/usr/bin/env python3
“”"
author:zhang ming yang “”" import time import threading from concurrent.futures import ThreadPoolExecutor import traceback from MyThread import MyThreaddef func(m):
try: print(1 / m) time.sleep(2) except Exception as e: # 注意:这个地方必须用traceback,如果没用的话,某个线程即使执行时出现异常,返回标志依然是成功. traceback.print_exc(str(e))func_list = []
for i in range(0, 10):
t = {“func”: func, “args”: (i,)} func_list.append(t)mt = MyThread(func_list,thread_workers=4)
mt.start()if (mt.ret_flag) == len(func_list):
print(“全部线程:%s 全部执行成功!” % (len(func_list))) else: print(“全部线程:%s 执行成功:%s 个,失败:%s 个.” % (len(func_list), mt.ret_flag, len(func_list) - mt.ret_flag)) 运行结果:D:\Python34\python.exe “D:/Python Work Location/DBS_NEW_@/DBS_NEW/test2.py”
1.0 0.5 0.3333333333333333 0.25 Traceback (most recent call last): 0.2 0.16666666666666666 0.14285714285714285 0.125 0.1111111111111111 全部线程:10 执行成功:9 个,失败:1 个.Process finished with exit code 0
实例2:#!/usr/bin/env python3
“”"
author:zhang ming yang “”" import time import threading from concurrent.futures import ThreadPoolExecutor import traceback from MyThread import MyThreadclass Obj_Thread(object):
# 线程函数.def func(self,m): try: print(1 / m) time.sleep(2) except Exception as e: print(str(e)) # 注意:这个地方必须用traceback,如果没用的话,某个线程即使执行时出现异常,返回标志依然是成功. traceback.print_exc(str(e))def run(self): func_list = [] for i in range(0, 10): t = {"func": self.func, "args": (i,)} func_list.append(t) mt = MyThread(func_list,thread_workers=4) mt.start() if (mt.ret_flag) == len(func_list): print("全部线程:%s 全部执行成功!" % (len(func_list))) else: print("全部线程:%s 执行成功:%s 个,失败:%s 个." % (len(func_list), mt.ret_flag, len(func_list) - mt.ret_flag))
if name == ‘main’:
obj = Obj_Thread() obj.run() 运行结果:同上。(14)GIL全局解释器锁
GIL,全局解释器锁:简单的总结下就是:Python的多线程在多核CPU上,只对于IO密集型计算产生正面效果;而当有至少有一个CPU密集型线程存在,那么多线程效率会由于GIL而大幅下降。
参考博客:
https://www.jianshu.com/p/9eb586b64bdb
https://www.cnblogs.com/cjaaron/p/9166538.html
应用场景:
IO密集型:线程 计算密集型:进程对比图:
Java、C#等语言:同一个进程当中的多个线程可以同时被多个CPU调度,可以利用多核的优势;
Python:同一个进程当中只有1个线程可以被CPU调度,不能利用多核的优势;