侧边栏壁纸
博主头像
soulballad博主等级

技术文章记录及总结

  • 累计撰写 169 篇文章
  • 累计创建 26 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

【Python】-10.Python线程和进程

soulballad
2020-09-16 / 0 评论 / 0 点赞 / 46 阅读 / 8,987 字
温馨提示:
本文最后更新于 2022-03-03,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

1. 线程

Python中关于线程的主要是 _thread 和 threading 模块

1.1 _thread 模块

1.1.1 创建线程

import time
import datetime
import _thread

date_time_format = "%H:%M:%S"


def get_time_str():
    now = datetime.datetime.now()
    return datetime.datetime.strftime(now, date_time_format)


def thread_function(thread_id):
    print("Thread %d\t start at %s" % (thread_id, get_time_str()))
    print("Thread %d\t sleep" % thread_id)
    time.sleep(4)
    print("Thread %d\t finish at %s" % (thread_id, get_time_str()))


def main():
    print("Main thread start at %s" % get_time_str())
    for i in range(5):
        # 启动新线程,接收一个 元组 作为参数,返回一个标识符
        _thread.start_new_thread(thread_function, (i,))
        time.sleep(1)
    time.sleep(6)
    print("Main thread finish at %s" % get_time_str())


if __name__ == "__main__":
    main()

1.1.2 使用锁

import time
import datetime
import _thread

date_time_format = "%H:%M:%S"


def get_time_str():
    now = datetime.datetime.now()
    return datetime.datetime.strftime(now, date_time_format)


def thread_function(thread_id, lock):
    print("Thread %d\t start at %s" % (thread_id, get_time_str()))
    print("Thread %d\t sleep" % thread_id)
    time.sleep(4)
    print("Thread %d\t finish at %s" % (thread_id, get_time_str()))
    lock.release()


def main():
    print("Main Thread start at %s" % get_time_str())
    locks = []
    for i in range(5):
        lock = _thread.allocate_lock()
        lock.acquire()
        locks.append(lock)
    for i in range(5):
        _thread.start_new_thread(thread_function, (i, locks[i]))
        time.sleep(1)
    for i in range(5):
        while locks[i].locked():
            time.sleep(1)
    print("Main Thread finish at %s" % get_time_str())


if __name__ == "__main__":
    main()

1.2 threading 模块

1.2.1 创建线程

import time
import datetime
import threading

date_time_format = "%H:%M:%S"


def get_time_str():
    now = datetime.datetime.now()
    return datetime.datetime.strftime(now, date_time_format)


def thread_function(thread_id):
    print("Thread %d\t start at %s" % (thread_id, get_time_str()))
    print("Thread %d\t sleep" % thread_id)
    time.sleep(4)
    print("Thread %d\t finish at %s" % (thread_id, get_time_str()))


def main():
    print("Main Thread start at %s" % get_time_str())
    threads = []

    # 创建线程
    for i in range(5):
        thread = threading.Thread(target=thread_function, args=(i,))
        threads.append(thread)

    # 启动线程
    for i in range(5):
        threads[i].start()
        time.sleep(1)

    # 等待线程执行完毕
    for i in range(5):
        threads[i].join()
    print("Main Thread finish at %s" % get_time_str())


if __name__ == "__main__":
    main()

1.2.2 通过子类创建线程

import time
import datetime
import threading

date_time_format = "%H:%M:%S"


def get_time_str():
    now = datetime.datetime.now()
    return datetime.datetime.strftime(now, date_time_format)


class MyThread(threading.Thread):
    def __init__(self, thread_id):
        super(MyThread, self).__init__()
        self.thread_id = thread_id

    def run(self):
        print("Thread %d\t start at %s" % (self.thread_id, get_time_str()))
        print("Thread %d\t sleep" % self.thread_id)
        time.sleep(4)
        print("Thread %d\t finish at %s" % (self.thread_id, get_time_str()))


def main():
    print("Main Thread start at %s" % get_time_str())
    threads = []

    # 创建线程
    for i in range(5):
        thread = MyThread(i)
        threads.append(thread)

    # 启动线程
    for i in range(5):
        threads[i].start()
        time.sleep(1)

    # 等待线程执行完毕
    for i in range(5):
        threads[i].join()

    print("Main Thread finish at %s" % get_time_str())


if __name__ == '__main__':
    main()

1.3 线程同步

import time
import threading

thread_lock = None


class MyThread(threading.Thread):
    def __init__(self, thread_id):
        super(MyThread, self).__init__()
        self.thread_id = thread_id

    def run(self):
        # 锁定
        thread_lock.acquire()
        for i in range(3):
            print("Thread %d\t printing! times:%d" % (self.thread_id, i))
        # 释放
        thread_lock.release()

        time.sleep(1)

        # 锁定
        thread_lock.acquire()
        for i in range(3):
            print("Thread %d\t printing! times:%d" % (self.thread_id, i))
        # 释放
        thread_lock.release()


def main():
    print("Main Thread start")
    threads = []

    # 创建线程
    for i in range(5):
        thread = MyThread(i)
        threads.append(thread)

    # 启动线程
    for i in range(5):
        threads[i].start()

    # 等待线程执行完毕
    for i in range(5):
        threads[i].join()

    print("Main Thread finish")


if __name__ == '__main__':
    # 获取锁
    thread_lock = threading.Lock()
    main()

1.4 队列

# 1. 队列使用
print('--------------------------------------- 1 ---------------------------------------')

from queue import Queue

q = Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get())

#
# 2. 队列结合线程使用
print('--------------------------------------- 2 ---------------------------------------')

import time
import threading
import queue

# 创建工作队列并且限制队列的最大元素个数为 10
work_queue = queue.Queue(maxsize=10)

# 创建结果队列并且限制队列的最大元素个数为 10
result_queue = queue.Queue(maxsize=10)


class WorkThread(threading.Thread):
    def __init__(self, thread_id):
        super(WorkThread, self).__init__()
        self.thread_id = thread_id

    def run(self):
        while not work_queue.empty():
            # 从工作队列中获取数据
            work = work_queue.get()
            # 模拟工作耗时3秒
            time.sleep(3)
            out = "Thread %d\t received %s" % (self.thread_id, work)
            # 把结果放入结果队列
            result_queue.put(out)


def main():
    # 工作队列放入数据
    for i in range(10):
        work_queue.put("message id %d" % i)

    # 开启两个工作线程
    for i in range(2):
        thread = WorkThread(i)
        thread.start()

    # 输出10个结果
    for i in range(10):
        result = result_queue.get()
        print(result)


if __name__ == '__main__':
    main()

2. 进程

2.1 os 模块

2.1.1 system 函数

system 函数是最简单的创建进程的方式,函数只有一个参数,就是要执行的命令

import os

# 判断是否是 windows
if os.name == 'nt':
    return_code = os.system("dir")
else:
    return_code = os.system("ls")

# 判断命令返回值是否是0,0代表执行成功
if return_code == 0:
    print("Run success!")
else:
    print("Something wrong!")

2.1.2 fork

fork 调用系统 API 创建子进程。但是 fork 函数在 windows 上并不存在,在 linux和mac上可以使用

import os

print("Main Process ID (%s)" % os.getpid())
pid = os.fork()
if pid == 0:
    print("This is child process(%s) and main process is %s." % (os.getpid, os.getppid()))
else:
    print("Created a child process (%s)." % (pid, ))

2.2 subprocess 模块

2.2.1 call

import os
import subprocess

# 判断是否是 windows
if os.name == 'nt':
    return_code = subprocess.call(["cmd", "/c", "dir"])
else:
    return_code = subprocess.call(["ls", "-l"])

# 判断命令返回值是否是0,0代表执行成功
if return_code == 0:
    print("Run success!")
else:
    print("Something wrong!")

2.2.2 Popen

使用 Popen 调用外部命令

import os
import subprocess

if os.name == 'nt':
    ping = subprocess.Popen("ping -n 5 www.baidu.com", shell=True, stdout=subprocess.PIPE)
else:
    ping = subprocess.Popen("pint -c 5 www.baidu.com", shell=True, stdout=subprocess.PIPE)

# 等待命令执行完毕
ping.wait()

# 答应外部命令的进程id
print(ping.pid)

# 答应外部命令的返回码
print(ping.returncode)

# 打印外部命令的输出内容
out = ping.stdout.read()
print(out)

2.3 multiprocessing Process

2.3.1 创建进程

from multiprocessing import Process
import os


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def f(name):
    info('function f')
    print('hello', name)


if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('Python',))
    p.start()
    p.join()

2.3.2 使用子类创建子进程

from multiprocessing import Process
import os


class MyProcess(Process):
    def __init__(self):
        super(MyProcess, self).__init__()

    def run(self):
        print('module name:', __name__)
        print('parent process:', os.getppid())
        print('process id:', os.getpid())


def main():
    processes = []

    # 创建进程
    for i in range(5):
        processes.append(MyProcess())

    # 启动进程
    for i in range(5):
        processes[i].start()

    # 等待进程结束
    for i in range(5):
        processes[i].join()


if __name__ == '__main__':
    main()

2.3.3 multiprocessing.Queue

使用 multiprocessing.Queue 实现进程同步

from multiprocessing import Process, Queue
import os

# 创建队列
result_queue = Queue()


class MyProcess(Process):
    def __init__(self, q):
        super(MyProcess, self).__init__()
        # 获取队列
        self.q = q

    def run(self):
        output = 'module name %s\n' % __name__
        output += 'parent process: %d\n' % os.getppid()
        output += 'process id: %d\n' % os.getpid()
        self.q.put(output)


def main():
    processes = []

    # 创建进程并把队列传给进程
    for i in range(5):
        processes.append(MyProcess(result_queue))

    # 启动进程
    for i in range(5):
        processes[i].start()

    # 等待进程结束
    for i in range(5):
        processes[i].join()

    while not result_queue.empty():
        output = result_queue.get()
        print(output)


if __name__ == '__main__':
    main()

3. 进程池

3.1 multiprocessing.Pool

使用 multiprocessing 的 Pool 管理进程

import multiprocessing.pool

def process_func(process_id):
    print("process id %d start" % process_id)
    time.sleep(1)
    print("process id %d end" % process_id)


def main():
    pool = multiprocessing.Pool(processes=3)
    for i in range(10):
        # 向进程池中添加要执行的任务
        pool.apply_async(process_func, args=(i,))

    # 先调用close关闭进程池,不能再有新任务加入进程池中
    pool.close()
    # join函数等待所有子进程结束
    pool.join()


if __name__ == '__main__':
    main()

3.2 使用 Pool 的map函数

import multiprocessing.pool

def process_func(process_id):
    print("process id %d start" % process_id)
    time.sleep(1)
    print("process id %d end" % process_id)


def main():
    pool = multiprocessing.Pool(processes=3)
    pool.map(process_func, range(10))

    # 先调用close关闭进程池,不能再有新任务加入进程池中
    pool.close()
    # join函数等待所有子进程结束
    pool.join()


if __name__ == '__main__':
    main()

4. 线程池

4.1 multiprocessing.dummy

使用 multiprocessing 的 dummy 管理线程

import multiprocessing.dummy


def process_func(process_id):
    print("process id %d start" % process_id)
    time.sleep(3)
    print("process id %d end" % process_id)


def main():
    pool = multiprocessing.dummy.Pool(processes=3)
    for i in range(10):
        # 向进程池中添加要执行的任务
        pool.apply_async(process_func, args=(i,))

    # 先调用close关闭进程池,不能再有新任务加入进程池中
    pool.close()
    # join函数等待所有子进程结束
    pool.join()

4.2 使用 dummy.Pool 的map函数

import multiprocessing
import time


def process_func(process_id):
    print("process id %d start" % process_id)
    time.sleep(1)
    print("process id %d end" % process_id)


def main():
    pool = multiprocessing.dummy.Pool(processes=3)
    pool.map(process_func, range(10))

    # 先调用close关闭进程池,不能再有新任务加入进程池中
    pool.close()
    # join函数等待所有子进程结束
    pool.join()


if __name__ == '__main__':
    main()
0

评论区