线程是cpu计算的最小单元。
对于Python来说他的进程和线程和其他语言有差异,是有GIL锁。
GIL锁保证一个进程中同一时刻只有一个线程被cpu调度。
GIL锁,全局解释器锁。用于限制一个进程中同一时刻只有一个线程被cpu调度。
扩展:默认GIL锁在执行100个cpu指令(过期时间)。
查看GIL切换的指令个数
import sys v1 = sys。getcheckinterval() print(v1)
一、通过threading.Thread类创建线程
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() print('主线程')
from threading import Thread import time class Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print('%s say hello' % self.name) if __name__ == '__main__': t = Sayhi('nick') t.start() print('主线程')
from threading import Thread from multiprocessing import Process import os def work(): print('hello',os.getpid()) if __name__ == '__main__': # part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样 t1=Thread(target=work) t2=Thread(target=work) t1.start() t2.start() print('主线程/主进程pid',os.getpid()) # part2:开多个进程,每个进程都有不同的pid p1=Process(target=work) p2=Process(target=work) p1.start() p2.start() print('主线程/主进程pid',os.getpid())
from threading import Thread from multiprocessing import Process import os def work(): print('hello') if __name__ == '__main__': # 在主进程下开启线程 t=thread(target=work) t.start() print('主线程/主进程') ''' 打印结果: hello 主线程/主进程 ''' # 在主进程下开启子进程 t=Process(target=work) t.start() print('主线程/主进程') ''' 打印结果: 主线程/主进程 hello '''
from threading import Thread from multiprocessing import Process import os def work(): global n n=0 if __name__ == '__main__': # n=100 # p=Process(target=work) # p.start() # p.join() # print('主',n) # 毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100 n=1 t=Thread(target=work) t.start() t.join() print('主',n) # 查看结果为0,因为同一进程内的线程之间共享进程内的数据
Thread实例对象的方法:
isAlive()
:返回线程是否活动的。getName()
:返回线程名。setName()
:设置线程名。threading模块提供的一些方法:
threading.currentThread()
:返回当前的线程变量。threading.enumerate()
:返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。threading.activeCount()
:返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。from threading import Thread import threading from multiprocessing import Process import os def work(): import time time.sleep(3) print(threading.current_thread().getName()) if __name__ == '__main__': # 在主进程下开启线程 t=Thread(target=work) t.start() print(threading.current_thread().getName()) print(threading.current_thread()) # 主线程 print(threading.enumerate()) # 连同主线程在内有两个运行的线程 print(threading.active_count()) print('主线程/主进程') ''' 打印结果: MainThread <_MainThread(MainThread, started 140735268892672)> [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>] 主线程/主进程 Thread-1 '''
from threading import Thread import time def sayhi(name): time.sleep(2) print('%s say hello' %name) if __name__ == '__main__': t=Thread(target=sayhi,args=('nick',)) t.start() t.join() print('主线程') print(t.is_alive()) ''' nick say hello 主线程 False '''
import multiprocessing import threading import socket s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.bind(('127.0.0.1',8080)) s.listen(5) def action(conn): while True: data=conn.recv(1024) print(data) conn.send(data.upper()) if __name__ == '__main__': while True: conn,addr=s.accept() p=threading.Thread(target=action,args=(conn,)) p.start()
无论是进程还是线程,都遵循:守护xx会等待主xx运行完毕后被销毁。需要强调的是:运行完毕并非终止运行。
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
from threading import Thread import time def foo(): print(123) time.sleep(10) print("end123") def bar(): print(456) time.sleep(10) print("end456") t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon= True #必须在t.start()之前设置 # t1.setDaemon(True) t1.start() t2.start() print("main-------") print(t1.is_alive()) # 123 # 456 # main------- # end456
from threading import Thread import os,time def work(): global n temp=n time.sleep(0.1) n=temp-1 if __name__ == '__main__': n=100 l=[] for i in range(100): p=Thread(target=work) l.append(p) p.start() for p in l: p.join() print(n) #结果可能为99
对公共数据的操作
import threading R=threading.Lock() R.acquire() ''' 对公共数据的操作 ''' R.release()
不加锁:并发执行,速度快,数据不安全
from threading import current_thread,Thread,Lock import os,time def task(): global n print('%s is running' %current_thread().getName()) temp=n time.sleep(0.5) n=temp-1 if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:0.5216062068939209 n:99 '''
加锁:未加锁部分并发执行,加锁部分串行执行,速度慢,数据安全
from threading import current_thread,Thread,Lock import os,time def task(): #未加锁的代码并发运行 time.sleep(3) print('%s start to run' %current_thread().getName()) global n #加锁的代码串行运行 lock.acquire() temp=n time.sleep(0.5) n=temp-1 lock.release() if __name__ == '__main__': n=100 lock=Lock() threads=[] start_time=time.time() for i in range(100): t=Thread(target=task) threads.append(t) t.start() for t in threads: t.join() stop_time=time.time() print('主:%s n:%s' %(stop_time-start_time,n)) ''' Thread-1 is running Thread-2 is running ...... Thread-100 is running 主:53.294203758239746 n:0 '''
所谓死锁:是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
from threading import Lock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
解决方法:递归锁,在Python中为了支持在同一线程中多次请求同一资源,python提供了可重入锁RLock。
这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁。
from threading import RLock as Lock import time mutexA=Lock() mutexA.acquire() mutexA.acquire() print(123) mutexA.release() mutexA.release()
递归锁解决死锁问题
import time from threading import Thread,RLock fork_lock = noodle_lock = RLock() def eat1(name): noodle_lock.acquire() print('%s 抢到了面条'%name) fork_lock.acquire() print('%s 抢到了叉子'%name) print('%s 吃面'%name) fork_lock.release() noodle_lock.release() def eat2(name): fork_lock.acquire() print('%s 抢到了叉子' % name) time.sleep(1) noodle_lock.acquire() print('%s 抢到了面条' % name) print('%s 吃面' % name) noodle_lock.release() fork_lock.release() for name in ['哪吒','nick','tank']: t1 = Thread(target=eat1,args=(name,)) t2 = Thread(target=eat2,args=(name,)) t1.start() t2.start()
queue队列:使用import queue
,用法与进程Queue一样
当必须在多个线程之间安全地交换信息时,队列在线程编程中特别有用。
通过双向列表实现的
class queue.Queue(maxsize=0)
import queue q=queue.Queue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(先进先出): first second third '''
通过堆实现
class queue.LifoQueue(maxsize=0)
import queue q=queue.LifoQueue() q.put('first') q.put('second') q.put('third') print(q.get()) print(q.get()) print(q.get()) ''' 结果(后进先出): third second first '''
PriorityQueue类和LifoQueue类继承Queue类然后重写了_init、_qsize、_put、_get这四个类的私有方法.
通过list来实现的。
class queue.PriorityQueue(maxsize=0)
优先队列的构造函数。maxsize是一个整数,它设置可以放置在队列中的项数的上限。一旦达到此大小,插入将阻塞,直到队列项被使用。如果maxsize小于或等于0,则队列大小为无穷大。
import queue q=queue.PriorityQueue() #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高 q.put((20,'a')) q.put((10,'b')) q.put((30,'c')) print(q.get()) print(q.get()) print(q.get()) ''' 结果(数字越小优先级越高,优先级高的优先出队): (10, 'b') (20, 'a') (30, 'c') '''
更多方法说明
官方文档:https://docs.python.org/dev/library/concurrent.futures.html
concurrent.futures模块提供了高度封装的异步调用接口:
两者都实现了由抽象Executor类定义的相同接口。
ThreadPoolExecutor(线程池)与ProcessPoolExecutor(进程池)都是concurrent.futures模块下的,主线程(或进程)中可以获取某一个线程(进程)执行的状态或者某一个任务执行的状态及返回值。
通过submit返回的是一个future对象,它是一个未来可期的对象,通过它可以获悉线程的状态。
ThreadPoolExecutor构造实例的时候,传入max_workers参数来设置线程中最多能同时运行的线程数目 。
使用submit函数来提交线程需要执行任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意submit()不是阻塞的,而是立即返回。
通过submit函数返回的任务句柄,能够使用done()方法判断该任务是否结束。
使用result()方法可以获取任务的返回值,查看内部代码,发现这个方法是阻塞的。
对于频繁的cpu操作,由于GIL锁的原因,多个线程只能用一个cpu,这时多进程的执行效率要比多线程高。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ProcessPoolExecutor(max_workers=3) futures=[] for i in range(11): future=executor.submit(task,i) futures.append(future) executor.shutdown(True) print('+++>') for future in futures: print( future.result())
wait方法可以让主线程阻塞,直到满足设定的要求。
wait(fs, timeout=None, return_when=ALL_COMPLETED),wait接受3个参数,
import time from concurrent.futures import ( ThreadPoolExecutor, wait ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [1, 2, 3, 4]] i = 1 for task in task_list: print("task{}:{}".format(i, task)) i += 1 print(wait(task_list, timeout=2.5)) # wait在2.5秒后返回线程的状态,result: # task1:<Future at 0x7ff3c885f208 state=running> # task2:<Future at 0x7ff3c885fb00 state=running> # task3:<Future at 0x7ff3c764b2b0 state=running> # task4:<Future at 0x7ff3c764b9b0 state=running> # DoneAndNotDoneFutures( # done={<Future at 0x7ff3c885f208 state=finished returned int>, <Future at 0x7ff3c885fb00 state=finished returned int>}, # not_done={<Future at 0x7ff3c764b2b0 state=running>, <Future at 0x7ff3c764b9b0 state=running>}) # # 可以看到在timeout 2.5时,task1和task2执行完毕,task3和task4仍在执行中
map(fn, *iterables, timeout=None),第一个参数fn是线程执行的函数;第二个参数接受一个可迭代对象;第三个参数timeout跟wait()的timeout一样,但由于map是返回线程执行的结果,如果timeout小于线程执行时间会抛异常TimeoutError。
map的返回是有序的,它会根据第二个参数的顺序返回执行的结果:
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import os,time,random def task(n): print('%s is runing' %os.getpid()) time.sleep(random.randint(1,3)) return n**2 if __name__ == '__main__': executor=ThreadPoolExecutor(max_workers=3) # for i in range(11): # future=executor.submit(task,i) executor.map(task,range(1,12)) #map取代了for+submit
上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断,有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用as_completed方法一次取出所有任务的结果。
import time from collections import OrderedDict from concurrent.futures import ( ThreadPoolExecutor, as_completed ) def get_thread_time(times): time.sleep(times) return times start = time.time() executor = ThreadPoolExecutor(max_workers=4) task_list = [executor.submit(get_thread_time, times) for times in [2, 3, 1, 4]] task_to_time = OrderedDict(zip(["task1", "task2", "task3", "task4"],[2, 3, 1, 4])) task_map = OrderedDict(zip(task_list, ["task1", "task2", "task3", "task4"])) for result in as_completed(task_list): task_name = task_map.get(result) print("{}:{}".format(task_name,task_to_time.get(task_name))) # task3: 1 # task1: 2 # task2: 3 # task4: 4
task1、task2、task3、task4的等待时间分别为2s、3s、1s、4s,通过as_completed返回执行完的线程结果,as_completed(fs, timeout=None)接受2个参数,第一个是执行的线程列表,第二个参数timeout与map的timeout一样,当timeout小于线程执行时间会抛异常TimeoutError。
通过执行结果可以看出,as_completed返回的顺序是线程执行结束的顺序,最先执行结束的线程最早返回。
Future对象也可以像协程一样,当它设置完成结果时,就可以立即进行回调别的函数。add_done_callback(fn),则表示 Futures 完成后,会调⽤fn函数。
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor from multiprocessing import Pool import requests import json import os def get_page(url): print('<进程%s> get %s' %(os.getpid(),url)) respone=requests.get(url) if respone.status_code == 200: return {'url':url,'text':respone.text} def parse_page(res): res=res.result() print('<进程%s> parse %s' %(os.getpid(),res['url'])) parse_res='url:<%s> size:[%s]\n' %(res['url'],len(res['text'])) with open('db.txt','a') as f: f.write(parse_res) if __name__ == '__main__': urls=[ 'https://www.baidu.com', 'https://www.python.org', 'https://www.openstack.org', 'https://help.github.com/', 'http://www.sina.com.cn/' ] # p=Pool(3) # for url in urls: # p.apply_async(get_page,args=(url,),callback=pasrse_page) # p.close() # p.join() p=ProcessPoolExecutor(3) for url in urls: p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:mmqy2019@163.com进行举报,并提供相关证据,查实之后,将立刻删除涉嫌侵权内容。
长按识别二维码并关注微信
更方便到期提醒、手机管理