Python 多线程、多进程

旧城等待, 2021-12-01 19:06 511阅读 0赞

什么是线程?

py文件在执行程序中,他会根据程序的编写来区分,假如没有创建子进程,整个程序就是主进程。

那程序中,有主线程而且还有子线程,那他就是一个多线程。

使用多线程可以提升I/O密集型的效率。

什么是进程?

py文件就是一个进程,比如:QQ,360,浏览器。

使用多进程,会消耗很大的资源问题。

GIL锁

GIL锁又称,全局解释器锁。

GIL锁的作用:在同一时刻,只能有一个线程进入解释器。

站在开发Python语言的那一端时,他就是一个神器,而站在使用这门语言这一端时,他就是一个BUG般的存在,而这BUG仅存在于CPython中。

为什么说BUG呢,因为有了GIL锁,我们使用多线程在进行计算密集型中,计算机的核数再多,他也只能使用一核。

I/O密集型,计算密集型

什么是I/O密集型?

说白了,他就是一个堵塞,当我们创建多线程(A、B),A线程在执行,遇到了堵塞,在CPU空闲时,切换到了B线程。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. start = time.time()
  4. def music():
  5. print('I listening Music')
  6. time.sleep(2)
  7. def movie():
  8. print('I watching TV')
  9. time.sleep(3)
  10. t1 = threading.Thread(target=music)
  11. t2 = threading.Thread(target=movie)
  12. t1.start()
  13. t2.start()
  14. end = time.time()
  15. result = end - start
  16. print(result)
  17. print('主线程结束')

什么时计算密集型?

线程在计算过程中,没有遇到堵塞,而是一直在执行计算。

ContractedBlock.gif ExpandedBlockStart.gif

  1. def add():
  2. num = 0
  3. for i in range(1000000):
  4. num += i
  5. print(num)

如何创建多线程?

Threading模块

函数创建

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. start = time.time()
  4. def music():
  5. for i in range(3):
  6. print('I listening Music')
  7. time.sleep(1)
  8. def movie():
  9. for i in range(2):
  10. print('I watching TV')
  11. time.sleep(5)
  12. t1 = threading.Thread(target=music) #创建子线程
  13. t2 = threading.Thread(target=movie) #创建子线程
  14. threads = [t1,t2]
  15. for t in threads:
  16. t.start() #启动子线程

类创建

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. class MyThread(threading.Thread): # 首先要继承这个方法
  3. def __init__(self,count):
  4. super().__init__()
  5. self.count = count
  6. def current_thread_count(self):
  7. print(self.count)
  8. def run(self): #定义每个线程要运行的内容
  9. self.current_thread_count()
  10. t1 = MyThread(threading.active_count())
  11. t2 = MyThread(threading.active_count())
  12. t1.start() #开启线程
  13. t2.start()

join ()方法

主线程A中,创建了子线程B,并且在主线程A中调用了B.join(),那么,主线程A会在调用的地方等待,直到子线程B完成操作后,

才可以接着往下执行,那么在调用这个线程时可以使用被调用线程的join方法。join([timeout]) 里面的参数时可选的,代表线程运行的最大时

间,即如果超过这个时间,不管这个此线程有没有执行完毕都会被回收,然后主线程或函数都会接着执行的,如果线程执行时间小于参数表示的

时间,则接着执行,不用一定要等待到参数表示的时间。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. start = time.time()
  4. def music():
  5. for i in range(3):
  6. print('I listening Music')
  7. time.sleep(1)
  8. def movie():
  9. for i in range(2):
  10. print('I watching TV')
  11. time.sleep(5)
  12. t1 = threading.Thread(target=music) #创建子线程
  13. t2 = threading.Thread(target=movie) #创建子线程
  14. threads = [t1,t2]
  15. for t in threads:
  16. t.start() #启动子线程
  17. t.join() # 代表赋值前的一个,也就是t2
  18. print('主线程结束')

setDaemon()方法

主线程A中,创建了子线程B,并且在主线程A中调用了B.setDaemon(),这个的意思是,把主线程A设置为守护线程,这

时候,要是主线程A执行结束了,就不管子线程B是否完成,一并和主线程A退出.这就是setDaemon方法的含义,这基本和join是相反的。此外,还有

个要特别注意的:必须在start() 方法调用之前设置,如果不设置为守护线程,程序会被无限挂起,只有等待了所有线程结束它才结束。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. start = time.time()
  4. def music():
  5. for i in range(3):
  6. print('I listening Music')
  7. time.sleep(1)
  8. def movie():
  9. for i in range(2):
  10. print('I watching TV')
  11. time.sleep(5)
  12. t1 = threading.Thread(target=music) #创建子线程
  13. t2 = threading.Thread(target=movie) #创建子线程
  14. threads = [t1,t2]
  15. t2.setDaemon(1) #守护线程
  16. for t in threads:
  17. t.start() #启动子线程
  18. print('主线程结束')

同步锁

为什么会有同步锁?

当我们创建多线程时,并且有一个全局变量,而多线程操作这个全局变量。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. def sub():
  4. global number
  5. num = number
  6. time.sleep(0.1)
  7. number = num - 1
  8. number = 10
  9. threads = []
  10. for i in range(10):
  11. t = threading.Thread(target=sub)
  12. t.start()
  13. threads.append(t)
  14. for i in threads:
  15. i.join()
  16. print(number) # 9

结果并不是我们想要的。

为什么出现这种问题?

程序在sleep的一瞬间,cpu来回切换,还没等着修改全局变量,所有的线程已经被创建,而且也已经被赋值。

如何解决?

那就是加锁了。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. def sub():
  4. global number
  5. r.acquire() # 获得锁
  6. num = number
  7. time.sleep(0.1)
  8. number = num - 1
  9. r.release() # 释放锁
  10. number = 10
  11. threads = []
  12. r = threading.Lock()
  13. for i in range(10):
  14. t = threading.Thread(target=sub)
  15. t.start()
  16. threads.append(t)
  17. for i in threads:
  18. i.join()
  19. print(number) # 0

加锁,其实就是不让cup进行线程切换,直到锁被释放。

如果锁没被释放,不会让其他线程进入,也不会影响不进入线程的执行。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. number = 10
  4. threads = []
  5. r = threading.Lock()
  6. def sub():
  7. global number
  8. r.acquire()
  9. num = number
  10. time.sleep(0.1)
  11. number = num - 1
  12. r.release()
  13. def music():
  14. time.sleep(0.5)
  15. print('Music')
  16. t = threading.Thread(target=music)
  17. t.start()
  18. for i in range(10):
  19. t = threading.Thread(target=sub)
  20. t.start()
  21. threads.append(t)
  22. for i in threads:
  23. i.join()
  24. print(number)

递归锁(Rlock)

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. r = threading.Lock()
  3. class MyThread(threading.Thread):
  4. def Thread_1(self):
  5. r.acquire()
  6. print('第一层',self.name)
  7. r.acquire()
  8. print('第二层',self.name)
  9. r.release()
  10. r.release()
  11. def run(self):
  12. self.Thread_1()
  13. for i in range(5):
  14. t = MyThread()
  15. t.start()

死锁

递归锁,与Lock很相似,但是他有一个计数的功能,能解决死锁

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. r = threading.RLock()
  3. class MyThread(threading.Thread):
  4. def Thread_1(self):
  5. r.acquire()
  6. print('第一层',self.name)
  7. r.acquire()
  8. print('第二层',self.name)
  9. r.release()
  10. r.release()
  11. def Thread_2(self):
  12. r.acquire()
  13. print('第一层',self.name)
  14. r.acquire()
  15. print('第二层',self.name)
  16. r.release()
  17. r.release()
  18. def run(self):
  19. self.Thread_1()
  20. self.Thread_2()
  21. for i in range(5):
  22. t = MyThread()
  23. t.start()

信号量(Semaphore)

信号量相当于,可以限制最大进入的线程数量。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. r = threading.Semaphore(2) # 创建信号量,最大进入的线程数量
  4. class MyThread(threading.Thread):
  5. def Thread_1(self):
  6. r.acquire() # 每次进入线程+1,但不能超过信号量设定的值
  7. print(self.name)
  8. time.sleep(2)
  9. r.release() # -1
  10. def run(self):
  11. self.Thread_1()
  12. for i in range(5):
  13. t = MyThread()
  14. t.start()

条件变量(Conditon)

wait():条件不满足时调用,线程会释放锁并进入等待阻塞。

notify():条件创造后调用,通知等待池激活一个线程。

notifyAll():条件创造后调用,通知等待池激活所有线程。

ContractedBlock.gif ExpandedBlockStart.gif

  1. import threading
  2. import time
  3. import random
  4. def producer():
  5. time.sleep(0.2)
  6. global F
  7. while True:
  8. if con_Lock.acquire():
  9. r = random.randrange(0,100)
  10. F.append(r)
  11. print(str(threading.current_thread())+ '--->' + str(r))
  12. con_Lock.notify()
  13. con_Lock.release()
  14. time.sleep(3)
  15. def consumer():
  16. global F
  17. while True:
  18. con_Lock.acquire()
  19. if not F:
  20. print("老板,快点,没有包子了")
  21. con_Lock.wait()
  22. a = F.pop()
  23. print('包子%s已经被吃'%a)
  24. time.sleep(0.5)
  25. con_Lock = threading.Condition()
  26. threads = []
  27. F = []
  28. for i in range(5):
  29. threads.append(producer)
  30. threads.append(consumer)
  31. for i in threads:
  32. t = threading.Thread(target=i)
  33. t.start()

Event

ContractedBlock.gif ExpandedBlockStart.gif

  1. from threading import Event
  2. Event.isSet() #返回event的状态值
  3. Event.wait() #如果 event.isSet()==False将阻塞线程;
  4. Event.set() #设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
  5. Event.clear() #恢复

实例

ContractedBlock.gif ExpandedBlockStart.gif

  1. #首先定义两个函数,一个是连接数据库
  2. # 一个是检测数据库
  3. from threading import Thread,Event,currentThread
  4. import time
  5. e = Event()
  6. def conn_mysql():
  7. '''链接数据库'''
  8. count = 1
  9. while not e.is_set(): #当没有检测到时候
  10. if count >3: #如果尝试次数大于3,就主动抛异常
  11. raise ConnectionError('尝试链接的次数过多')
  12. print('\033[45m%s 第%s次尝试'%(currentThread(),count))
  13. e.wait(timeout=1) #等待检测(里面的参数是超时1秒)
  14. count+=1
  15. print('\033[44m%s 开始链接...'%(currentThread().getName()))
  16. def check_mysql():
  17. '''检测数据库'''
  18. print('\033[42m%s 检测mysql...' % (currentThread().getName()))
  19. time.sleep(5)
  20. e.set()
  21. if __name__ == '__main__':
  22. for i in range(3): #三个去链接
  23. t = Thread(target=conn_mysql)
  24. t.start()
  25. t = Thread(target=check_mysql)
  26. t.start()

ContractedBlock.gif ExpandedBlockStart.gif

  1. from threading import Thread,Event,currentThread
  2. import time
  3. e = Event()
  4. def traffic_lights():
  5. '''红绿灯'''
  6. time.sleep(5)
  7. e.set()
  8. def car():
  9. '''车'''
  10. print('\033[42m %s 等绿灯\033[0m'%currentThread().getName())
  11. e.wait()
  12. print('\033[44m %s 车开始通行' % currentThread().getName())
  13. if __name__ == '__main__':
  14. for i in range(10):
  15. t = Thread(target=car) #10辆车
  16. t.start()
  17. traffic_thread = Thread(target=traffic_lights) #一个红绿灯
  18. traffic_thread.start()
  19. 红绿灯

队列(Queue)

Python Queue模块有三种队列及构造函数:
1、Python Queue模块的FIFO队列先进先出。 class queue.Queue(maxsize)
2、LIFO类似于堆,即先进后出。 class queue.LifoQueue(maxsize)
3、还有一种是优先级队列级别越低越先出来。 class queue.PriorityQueue(maxsize)

此包中的常用方法(q = Queue.Queue()):
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作

ContractedBlock.gif

ContractedBlock.gif

#

如何创建多进程?

multiprocessing 模块

Process

构造方法:

Process([group [, target [, name [, args [, kwargs]]]]])

  group: 线程组,目前还没有实现,库引用中提示必须是None;
  target: 要执行的方法;
  name: 进程名;
  args/kwargs: 要传入方法的参数。

实例方法:

  is_alive():返回进程是否在运行。

  join([timeout]):阻塞当前上下文环境的进程程,直到调用此方法的进程终止或到达指定的timeout(可选参数)。

  start():进程准备就绪,等待CPU调度

  run():strat()调用run方法,如果实例进程时未制定传入target,这star执行t默认run()方法。

  terminate():不管任务是否完成,立即停止工作进程

属性:

  authkey

  daemon:和线程的setDeamon功能一样

  exitcode(进程在运行时为None、如果为–N,表示被信号N结束)

  name:进程名字。

  pid:进程号。

多进程与多线程的创建完全一致,也有两种方式。

函数创建

ContractedBlock.gif ExpandedBlockStart.gif

  1. from multiprocessing import Process
  2. import os
  3. def My_process():
  4. print(os.getpid()) # 获取进程和PID
  5. process_num = []
  6. if __name__ == "__main__":
  7. for i in range(3):
  8. p = Process(target=My_process,args=())
  9. p.start()
  10. process_num.append(p)
  11. for p in process_num:
  12. p.join()
  13. print('主进程结束')

类创建

ContractedBlock.gif ExpandedBlockStart.gif

  1. from multiprocessing import Process
  2. import time
  3. class My_process(Process):# 继承
  4. def __init__(self,name):
  5. super().__init__() # 调用父类的__init__方法
  6. self.name = name
  7. def run(self):
  8. print(self.name)
  9. if __name__ == '__main__':
  10. for i in range(3):
  11. p = My_process(str(i))
  12. p.daemon = True # 启动守护进程
  13. p.start()
  14. time.sleep(0.5)
  15. print('主进程结束')

注意:多进程开启时

windows必须在,if __name__ == “__main__“: 下开启

linux下不用

进程之间通讯

Queue(队列)

这里的Queue跟threading的用法类似:

ContractedBlock.gif ExpandedBlockStart.gif

  1. from multiprocessing import Process,Queue
  2. q = Queue() # 创建队列
  3. def multi_process(i,q):
  4. q.put(i)
  5. process_obj = []
  6. def run():
  7. for i in range(3):
  8. p = Process(target=multi_process,args=(i,q))
  9. p.start()
  10. process_obj.append(p)
  11. for p in process_obj:
  12. p.join()
  13. if __name__ == "__main__":
  14. run()
  15. for i in range(3):
  16. print(q.get())

Pipe(管道)

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

ContractedBlock.gif ExpandedBlockStart.gif

  1. import os
  2. from multiprocessing import Process,Pipe
  3. parent_port,child_port = Pipe()
  4. def multi_process(port):
  5. Pid = os.getpid()
  6. print(port.recv())
  7. port.send('我的Pid是:%s'%Pid)
  8. port.close()
  9. process_obj = []
  10. def run():
  11. for i in range(3):
  12. p = Process(target=multi_process,args=(child_port,))
  13. p.start()
  14. process_obj.append(p)
  15. for p in process_obj:
  16. parent_port.send('你的Pid是多少?')
  17. print(parent_port.recv())
  18. p.join()
  19. if __name__ == "__main__":
  20. run()

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

Manager

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array. For example,

ContractedBlock.gif ExpandedBlockStart.gif

  1. import os
  2. from multiprocessing import Process,Manager
  3. def multi_process(*args):
  4. for arg in args:
  5. try:
  6. arg[os.getpid()] = os.getpid()
  7. except:
  8. arg.append(os.getpid())
  9. def start(*args):
  10. for i in range(3):
  11. p = Process(target=multi_process,args=(args))
  12. p.start()
  13. return p
  14. if __name__ == "__main__":
  15. with Manager() as manager:
  16. l = manager.list()
  17. d = manager.dict()
  18. p = start(l,d)
  19. p.join()
  20. print(l)
  21. print(d)

#

进程池

用multiprocess中的Pool起进程池
进程池中开启的个数:默认是cpu个数
提交任务(不能传队列作为子进程的参数,只能传管道)
- apply 同步提交,直接返回结果
- apply_async 异步提交,返回对象,通过对象获取返回值

ContractedBlock.gif ExpandedBlockStart.gif

  1. from multiprocessing import Pool
  2. import time
  3. def Foo(i):
  4. time.sleep(2)
  5. return i + 100
  6. def Back(arg):
  7. print('--->:', arg)
  8. if __name__ == '__main__':
  9. pool = Pool(5)
  10. for i in range(10):
  11. pool.apply_async(func=Foo, args=(i,), callback=Back)
  12. # pool.apply(func=Foo, args=(i,))
  13. print('end')
  14. pool.close()
  15. pool.join()

转载于:https://www.cnblogs.com/py-peng/p/11068296.html

发表评论

表情:
评论列表 (有 0 条评论,511人围观)

还没有评论,来说两句吧...

相关阅读

    相关 python进程线

    有关进程、线程、多进程、多线程 线程与进程 概念 线程:是程序执行流的最小单元,是系统独立调度和分配CPU(独立运行)的基本单位。 进程:是资源分配的基本

    相关 Python 线进程

    什么是线程? py文件在执行程序中,他会根据程序的编写来区分,假如没有创建子进程,整个程序就是主进程。 那程序中,有主线程而且还有子线程,那他就是一个多线程。 使用多