[并发编程专题]多线程

左手的ㄟ右手 2024-04-19 08:15 140阅读 0赞

线程是执行线程的缩写。程序员可以将他的工作拆分到线程中,这些线程同时运行并共享同一内存上下文。
多线程应该在多处理器或者多核机器上执行,将每个CPU核上并行化每个线程执行,从而使程序更快。
竞争冒险:
如果两个线程更新相同的没有任何保护的数据,则会发生竟态条件。
而锁机制有助于保护数据,在多线程编程中总要确保以安全的方式访问资源。
死锁:
两个线程锁一个资源,并尝试获取另外一个线程锁定的资源,他们讲永远彼此等待的情况叫死锁。

python多线程

多线程应用案例:假设需要使用多个查询从一些web服务器获取数据,这些查询无法通过单个大型的HTTP请求批量处理
使用API的地理编码端点解析。
单线程:

  1. import time
  2. api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
  3. PLACES = (
  4. 'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
  5. 'NewYork'
  6. )
  7. def fetch_place(place):
  8. gecode = api[place]
  9. print("place:",place,"gecode:",gecode)
  10. def main():
  11. for place in PLACES:
  12. fetch_place(place)
  13. if __name__ == '__main__':
  14. main()

多线程:加入线程池。
启动一些预定义数量的线程,从队列中消费工作项,直到完成。当没有其他工作要做,线程将返回。

  1. import time
  2. from queue import Queue,Empty
  3. from threading import Thread
  4. THREAD_POOL_SIZE=4
  5. def worker(work_queue):
  6. while not work_queue.empty():
  7. try:
  8. item = work_queue.get(block=False)
  9. except Empty:
  10. break
  11. else:
  12. fetch_place(item)
  13. work_queue.task_done()
  14. api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
  15. PLACES = (
  16. 'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
  17. 'NewYork'
  18. )
  19. def fetch_place(place):
  20. gecode = api[place]
  21. print("place:",place,"gecode:",gecode,"\n")
  22. def main():
  23. work_queue = Queue()
  24. for place in PLACES:
  25. work_queue.put(place)
  26. threads = [
  27. Thread(target=worker,args=(work_queue,))
  28. for _ in range(THREAD_POOL_SIZE)
  29. ]
  30. for thread in threads:
  31. thread.start()
  32. work_queue.join()
  33. while threads:
  34. threads.pop().join()
  35. if __name__ == '__main__':
  36. main()

在这里插入图片描述
从打印结果可以看出输出有点问题,更好的方案是启动另外的线程打印,而不是在主线程中进行。我们可以通过一种队列来实现,这个队列主要负责从我们的工作线程收集结果。

  1. import time
  2. from queue import Queue,Empty
  3. from threading import Thread
  4. THREAD_POOL_SIZE=4
  5. def worker(work_queue,result_queue):
  6. while not work_queue.empty():
  7. try:
  8. item = work_queue.get(block=False)
  9. except Empty:
  10. break
  11. else:
  12. result_queue.put(
  13. fetch_place(item)
  14. )
  15. work_queue.task_done()
  16. api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
  17. PLACES = (
  18. 'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
  19. 'NewYork'
  20. )
  21. def fetch_place(place):
  22. geo_dict = []
  23. gecode = api[place]
  24. geo_dict.append((place,gecode))
  25. return geo_dict
  26. def present_result(geocode):
  27. print(geocode)
  28. def main():
  29. work_queue = Queue()
  30. result_queue = Queue()
  31. for place in PLACES:
  32. work_queue.put(place)
  33. threads = [
  34. Thread(target=worker,args=(work_queue,result_queue))
  35. for _ in range(THREAD_POOL_SIZE)
  36. ]
  37. for thread in threads:
  38. thread.start()
  39. work_queue.join()
  40. while threads:
  41. threads.pop().join()
  42. while not result_queue.empty():
  43. present_result(result_queue.get())
  44. if __name__ == '__main__':
  45. main()

在这里插入图片描述

处理错误与速率限制

有时候会遇到外部服务器提供商施加的速率限制。当客户端超过请求速率会抛出异常,但是这个异常是单独引起的,不会导致整个程序崩溃,工作线程会立即退出,但是主线程将等待work_queue上存储的所有任务完成(使用work_queue.join()调用)。我们的线程应该尽可能的处理异常,并确保队列中的所有项目都会被处理。如果不做进一步处理改进,以上的案例一旦遇到异常,一些工作线程将崩溃,并且不会退出。
针对以上问题,可以在result_queue队列中设置一个错误实例,并将当前任务标记已完成,与没有错误时一样。这样,我们确保主线程在work_queue.join()中等待时间不会无限期地锁定。主线程然后可以检查结果并重新提出在结果队列中发现的任何异常。

  1. import time
  2. from queue import Queue,Empty
  3. from threading import Thread
  4. THREAD_POOL_SIZE=4
  5. def worker(work_queue,results_queue):
  6. while True:
  7. try:
  8. item = work_queue.get(block=False)
  9. except Empty:
  10. break
  11. else:
  12. try:
  13. result = fetch_place(item)
  14. except Exception as err:
  15. results_queue.put(err)
  16. else:
  17. results_queue.put(result)
  18. finally:
  19. work_queue.task_done()
  20. api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
  21. PLACES = (
  22. 'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
  23. 'NewYork'
  24. )
  25. def fetch_place(place):
  26. geo_dict = []
  27. gecode = api[place]
  28. geo_dict.append((place,gecode))
  29. return geo_dict
  30. def present_result(geocode):
  31. print(geocode)
  32. def main():
  33. work_queue = Queue()
  34. results_queue = Queue()
  35. for place in PLACES:
  36. work_queue.put(place)
  37. threads = [
  38. Thread(target=worker,args=(work_queue,results_queue))
  39. for _ in range(THREAD_POOL_SIZE)
  40. ]
  41. for thread in threads:
  42. thread.start()
  43. work_queue.join()
  44. while threads:
  45. threads.pop().join()
  46. while not results_queue.empty():
  47. result = results_queue.get()
  48. if isinstance(result,Exception):
  49. raise result
  50. present_result(result)
  51. if __name__ == '__main__':
  52. main()

对工作速度的限制成为节流。
令牌桶算法:

  • 存在具有预定量的令牌桶
  • 每个令牌响应单个权限以处理一项工作
  • 每次工作者要求一个或者多个令牌权限时
    – 我们测量从上次重新装满桶所花费的shijan
    – 如果时间差允许,用这个时间差响应的令牌量重新填充桶
    – 如果存储的令牌的数量大于或者等于请求的数量,我们减少存储的令牌的数量并且返回那个值;
    – 如果存储的令牌的数量小于请求的数量,我们返回零

    import time
    from queue import Queue,Empty
    from threading import Thread
    from threading import Lock
    THREAD_POOL_SIZE=4

    class Throttle(object):

    1. def __init__(self,rate):
    2. self._consume_lock = Lock()
    3. self.rate = rate
    4. self.tokens = 0
    5. self.last = 0
    6. def consume(self,amount = 1):
    7. with self._consume_lock:
    8. now = time.time()
    9. # 时间测量在第一个令牌请求上初始化以避免初始突变
    10. if self.last == 0:
    11. self.last = now
    12. elapsed = now - self.last
    13. # 请确认传递时间的量足够大一添加新的令牌
    14. if int(elapsed*self.rate):
    15. self.tokens+=int(elapsed*self.rate)
    16. self.last = now
    17. # 不要过度装满桶
    18. self.tokens = (
    19. self.rate
    20. if self.tokens>self.rate
    21. else self.tokens
    22. )
    23. # 如果可用最终分派令牌
    24. if self.tokens>=amount:
    25. self.tokens-=amount
    26. else:
    27. amount=0
    28. return amount

    def worker(work_queue,results_queue,throttle):

    1. while True:
    2. try:
    3. item = work_queue.get(block=False)
    4. except Empty:
    5. break
    6. else:
    7. while not throttle.consume():
    8. pass
    9. try:
    10. result = fetch_place(item)
    11. except Exception as err:
    12. results_queue.put(err)
    13. else:
    14. results_queue.put(result)
    15. finally:
    16. work_queue.task_done()
  1. api = { "Reykjavik":"123,231","Vien":"234,342","Zadar":"334,345","Venice":"345,243","Wrocla":"45,34","Bolognia":"895,569","Berlin":"56,23","Slubice":"2,3","NewYork":"345,6"}
  2. PLACES = (
  3. 'Reykjavik','Vien','Zadar','Venice','Wrocla','Bolognia','Berlin','Slubice',
  4. 'NewYork'
  5. )
  6. def fetch_place(place):
  7. geo_dict = []
  8. gecode = api[place]
  9. geo_dict.append((place,gecode))
  10. return geo_dict
  11. def present_result(geocode):
  12. print(geocode)
  13. def main():
  14. work_queue = Queue()
  15. results_queue = Queue()
  16. throttle = Throttle(10)
  17. for place in PLACES:
  18. work_queue.put(place)
  19. threads = [
  20. Thread(target=worker,args=(work_queue,results_queue,throttle))
  21. for _ in range(THREAD_POOL_SIZE)
  22. ]
  23. for thread in threads:
  24. thread.start()
  25. work_queue.join()
  26. while threads:
  27. threads.pop().join()
  28. while not results_queue.empty():
  29. result = results_queue.get()
  30. if isinstance(result,Exception):
  31. raise result
  32. present_result(result)
  33. if __name__ == '__main__':
  34. main()

发表评论

表情:
评论列表 (有 0 条评论,140人围观)

还没有评论,来说两句吧...

相关阅读

    相关 [并发编程专题]线

    程是执行线程的缩写。程序员可以将他的工作拆分到线程中,这些线程同时运行并共享同一内存上下文。 多线程应该在多处理器或者多核机器上执行,将每个CPU核上并行化每个线程执...

    相关 Java 线并发编程

    一、多线程 1、操作系统有两个容易混淆的概念,进程和线程。 进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空

    相关 Java线并发编程

    Java线程:概念与原理 一、操作系统中线程和进程的概念 > 现在的操作系统是多任务操作系统。多线程是实现多任务的一种方式。 > > 进程是指一个内存中运行的应用程序,每

    相关 Java 线 并发编程

    一、多线程 1、操作系统有两个容易混淆的概念,进程和线程。 进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空

    相关 Java 线 并发编程 整理

    一、多线程 1、操作系统有两个容易混淆的概念,进程和线程。 进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地