port scan多线程扫描器

0x01 版本一

import socket
from multiprocessing import Pool

# 端口扫描-
def port_scan(ip, port):
    # 创建socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置超时时间
    s.settimeout(0.1)
    # 连接
    result = s.connect_ex((ip, port))
    # 关闭连接
    s.close()
    # 判断是否连接成功
    if result == 0:
        print('端口{}开放'.format(port))
        return port

    return None


# 多线程调用端口扫描
def multi_port_scan(ip, port_list):
    # 创建线程池
    pool = Pool()
    # 调用端口扫描,并将结果保存到列表中
    results = []

    for port in port_list:
        # 获取线程执行结果
        result = pool.apply_async(port_scan, args=(ip, port,results))
        if result.get() is not None:
            results.append(result.get())

    # 关闭线程池
    pool.close()
    # 等待线程池执行完毕
    pool.join()

    # 获取开放端口列表
    print("开放端口列表:", results)


if __name__ == '__main__':
    # ip地址
    ip = '192.168.3.12'
    # 端口列表
    port_list = [i for i in range(1, 10000)]
    # 调用多线程端口扫描
    multi_port_scan(ip, port_list)

使用 默认进程池个数进行扫描, 速度很慢,感觉像单线程.

 result.get() is not None:
            results.append(result.get())

每次执行完一个进程, 需要与主进程交互, 速度会很慢

0x02 开辟单独的内存接收数据

import socket
from multiprocessing import Pool, Manager

# 端口扫描-
def port_scan(ip, port, results):
    # 创建socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置超时时间
    s.settimeout(0.1)
    # 连接
    result = s.connect_ex((ip, port))
    # 关闭连接
    s.close()
    # 判断是否连接成功
    if result == 0:
        results.append(port)
        print("端口{}开放".format(port))
        # return port

    # return None


# 多线程调用端口扫描
def multi_port_scan(ip, port_list):
    # 创建线程池
    pool = Pool(50)
    # 调用端口扫描,并将结果保存到列表中
    manager = Manager()
    results = manager.list()
    for port in port_list:
        # 获取线程执行结果
        pool.apply_async(port_scan, args=(ip, port, results))
        # if result.get() is not None:
        #     results.append(result.get())

    # 关闭线程池
    pool.close()
    # 等待线程池执行完毕
    pool.join()

    # 获取开放端口列表
    print("开放端口列表:", results)


if __name__ == "__main__":
    # ip地址
    ip = "192.168.3.12"
    # 端口列表
    port_list = [i for i in range(1, 10000)]
    # 调用多线程端口扫描
    multi_port_scan(ip, port_list)

速度快了一倍多, 还是有优化的空间

0x03 使用异步调用

import asyncio

# 端口扫描
async def port_scan(ip, port):
    try:
        # 创建socket对象
        reader, writer = await asyncio.open_connection(ip, port)
        writer.close()
        # 判断是否连接成功
        print(f"端口{port}开放")
        return port
    except:
        return None

# 异步调用端口扫描
async def async_port_scan(ip, port_list):
    tasks = []
    # 调用端口扫描,并将结果保存到列表中
    for port in port_list:
        tasks.append(asyncio.create_task(port_scan(ip, port)))
    results = await asyncio.gather(*tasks)
    # 获取开放端口列表
    print("开放端口列表:", [port for port in results if port])

if __name__ == '__main__':
    # ip地址
    ip = '192.168.3.12'
    # 端口列表
    port_list = [i for i in range(1, 65535)]
    # 调用异步端口扫描
    asyncio.run(async_port_scan(ip, port_list))

速度很快, 推荐的方式, 主进程异步调用

0x05 多线程的扫描方式

# -*- coding: utf-8 -*-
# @Time    : 2023/2/24 16:45
# @Author  : GyArmy

import socket
import threading

# 端口扫描-
def port_scan(ip, port, results):
    # 创建socket对象
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # 设置超时时间
    s.settimeout(0.1)
    # 连接
    result = s.connect_ex((ip, port))
    # 关闭连接
    s.close()
    # 判断是否连接成功
    if result == 0:
        results.append(port)
        print("端口{}开放".format(port))

def multi_port_scan(ip, port_list):
    # 创建线程池
    thread_list = []
    # 用列表保存结果
    results = []
    for port in port_list:
        # 创建线程并添加到线程列表中
        t = threading.Thread(target=port_scan, args=(ip, port, results))
        thread_list.append(t)

    # 启动所有线程
    for t in thread_list:
        t.start()

    # 等待所有线程执行完毕
    for t in thread_list:
        t.join()

    # 获取开放端口列表
    print("开放端口列表:", results)

if __name__ == "__main__":
    # ip地址
    ip = "192.168.6.102"
    # 端口列表
    port_list = [i for i in range(1, 10000)]
    # 调用多线程端口扫描
    multi_port_scan(ip, port_list)

测试的响应速度也很快

0x06 局域网常用端口探测器

# -*- coding: utf-8 -*-
# @Time    : 2023/3/7 15:38
# @Author  : GyArmy
import ipaddress
from queue import Queue
import threading
from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP

logging.getLogger("scapy.runtime").setLevel(logging.ERROR)



# ping检测主机是否存活
def ping_host(ip, results):
    ping = IP(dst=ip) / ICMP()
    ping_reply = sr1(ping, timeout=1, verbose=False)
    if ping_reply:
        # print("主机存活:", ip)
        results.put(ip)


def scan_ip(ip_str):
    """
    扫描ip地址段
    :param ip_str:  "192.168.3.1/24"
    :return:
    """
    ip_list = list(ipaddress.ip_network(ip_str, strict=False).hosts())
    results = Queue()
    threads = []
    for ip in ip_list:
        t = threading.Thread(target=ping_host, args=(str(ip), results))
        threads.append(t)
        t.start()

    # 等待所有线程完成
    for t in threads:
        t.join()

    host_list = list(results.queue)
    return host_list



# 检测目标主机端口是否开放
# 1. 生成一个TCP数据包
# 2. 发送数据包
# 3. 接收数据包
# 4. 判断数据包是否有回应
def scan_port(ip, port, results):
    # 生成一个TCP数据包
    packet = IP(dst=ip) / TCP(dport=port, flags="S")
    # 发送数据包
    packet_reply = sr1(packet, timeout=1, verbose=False)
    # 判断数据包是否有回应
    # if packet_reply:
    if packet_reply and packet_reply.haslayer(TCP):
        # 判断回应的数据包是否是SYN+ACK
        if packet_reply[TCP].flags == "SA":
            # print(ip, ":", port , "端口开放")
            results.put(port)

def scan_port_list(ip, port_list):
    """
    扫描端口
    :param ip:
    :param port_list:
    :return:
    """
    results = Queue()
    threads = []
    for port in port_list:
        t = threading.Thread(target=scan_port, args=(ip, port, results))
        threads.append(t)
        t.start()

    # 等待所有线程完成
    for t in threads:
        t.join()

    port_list = list(results.queue)
    return port_list


if __name__ == '__main__':
    # 扫描ip地址段
    ip_list_str = "192.168.3.1/24"
    pc_up_list = scan_ip(ip_list_str)


    for ip in pc_up_list:
        # 扫描端口
        # port_list = [i for i in range(1, 1000)]
        # 常用端口
        port_list = [21, 22, 23, 25, 53, 80, 110, 139, 143, 443, 445, 1433, 1521, 3306, 3389, 6379, 7001, 8080, 8081, 8089, 9090, 9200, 27017,22122, 50070, 50075]
        port_up_list = scan_port_list(ip, port_list)
        print(ip,port_up_list)

0x07 搭配nmap模块的扫描

# 调用 nmap的扫描功能
import nmap

def namp_tool_scan(ip, port_list):
    # 创建一个nmap扫描对象
    nm = nmap.PortScanner()
    # 扫描主机
    port_list_str = ','.join(port_list)

    # 探测主机是否存活
    host_is_up = nm.scan(hosts=ip, arguments="-sP")
    # 'uphosts' -> 0
    if host_is_up['nmap']['scanstats']['uphosts'] == '0':
        print(ip, ' -> 主机不可达')
        return

    nm.scan(hosts=ip, arguments="-sS -p " + port_list_str)
    # 获取扫描结果
    result_host = nm.all_hosts()
    result = nm[result_host[0]]['tcp']
    # 打印出所有扫描到的端口
    # for port in result:
    #     print(port, result[port]['state'])

    open_port_list = []
    for port in result:
        if result[port]['state'] == 'open':
            # print(ip,port,result[port].get('name'),"open")
            open_port_list.append(str(port))

    if open_port_list:
        print(ip, ' -> ', ','.join(open_port_list))


if __name__ == '__main__':
    # 常用探测端口
    ports = ['21','22','23','25','80','139','443','445','3306','3389','6379','8080','11211']
    # ip列表
    ip_list = [i for i in range(1, 255)]
    # ip列表
    ip_list = ['192.168.3.' + str(i) for i in ip_list]
    # nmap扫描
    for ip in ip_list:
        namp_tool_scan(ip, ports)

原文链接: port scan多线程扫描器 版权所有,转载时请注明出处,违者必究。
注明出处格式:流沙团 ( https://gyarmy.com/post-817.html )

发表评论

0则评论给“port scan多线程扫描器”