Python既支持多进程又支持多线程,因此使用Python实现并发编程主要有3种方式:
Unix和Linux操作系统上提供了fork()
系统调用来创建进程,调用fork()
函数的是父进程,创建出的是子进程,子进程是父进程的一个拷贝,但是子进程拥有自己的PID。
fork()
函数非常特殊它会返回两次,父进程中可以通过fork()
函数的返回值得到子进程的PID,而子进程中的返回值永远都是0。
Python的os模块提供了fork()
函数。由于Windows系统没有fork()
调用,因此要实现跨平台的多进程编程,可以使用multiprocessing模块的Process
类来创建子进程,而且该模块还提供了更高级的封装,例如批量启动进程的进程池(Pool
)、用于进程间通信的队列(Queue
)和管道(Pipe
)等。
注意点: 使用多进程、多线程之前,一定先要单独把程序跑通过再来改,不然里面报错,程序是不会终端,也没有任何提示,即便有打印消息,它也是在主进程结束时,一起打印出来。
import threading
import time
# 并行:真的多任务
# 并发:假的多任务“一起”,就是在不停的来回切
def sing():
for i in range(5):
print("正在唱歌")
time.sleep(1)
def dance():
for i in range(5):
print("正在跳舞----------")
time.sleep(1)
def main():
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
print(threading.enumerate()) # 只有主线程
t1.start() # 当主线程执行到这的时候,主线程会继续往下,同时会生成子线程去执行t1的sing函数
print(threading.enumerate())
t2.start() # 子线程执行顺序不确定,由操作系统来调度,想让谁先完成就加sleep多给时间
# 若想所有子线程都结束后,主线程再结束,子线程都必须在这有.join()
print(threading.enumerate()) # 主线程会比上面的子线程还先执行完(会看到这句还先打印出来)
if __name__ == '__main__':
main()
==join所完成的工作就是线程同步==:即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止。(注意第24行的备注)
class MyThread(threading.Thread): # 继承类的方式写多线程
def run(self): # 必须要有run方法
for i in range(5):
print("在唱第{}首歌曲".format(i))
time.sleep(1)
if __name__ == '__main__':
t = MyThread()
t.start() # 也是start调用,这回默认调用run方法
可以直接使用threading模块的Thread类来创建线程,但是我们之前讲过一个非常重要的概念叫“继承”,我们可以从已有的类创建新类,因此也可以通过继承Thread类的方式来创建自定义的线程类,然后再创建线程对象并启动线程。
from threading import Thread
class DownloadTask(Thread):
def __init__(self, file_name):
super(DownloadTask, self).__init__()
self._file_name = file_name
def run(self):
#名字必须是run,这是方法重写;Thread这个类里已经有自动调用了run
print("开始下载{}".format(self._file_name))
time_to_load = np.random.randint(5, 10)
time.sleep(time_to_load)
print("{}下载完成,花费{}秒!".format(self._file_name, time_to_load))
def main():
start = time.time()
t1 = DownloadTask("Python从入门到住院.pdf")
t1.start()
t2 = DownloadTask("Pejing Hot.avi")
t2.start()
t1.join()
t2.join()
end = time.time()
print("总共花了{}秒".format(end - start))
if __name__ == '__main__':
main()
# 定义一个全局变量
nums = 0
def func1(temp):
global nums # 当使用的全局变量是不可变类型的时候,还是用global声明一下
for i in range(temp):
nums += 1
print("现在nums={}".format(nums))
def func2(temp):
global nums
for i in range(temp):
nums += 1
print("现在nums={}".format(nums))
def main():
t1 = threading.Thread(target=func1, args=(1000000,), daemon=True) # 通过args去传调用函数的参数,注意必须是一个tuple(或者是()中一个列表)
t2 = threading.Thread(target=func2, args=(1000000,), daemon=True)
t1.start()
t2.start()
# 等待子线程运行完(一般是使用子线程.join()来等待)
time.sleep(3)
print("此时muns的值为:{}".format(nums)) # 当值很大的时候就会发现此时的nums并不等于2000000
# 那是因为没有锁,一个子线程+1后还没写入nums,就被另外一个子线程读取去+1了,那最后肯定小于2000000
if __name__ == '__main__':
main()
注:daemon属性(daemon属性默认值为False)
就是多线程之间资源共享,所以加互斥锁来保证数据的一致,但这也可能会导致死锁。
nums = 0
mutex = threading.Lock() # 整一个互斥锁,默认是没有上锁的
def test1(temp):
global nums
# mutex.acquire() # 上锁:如果之前没有上锁,则上锁成功;如果上锁之前被锁上了,就会堵塞在这里,直到这个锁被解开为止
# for i in range(temp):
# nums += 1
# mutex.release()
# print("现在nums={}".format(nums)) # 这种这里永远会是1000000,就有点单线程的意思了,这里执行完了才放开
for i in range(temp):
mutex.acquire()
nums += 1
mutex.release()
print("现在nums={}".format(nums)) # 这种这里的值就是不一定的,每次运行结果都不一样,但最终的结果一定是2000000
'''故上锁的代码越少越好'''
def test2(temp):
global nums
# mutex.acquire()
# for i in range(temp):
# nums += 1
# mutex.release()
for i in range(temp):
mutex.acquire()
nums += 1
mutex.release()
print("现在nums={}".format(nums))
def main():
t1 = threading.Thread(target=test1, args=(1000000,))
t2 = threading.Thread(target=test2, args=(1000000,))
t1.start()
t2.start()
time.sleep(1)
print("此时muns的值为:{}".format(nums))
if __name__ == '__main__':
main()
临界资源,锁:
因为多个线程可以共享进程的内存空间,因此要实现多个线程间的通信相对简单,大家能想到的最直接的办法就是设置一个全局变量,多个线程共享这个全局变量即可。但是当多个线程共享同一个变量(我们通常称之为“资源”)的时候,很有可能产生不可控的结果从而导致程序失效甚至崩溃。如果一个资源被多个线程竞争使用,那么我们通常称之为“临界资源”,对“临界资源”的访问需要加上保护,否则资源会处于“混乱”的状态。
下面的例子演示了100个线程向同一个银行账户转账(转入1元钱)的场景,在这个例子中,银行账户就是一个临界资源,在没有保护的情况下我们很有可能会得到错误的结果。
import time
from threading import Thread
class Account:
def __init__(self):
self._balance = 0
def deposit(self, money):
new_balance = self._balance + money
time.sleep(0.01)
self._balance = new_balance
@property #这个装饰器是将下面的self._balance属性弄成一个方法(这个方法其实就是属性了)
def balance(self):
return self._balance
class AddMoneyThread(Thread):
def __init__(self, account, money):
super(AddMoneyThread, self).__init__()
self._account = account
self._money = money
def run(self):
self._account.deposit(self._money)
def main():
account = Account()
threads = []
for i in range(100):
t = AddMoneyThread(account, 1)
threads.append(t)
t.start()
#结合上面的看,所有的线程都要先start(),之后又全都要join()
for t in threads:
t.join()
print("账户余额为:{}元".format(account.balance))
#这里直接点balabce,就是因为这个装饰器@property,不然不是返回的值,可以试
if __name__ == '__main__':
main()
运行上面的程序,100个线程分别向账户中转入1元钱,结果居然远远小于100元。之所以出现这种情况是因为我们没有对银行账户这个“临界资源”加以保护,多个线程同时向账户中存钱时,会一起执行到new_balance = self._balance + money
这行代码,多个线程得到的账户余额都是初始状态下的0
,所以都是0
上面做了+1的操作,因此得到了错误的结果。在这种情况下,“锁” 就可以派上用场了。我们可以通过“锁”来保护“临界资源”,只有获得“锁”的线程才能访问“临界资源”,而其他没有得到“锁”的线程只能被阻塞起来,直到获得“锁”的线程释放了“锁”,其他线程才有机会获得“锁”,进而访问被保护的“临界资源”。
下面的代码演示了如何使用“锁”来保护对银行账户的操作,从而获得正确的结果。
from threading import Thread, Lock #多了一个"Lock"
class Account:
def __init__(self):
self._balance = 0
self._lock = Lock() #这个"Lock"是一个类,先实例化
def deposit(self, money):
self._lock.acquire() #先获取锁后才能执行后续的代码
try:
new_balance = self._balance + money
time.sleep(0.01)
self._balance = new_balance
finally:
self._lock.release() #保证是否操作正常都能将锁释放掉
"""其他剩下部分的跟上面的都是一样的,这样的结果就是100"""
用户192.168.12,端口9999
import threading
import socket
def send(s_socket):
while True:
send_data = input("请输入要发送的消息:")
if send_data == "exit":
break
s_socket.sendto(send_data.encode('utf-8'), ("192.168.1.6", 7777))
def rec(r_socket):
while True:
receive_data = r_socket.recvfrom(1024)
out = receive_data[0].decode('utf-8')
print(out)
if out == "exit":
break
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(("", 9999))
t1 = threading.Thread(target=send, args=(udp_socket,))
t2 = threading.Thread(target=rec, args=(udp_socket,))
t1.start()
t2.start()
if __name__ == '__main__':
main()
用户192.168.1.6, 端口7777
import threading
import socket
def send(s_socket):
while True:
send_data = input("请输入要发送的消息:")
if send_data == "exit":
break
s_socket.sendto(send_data.encode('utf-8'), ("192.168.1.2", 9999))
def rec(r_socket):
while True:
receive_data = r_socket.recvfrom(1024)
out = receive_data[0].decode('utf-8')
print(out)
if out == "exit":
break
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(("", 7777))
t1 = threading.Thread(target=send, args=(udp_socket,))
t2 = threading.Thread(target=rec, args=(udp_socket,))
t1.start()
t2.start()
if __name__ == '__main__':
main()
这里是写了一个麦克风的示例,持续录音(录音部分用子线程),按下enter结束整个程序。且不再是使用.join()卡住主线程,而是用while循环,参考的是海康摄像头的持续抓取示例代码:
- 注意:全局的变量,在函数中若做了修改就需要加“global”关键字,没做修改的不用。同样在“if _name_ == ‘_main_’:” 这行函数下可随意用或是修改全局变量。
import pyaudio
import wave # python标准库
import traceback
import threading
import time
CHUNK = 1024 # 定义数据流块,,或者给1024?
FORMAT = pyaudio.paInt16 # 量化位数(音量级划分)
CHANNELS = 1 # 声道数;声道数:可以是单声道或者是双声道
RATE = 44100 # 采样率;采样率:一秒内对声音信号的采集次数,常用的有8kHz, 16kHz, 32kHz, 48kHz, 11.025kHz, 22.05kHz, 44.1kHz(44100)
RECORD_SECONDS = 1 # 录音秒数
WAVE_OUTPUT_FILENAME = "123.wav" # wav文件路径
DEVICE_INDEX = 1 # usb设备的index,一般插入的usb麦克风索引依次是 1、2、3、4,且名字里是带 “USB Audio Device”
g_Exit = False
def record(stream_obj, wf_obj):
global g_Exit # 注意这要加global(不修改这个变量的话,可以不加)
counts = 0
while True:
if g_Exit:
break
for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
data = stream_obj.read(CHUNK)
wf_obj.writeframes(data)
counts += 1
print(counts)
def run():
# 因为CHANNELS, RATE在下面做了修改,所以要加global
global CHANNELS, RATE, g_Exit # 像 FORMAT 这变量函数里没做修改,就不用加global
p = pyaudio.PyAudio()
try:
deviceInfo = p.get_device_info_by_index(DEVICE_INDEX)
except Exception as e:
traceback.print_exc()
else:
deviceName = deviceInfo["name"]
assert "USB Audio Device" in deviceName, "Error: wrong mic device index!"
CHANNELS = deviceInfo["maxInputChannels"] if deviceInfo.get("maxInputChannels") is not None else CHANNELS
RATE = deviceInfo["defaultSampleRate"] if deviceInfo.get("defaultSampleRate") is not None else RATE
RATE = int(RATE)
stream = p.open(format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
input_device_index=1)
# 写入文件
wf = wave.Wave_write(WAVE_OUTPUT_FILENAME) # 一个意思,默认就是wb,这样才有提示
wf.setnchannels(CHANNELS) # 配置声道数
wf.setsampwidth(p.get_sample_size(FORMAT)) # 配置量化位数
wf.setframerate(RATE) # 配置采样率
# 创建子线程
t1 = threading.Thread(target=record, args=(stream, wf))
t1.start() # 开始运行子线程
# 使用while阻塞主线程,按enter退出,而不是用t1.join()
print("Press enter to exit!")
while True:
str = input()
if str == "":
g_Exit = True
break
time.sleep(1.5)
stream.stop_stream()
stream.close()
p.terminate()
wf.close()
if __name__ == '__main__':
run()
进程会把资源都复制一份,代码一般都共用一份,然后是写时拷贝。
线程和进程的优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。
这个文章也讲的很不错。
特别注意: Process(target=download_task, args=(“Python.pdf”,))
参数的 target、args一定都要写上,==一定==
import os
import time
import multiprocessing
def download_task(file_name):
print("启动下载进程,进程号{}".format(os.getpid()))
# os.getpid()获取进程号
print("开始下载{}".format(file_name))
time_to_download = 5
time.sleep(time_to_download)
print("{}下载完成!花费了{}秒".format(file_name, time_to_download))
def main():
start = time.time()
# 注意:args里必须有括号,且有“逗号”
p1 = multiprocessing.Process(target=download_task, args=("Python.pdf",))
p1.start()
p2 = multiprocessing.Process(target=download_task, args=("Peking Hot.avi",))
p2.start()
p1.join()
p2.join()
end = time.time()
print("总共消耗了{}秒".format(end - start))
if __name__ == '__main__':
main()
Tips:
可能会:
import multiprocessing as mp mp.Pool(processes=max(mp.cpu_count() // 2, 4)) # mp.cpu_count()获取cpu核心数量
"""
当需要创建的子进程数量不多时,可以直接利用上面multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大﹐此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时﹐可以指定一个最大进程数﹐当有新的请求提交到Pool中时﹐如果池还没有满﹐那么就会创建一个新的进程用来执行该请求﹔
但如果池中的进程数已经达到指定的最大值﹐那么该请求就会等待﹐直到池中有进程结束﹐才会用之前的进程来执行新的任务﹐请看下面的实例︰ (Ps:注意这最后一句)
"""
import multiprocessing
import os, time, random
def worker(msg):
t_start = time.time()
print("{}开始执行,进程号为{}".format(msg, os.getpid()))
# 会发现结果中始终只有3个进程号,这个进程完了,再用这个进程号继续调度下一个任务
time.sleep(random.random()*2) # random.random()随机生成0-1之间的浮点数
t_stop = time.time()
print("{}执行完毕,耗时{:.2f}".format(msg, (t_stop - t_start)))
if __name__ == '__main__':
# 建议就直接用这种(就是写在if __name__ == '__main__':后)
# force设为True就是无论起那么是否设置过方法,就以这为准了;如果是False,则是如果前面没设置过,那现在就设置,如果前面设置过,这里设置就没有,不会覆盖。
multiprocessing.set_start_method(method="spawn", force=True)
pool = multiprocessing.Pool(3) # 定义一个进程池,最大进程数3,这个值算是超参数吧
for i in range(10):
# Pool().apply_async(要调用的目标函数名, (传递给目标的参数元祖,))
# 每次循环将会用空闲出来的子进程取调用目标
po.apply_async(worker, (i, )) # 重要的就是这里调用的这个函数
print("----------start---------")
pool.close() # 关闭进程池,关闭后po不再接受新的请求
pool.join() # 等待po中所有的子进程执行完毕,注意必须放在close语句之后;;且一定要有,不然主进程都结束了就啥都没了
print("----------end-----------")
注意:
po.apply_async(worker, (i, ))中的函数这些,一定不能有错误,比如这里给函数少传了一个参数等,都是不会报错的,程序会直接结束,不会有任何报错信息的。
method有三种:
之前写的pdf转txt
import os
import tqdm
import multiprocessing
import sys
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from pdfminer.pdfdevice import PDFDevice, TagExtractor
from pdfminer.pdfpage import PDFPage
from pdfminer.converter import XMLConverter, HTMLConverter, TextConverter
from pdfminer.cmapdb import CMapDB
from pdfminer.layout import LAParams
from pdfminer.image import ImageWriter
"""
-P password : PDF password.
-o output : Output file name.
-t text|html|xml|tag : Output type. (default: automatically inferred from the output file name.)
-O output_dir : Output directory for extracted images.
-c encoding : Output encoding. (default: utf-8)
-s scale : Output scale.
-R rotation : Rotates the page in degree.
-Y normal|loose|exact : Specifies the layout mode. (only for HTML output.)
-p pagenos : Processes certain pages only.
-m maxpages : Limits the number of maximum pages to process.
-S : Strips control characters.
-C : Disables resource caching.
-n : Disables layout analysis.
-A : Applies layout analysis for all texts including figures.
-V : Automatically detects vertical writing.
-M char_margin : Speficies the char margin.
-W word_margin : Speficies the word margin.
-L line_margin : Speficies the line margin.
-F boxes_flow : Speficies the box flow ratio.
-d : Turns on Debug output.
"""
def to_txt(argv):
import getopt
def usage():
print(f'usage: {argv[0]} [-P password] [-o output] [-t text|html|xml|tag]'
' [-O output_dir] [-c encoding] [-s scale] [-R rotation]'
' [-Y normal|loose|exact] [-p pagenos] [-m maxpages]'
' [-S] [-C] [-n] [-A] [-V] [-M char_margin] [-L line_margin]'
' [-W word_margin] [-F boxes_flow] [-d] input.pdf ...')
return 100
try:
(opts, args) = getopt.getopt(argv[1:], 'dP:o:t:O:c:s:R:Y:p:m:SCnAVM:W:L:F:')
except getopt.GetoptError:
return usage()
if not args: return usage()
# debug option
debug = 0
# input option
password = b''
pagenos = set()
maxpages = 0
# output option
outfile = None
outtype = None
imagewriter = None
rotation = 0
stripcontrol = False
layoutmode = 'normal'
encoding = 'utf-8'
pageno = 1
scale = 1
caching = True
showpageno = True
laparams = LAParams()
for (k, v) in opts:
if k == '-d': debug += 1
elif k == '-P': password = v.encode('ascii')
elif k == '-o': outfile = v
elif k == '-t': outtype = v
elif k == '-O': imagewriter = ImageWriter(v)
elif k == '-c': encoding = v
elif k == '-s': scale = float(v)
elif k == '-R': rotation = int(v)
elif k == '-Y': layoutmode = v
elif k == '-p': pagenos.update( int(x)-1 for x in v.split(',') )
elif k == '-m': maxpages = int(v)
elif k == '-S': stripcontrol = True
elif k == '-C': caching = False
elif k == '-n': laparams = None
elif k == '-A': laparams.all_texts = True
elif k == '-V': laparams.detect_vertical = True
elif k == '-M': laparams.char_margin = float(v)
elif k == '-W': laparams.word_margin = float(v)
elif k == '-L': laparams.line_margin = float(v)
elif k == '-F': laparams.boxes_flow = float(v)
#
PDFDocument.debug = debug
PDFParser.debug = debug
CMapDB.debug = debug
PDFPageInterpreter.debug = debug
#
rsrcmgr = PDFResourceManager(caching=caching)
if not outtype:
outtype = 'text'
if outfile:
if outfile.endswith('.htm') or outfile.endswith('.html'):
outtype = 'html'
elif outfile.endswith('.xml'):
outtype = 'xml'
elif outfile.endswith('.tag'):
outtype = 'tag'
if outfile:
outfp = open(outfile, 'w', encoding=encoding)
else:
outfp = sys.stdout
if outtype == 'text':
device = TextConverter(rsrcmgr, outfp, laparams=laparams,
imagewriter=imagewriter)
elif outtype == 'xml':
device = XMLConverter(rsrcmgr, outfp, laparams=laparams,
imagewriter=imagewriter,
stripcontrol=stripcontrol)
elif outtype == 'html':
device = HTMLConverter(rsrcmgr, outfp, scale=scale,
layoutmode=layoutmode, laparams=laparams,
imagewriter=imagewriter, debug=debug)
elif outtype == 'tag':
device = TagExtractor(rsrcmgr, outfp)
else:
return usage()
for fname in args:
with open(fname, 'rb') as fp:
interpreter = PDFPageInterpreter(rsrcmgr, device)
for page in PDFPage.get_pages(fp, pagenos,
maxpages=maxpages, password=password,
caching=caching, check_extractable=True):
page.rotate = (page.rotate+rotation) % 360
interpreter.process_page(page)
device.close()
outfp.close()
return
if __name__ == '__main__':
txt_save_path = r"/home/songhui/new_for_re/temp_txt"
total_path = r"/home/songhui/new_for_re/temp_data"
# error_file_path = os.path.join(txt_save_path, "error.txt")
# error_file = open(error_file_path, "w")
name_files = os.listdir(total_path)
# 使用多进程加快处理
print("开始处理:")
po = multiprocessing.Pool(35)
for com_code_name in tqdm.tqdm(name_files, desc="进度"):
save_path = os.path.join(txt_save_path, com_code_name)
os.makedirs(save_path, exist_ok=True)
com_path = os.path.join(total_path, com_code_name)
pdf_files = os.listdir(com_path)
for pdf_name in pdf_files:
txt_name = pdf_name.replace(".pdf", ".txt")
abs_pdf_path = os.path.join(com_path, pdf_name).replace("\\", "/")
abs_txt_path = os.path.join(save_path, txt_name).replace("\\", "/")
try:
po.apply_async(to_txt, args=(['', '-o', abs_txt_path, abs_pdf_path], ))
# to_txt(['', '-o', abs_txt_path, abs_pdf_path])
except Exception as e:
pass
# error_file.write(str(e) + "\n")
# error_file.flush()
po.close()
po.join()
print("处理完毕!")
# error_file.close()
线程池和进程池的区别如下:(但线程、进程数量少时,使用‘池’来做管理,可能开销更大)
python中使用线程池:看opencv源码中samples中的python示例“==video_threaded.py==”,这里简单放下几行关键代码(源码启发很大,并不是只有循环才去开一个线程)(这种方式可能会十分消耗CPU,很容易就把CPU拉满了)
from multiprocessing.pool import ThreadPool
from collections import deque
import cv2
def main():
pending = deque() # 这是python中的双端队列
threadn = cv2.getNumberOfCPUs() # 得到核心数量
# 还有多进程库中:multiprocessing.cpu_count()
pool = ThreadPool(processes = threadn)
# 后面的代码就不写了,只写几行核心的:
while True:
# process_frame是一个函数,里面不一定要有循环,也可以是一个密集计算的函数,可带返回值的那种。
if 多线程模式:
task = pool.apply_async(process_frame, (frame.copy(), t))
# task就是process_frame函数的返回值
else:
task = process_frame(frame, t)
特别注意:因为上面是while True死循环,所以可以这样,如果是自己的for循环,一定要加等待,不然主线程会直接退出 (没完全验证后,不确定对不对)
for i in range(50):
pool.apply_async(run, (300, ))
pool.close() # 关闭线程池,等待所有任务完成
pool.join() # 阻塞主线程,等待所有工作线程退出
3.1简单的拷贝
"""直接用循环,一次拷贝一个"""
t_start = time.time()
old_path = r"C:\Users\Administrator\Desktop\1"
files_name = os.listdir(old_path)
target_path = r"C:\Users\Administrator\Desktop\2"
for file_name in files_name:
file_abspath = os.path.join(old_path, file_name)
shutil.copy(file_abspath, target_path)
t_end = time.time()
print("用时:{:.2f}秒".format(t_end - t_start))
3.2使用多进程
Ps:这一定要在if ___name__ == ‘__main__‘:里面执行
import multiprocessing
if __name__ == '__main__':
t_start = time.time()
old_path = r"C:\Users\Administrator\Desktop\1"
files_name = os.listdir(old_path)
target_path = r"C:\Users\Administrator\Desktop\2"
po = multiprocessing.Pool(5) # 创建进程池
for file_name in files_name:
file_abspath = os.path.join(old_path, file_name)
po.apply_async(shutil.copy, args=(file_abspath, target_path))
# 每个文件的复制就是一个进程
po.close()
po.join()
t_end = time.time()
print("用时:{:.2f}秒".format(t_end - t_start))
Ps:可以使用shutil.copytree(old_dir_path, destination_path, ignore=ignore_patterns('*.pyc', 'tmp*'))
from multiprocessing import Queue
:这个队列可以实现各进程之间的通信
from queue import Queue
:这个是普通的队列,就在自己进程间通讯
"""队列里什么都能放""" # 自己测的有点问题,还是用另外那个吧
from multiprocessing import Queue #
q = Queue(3) # 初始化一个对象,最多可接受 3 调数据;;要是不给,默认根据硬件来给到合适的最大值
# 放数据
q.put("消息1")
q.put([11, "nihao", (12, 34)])
print(q.full()) # False
q.put("任意数据")
print(q.full()) # True
q.put("第4个数据") # 同样这里,因为队列已经满了,再放就会堵塞在这里
# 取数据
print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 因为上面定义了只能放三个数据,所以这时已经取空了,再取就会堵塞在这里
q.get_nowait() # 这种就是不等待,若是空,会直接抛出异常
print(q.empty()) # 可以通过这来判断是否为空
Ps:若是子进程与主进程间的通信,是不能直接用这个Queue的,用下面这:
multiprocessing.Manager().Queue()
"""Ps:若是用进程池中的进程和主进程进行通信:
不能用这个队列 from multiprocessing import Queue q = Queue()
而是要用这个队列 from multiprocessing import Manager q = Manager().Queue()"""
import shutil
import os
import multiprocessing
def mycopy(q, source, destination):
shutil.copy(source, destination)
q.put(source) # 复制完一个就放消息进队列
if __name__ == '__main__':
old_path = r"C:\Users\Administrator\Desktop\1"
files_name = os.listdir(old_path)
target_path = r"C:\Users\Administrator\Desktop\2"
# 创建进程池
po = multiprocessing.Pool(5)
# 创建队列
queue = multiprocessing.Manager().Queue()
# 向进程池添加copy任务
for file_name in files_name:
file_abspath = os.path.join(old_path, file_name)
po.apply_async(mycopy, args=(queue, file_abspath, target_path))
po.close() # 还是关闭进程池,但是不需要下面这句等待了
# po.join() # 不再通过join()的方式让主进程去等待;而是下面这种
nums = 0
while True: # while True就永远不会退出了,那就设置条件
file = queue.get() # 当队列里为空时,get取不到数据,就会阻塞在这里
nums += 1
# print("已经完成拷贝:{}".format(file)) # {:.2%}和{:0.2%}是一样的
print("\r拷贝进度:{:.2%}".format(nums / len(files_name)), end="")
# end不换行, \r 好像是到行首,使得始终只有一个 “拷贝进度:---”
if nums == len(files_name):
break
print() # 为了linux好看,可以最后换行一下
同理(第33行):如果队列里满了,put时,也会阻塞在put函数那里。
from collections.abc import Iterable, Iterator
a = (11, 22, 33) list(a) # 这种强制类型转换也是用的迭代器 先生成一个空列表,然后调用 next 一个个去取,再放进去
第一步:
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
print(isinstance(classmate, Iterable)) # 结果是False
for name in classmate: # 这并不是一个可迭代的对象
print(name) # 这就会报错,说TypeError: 'Classmate' object is not iterable
第二步:
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
def __iter__(self): # 要想自己写的类是可迭代的对象,必须要有这个方法
pass # 哪怕什么都不写,错误也变成了TypeError: iter() returned non-iterator of type 'NoneType'
# 要想真正能返回值,那这个方法必须返回一个具有 iter() next()的对象引用
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
print(isinstance(classmate, Iterable)) # 结果是True
for name in classmate:
print(name) # 这会报错
第三步:搞一个有 iter、next 方法的对象
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
def __iter__(self):
return ClassIterator()
class ClassIterator():
def __iter__(self):
pass
def __next__(self):
pass
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
a = [11, 22, 33]
for name in a:
print(name)
这种循环就是分三步走:
首先isinstance判断 classmate 是否是可迭代的,里面有 iter 方法,就是True;
然后会调用里面的 iter 函数,得到 classmate 对象的 __iter__方法的返回值,
而 __iter__ 方法的返回值是一个迭代器,每循环一下,它就调用 __next__ 去取值
当一个对象是可定迭代的,那么使用iter()装起来(此时就是自动调用里面的 __iter__ 方法),就是一个迭代器,
print(isinstance(classmate, Iterable)) # 是否可迭代 True classmate_iter = iter(classmate) print(isinstance(classmate_iter, Iterator)) # 是否是迭代器 True
第四步:进一步完善
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
def __iter__(self):
return ClassIterator()
class ClassIterator():
def __iter__(self):
pass
def __next__(self):
return 11 # 相比第三步,只是这里把pass弄一个具体的返回值了
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
for name in classmate:
print(name) # 这就会一直打印 11 ,因为上面的next就只返回了 11
第五步:再完善(只打印一个人名)
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
def __iter__(self):
return ClassIterator(self) # 把自己传进去
class ClassIterator():
def __init__(self, obj):
self.obj = obj
def __iter__(self):
pass
def __next__(self):
return self.obj.names[0] # 相比第步,只是这里把pass弄一个具体的返回值了
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
# 既然想要打印class Classmate()中的name,那next中就要有这个啊,那就如上面把自己 self 传入进去
for name in classmate:
print(name) # 这里就一直打印张三
第六步:再完善(把所有人名都打印)
class Classmate():
def __init__(self):
self.names = []
def add(self, name):
self.names.append(name)
def __iter__(self):
return ClassIterator(self)
class ClassIterator():
def __init__(self, obj):
self.obj = obj
self.current_num = 0
def __iter__(self):
pass
def __next__(self): # 想要打印所有,就要有一个来计数替代self.obj.names[0]中的 0
if self.current_num < len(self.obj.names):
ret = self.obj.names[self.current_num]
self.current_num += 1
return ret # 每次都返回取值,当取完后,就raise这个,就会自动停止
else:
raise StopIteration
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
for name in classmate:
print(name) # 就能把所有人名打出来
第七步:最终版,只留下一个类
class Classmate():
def __init__(self):
self.names = []
self.current_num = 0
def add(self, name):
self.names.append(name)
def __iter__(self):
return self # 返回自己,因为自己是满足了同时有 __iter__ __next__ 方法的
# 循环时是会自己取调用 __next__ 的
def __next__(self):
if self.current_num < len(self.names):
ret = self.names[self.current_num]
self.current_num += 1
return ret
else:
raise StopIteration
classmate = Classmate()
classmate.add("张三")
classmate.add("李四")
classmate.add("王五")
for name in classmate:
print(name)
class FibIterator():
def __init__(self, n):
self.n = n
self.a = 0
self.b = 1
self.current_num = 0
def __iter__(self):
return self
def __next__(self):
if self.current_num < self.n:
ret = self.a
self.current_num += 1
self.a, self.b = self.b, self.a + self.b
return ret
else:
raise StopIteration
fibs = FibIterator(12)
for fib in fibs:
print(fib)
生成器:它也是一种特殊的迭代器
(1)可以用推导式生成 a = (i for i in range(10))
(2)通过函数中加yield
def fib_gen(nums):
a, b = 0, 1
current_num = 0
while current_num < nums:
yield a # 如果一个函数中有yield语句,那这就不再是函数,而是一个生成器模板
a, b = b, a + b
current_num += 1
return """这是调用 对象.value 后才能得到"""
x = fib_gen(10) # 这在调用时,不是调用函数,而是生成一个生成器对象
print(next(x)) # 运行到yield时,就会停到那里,然后把值返回过去,
print(next(x)) # 这里再调用时,就会接着上yield下面的语句接着执行,然后再执行到yield时挺住;
for i in x:
print(i) # 用循环最后没有代码时就会自动停止
y = fib_gen(2)
while True: # 如果一直用next去取值,最后没有时就会报错,因为超出了,这不会自动停
try:
ret = next(y)
print(ret)
except Exception as e:
print(e, type(e)) # 抛出异常的时候就会把对象函数的返回值打出来
print(e.value, type(e.value)) # 结果一样,只是这个是 str类型
break
def fib_gen(nums):
a, b = 0, 1
current_num = 0
while current_num < nums:
ret = yield a # 这个yield语句可以这样写,然后通过 send 传参进来,这个 ret 的值就是传进来的值
print("这是传进来的参数:", ret)
if ret:
current_num = ret
a, b = b, a + b
current_num += 1
obj = fib_gen(4)
value1 = next(obj)
print(value1) # 0
value2 = obj.send(None) # 注意着是传到yield那里,所以一般不放在取第一个值,不然参数传进去没人接收,会出错
print(value2) # 1
print(next(obj)) # 1
print(next(obj)) # 2
# print(next(obj)) # 这本应该是3,但是就会报错了,因为超出了
val = obj.send(2) # 然后这样子传参进去就改变了 current_num, 就可以继续得到值
print(val) # 3
import time
def task_1():
while True:
print("---1---")
time.sleep(0.1)
yield # 给函数加yield弄成生成器
def task_2():
while True:
print("---2---")
time.sleep(0.1)
yield
def main():
t1 = task_1()
t2 = task_2()
while True:
next(t1) # 执行到这个,到里面的yield的时候,就会停住,就会往下,然后就往下
next(t2) # 就执行这个,执行到这个的yield时,就会停住,循环又会执行next(t1),就就一直切换,这就是协程
if __name__ == '__main__':
main()
from greenlet import greenlet # 这其实就是对yield进行了封装
import time
def test_1():
while True:
print("---A---")
gr2.switch() # 注意这是用的 gr2 ,就会切到下面对应的gr1.switch()
time.sleep(0.5) # 若是这里sleep很久,那程序就会卡主在这里,就根本不是多线程
print(1321111111111)
def test_2():
while True:
print("---B---")
gr1.switch() # 这是用的 gr1 , 就会切到上面对应的gr2.switch()
time.sleep(0.5)
print(46554654)
gr1 = greenlet(test_1)
gr2 = greenlet(test_2)
gr1.switch() # A B A B
greenlet就是遇到耗时的(time.sleep(5))就会卡那里,下面的gevent就是遇到耗时的就自动切换
协程最大的意义就是把耗时的操作利用起来去做别的,如socket的监听、receive、connect等待等,推荐就是使用这个了
理解demo:
import gevent
import time
def f1(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5)
def f2(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5)
def f3(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5) # 得用这种才能自动切换
g1 = gevent.spawn(f1, 5) # 固定方式生成一个对象
g2 = gevent.spawn(f2, 5)
g3 = gevent.spawn(f3, 5)
# 这个本来应该是在有耗时任务时自动切换,但是在time.sleep(0.5)是却没有,得用gevent.sleep()
g1.join()
g2.join()
g3.join() # join()的作用都是让主线程等待协程先完成,如果是在while True中就不需要这个了
为了解决上面的问题,使用下面的最终版本:
import time
import random
import gevent
from gevent import monkey
monkey.patch_all()
# 必须加这一句,将程序中耗时操作的代买,换成gevent中自己实现的模块
def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())
print(f"------------{coroutine_name}已经运行完了-------------")
gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2"),
gevent.spawn(coroutine_work, "work3")
])
# 可以以joinall 列表的方式去全部启动,不用一个个的 join
Ps:迭代器和生成器都是Python中特有的概念,迭代器可以看作是一个特殊的对象,每次调用该对象时会返回自身的下一个元素。 一个可迭代的对象必须是定义了 _iter_()方法的对象(如列表,元组等),而一个迭代器必须是定义了__iter__()方法和next()方法的对象。
比较遗憾的一件事情是Python的多线程并不能发挥CPU的多核特性,这一点只要启动几个执行死循环的线程就可以得到证实了。之所以如此,是因为Python的解释器有一个“全局解释器锁”(GIL)的东西,任何线程执行前必须先获得GIL锁,然后每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行,这是一个历史遗留问题,但是即便如此,就如我们之前举的例子,使用多线程在提升执行效率和改善用户体验方面仍然是有积极意义的。
可以把任务分为计算密集型和I/O密集型。计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如对视频进行编码解码或者格式转换等等,这种任务全靠CPU的运算能力,虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低。计算密集型任务由于主要消耗CPU资源,这类任务用Python这样的脚本语言去执行效率通常很低,最能胜任这类任务的是C语言。
除了计算密集型任务,其他的涉及到网络、存储介质I/O的任务都可以视为I/O密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待I/O操作完成(因为I/O的速度远远低于CPU和内存的速度)。对于I/O密集型任务,如果启动多任务,就可以减少I/O等待时间从而让CPU高效率的运转。有一大类的任务都属于I/O密集型任务,这其中包括了我们很快会涉及到的网络应用和Web应用。
对于Python开发者来说,以下情况需要考虑使用多线程:
那么在遇到下列情况时,应该考虑使用多进程:
在Python语言中,单线程+异步I/O的编程模型称为协程,有了协程的支持,就可以基于事件驱动编写高效的多任务程序。==协程最大的优势就是极高的执行效率,因为子程序切换不是线程切换,而是由程序自身控制,因此,没有线程切换的开销==。==协程的第二个优势就是不需要多线程的锁机制==,因为只有一个线程,也不存在同时写变量冲突,在协程中控制共享资源不用加锁,只需要判断状态就好了,所以执行效率比多线程高很多。如果想要充分利用CPU的多核特性,最简单的方法是==多进程+协程==,既充分利用多核,又充分发挥协程的高效率,可获得极高的性能。
例子1:将耗时间的任务放到线程中以获得更好的用户体验。
如下所示的界面中,有“下载”和“关于”两个按钮,用休眠的方式模拟点击“下载”按钮会联网下载文件需要耗费10秒的时间,如果不使用“多线程”,我们会发现,当点击“下载”按钮后整个程序的其他部分都被这个耗时间的任务阻塞而无法执行了(最直观的是无法再点击“关于”按钮)
import time
import tkinter
import tkinter.messagebox
def download():
time.sleep(10) #模拟下载需要10秒
tkinter.messagebox.showinfo("提示", "下载完成!")
def show_about():
tkinter.messagebox.showinfo("关于", "python学习")
def main():
top = tkinter.Tk()
top.title("单线程")
top.geometry("200x150") #注意这里不是乘号,而是字母x
top.wm_attributes("-topmost", True)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text = "下载", command = download)
button1.pack(side = "left")
button2 = tkinter.Button(panel, text = "关于", command = show_about)
button2.pack(side = "right")
panel.pack(side = "bottom")
tkinter.mainloop()
if __name__ == '__main__':
main()
如果使用多线程将耗时间的任务放到一个独立的线程中执行,这样就不会因为执行耗时间的任务而阻塞了主线程,修改后的代码如下所示。
import time
import tkinter
import tkinter.messagebox
from threading import Thread
def main():
class DownloadTaskHandler(Thread):
# 这好像就是将下载任务放进了线程里
def run(self):
time.sleep(10)
tkinter.messagebox.showinfo("提示", "下载完成!")
button1.config(state=tkinter.NORMAL) # 启用下载按钮
def download():
button1.config(state=tkinter.DISABLED) # 禁用下载按钮
DownloadTaskHandler(daemon=True).start() # 在线程中处理耗时间的下载任务
# 通过daemon参数将线程设置为守护线程(主程序退出就不再保留执行)
def show_about():
tkinter.messagebox.showinfo("关于", "Python学习")
top = tkinter.Tk()
top.title("单线程")
top.geometry("250x150")
top.wm_attributes("-topmost", 1)
panel = tkinter.Frame(top)
button1 = tkinter.Button(panel, text="下载", command=download)
button1.pack(side="left")
button2 = tkinter.Button(panel, text="关于", command=show_about)
button2.pack(side="right")
panel.pack(side="bottom")
tkinter.mainloop()
if __name__ == '__main__':
main()
Tips:
也可以使用subprocess模块中的类和函数来创建和启动子进程,然后通过管道来和子进程通信
还可以通过subprocess
模块的call
函数执行其他的命令来创建子进程,相当于就是在我们的程序中调用其他程序