区块链技术博客
www.b2bchain.cn

Python3学习笔记_H(进程、线程)

这篇文章主要介绍了Python3学习笔记_H(进程、线程)的讲解,通过具体代码实例进行19593 讲解,并且分析了Python3学习笔记_H(进程、线程)的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=19593

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

文章目录

      • 建议不要光看,要多动手敲代码。眼过千遭,不如手读一遍。
      • 进程
        • 进程定义
        • 创建进程
          • linux专供版
        • getpid、getppid
          • multiprocessing
            • Process
            • Process 子类
        • 进程池Pool
          • multiprocessing.Pool常用函数
        • 进程间的通信Queue
          • Queue使用
          • 进程池中的Queue
      • shutil模块
      • 线程
        • 多线程
          • 线程模块
          • 查看线程数目
          • threading 注意点
          • 多线程共享全局变量
          • 列表当实参传递到线程中
          • 进程VS线程
          • 线程同步
            • 互斥锁
            • 死锁
          • 生产者消费者模式
          • ThreadLocal
          • 使用ThreadLocal
          • 异步
          • GIL(全局解释锁)
          • 分布式进程
        • 异步IO
          • asyncio、async/await、aiohttp

笔记中代码均可运行在Jupyter NoteBook下(实际上Jupyter-lab使用体验也很棒)。

建议不要光看,要多动手敲代码。眼过千遭,不如手读一遍。

相关笔记的jupiter运行代码已经上传,请在资源中自行下载。

进程

进程定义

参考网址:https://www.cnblogs.com/gengyi/p/8564052.html

''' 进程:  进程是程序的一次动态执行过程,它对应了从代码加载、 执行到执行完毕的一个完整过程。  进程是系统进行资源分配和调度的一个独立单位。 进程是由代码(堆栈段)、数据(数据段)、内核状态和一组寄存器组成。   在多任务操作系统中,通过运行多个进程来并发地执行多个任务。 由于每个线程都是一个能独立执行自身指令的不同控制流, 因此一个包含多个线程的进程也能够实现进程内多任务的并发执行。   进程是一个内核级的实体,进程结构的所有成分都在内核空间中, 一个用户程序不能直接访问这些数据。    进程的状态:   创建、准备、运行、阻塞、结束。   进程间的通信方式:      (1)管道(pipe)及有名管道(named pipe):                                  管道可用于具有亲缘关系的父子进程间的通信,有名管道除了具有管道所具有的功能外,它还允许无亲缘关系进程间的通信。      (2)信号(signal):              信号是在软件层次上对中断机制的一种模拟,它是比较复杂的通信方式,         用于通知进程有某事件发生,一个进程收到一个信号与处理器收到一个中断请求效果上可以说是一致的。      (3)消息队列(message queue):                  消息队列是消息的链接表,它克服了上两种通信方式中信号量有限的缺点,         具有写权限的进程可以按照一定得规则向消息队列中添加新信息;对消息队列有读权限的进程则可以从消息队列中读取信息。      (4)共享内存(shared memory):                  可以说这是最有用的进程间通信方式。         它使得多个进程可以访问同一块内存空间,不同进程可以及时看到对方进程中对共享内存中数据得更新。         这种方式需要依靠某种同步操作,如互斥锁和信号量等。      (5)信号量(semaphore):              主要作为进程之间及同一种进程的不同线程之间得同步和互斥手段。      (6)套接字(socket):         这是一种更为一般得进程间通信机制,它可用于网络中不同机器之间的进程间通信,应用非常广泛。  Python中进程创建:      os.fork()      subprocess      processing      multiprocessing ''' 

创建进程

linux专供版

这里的fork()只是用于Linux(unix)仅作为了解

不要在jupyter notebook中进行大量或多次fork执行,会大量占用cpu,导致卡顿

fork 侧重于父子进程都有任务,适合于少量的子进程创建

父进程、子进程执行顺序没有规律,完全取决于操作系统的调度算法

import os  pid = os.fork() 

fork()是唯一调用一次返回两次的函数,因为操作系统将父进程复制一份形成新的进程(子进程),然后分别在父进程和子进程内返回。

父进程和子进程都会从fork()函数中得到一个返回值,子进程永远返回0,而父进程返回子进程的ID。

一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

通过os.getpid()获取当前进程的pid通过os.getppid()获取当前进程父进程的pid

这里可以把fork理解为鸣人影分身的deepcopy模式,完全克隆出一个自己,对分分支后的任务进行执行
Python3学习笔记_H(进程、线程)

# fork() 例子 import os # fork()在os模块下  def prt():     print('我在fork前')      prt()  def use_fork():     pid = os.fork() # 进程在这里fork,产生一个新的进程,返回新进程的pid      if pid<0:         print('fork调用失败')      elif pid == 0: # 子进程         print('我是子进程<{}>,我的父进程为:<{}>'.format(os.getpid(), os.getppid()))         # 打印子进程pid,父进程pid      else: # 父进程         print('我是父进程<{}>,我的子进程为<{}>'.format(os.getpid(), pid))         # 打印父进程pid,子进程pid      print('父子进程都可以执行这行') # 分支执行结束,回归执行程序主干  use_fork() 
我在fork前 我是父进程<4134>,我的子进程为<5573> 父子进程都可以执行这行 我在fork前 我是子进程<5573>,我的父进程为:<4134> 父子进程都可以执行这行 
'''多进程中,每个进程中所有数据(包括全局变量)都各有拥有一份,互不影响''' # fork() 内修改全局变量 例子 import os # fork()在os模块下  num = 12 print('未fork的全局变量:', num) pid = os.fork() # 进程在这里fork,产生一个新的进程,返回新进程的pid  if pid<0:     print('fork调用失败')  elif pid == 0: # 子进程     print('我是子进程<{}>,我的父进程为:<{}>'.format(os.getpid(), os.getppid()))     num += 3     print('子进程修改num为num+=3:{}'.format(num))      else: # 父进程     print('我是父进程<{}>,我的子进程为<{}>'.format(os.getpid(), pid))     num += 5     print('父进程修改num为 num+=5:{}'.format(num))      print('num:',num) # 分支执行结束,回归执行程序主干 
未fork的全局变量: 12 我是父进程<6203>,我的子进程为<6301> 父进程修改num为:17 num: 17 我是子进程<6301>,我的父进程为:<6203> 子进程修改num为:15 num: 15 
'''多个fork()同时出现''' # 多个fork()同时出现 import os, time  ret = os.fork() if ret ==0:     print("1".center(10, "*")) else:     print("2".center(10,"*")) ret = os.fork() if ret == 0:     print("11".center(10,"-")) else:     print(print("22".center(10,"+")))      # 父进程、子进程执行顺序没有规律,完全取决于操作系统的调度算法  # 注意:这里的结果中,包含了两个11,22 # 这里和前边说的在fork()处分开后,代码都会分别执行, # 也就是后边的fork()是在单个进程中的再一次分进程了 
****1***** ****2***** ++++22++++ None ++++22++++ ----11---- ----11---- None ERROR! Session/line number was not unique in database. History logging moved to new session 194 ERROR! Session/line number was not unique in database. History logging moved to new session 195 

getpid、getppid

getpid()获得当前进程的pid

getppid()获得当前进程的父进程

在新建进程之后子进程和父进程执行的基本一样(这里只是不执行父进程专有的代码,其他的代码子进程同样会执行)

全局变量在多个进程中不共享

# getpid getppid 例子 import os, time  g_num = 666 tmp = os.fork() if tmp == 0:     print("*"*20)     print(g_num)     g_num +=111     time.sleep(2)     print("this is son pid = {0}, ppid = {1}, g_num_id = {2}, g_num = {3}".format(os.getpid(), os.getppid(), id(g_num), g_num)) else:     print("*"*20)     print(g_num)     print("this is father pid = {0}, ppid = {1}, g_num_id = {2}, g_num = {3}".format(os.getpid(), os.getppid(), id(g_num), g_num)) 
******************** 666 this is father pid = 6203, ppid = 6087, g_num_id = 140675625067056, g_num = 666 ******************** 666 this is son pid = 6941, ppid = 6203, g_num_id = 140675685461968, g_num = 777 
multiprocessing

multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象

代码if __name__=='__main__':在Process使用中必须有

'''Process 创建子进程'''  from multiprocessing import Process  import os  # 子进程要执行的代码 def run_proc(name):     print('子进程运行中,name= %s ,pid=%d...' % (name, os.getpid()))  if __name__=='__main__': # 判断是否在当前模块下     print('父进程 %d.' % os.getpid())     p = Process(target=run_proc, args=('test',)) # 创建子进程     print('子进程将要执行')     p.start()     p.join()     print('子进程已结束') 
父进程 6203. 子进程将要执行 子进程运行中,name= test ,pid=7030... 子进程已结束 
''' 对例子的说明:  创建子进程时,只需要传入一个执行函数和函数的参数, 创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。      join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。 ''' 
Process

Process 语法:
Process([group[, target[, name[, args[, kwargs]]]]])

参数解释:

    group: 线程组,目前还没有实现,库引用中提示必须是None;      target: 要执行的方法; 进程实例所调用对象     name: 进程名; 当前进程的别名     args/kwargs: 要传入方法的参数(参数元组/参数字典)。 

Process类常用方法:

    is_alive() 判断进程实例是否还在执行      join([timeout]) 是否等待进程实例执行结束,timeout为等待超时时间(秒级)         需要注意:p.join只对start方式开启的进程有效,对run方式开启的进程无效。      start() 启动进程实例,并调用该子进程中的p.run()      run() 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法      terminate() 不管任务是否完成,立即终止         不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。         如果p还保存了一个锁那么也将不会被释放,进而导致死锁 

Process 属性:

    daemon:守护进程标识,:默认值为False,如果设为True,代表p为后台运行的守护进程,         当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程。         必须在p.start()之前设置      exitcode:进程的退出状态码(进程运行时为None,-N表示被信号N结束)      name:进程名      pid:进程号          authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。         这个键的用途是为涉及网络连接的底层进程间通信提供安全性,         这类连接只有在具有相同的身份验证键时才能成功(了解即可) 
# 例子 from multiprocessing import Process  import os  from time import sleep  # 子进程执行代码 def run_proc(name, age, **kwargs):     for i in range(10):         print('子进程运行中,i = {}, name = {}, age = {}, pid = {}'.format(             i, name, age, os.getpid()         ))             print(kwargs)         sleep(0.5) # 挂起0.5s      if __name__=='__main__':     print('父进程:',os.getpid())     p = Process(target=run_proc, args=('test',22), kwargs={'num':123456})     print('子进程将执行')     p.start()     sleep(1) # 挂起1s     p.terminate()     p.join()     print('子进程结束') 
父进程: 6203 子进程将执行 子进程运行中,i = 0, name = test, age = 22, pid = 7697 {'num': 123456} 子进程运行中,i = 1, name = test, age = 22, pid = 7697 {'num': 123456} 子进程结束 
# process 例子2  from multiprocessing import Process  import time  import os  #两个子进程将会调用的两个方法 def worker_1(interval):     print('worker_1, 父进程<{}>,当前进程<{}>'.format(os.getppid(), os.getpid()))     t_start = time.time()     time.sleep(interval) # 挂起interval秒     t_end = time.time()     print('worker_1执行时间为{}'.format(t_end-t_start))      def worker_2(interval):     print('worker_2, 父进程<{}>,当前进程<{}>'.format(os.getppid(), os.getpid()))     t_start = time.time()     time.sleep(interval) # 挂起interval秒     t_end = time.time()     print('worker_2执行时间为{}'.format(t_end-t_start))      # 输出当前程序的id print('进程pid:', os.getpid()) # 进程pid: 6203  # 创建两个进程对象,target指向进程对象要执行的方法, # args后的元组是要传递给work_1()中的interval参数 # 若不指定name参数,默认的进程对象名称为Process-N,N为一个递增的整数 p1=Process(target=worker_1,args=(3,)) p2=Process(target=worker_2,name='Dog',args=(1,))  # 使用<进程对象.start()>来创建并执行一个子进程 # 创建的两个进程在start后分别执行worker_1和worker_2中的代码 p1.start() p2.start()  # 同时父进程仍然向下执行,若p2进程还在执行,将返回Ture print('p2.isalive=',p2.is_alive())  # 输出p1和p2进程的别名和pid print('p1.name={}tp1.pid={}'.format(p1.name, p1.pid)) # p1.name=Process-12	p1.pid=8048 print('p2.name={}tp2.pid={}'.format(p2.name, p2.pid)) # p2.name=Dog	p2.pid=8049   # join() timeout不设置,表示父进程要在这个位置等待p1 # 进程执行完成后再执行下面的语句  p1.join(1) # 设置超时时间为1s print('set timeout=1, p1.is_alive=',p1.is_alive()) # 这里应该打印True  # 一般用于进程间的数据同步,若不写这一句,下面的is_alive判断将为True, # 在shell(cmd)中调用此程序可以完整的看到这个过程 p1.join() # 设置等待子进程执行完毕再执行父进程的后续语句 print('p1.is_alive=',p1.is_alive()) # 等待子进程执行完毕,此处打印可能有输出延迟  
进程pid: 6203 worker_1, 父进程<6203>,当前进程<8048> worker_2, 父进程<6203>,当前进程<8049> p2.isalive= True p1.name=Process-12	p1.pid=8048 p2.name=Dog	p2.pid=8049 worker_2执行时间为1.0010364055633545 set timeout=1, p1.is_alive= True worker_1执行时间为3.002727746963501 p1.is_alive= False 
Process 子类

创建新的进程还能够使用类的方式:

可以自定义一个类,继承Process类,  每次实例化这个类的时候,就等同于实例化一个进程对象 

用Process子类来进行进程的控制可以达到简化的效果

# Process 子类 例子 from multiprocessing import Process  import time  import os  # 继承Process类 class ProcessSon(Process):     # Process类本身有__init__方法,重写Process的__init__方法,会有一个问题     # 并没有完全初始化一个Process类,所以不能使用这个类继承的一些属性和方法     # 最好的方法是将继承类本身传递给Process.__init__方法,完成对自类的初始化     def __init__(self, interval):         Process.__init__(self)         self.interval = interval              # 重写Process 的run()方法     def run(self):         print('子进程{}开始执行,父进程为{}'.format(os.getpid(), os.getppid()))         t_start = time.time()         time.sleep(self.interval)         t_stop = time.time()         print('进程{}执行结束,用时{}'.format(os.getpid(), t_stop-t_start))          if __name__=='__main__': # Process 标配     t_start = time.time()     print('当前进程为:{}'.format(os.getpid()))     p1=ProcessSon(3) # 用子类创建一个新的进程对象     # 对一个不包含target属性的Process类执行start()方法,会执行此类的run()方法,     # 这里执行的是p1.run()[也就是ProcessSon.run()]     p1.start()     p1.join()     t_stop = time.time()          print('进程{}执行结束,用时{}nover'.format(os.getpid(), t_stop-t_start))      
当前进程为:2769 子进程25847开始执行,父进程为2769 进程25847执行结束,用时3.0029664039611816 进程2769执行结束,用时3.0183212757110596 over 

进程池Pool

需要创建的子进程数量不多时,可以直接用multiprocessing.Process动态生成多个进程,但是对于数目过大时,手动创建工作量太大,可以用multiprocessing模块中的Pool方法

初始化Pool时可指定最大进程数,当有新的请求提交到Pool时,在Pool未满下,会创建新进程来执行请求,但若Pool已满,则请求会等待,直到进程池有空位才会创建新的进程来执行请求

Pool进程池并不是进程数越大越好

multiprocessing.Pool常用函数

apply_async(func[, args[,kwds]]) 异步调用

使用非阻塞方式调用func(并行执行,阻塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键子参数列表; 

apply(func[, args[, kwds]]) 同步调用

使用阻塞的方式调用func 

close()

关闭Pool,使其不再接受新的任务 

terminate()

强制终止任务,不管是否完成 

join([timeout])

 主进程阻塞,等待子进程的退出,必须在close()或terminate()后使用  timeout为超时时间,秒级(pool.join(1) 设定超时时间为1s) 
# Pool 例子(apply_async() 异步) #!/usr/bin/env python3 #-*- coding: utf-8 -*- from multiprocessing import Pool;  import os  import time  import random  run_start = time.time()  def worker(msg):     t_start = time.time()     print('{}开始执行,进程号为:{}'.format(msg, os.getpid()))     # random.random()随机生成0-1之间的浮点数     sleep_num = random.random()*2     time.sleep(sleep_num)     t_stop = time.time()     use_time = t_stop-t_start     print('{}执行完毕,用时{}'.format(msg, use_time))      print('start'.center(30, '*'))  po = Pool(3) # 定义一个进程池,设置最大进程数为3  for i in range(10):     # Pool.apply_async(要调用的目标,(传递给目标的参数元组,))     # 每次循环将会用空闲出的子进程去调用目标     po.apply_async(worker, (i, )) # 运行worker函数,并且worker传递参数为i      po.close() # 关闭进程池,关闭后po不再接受新的请求  # 进程阻塞,如果不添加此句,会出现主进程执行结束后直接关闭, # 子进程无法执行 po.join() # 等待po中所有子进程执行完成,必须放在close语句后 print('end'.center(30, '*'))  run_stop = time.time() run_time = run_stop-run_start print('总用时:',run_time) 
************start************* 1开始执行,进程号为:8905 2开始执行,进程号为:8906 0开始执行,进程号为:8904 1执行完毕,用时1.102046012878418 3开始执行,进程号为:8905 0执行完毕,用时1.7687046527862549 4开始执行,进程号为:8904 2执行完毕,用时1.7871208190917969 5开始执行,进程号为:8906 4执行完毕,用时0.04486870765686035 6开始执行,进程号为:8904 3执行完毕,用时1.059372901916504 7开始执行,进程号为:8905 5执行完毕,用时0.5280158519744873 8开始执行,进程号为:8906 6执行完毕,用时0.6856362819671631 9开始执行,进程号为:8904 8执行完毕,用时0.30577731132507324 9执行完毕,用时0.3760497570037842 7执行完毕,用时1.3986475467681885 *************end************** 总用时: 3.6309187412261963 
'''apply阻塞式''' # Pool 例子 #!/usr/bin/env python3 #-*- coding: utf-8 -*- from multiprocessing import Pool;  import os  import time  import random   run_start = time.time()  def worker(msg):     t_start = time.time()     print('{}开始执行,进程号为:{}'.format(msg, os.getpid()))     # random.random()随机生成0-1之间的浮点数     sleep_num = random.random()*2     time.sleep(sleep_num)     t_stop = time.time()     use_time = t_stop-t_start     print('{}执行完毕,用时{}'.format(msg, use_time))      print('start'.center(30, '*'))  po = Pool(3) # 定义一个进程池,设置最大进程数为3  for i in range(10):     # Pool.apply(要调用的目标,(传递给目标的参数元组,))     # 每次循环将会用空闲出的子进程去调用目标     po.apply(worker, (i, )) # 运行worker函数,并且worker传递参数为i      po.close() # 关闭进程池,关闭后po不在接受新的请求  # 进程阻塞,如果不添加此句,会出现主进程执行结束后直接关闭, # 子进程无法执行 po.join() # 等待po中所有子进程执行完成,必须放在close语句后 print('end'.center(30, '*'))  run_stop = time.time() run_time = run_stop-run_start print('总用时:',run_time) 
************start************* 0开始执行,进程号为:9181 0执行完毕,用时0.21720004081726074 1开始执行,进程号为:9182 1执行完毕,用时1.266624927520752 2开始执行,进程号为:9183 2执行完毕,用时0.8020675182342529 3开始执行,进程号为:9181 3执行完毕,用时1.9233331680297852 4开始执行,进程号为:9182 4执行完毕,用时0.48862147331237793 5开始执行,进程号为:9183 5执行完毕,用时0.8913943767547607 6开始执行,进程号为:9181 6执行完毕,用时0.03457212448120117 7开始执行,进程号为:9182 7执行完毕,用时0.5286092758178711 8开始执行,进程号为:9183 8执行完毕,用时0.452136754989624 9开始执行,进程号为:9181 9执行完毕,用时1.9772589206695557 *************end************** 总用时: 8.642189741134644 

进程间的通信Queue

可以使用multiprocessing模块的Queue实现多进程之间的数据传递,

Queue本身是一个消息列队程序

# Queue 例子 from multiprocessing import Queue  q = Queue(3) # 初始化一个Queue对象,最多接收三条put消息 q.put('msg1') q.put('msg2') print(q.full()) # False q.put('msg3') print(q.full()) # True  # 消息队列已满,抛出异常, # 第一个try会等待2s后再抛出, # 第二个try会立刻抛出异常 try:     q.put('msg4',True, 2) except:     print('消息队列已满,现有消息数量:{}'.format(q.qsize()))  try:     q.put_nowait('msg4') except:     print('消息队列已满,现有消息数量:{}'.format(q.qsize()))      # 推荐 # 先判断消息队列是否已满,再写入 if not q.full():     q.put_nowait('msg4')  # 读取消息时,先判断消息队列是否为空,再读取 if not q.empty():     for i in range(q.qsize()):         print(q.get_nowait()) 
False True 消息队列已满,现有消息数量:3 消息队列已满,现有消息数量:3 msg1 msg2 msg3 
Queue使用

初始化Queue()对象时(如:q = Queue()),若Queue中未指定值,或为负值,表示可接受消息数列无上限(直到内存存满)

Queue.qsize()

返回当前队列包含的消息数量 

Queue.empty()

消息队列判空,队列为空返回True,队列已满返回False 

Queue.full()

消息队列判满,已满,返回True,否则返回False 

Queue.get([block[, timeout]])

获取队列的一条消息,然后将其从队列中移除,block默认值为True;  block参数设置:      1. 如果block使用默认值,且没有设置timeout(单位秒),消息队列如果已经没有空间写入,此时程序将被阻塞(停在写入状态),直到从消息队列腾出空间为止,若设置了timeout,则会等待timeout秒,若还没读取到任何消息,抛出”Queue.Empty“异常;          2. 若block值为False,消息队列为空,会立刻抛出”Queue.Empty“异常 

Queue.get_nowait()

相当于`Queue.get(False)` 

Queue.put(item[, block[, timeout]])

参数解释: [     item:将要写入消息队列的消息          block:阻塞设置                  block使用默认值(True),且未设置timeout(单位秒),消息队列已满,此时程序将被阻塞,直到消息队列有空间为止,若设置了timeout,等待超时后会抛出”Queue.full“异常                  block=False,消息队列已满,会立刻抛出”Queue.Full”异常          timeout:超时时间 ] 

Queue.put_nowait(item)

相当于Queue.put(item,False) 
# Queue 实例 # 这里注意:py文件名一定不能和Python中的保留名相同,不然会报错  from multiprocessing import Process, Queue  import os,time, random  # 写数据进程执行 def write(q):     for value in ['A', 'B', 'C']:         print('put {} to queue'.format(value))         q.put(value)         time.sleep(random.random())          # 读数据操作 def read(q):     while True:         if not q.empty():             value = q.get(True)             print('get {} from queue'.format(value))             time.sleep(random.random())                      else:             break              if __name__=='__main__':     # 父进程创建Queue,并传给各个子进程     q = Queue()     pw = Process(target=write, args=(q, ))     pr = Process(target=read, args=(q, ))          # 启动子进程pw,写入消息     pw.start()     # 等待子进程pw结束(加入进程阻塞)     pw.join()     # 启动读消息子进程pr     pr.start()     pr.join(5) # 针对read(q)的while设置,防止等待过长     # 设置超时时间     pw.terminate()     print('n所有数据读写完成!')       
put A to queue put B to queue put C to queue get A from queue get B from queue get C from queue  所有数据读写完成! 
进程池中的Queue

Pool中使用Queue:

使用的是multiprocessing.Manager()中的Queue(),不是multiprocessing.Queue() 否则会报错“RuntimeError: Queue objects should only be shared between processes through inheritance.” 
# Pool中使用Queue通信 # pool中使用的是multiprocessing.Manager的Queue from multiprocessing import Manager, Pool  import os, time, random   def reader(q):     print('reader <{}> run, father is <{}>'.format(os.getpid(), os.getppid()))          for i in range(q.qsize()):         print('reader get msg from Queue', q.get(True))          def writer(q):     print('writer <{}> run, father is <{}>'.format(os.getpid(), os.getppid()))          for i in range(5):         q.put(i)          if __name__=='__main__':     print('pid:{} start'.format(os.getpid()))     q = Manager().Queue() # 消息队列初始化使用的是Manager().Queue()     po = Pool()          # 阻塞模式创建进程,在writer()完全执行完后再执行reader(),     # 免去reader()中的死循环     po.apply(writer, (q, ))     po.apply(reader, (q, ))          po.close() # 关闭Pool          po.join() # 进程阻塞          print('pid:{} end'.format(os.getpid()))           
pid:2126 start writer <3475> run, father is <2126> reader <3477> run, father is <2126> reader get msg from Queue 0 reader get msg from Queue 1 reader get msg from Queue 2 reader get msg from Queue 3 reader get msg from Queue 4 pid:2126 end 
# 多进程拷贝文件 #-*- coding:utf-8 -*- from multiprocessing import Pool, Manager  import os, shutil # 刚好和os模块互补  def cp_file(name, oldFolderName, newFolderName, queue):     '''将旧文件夹中的文件拷贝到新文件夹'''     with open(os.path.join(oldFolderName,name), 'r+', encoding='utf-8') as fr:         content = fr.read()              with open(os.path.join(newFolderName, name), 'w', encoding='utf-8') as fw:         fw.write(content)              queue.put(name)  def main():      # 获取文件要copy的文件夹的名字     oldFolderName = input('input Folder Name:')     # 新建一个文件夹     newFolderName = oldFolderName+'-复件'     # 判断文件夹是否存在     if os.path.exists(newFolderName):         # 存在就删除后再新建         shutil.rmtree(newFolderName)         os.mkdir(newFolderName)          # 获取需要copy的文件夹中的所有文件名     fileNames = os.listdir(oldFolderName)     # 使用多进程方式copy文件     pool = Pool(6)     queue = Manager().Queue()          for name in fileNames:         pool.apply_async(cp_file, args=(name, oldFolderName, newFolderName, queue))     num = 0     all_num = len(fileNames)     while num<all_num:         queue.get()         num +=1         copyRate = num/all_num         print('rcopy的进度是{:.2f}%'.format((copyRate*100)), end='')          if __name__=='__main__':     main()          # 僵尸进程:父进程死了,但是子进程没死,也叫孤儿进程 
input Folder Name:multiprocessing_test copy的进度是100.00% 

shutil模块

参考于jb51

网址:https://www.jb51.net/article/145522.htm

copyfileobj(fsrc, fdst, length=16*1024): 将fsrc文件内容复制至fdst文件,length为fsrc每次读取的长度,用做缓冲区大小      fsrc: 源文件     fdst: 复制至fdst文件     length: 缓冲区大小,即fsrc每次读取的长度  import shutil f1 = open("file.txt","r") f2 = open("file_copy.txt","a+") shutil.copyfileobj(f1,f2,length=1024)  copyfile(src, dst): 将src文件内容复制至dst文件      src: 源文件路径     dst: 复制至dst文件,若dst文件不存在,将会生成一个dst文件;若存在将会被覆盖     follow_symlinks:设置为True时,若src为软连接,则当成文件复制;如果设置为False,复制软连接。默认为True。Python3新增参数  import shutil shutil.copyfile("file.txt","file_copy.txt")  copymode(src, dst): 将src文件权限复制至dst文件。文件内容,所有者和组不受影响      src: 源文件路径     dst: 将权限复制至dst文件,dst路径必须是真实的路径,并且文件必须存在,否则将会报文件找不到错误     follow_symlinks:设置为False时,src, dst皆为软连接,可以复制软连接权限,如果设置为True,则当成普通文件复制权限。默认为True。Python3新增参数  import shutil shutil.copymode("file.txt","file_copy.txt")  copystat(src, dst): 将权限,上次访问时间,上次修改时间以及src的标志复制到dst。文件内容,所有者和组不受影响      src: 源文件路径     dst: 将权限复制至dst文件,dst路径必须是真实的路径,并且文件必须存在,否则将会报文件找不到错误     follow_symlinks:设置为False时,src, dst皆为软连接,可以复制软连接权限、上次访问时间,上次修改时间以及src的标志,如果设置为True,则当成普通文件复制权限。默认为True。Python3新增参数       import shutil shutil.copystat("file.txt","file_copy.txt")  copy(src, dst): 将文件src复制至dst。dst可以是个目录,会在该目录下创建与src同名的文件,若该目录下存在同名文件,将会报错提示已经存在同名文件。权限会被一并复制。本质是先后调用了copyfile与copymode而已      src:源文件路径     dst:复制至dst文件夹或文件     follow_symlinks:设置为False时,src, dst皆为软连接,可以复制软连接权限,如果设置为True,则当成普通文件复制权限。默认为True。Python3新增参数   improt shutil,os shutil.copy("file.txt","file_copy.txt") # 或者 shutil.copy("file.txt",os.path.join(os.getcwd(),"copy"))  copy2(src, dst): 将文件src复制至dst。dst可以是个目录,会在该目录下创建与src同名的文件,若该目录下存在同名文件,将会报错提示已经存在同名文件。权限、上次访问时间、上次修改时间和src的标志会一并复制至dst。本质是先后调用了copyfile与copystat方法而已      src:源文件路径     dst:复制至dst文件夹或文件     follow_symlinks:设置为False时,src, dst皆为软连接,可以复制软连接权限、上次访问时间,上次修改时间以及src的标志,如果设置为True,则当成普通文件复制权限。默认为True。Python3新增参数   improt shutil,os shutil.copy2("file.txt","file_copy.txt") # 或者 shutil.copy2("file.txt",os.path.join(os.getcwd(),"copy"))  ignore_patterns(*patterns): 忽略模式,用于配合copytree()方法,传递文件将会被忽略,不会被拷贝      patterns:文件名称,元组  copytree(src, dst, symlinks=False, ignore=None): 拷贝文档树,将src文件夹里的所有内容拷贝至dst文件夹      src:源文件夹     dst:复制至dst文件夹,该文件夹会自动创建,需保证此文件夹不存在,否则将报错     symlinks:是否复制软连接,True复制软连接,False不复制,软连接会被当成文件复制过来,默认False     ignore:忽略模式,可传入ignore_patterns()     copy_function:拷贝文件的方式,可以传入一个可执行的处理函数,默认为copy2,Python3新增参数     ignore_dangling_symlinks:sysmlinks设置为False时,拷贝指向文件已删除的软连接时,将会报错,如果想消除这个异常,可以设置此值为True。默认为False,Python3新增参数       import shutil,os folder1 = os.path.join(os.getcwd(),"aaa") # bbb与ccc文件夹都可以不存在,会自动创建 folder2 = os.path.join(os.getcwd(),"bbb","ccc") # 将"abc.txt","bcd.txt"忽略,不复制 shutil.copytree(folder1,folder2,ignore=shutil.ignore_patterns("abc.txt","bcd.txt")  rmtree(path, ignore_errors=False, onerror=None): 移除文档树,将文件夹目录删除      ignore_errors:是否忽略错误,默认False     onerror:定义错误处理函数,需传递一个可执行的处理函数,该处理函数接收三个参数:函数、路径和excinfo  import shutil,os folder1 = os.path.join(os.getcwd(),"aaa") shutil.rmtree(folder1)  move(src, dst): 将src移动至dst目录下。若dst目录不存在,则效果等同于src改名为dst。若dst目录存在,将会把src文件夹的所有内容移动至该目录下面      src:源文件夹或文件     dst:移动至dst文件夹,或将文件改名为dst文件。如果src为文件夹,而dst为文件将会报错     copy_function:拷贝文件的方式,可以传入一个可执行的处理函数。默认为copy2,Python3新增参数   import shutil,os # 示例一,将src文件夹移动至dst文件夹下面,如果bbb文件夹不存在,则变成了重命名操作 folder1 = os.path.join(os.getcwd(),"aaa") folder2 = os.path.join(os.getcwd(),"bbb") shutil.move(folder1, folder2) # 示例二,将src文件移动至dst文件夹下面,如果bbb文件夹不存在,则变成了重命名操作 file1 = os.path.join(os.getcwd(),"aaa.txt") folder2 = os.path.join(os.getcwd(),"bbb") shutil.move(file1, folder2) # 示例三,将src文件重命名为dst文件(dst文件存在,将会覆盖) file1 = os.path.join(os.getcwd(),"aaa.txt") file2 = os.path.join(os.getcwd(),"bbb.txt") shutil.move(file1, file2)  disk_usage(path): 获取当前目录所在硬盘使用情况。Python3新增方法      path:文件夹或文件路径。windows中必须是文件夹路径,在linux中可以是文件路径和文件夹路径  import shutil.os path = os.path.join(os.getcwd(),"aaa") info = shutil.disk_usage(path) print(info)   # usage(total=95089164288, used=7953104896, free=87136059392)  chown(path, user=None, group=None): 修改路径指向的文件或文件夹的所有者或分组。Python3新增方法      path:路径     user:所有者,传递user的值必须是真实的,否则将报错no such user     group:分组,传递group的值必须是真实的,否则将报错no such group  import shutil,os path = os.path.join(os.getcwd(),"file.txt") shutil.chown(path,user="root",group="root")  which(cmd, mode=os.F_OK | os.X_OK, path=None): 获取给定的cmd命令的可执行文件的路径。Python3新增方法  import shutil info = shutil.which("python3") print(info)   # /usr/bin/python3  归档操作  shutil还提供了创建和读取压缩和存档文件的高级使用程序。内部实现主要依靠的是zipfile和tarfile模块  make_archive(base_name, format, root_dir, …): 生成压缩文件      base_name:压缩文件的文件名,不允许有扩展名,因为会根据压缩格式生成相应的扩展名     format:压缩格式     root_dir:将制定文件夹进行压缩   import shutil,os base_name = os.path.join(os.getcwd(),"aaa") format = "zip" root_dir = os.path.join(os.getcwd(),"aaa") # 将会root_dir文件夹下的内容进行压缩,生成一个aaa.zip文件 shutil.make_archive(base_name, format, root_dir)  get_archive_formats(): 获取支持的压缩文件格式。目前支持的有:tar、zip、gztar、bztar。在Python3还多支持一种格式xztar  unpack_archive(filename, extract_dir=None, format=None): 解压操作。Python3新增方法      filename:文件路径     extract_dir:解压至的文件夹路径。文件夹可以不存在,会自动生成     format:解压格式,默认为None,会根据扩展名自动选择解压格式   import shutil,os zip_path = os.path.join(os.getcwd(),"aaa.zip") extract_dir = os.path.join(os.getcwd(),"aaa") shutil.unpack_archive(zip_path, extract_dir)  get_unpack_formats(): 获取支持的解压文件格式。目前支持的有:tar、zip、gztar、bztar和xztar。Python3新增方法 

线程

线程是应用程序中工作的最小单元。

线程可以被抢占(中断)

在其他线程正在运行时,线程可以暂时搁置(睡眠),这是线程的退让。

线程的执行顺序由操作系统的调度算法决定
线程调度
Python3学习笔记_H(进程、线程)线程可以分为:

内核线程:由操作系统内核创建和撤销。  用户线程:不需要内核支持而在用户程序中实现的线程。 

Python使用线程:

函数:_thread模块 `_thread.start_new_thread(function, args[, kwargs])` 不建议使用,推荐使用threading  类:Thread类 
# 未使用模块的单线程 例子 import time   def about():     print('我是单线程,我就是个弟弟')     time.sleep(0.5)      if __name__=='__main__':     for i in range(5):         about() 
我是单线程,我就是个弟弟 我是单线程,我就是个弟弟 我是单线程,我就是个弟弟 我是单线程,我就是个弟弟 我是单线程,我就是个弟弟 
''' 调用thread模块的函数式单线程 import _thread  _thread.start_new_thread(fucntion, args[, kwargs])  参数说明: [     funciton 线程调用函数,          args 传入被调用线程函数的参数,必须为tuple类型“(1,)”          kwargs 可选参数,dict类型(关键字参数) ] ''' # thread 线程例子 import _thread import time  # 定义调用函数 def print_time(threadName, delay):     count = 0     while count<3:         time.sleep(delay)         count +=1         print('线程:{}, 时间:{}'.format(threadName, time.ctime(time.time())))          # 创建两个线程 try:     _thread.start_new_thread(print_time, ('Thread-1', 1))     _thread.start_new_thread(print_time, ('Thread-2', 3)) except Exception:     print("Error:can't start thread" )      
线程:Thread-1, 时间:Fri Jan 11 16:31:51 2019 线程:Thread-1, 时间:Fri Jan 11 16:31:52 2019 线程:Thread-2, 时间:Fri Jan 11 16:31:53 2019 线程:Thread-1, 时间:Fri Jan 11 16:31:53 2019 线程:Thread-2, 时间:Fri Jan 11 16:31:56 2019 线程:Thread-2, 时间:Fri Jan 11 16:31:59 2019 

多线程

多线程类似于同时执行多个不同程序,多线程优点:

(1)易于调度。  (2)提高并发性。通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。  (3)开销少。创建线程比创建进程要快,所需开销很少 
线程模块

Python3中通过两个标准库_thread(python2中为thread)和threading提供对线程的支持。
_thread 提供低级别的、原始的线程以及一个简单的锁。

threading 提供包括_thread提供的方法以及其他方法:

threading.currentThread() 返回当前的线程变量

threading.enumerate ()返回一个包含正在运行的线程的list,正在运行指线程启动后、结束前,不包括启动前和终止后的线程

threading.activeCount() 返回正在运行的线程数量,
len(threading.enumerate())作用相同

线程模块同样提供Thread类处理线程,Thread提供的方法:
run() 用以表示线程活动的方法

start() 启动线程

join(timeout) 阻塞调用,直到join()被中止、正常退出、抛出异常或是超时

isAlive() 判断线程是否活动 这里isAlive()=is_alive()

getName() 获取线程名

setName() 设置线程名

# threading 多线程 例子 import threading import time   def about():     print('我是多线程,我就是快')     time.sleep(0.5)      if __name__=='__main__':     for i in range(5):         # 创建新的线程         t = threading.Thread(target=about) # 这里about不要加()         t.start() # 启动线程         # 快的不要不要的 
我是多线程,我就是快 我是多线程,我就是快 我是多线程,我就是快 我是多线程,我就是快 我是多线程,我就是快 
# 主线程会等待所有子线程结束后才结束 import threading  from time import sleep, ctime   def sing():     for i in range(3):         print('singing', i)         sleep(1)          def dance():     for i in range(3):         print('dancing', i)         sleep(1)          if __name__=='__main__':     print('start'.center(30, '*'))          # 创建线程     t1 = threading.Thread(target=sing)     t2 = threading.Thread(target=dance)          # 启动线程     t1.start()      t2.start()          sleep(6) # 不写此行,会出现歌舞未完晚会先完的笑话          print('end'.center(30, '*')) 
************start************* singing 0 dancing 0 singing 1 dancing 1 singing 2 dancing 2 *************end************** 
查看线程数目
# 查看线程数目例子 #!/usr/bin/python3 # -*- coding:utf-8 -*- import threading  from time import sleep, ctime   def sing():     for i in range(3):         print('singing', i)         sleep(1)          def dance():     for i in range(3):         print('dancing', i)         sleep(1)          if __name__=='__main__':     print('start'.center(30, '*'), ctime())     t1 = threading.Thread(target=sing)     t2 = threading.Thread(target=sing)          t1.start()     t2.start()          if threading.activeCount()>1:         print('当前线程数为:', threading.activeCount())         sleep(0.5)     print('end'.center(30, '*'), ctime())     '''这里jupyter notebook运行此处代码会出现死循环,但是在linux下无此问题,     所以这里不运行代码,用Linux截图显示''' 
************start************* Sat Feb 15 14:09:55 2020 singing 0 singing 0 当前线程数为: 7 *************end************** Sat Feb 15 14:09:56 2020 singing 1 singing 1 singing 2 singing 2 

Python3学习笔记_H(进程、线程)

threading 注意点
''' 线程执行代码的封装:     在使用threading模块时,定义一个新的子类,继承threading.Thread,     然后重写run方法 ''' # 线程执行代码的封装 例子 import threading  import time   class MyThread(threading.Thread): # 继承threading.Thread     def run(self): # 重写run()         for i in range(3):             time.sleep(1)             msg = '当前线程为:'+self.name+'@'+str(i)             print(msg)              if __name__=='__main__':     mt = MyThread()     mt.start() 
当前线程为:[email protected] 当前线程为:[email protected] 当前线程为:[email protected] 
''' 例子说明:  python的threading.Thread类有一个run()方法,用于定义线程的功能函数,  可在自己的线程类中覆盖该方法,在创建自己的线程实例后, python会调用用户自定义的run() ''' 
多线程共享全局变量

在一个进程内的所有线程共享全局变量,能够在不适用其他方式的前提下完成多线程之间的数据共享(这点要比多进程要好)

缺点就是,线程是对全局变量随意修改可能造成多线程之间对全局变量的混乱(即线程非安全)

# 多线程共享全局变量 例子 from threading import Thread  import time   g_num = 50  def work1():     global g_num          for i in range(5):         g_num +=2         print('nin work1, g_num = {}'.format(g_num))  def work2():     global g_num          for i in range(3):         g_num += 3         print('nin work2, g_num = {}'.format(g_num))          if __name__=='__main__':     t1 = Thread(target=work1)     t2 = Thread(target=work2)          t1.start()     t2.start()     # 这里会出现全局变量混乱现象 
in work1, g_num =  52  in work1, g_num =  54  in work1, g_num =  56  in work1, g_num =  58  in work1, g_num =  60  in work2, g_num =  63  in work2, g_num =  66  in work2, g_num =  69 

Python3学习笔记_H(进程、线程)

列表当实参传递到线程中
# 例子 from threading import Thread  import time   def work1(nums):     nums.append(44)     print('in work1'.center(30, "*"), nums)      def work2(nums):     # 延时,保证t1线程执行完毕     time.sleep(1)     print('in work2'.center(30, "*"), nums)      g_nums = [11, 22, 33]  t1 = Thread(target=work1, args=(g_nums, )) t1.start()  t2 = Thread(target=work2, args=(g_nums, )) t2.start() 
***********in work1*********** [11, 22, 33, 44] ***********in work2*********** [11, 22, 33, 44] 
进程VS线程

进程:进程是系统进行资源分配和调度的一个独立单位.

线程线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.

进程可以理解为一台电脑上可以同时运行多个QQ
线程可以理解为一个QQ可以开多个聊天窗口

进程和线程的关系:

(1)一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程。

(2)资源分配给进程,同一进程的所有线程共享该进程的所有资源。

(3)处理机分给线程,即真正在处理机上运行的是线程

(4)线程在执行过程中,需要协作同步。不同进程的线程间要利用消息通信的办法实现同步。线程是指进程内的一个执行单元,也是进程内的可调度实体.

进程与线程的区别:

(1)调度:线程作为调度和分配的基本单位,进程作为拥有资源的基本单位

(2)并发性:不仅进程之间可以并发执行,同一个进程的多个线程之间也可并发执行

(3)拥有资源:进程是拥有资源的一个独立单位,线程不拥有系统资源,但可以访问隶属于进程的资源.

(4)系统开销:在创建或撤消进程时,由于系统都要为之分配和回收资源,导致系统的开销明显大于创建或撤消线程时的开销。

各自优缺点

线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。

线程同步

线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。

互斥锁为资源引入一个状态:锁定/非锁定。

某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。

使用 Thread 对象的 Lock 和 Rlock 可以实现简单的线程同步,这两个对象都有 acquire 方法和 release 方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到 acquire 和 release 方法之间。

# 线程不安全 例子 from threading import Thread  import time   g_num = 0 g_number = 0  def test1():     global g_num          for i in range(1000000):         g_num +=1     print('in test1, g_num = {}'.format(g_num))          def test2():     global g_num          for i in range(1000000):         g_num +=1     print('***in test2, g_num = {}'.format(g_num))          def test3():     global g_number          for i in range(1000000):         g_number +=1     print('***in test3, g_number = {}'.format(g_number))          def test4():     global g_number          for i in range(1000000):         g_number +=1     print('***in test4, g_number = {}'.format(g_number))          # 未对线程运行进行调节 t1 = Thread(target=test1) t1.start()  t2 = Thread(target=test2) t2.start()  print('g_num = {}'.format(g_num))  time.sleep(5) # 保证线程t1、t2执行完毕  # 对线程运行进行调节 t3 = Thread(target=test3) t3.start()  time.sleep(5) # 睡眠5s,保证线程t3可以完全执行完毕  t4 = Thread(target=test4) t4.start() t4.join()# 线程阻塞 print('g_number = {}'.format(g_number)) 
g_num = 362029 in test1, g_num = 1057030 ***in test2, g_num = 1584212 ***in test3, g_number = 1000000 ***in test4, g_number = 2000000 g_number = 2000000 
互斥锁

当多个线程几乎同时修改某一个共享数据的时候,需要进行同步控制
线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。

互斥锁为资源引入一个状态:锁定/非锁定。

添加锁的原则是:在保证程序正确的前提下,尽可能的少锁住代码。(这样锁死的时间就越少)

threading模块中的Lock类,可以方便的处理锁定:

创建锁: mutex = threading.Lock()

锁定:mutex.acquire([blocking])

blocking参数解释:  若设定blocking为True,则当前线程堵塞,直到获得这个锁为止(默认为True)  若blocking设置为False,则当前线程不会堵塞 

释放:mutex.release()

# 用互斥锁实现上例的线程同步 例子 from threading import Thread, Lock  from time import sleep   g_num = 0  def test1():     global g_num     mutexFlag = mutex.acquire(True)     # True表示阻塞,即若这个锁在要上锁之前已经被上锁,那么此线程会一直等待到解锁     # False表示非阻塞,即不管本次上锁能否成功,都不会卡在这里,会继续执行下面的代码         if mutexFlag:         for i in range(100000):                 g_num +=1         mutex.release()     print('in test1, g_num={}'.format(g_num))      def test2():     global g_num     mutexFlag = mutex.acquire(True)     # True表示阻塞,即若这个锁在要上锁之前已经被上锁,那么此线程会一直等待到解锁     # False表示非阻塞,即不管本次上锁能否成功,都不会卡在这里,会继续执行下面的代码         if mutexFlag:         for i in range(100000):             g_num +=1         mutex.release()     print('**in test2, g_num={}'.format(g_num))      if __name__ == "__main__":     # 创建一个互斥锁,这个锁默认是未上锁状态     mutex = Lock()      t1 = Thread(target=test1)     t1.start()      t2 = Thread(target=test2)     t2.start()     sleep(3)     print('for total g_num = {}'.format(g_num)) 
in test1, g_num=100000 **in test2, g_num=200000 for total g_num = 200000 
''' 上锁解锁过程  当一个线程调用Lock的acquire()获得锁时,锁就进入“locked’状态  每次只有一个线程可以获得锁,如果此时另一个线程试图获得此锁, 尝试获取锁线程会变为”blocked“状态,称为”阻塞“ 直到拥有锁的线程调用锁的release()方法释放锁,锁进入”unlocked“状态  线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁, 并使得该线程进入运行(running)状态。 '''  ''' 锁的好处:     确保了某段关键代码只能由一个线程从头到尾完整地执行      锁的坏处:     阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,     效率就大大地下降了          由于可以存在多个锁,不同的线程持有不同的锁,     并试图获取对方持有的锁时,可能会造成死锁 '''  ''' 对多线程-非共享数据(非全局变量):     在多线程开发中,全局变量是多个线程都共享的数据,     而局部变量等是各自线程的,是非共享的 ''' 
死锁

在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
Python3学习笔记_H(进程、线程)
避免死锁
程序设计时要尽量避免(银行家算法)
添加超时时间等

银行家算法

一个银行家拥有一定数量的资金,有若干个客户要贷款。每个客户须在一开始就声明他所需贷款的总额。若该客户贷款总额不超过银行家的资金总数,银行家可以接收客户的要求。客户贷款是以每次一个资金单位(如1万RMB等)的方式进行的,客户在借满所需的全部单位款额之前可能会等待,但银行家须保证这种等待是有限的,可完成的。

例如:有三个客户C1,C2,C3,向银行家借款,该银行家的资金总额为10个资金单位,其中C1客户要借9各资金单位,C2客户要借3个资金单位,C3客户要借8个资金单位,总计20个资金单位。某一时刻的状态如图所示。
Python3学习笔记_H(进程、线程)
对于a图的状态,按照安全序列的要求,我们选的第一个客户应满足该客户所需的贷款小于等于银行家当前所剩余的钱款,可以看出只有C2客户能被满足:C2客户需1个资金单位,小银行家手中的2个资金单位,于是银行家把1个资金单位借给C2客户,使之完成工作并归还所借的3个资金单位的钱,进入b图。同理,银行家把4个资金单位借给C3客户,使其完成工作,在c图中,只剩一个客户C1,它需7个资金单位,这时银行家有8个资金单位,所以C1也能顺利借到钱并完成工作。最后(见图d)银行家收回全部10个资金单位,保证不赔本。那麽客户序列{C1,C2,C3}就是个安全序列,按照这个序列贷款,银行家才是安全的。否则的话,若在图b状态时,银行家把手中的4个资金单位借给了C1,则出现不安全状态:这时C1,C3均不能完成工作,而银行家手中又没有钱了,系统陷入僵持局面,银行家也不能收回投资。

综上所述,银行家算法是从当前状态出发,逐个按安全序列检查各客户谁能完成其工作,然后假定其完成工作且归还全部贷款,再进而检查下一个能完成工作的客户,…。如果所有客户都能完成工作,则找到一个安全序列,银行家才是安全的。

'''同步应用''' # 使用互斥锁完成多个任务,有序的进行工作 import queue from threading import Lock, Thread  flag = True # 循环标记 i = 0 # 运行次数   def task1(q, lock, another):     '''同步函数1'''     global flag     global i     while flag:         if q.full() != True:             if lock.acquire():                 i += 1                 try: # 队列满后退出循环                     q.put(1, timeout=0.1)                 except Exception:                     break                 print("this is task1 at NO.{}".format(i))                 another.release()         else:             flag = False   def task2(q, lock, another):     '''同步函数2'''     global flag     global i     while flag:         if q.full() != True:             if lock.acquire():                 i += 1                 try:                     q.put(1, timeout=0.1)                 except Exception:                     break                 print("this is task2 at NO.{}".format(i))                 another.release()         else:             flag = False   if __name__ == "__main__":     q = queue.Queue(4) # 创建一个大小为4的消息队列          # 创建两把锁     lock1 = Lock()     lock2 = Lock()     lock2.acquire()          # 初始化线程t1和t2     t1 = Thread(         target=task1, args=(             q,             lock1,             lock2,         ))      t2 = Thread(         target=task2, args=(             q,             lock2,             lock1,         ))          # 启动线程并对线程设置阻塞时间     t1.start()     t2.start()     t1.join(1)     t2.join(1) 
this is task1 at NO.1 this is task2 at NO.2 this is task1 at NO.3 this is task2 at NO.4 
生产者消费者模式

队列

先进先出

后进先出

线程优先级队列( Queue):

Python 的 Queue 模块中提供了同步的、线程安全的队列类,  包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列 PriorityQueue 

这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),
能够在多线程中直接使用。可以使用队列来实现线程间的同步。

import queue

q = queue.Queue()
这里的Queue和进程里Queue的方法和属性功能相同,不再赘述
Queue.task_done()
在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号

# 用FIFO队列实现生成者消费者问题 例子: from time import time, sleep from queue import Queue from threading import Thread  q = Queue(maxsize=10)  # 声明队列   def producter(pro_name):     '''生产者'''     number = 0  # 产品批次     # 队列不满时,生产产品     while number < 20:         number += 1         try:  # 处理队满异常,队满则中止生产,不满再打印生产信息             q.put(number, timeout=0.1)  # 放入产品         except Exception:             break         else:             print("<{}>生产了第<{}>个产品".format(pro_name, number))   def consumer(con_name):     '''消费者'''     # 当队列非空时消费产品     while not (q.empty()):         try:  # 处理空队列异常,队空再停止消费,不空则打印消费信息             seq = q.get(timeout=0.1)         except Exception:             break         else:             print("<{}>消费了第<{}>个产品".format(con_name, seq))   if __name__ == "__main__":     p1 = Thread(target=producter, args=("老王", ))  # 老王生产     c1 = Thread(target=consumer, args=("张三", ))  # 张三消费     c2 = Thread(target=consumer, args=("李四", ))  # 李四消费      p1.start()     # 当producter中的while 条件为 not(q.full())时,需要添加如下:     # sleep(1)  # 暂停1秒,防止生产队列因多线程消费而混乱     c1.start()     c2.start()      p1.join(2)     c1.join(3)     c2.join(3)      if q.empty():         sleep(1)         print("产出的产品全部消费完了!")  
<老王>生产了第<1>个产品 <老王>生产了第<2>个产品 <老王>生产了第<3>个产品 <老王>生产了第<4>个产品 <老王>生产了第<5>个产品 <老王>生产了第<6>个产品 <老王>生产了第<7>个产品 <老王>生产了第<8>个产品 <老王>生产了第<9>个产品 <老王>生产了第<10>个产品 <张三>消费了第<1>个产品 <张三>消费了第<2>个产品 <老王>生产了第<11>个产品 <老王>生产了第<12>个产品 <李四>消费了第<3>个产品<张三>消费了第<4>个产品<老王>生产了第<13>个产品 <老王>生产了第<14>个产品  <李四>消费了第<5>个产品 <李四>消费了第<6>个产品 <李四>消费了第<7>个产品 <李四>消费了第<8>个产品 <李四>消费了第<9>个产品 <李四>消费了第<10>个产品 <李四>消费了第<11>个产品 <李四>消费了第<12>个产品 <李四>消费了第<13>个产品 <李四>消费了第<14>个产品  <老王>生产了第<15>个产品 <老王>生产了第<16>个产品 <老王>生产了第<17>个产品 <老王>生产了第<18>个产品 <老王>生产了第<19>个产品 <老王>生产了第<20>个产品 <李四>消费了第<15>个产品 <李四>消费了第<16>个产品 <李四>消费了第<17>个产品 <李四>消费了第<18>个产品 <李四>消费了第<19>个产品 <李四>消费了第<20>个产品 产出的产品全部消费完了! 
ThreadLocal

多线程环境下,每个线程都有自己的数据。一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。

''' 使用函数传参的方法 每层都要传入参数,啰嗦的很 ''' def process_student(name):     std = Student(name)     # std是局部变量,但是每个函数都要用它,因此必须传进去:     do_task_1(std)     do_task_2(std)  def do_task_1(std):     do_subtask_1(std)     do_subtask_2(std)  def do_task_2(std):     do_subtask_2(std)     do_subtask_2(std)      ''' 使用全局字典的方法 这种方式理论上是可行的, 它最大的优点是消除了std对象在每层函数中的传递问题, 但是,每个函数获取std的代码有点low。 ''' global_dict = {}  def std_thread(name):     std = Student(name)     # 把std放到全局变量global_dict中:     global_dict[threading.current_thread()] = std     do_task_1()     do_task_2()  def do_task_1():     # 不传入std,而是根据当前线程查找:     std = global_dict[threading.current_thread()]     ...  def do_task_2():     # 任何函数都可以查找出当前线程的std变量:     std = global_dict[threading.current_thread()]     ... 
使用ThreadLocal

threadlocal是用于解决多线程之间的共享数据的参数紊乱问题

import threading  # 用threading.local() 生成全局变量 global_var = threading.local() 

ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。

一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题

# ThreadLocal 例子 from threading import local  global_var = local()  # 创建全局ThreadLocal对象   def thread_student():     '''学生线程'''     student = global_var.student  # 学生     print("Hello, Student: %s" % (student))   def process_thread(name):     '''绑定ThreadLocal对象'''     global_var.student = name     thread_student()   if __name__ == "__main__":     t1 = Thread(target=process_thread, args=("张三", ))     t2 = Thread(target=process_thread, args=("老王", ))      t1.start()     t2.start()      t1.join(1)     t2.join(1)  # 全局变量local_school就是一个ThreadLocal对象, # 每个Thread对它都可以读写student属性,但互不影响。 # 你可以把local_school看成全局变量, # 但每个属性如local_school.student都是线程的局部变量, # 可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。  # 可以理解为全局变量local_school是一个dict, # 不但可以用local_school.student,还可以绑定其他变量, # 如local_school.teacher等等。  
Hello, Student: 张三 Hello, Student: 老王 
异步

老张爱喝茶,废话不说,煮开水。
出场人物:老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。
1 老张把水壶放到火上,立等水开。(同步阻塞)
老张觉得自己有点傻
2 老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)
老张还是觉得自己有点傻,于是变高端了,买了把会响笛的那种水壶。水开之后,能大声发出嘀的噪音。
3 老张把响水壶放到火上,立等水开。(异步阻塞)
老张觉得这样傻等意义不大
4 老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)
老张觉得自己聪明了。

所谓同步异步,只是对于水壶而言。
普通水壶,同步;响水壶,异步。
虽然都能干活,但响水壶可以在自己完工之后,提示老张水开了。这是普通水壶所不能及的。
同步只能让调用者去轮询自己(情况2中),造成老张效率的低下。

所谓阻塞非阻塞,仅仅对于老张而言。
立等的老张,阻塞;看电视的老张,非阻塞。
情况1和情况3中老张就是阻塞的,媳妇喊他都不知道。虽然3中响水壶是异步的,可对于立等的老张没有太大的意义。所以一般异步是配合非阻塞使用的,这样才能发挥异步的效用。

# 异步例子 from multiprocessing import Pool  import time  import os   def test_1():     print('进程池中的进程:pid = {}, ppid = {}'.format(os.getpid(), os.getppid()))     for i in range(3):         print('{}'.format(i).center(10, '*'))         time.sleep(0.5)              return 'function test_1'  def test_2(args):     print('callback func pid = {}'.format(os.getpid()))     print('callback func args = {}'.format(args))      pool = Pool(3) pool.apply_async(func=test_1, callback=test_2)  time.sleep(3)  print('主进程:pid = {}'.format(os.getpid())) 
进程池中的进程:pid = 8988, ppid = 2110 ****0***** ****1***** ****2***** callback func pid = 2110 callback func args = function test_1 主进程:pid = 2110 
GIL(全局解释锁)

这里特别注意:在python中多进程比多线程的效率要高

#加载动态库 from ctypes import *  lib = cdll.LoadLibrary("loadName") #这里是用于导入库 
分布式进程

参考网址

https://www.liaoxuefeng.com/wiki/1016959663602400/1017631559645600

ThreadProcess中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

# task_master.py  import random, time, queue from multiprocessing.managers import BaseManager  # 发送任务的队列: task_queue = queue.Queue() # 接收结果的队列: result_queue = queue.Queue()  # 从BaseManager继承的QueueManager: class QueueManager(BaseManager):     pass  # 把两个Queue都注册到网络上, callable参数关联了Queue对象: QueueManager.register('get_task_queue', callable=lambda: task_queue) QueueManager.register('get_result_queue', callable=lambda: result_queue) # 绑定端口5000, 设置验证码'abc': manager = QueueManager(address=('', 5000), authkey=b'abc') # 启动Queue: manager.start() # 获得通过网络访问的Queue对象: task = manager.get_task_queue() result = manager.get_result_queue() # 放几个任务进去: for i in range(10):     n = random.randint(0, 10000)     print('Put task %d...' % n)     task.put(n) # 从result队列读取结果: print('Try get results...') for i in range(10):     r = result.get(timeout=10)     print('Result: %s' % r) # 关闭: manager.shutdown() print('master exit.')  # 当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用, # 但是,在分布式多进程环境下, # 添加任务到Queue不可以直接对原始的task_queue进行操作, # 那样就绕过了QueueManager的封装, # 必须通过manager.get_task_queue()获得的Queue接口添加。  
# 在另一台机器上启动任务进程(本机上启动也可以): # task_worker.py  import time, sys, queue from multiprocessing.managers import BaseManager  # 创建类似的QueueManager: class QueueManager(BaseManager):     pass  # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字: QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')  # 连接到服务器,也就是运行task_master.py的机器: server_addr = '127.0.0.1' print('Connect to server %s...' % server_addr) # 端口和验证码注意保持与task_master.py设置的完全一致: m = QueueManager(address=(server_addr, 5000), authkey=b'abc') # 从网络连接: m.connect() # 获取Queue的对象: task = m.get_task_queue() result = m.get_result_queue() # 从task队列取任务,并把结果写入result队列: for i in range(10):     try:         n = task.get(timeout=1)         print('run task %d * %d...' % (n, n))         r = '%d * %d = %d' % (n, n, n*n)         time.sleep(1)         result.put(r)     except Queue.Empty:         print('task queue is empty.') # 处理结束: print('worker exit.') 

异步IO

为解决CPU高速执行能力和IO设备的龟速严重不匹配,提供了多线程和多进程方法,但是还有一种解决方法:异步IO。(当代码需要执行一个耗时的IO操作时,它只发出IO指令,并不等待IO结果,然后就去执行其他代码了。一段时间后,当IO返回结果时,再通知CPU进行处理。)

异步IO模型需要一个消息循环,在消息循环中,主线程不断地重复“读取消息-处理消息”这一过程:

loop = get_event_loop()
while True:
event = loop.get_event()
process_event(event)

 ##### 协程  **协程**,又称微线程,纤程。英文名Coroutine。  协程其实可以认为是比线程更小的执行单元。 为啥说他是一个执行单元,因为他自带CPU上下文。这样只要在合适的时机, 我们可以把一个协程 切换到另一个协程。 只要这个过程中保存或恢复 CPU上下文那么程序还是可以运行的。  通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定  **协程和线程差异**  线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。  **协程问题**  系统并不感知,所以操作系统不会帮你做切换。 那么谁来帮你做切换?让需要执行的协程更多的获得CPU时间才是问题的关键。  >**例子** >>目前的协程框架一般都是设计成 1:N 模式。所谓 1:N 就是一个线程作为一个容器里面放置多个协程。 那么谁来适时的切换这些协程?答案是由协程自己主动让出CPU,也就是每个协程池里面有一个调度器, 这个调度器是被动调度的。意思就是他不会主动调度。而且当一个协程发现自己执行不下去了(比如异步等待网络的数据回来,但是当前还没有数据到), 这个时候就可以由这个协程通知调度器,这个时候执行到调度器的代码,调度器根据事先设计好的调度算法找到当前最需要CPU的协程。 切换这个协程的CPU上下文把CPU的运行权交个这个协程,直到这个协程出现执行不下去需要等待的情况,或者它调用主动让出CPU的API之类,触发下一次调度。 >>>**这个实现有一个问题:**假设这个线程中有一个协程是CPU密集型的他没有IO操作, 也就是自己不会主动触发调度器调度的过程,那么就会出现其他协程得不到执行的情况, 所以这种情况下需要程序员自己避免。   ```python # 协程简单例子 from time import sleep  def A():     a = 1     while a<4:         print('A a = {}'.format(a))         yield a         a += 1         sleep(0.3)  def B(c):     while True:         try:             print('B'.center(20, '*'))             next(c)             sleep(0.3)         except StopIteration:             break  if __name__ == '__main__':     a = A()     B(a) 
*********B********** A a = 1 *********B********** A a = 2 *********B********** A a = 3 *********B********** 
#!/usr/bin/env python3 # -*- coding:utf-8 -*- # greenlet 版协程  from greenlet import greenlet from time import sleep  max = 5   def A():     a = 1     while a < max:         print('A a = {}'.format(a).center(20, '*'))         gr2.switch()  # 切换至B()         a += 1         sleep(0.3)   def B():     b = 1     while b < max:         print("b in B is: %d" % b)         gr1.switch()         b += 1         sleep(0.3)   gr1 = greenlet(A) gr2 = greenlet(B)  # 协程切换要注意:切换出协程后要记得切回,以便使携程运行完,防止出现孤儿 gr1.switch() print("gr1.dead = {}, gr2.dead = {}".format(gr1.dead, gr2.dead)) gr2.switch() # 若不切回B会出现B协程一直存活的现象 print("gr1.dead = {}, gr2.dead = {}".format(gr1.dead, gr2.dead)) 
******A a = 1******* b in B is: 1 ******A a = 2******* b in B is: 2 ******A a = 3******* b in B is: 3 ******A a = 4******* b in B is: 4 gr1.dead = True, gr2.dead = False gr1.dead = True, gr2.dead = True 
'''grevent:当遇到IO时,自动切换''' # 使用例子 # -*- coding: utf-8 -*-  import gevent  def test(n):     for i in range(n):         print('this is "{}" at NO.{}'.format(gevent.getcurrent(), i))  g1 = gevent.spawn(test, 3) g2 = gevent.spawn(test, 3)  g1.join() g2.join() 
this is "<Greenlet "Greenlet-0" at 0x1c4eb6f69d8: test(3)>" at NO.0 this is "<Greenlet "Greenlet-0" at 0x1c4eb6f69d8: test(3)>" at NO.1 this is "<Greenlet "Greenlet-0" at 0x1c4eb6f69d8: test(3)>" at NO.2 this is "<Greenlet "Greenlet-1" at 0x1c4ec265268: test(3)>" at NO.0 this is "<Greenlet "Greenlet-1" at 0x1c4ec265268: test(3)>" at NO.1 this is "<Greenlet "Greenlet-1" at 0x1c4ec265268: test(3)>" at NO.2 
# gevent 切换执行 # -*- coding: utf-8 -*-  import gevent  def test(n):     for i in range(n):         print('this is "{}" at NO.{}'.format(gevent.getcurrent(), i))         gevent.sleep(0.3)  g1 = gevent.spawn(test, 3) g2 = gevent.spawn(test, 3)  g1.join() g2.join()         
this is "<Greenlet "Greenlet-0" at 0x1c4ec265378: test(3)>" at NO.0 this is "<Greenlet "Greenlet-1" at 0x1c4ec265488: test(3)>" at NO.0 this is "<Greenlet "Greenlet-0" at 0x1c4ec265378: test(3)>" at NO.1 this is "<Greenlet "Greenlet-1" at 0x1c4ec265488: test(3)>" at NO.1 this is "<Greenlet "Greenlet-0" at 0x1c4ec265378: test(3)>" at NO.2 this is "<Greenlet "Greenlet-1" at 0x1c4ec265488: test(3)>" at NO.2 
asyncio、async/await、aiohttp

参考自廖雪峰的Python教程,不在此赘述,下文给出参考网址

asyncio

async/await

aiohttp

本文转自互联网,侵权联系删除Python3学习笔记_H(进程、线程)

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » Python3学习笔记_H(进程、线程)
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们