0x01 版本一
import socket
from multiprocessing import Pool
# 端口扫描-
def port_scan(ip, port):
# 创建socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时时间
s.settimeout(0.1)
# 连接
result = s.connect_ex((ip, port))
# 关闭连接
s.close()
# 判断是否连接成功
if result == 0:
print('端口{}开放'.format(port))
return port
return None
# 多线程调用端口扫描
def multi_port_scan(ip, port_list):
# 创建线程池
pool = Pool()
# 调用端口扫描,并将结果保存到列表中
results = []
for port in port_list:
# 获取线程执行结果
result = pool.apply_async(port_scan, args=(ip, port,results))
if result.get() is not None:
results.append(result.get())
# 关闭线程池
pool.close()
# 等待线程池执行完毕
pool.join()
# 获取开放端口列表
print("开放端口列表:", results)
if __name__ == '__main__':
# ip地址
ip = '192.168.3.12'
# 端口列表
port_list = [i for i in range(1, 10000)]
# 调用多线程端口扫描
multi_port_scan(ip, port_list)
使用 默认进程池个数进行扫描, 速度很慢,感觉像单线程.
result.get() is not None:
results.append(result.get())
每次执行完一个进程, 需要与主进程交互, 速度会很慢
0x02 开辟单独的内存接收数据
import socket
from multiprocessing import Pool, Manager
# 端口扫描-
def port_scan(ip, port, results):
# 创建socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时时间
s.settimeout(0.1)
# 连接
result = s.connect_ex((ip, port))
# 关闭连接
s.close()
# 判断是否连接成功
if result == 0:
results.append(port)
print("端口{}开放".format(port))
# return port
# return None
# 多线程调用端口扫描
def multi_port_scan(ip, port_list):
# 创建线程池
pool = Pool(50)
# 调用端口扫描,并将结果保存到列表中
manager = Manager()
results = manager.list()
for port in port_list:
# 获取线程执行结果
pool.apply_async(port_scan, args=(ip, port, results))
# if result.get() is not None:
# results.append(result.get())
# 关闭线程池
pool.close()
# 等待线程池执行完毕
pool.join()
# 获取开放端口列表
print("开放端口列表:", results)
if __name__ == "__main__":
# ip地址
ip = "192.168.3.12"
# 端口列表
port_list = [i for i in range(1, 10000)]
# 调用多线程端口扫描
multi_port_scan(ip, port_list)
速度快了一倍多, 还是有优化的空间
0x03 使用异步调用
import asyncio
# 端口扫描
async def port_scan(ip, port):
try:
# 创建socket对象
reader, writer = await asyncio.open_connection(ip, port)
writer.close()
# 判断是否连接成功
print(f"端口{port}开放")
return port
except:
return None
# 异步调用端口扫描
async def async_port_scan(ip, port_list):
tasks = []
# 调用端口扫描,并将结果保存到列表中
for port in port_list:
tasks.append(asyncio.create_task(port_scan(ip, port)))
results = await asyncio.gather(*tasks)
# 获取开放端口列表
print("开放端口列表:", [port for port in results if port])
if __name__ == '__main__':
# ip地址
ip = '192.168.3.12'
# 端口列表
port_list = [i for i in range(1, 65535)]
# 调用异步端口扫描
asyncio.run(async_port_scan(ip, port_list))
速度很快, 推荐的方式, 主进程异步调用
0x05 多线程的扫描方式
# -*- coding: utf-8 -*-
# @Time : 2023/2/24 16:45
# @Author : GyArmy
import socket
import threading
# 端口扫描-
def port_scan(ip, port, results):
# 创建socket对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置超时时间
s.settimeout(0.1)
# 连接
result = s.connect_ex((ip, port))
# 关闭连接
s.close()
# 判断是否连接成功
if result == 0:
results.append(port)
print("端口{}开放".format(port))
def multi_port_scan(ip, port_list):
# 创建线程池
thread_list = []
# 用列表保存结果
results = []
for port in port_list:
# 创建线程并添加到线程列表中
t = threading.Thread(target=port_scan, args=(ip, port, results))
thread_list.append(t)
# 启动所有线程
for t in thread_list:
t.start()
# 等待所有线程执行完毕
for t in thread_list:
t.join()
# 获取开放端口列表
print("开放端口列表:", results)
if __name__ == "__main__":
# ip地址
ip = "192.168.6.102"
# 端口列表
port_list = [i for i in range(1, 10000)]
# 调用多线程端口扫描
multi_port_scan(ip, port_list)
测试的响应速度也很快
0x06 局域网常用端口探测器
# -*- coding: utf-8 -*-
# @Time : 2023/3/7 15:38
# @Author : GyArmy
import ipaddress
from queue import Queue
import threading
from scapy.all import *
from scapy.layers.inet import IP, ICMP, TCP
logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
# ping检测主机是否存活
def ping_host(ip, results):
ping = IP(dst=ip) / ICMP()
ping_reply = sr1(ping, timeout=1, verbose=False)
if ping_reply:
# print("主机存活:", ip)
results.put(ip)
def scan_ip(ip_str):
"""
扫描ip地址段
:param ip_str: "192.168.3.1/24"
:return:
"""
ip_list = list(ipaddress.ip_network(ip_str, strict=False).hosts())
results = Queue()
threads = []
for ip in ip_list:
t = threading.Thread(target=ping_host, args=(str(ip), results))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
host_list = list(results.queue)
return host_list
# 检测目标主机端口是否开放
# 1. 生成一个TCP数据包
# 2. 发送数据包
# 3. 接收数据包
# 4. 判断数据包是否有回应
def scan_port(ip, port, results):
# 生成一个TCP数据包
packet = IP(dst=ip) / TCP(dport=port, flags="S")
# 发送数据包
packet_reply = sr1(packet, timeout=1, verbose=False)
# 判断数据包是否有回应
# if packet_reply:
if packet_reply and packet_reply.haslayer(TCP):
# 判断回应的数据包是否是SYN+ACK
if packet_reply[TCP].flags == "SA":
# print(ip, ":", port , "端口开放")
results.put(port)
def scan_port_list(ip, port_list):
"""
扫描端口
:param ip:
:param port_list:
:return:
"""
results = Queue()
threads = []
for port in port_list:
t = threading.Thread(target=scan_port, args=(ip, port, results))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
port_list = list(results.queue)
return port_list
if __name__ == '__main__':
# 扫描ip地址段
ip_list_str = "192.168.3.1/24"
pc_up_list = scan_ip(ip_list_str)
for ip in pc_up_list:
# 扫描端口
# port_list = [i for i in range(1, 1000)]
# 常用端口
port_list = [21, 22, 23, 25, 53, 80, 110, 139, 143, 443, 445, 1433, 1521, 3306, 3389, 6379, 7001, 8080, 8081, 8089, 9090, 9200, 27017,22122, 50070, 50075]
port_up_list = scan_port_list(ip, port_list)
print(ip,port_up_list)
0x07 搭配nmap模块的扫描
# 调用 nmap的扫描功能
import nmap
def namp_tool_scan(ip, port_list):
# 创建一个nmap扫描对象
nm = nmap.PortScanner()
# 扫描主机
port_list_str = ','.join(port_list)
# 探测主机是否存活
host_is_up = nm.scan(hosts=ip, arguments="-sP")
# 'uphosts' -> 0
if host_is_up['nmap']['scanstats']['uphosts'] == '0':
print(ip, ' -> 主机不可达')
return
nm.scan(hosts=ip, arguments="-sS -p " + port_list_str)
# 获取扫描结果
result_host = nm.all_hosts()
result = nm[result_host[0]]['tcp']
# 打印出所有扫描到的端口
# for port in result:
# print(port, result[port]['state'])
open_port_list = []
for port in result:
if result[port]['state'] == 'open':
# print(ip,port,result[port].get('name'),"open")
open_port_list.append(str(port))
if open_port_list:
print(ip, ' -> ', ','.join(open_port_list))
if __name__ == '__main__':
# 常用探测端口
ports = ['21','22','23','25','80','139','443','445','3306','3389','6379','8080','11211']
# ip列表
ip_list = [i for i in range(1, 255)]
# ip列表
ip_list = ['192.168.3.' + str(i) for i in ip_list]
# nmap扫描
for ip in ip_list:
namp_tool_scan(ip, ports)
0则评论给“port scan多线程扫描器”