Hello, readers đ Yes, I am truly sorry for not being more active on this âPython For Hackersâ series for such a long period. However, I will now carry on with this series. In this âPython for Hackersâ blog post, Iâll explain how to create a Network scanner in Python that will enable you to identify clients that are available in your subnet and determine which ports are open on those clients.
So letâs start đ«Ą:
đ§đŁ What is Network Scanning
Network scanning is the process of gathering information about the devices and services
on a network.
How does network scanning helps hackers?
- To find open ports and services
- To identify Vulnerable devices
- To map out the network Topology
đ§ Building The tool
đĄBefore building the tool itself let me give a brief of what you are going to build.
đ Importing all the necessary modules in our file
from datetime import datetime
import argparse
import socket
import re
import concurrent.futures
from queue import Queue
import scapy.all as scapy
from  termcolor import colored
These all the modules which will be used.
â Taking command-line arguments
We will be using the argparse
module to accept command line inputs in this blog. If you are unfamiliar with using the argparse
module, you can read my prior blogs, where I have taught the fundamentals of using it as well as detailed how to take user inputs.
Creating get_args() function
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("target", help="Target URL or IP address")
parser.add_argument(
"-arp",
dest="arp",
help="Use this for ARP ping!",
required=False,
action="store_true",
)
parser.add_argument(
"-pT",
dest="tcpPortScan",
help="TCP Port Scan",
required=False,
action="store_true",
)
parser.add_argument(
"-pU",
dest="udpPortScan",
help="UDP Port Scan",
required=False,
action="store_true",
)
parser.add_argument(
"-p",
"--ports",
dest="ports",
help="Port range to scan, default is 1-65535 (all ports)",
required=False,
action="store",
default="1-65535",
)
parser.add_argument(
"-t",
"--threads",
dest="threads",
help="Threads for speed, default is 100",
required=False,
action="store",
default=100,
type=int,
)
return parser.parse_args()
Let me firstly explain the various arguments we are adding.
-arp
: This argument is responsible to accept , if user want to scan the whole subnet and identify the various hosts in his subnet.
-pT
: This argument is responsible to do a TCP port scan on the specified IP address
- -
pU
: This argument is responsible to do a UDP port scan on the specified IP address
-p
: This argument is supposed to be used with TCP or UDP port scan, this allows you to explicitly decide which port you want to scan for open or close.
-t
: This argument lets you decide the thread intensity.
Coding out code outside functions
if __name__ == "__main__":
main()
We are simply verifying that the namespace in the global scope is {main}. Should the program determine that this is the case, the main() function takes over as the execution context.
Creating the main
function
def main():
args = get_args()
host = args.target
start_port, end_port = map(int, args.ports.split("-"))
scan_type = ""
port_queue = Queue()
print(colored("-"*65, 'cyan', attrs=['dark']))
print(colored(
f"\tNetwork scanner starting at {datetime.now().strftime('%d/%m/%Y %H:%M:%S')}", 'cyan', attrs=['dark']))
print(colored("-"*65, 'cyan', attrs=['dark']))
if args.arp:
print(colored("-"*50,'light_red'))
print(colored("\tARP Ping Scan Results",'light_red'))
print(colored("-"*50,'light_red'))
print(colored("="*30,'black'))
print(colored("\tPort\tState",'black',attrs=['bold']))
print(colored("="*30,'black'))
arp_ping(host)
if ((args.tcpPortScan)or(args.udpPortScan)):
print(colored("-"*50,'light_red'))
if args.tcpPortScan:
print(colored("\tTCP Port Scan Results",'light_red'))
scan_type="T"
elif (args.udpPortScan):
print(colored("\tUDP Port Scan Results",'light_red'))
scan_type="U"
print(colored("-"*50,'light_red'))
print()
print(colored("="*30,'dark_grey'))
print(colored("\tPort\tState",'dark_grey',attrs=['bold']))
print(colored("="*30,'dark_grey'))
for port in range(start_port, end_port + 1):
port_queue.put(port)
with concurrent.futures.ThreadPoolExecutor(
max_workers=args.threads
) as executor:
for _ in range(args.threads):
executor.submit(scan_thread, host, scan_type, port_queue)
port_queue.join()
This is the main function which handles all the various functionalities. let me give a high-level overview of the whole function.
- The code in the first few lines simply extracts the command-line arguments to be used and assigns them to a variable associated with them and we also assign a queue data structure to the variable named
port_queue
and produces a banner to make the console more fancy.
- Then we developed two alternatives, depending on whether the user chose to run an ARP scan or a TCP or UDP scan.
- If the user selects an Arp scan, we simply print the banner and shift the execution context to the arp_ping function.
- If the user chooses a TCP or an UDP, then we simply print the banner and then we change the variable
scan_type
to T if TCP scan and U for UDP scan.
- Then we proceed to the main and most important part of the code, in which we use a for loop to iterate from start_port number to end_point number, passing each iteration variable to the queue
- Next, we use the âthreadPoolExectutatorâ from the âconcurrent.futuresâ library to create a thread pool. The value of the command-line argument âargs.threadsâ determines the number of worker threads in the pool.
- A pool is used within the thread pool to submit tasks to the executor, with the number of current works equal to args.thread.
- Each task then goes to theâscan_threadâ function, which is in charge of scanning ports, and we pass scan_type and port_queue as arguments.
- The executor manages the parallel execution of these tasks, up to the maximum number of worker threads permitted. This allows multiple ports to be scanned at the same time, increasing scanning efficiency.
- The threads retrieve port numbers from the âport_queueâ as they run and scan ports, perform the port scans, and report the results.
- The âport_queue.join()â statement is used to halt the execution of the main program until all tasks in the queue are completed. This ensures that the program waits for all threads to complete before moving on.
Creating the arp_ping
function
Letâs code the function which is responsible for generating arp requests in the subnet assigned.
if not re.match(r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]*$", ip):
print(colored("[-] Please provide a valid IP address range for ARP ping!",'red',attrs=['bold']))
exit(1)
try:
arp_request_frame = scapy.ARP(pdst=ip)
ether_broadcast_frame = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
broadcast_arp_packet = ether_broadcast_frame / arp_request_frame
active_clients = scapy.srp(
broadcast_arp_packet, timeout=3, verbose=False)[0]
for _, reply in active_clients:
print(f"[+]\t{reply.psrc}\t{reply.hwsrc}")
except Exception as err:
print(colored(f"[-] Error occurred! Reason: {err}",'red',attrs=['dark']))
Letâs understand the code
- Firstly, we check if the IP address which was passed was given in the right format or not, if not we print an error message and gets out of the loop.
- Now in the try block, we firstly create a create a ARP request frame with the specified destination IP address.
- We also create a Ethernet broadcast frame with destination MAC set to âFF:FF:FF:FF:FFâ.
- Then we combine two frames together to create a broadcast ARP packet.
- Then we sends the broadcast ARP packet and waits for the response for upto 3 seconds.
- Then we iterate over the responses and prints out the IP and MAC address of the clients who replied.
- Then at last if a error occurs , we simply prints that error as well.
Creating the scan_thread
function
Letâs define the scan_thread
which is responsible for running port scanning tasks in parallel within a thread.
def scan_thread(host, scan_type, port_queue):
while True:
try:
port = port_queue.get_nowait()
port_scan(port, host, scan_type)
port_queue.task_done()
except queue.Empty:
break
Letâs how the function is working.
- We firstly defines a infinite loop using the while loop which only terminates if the passed
port_queue
is empty.
- Then in the try block , in the line
port = port_queue.get_nowait()
which retrives a port number form the queue.
- If a port number is successfully retrieved and the port_queue was not empty , then we calls the function
port_scan
which is responsible for building the socket connections, in the function arguments we pass port, host and scan_type .
- After the port_scan is completed, the port_queue.task_done() method is used to signal that task associatedwith the scanned port has been completed.
- At last we code the loop terminate condition which is if the queue is empty then the loop is terminated.
Creating the port_scan
function
Letâs finally code the function which is responsible for building socket connections and tell if the port is open or not.
def port_scan(port, host, scan_type):
try:
if scan_type == "T":
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(1)
client.connect((host, port))
client.close()
print(f"[*]\t{port}\tOpen")
elif scan_type == "U":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1)
client.connect((host, port))
print(f"[*]\t{port}\tOpen")
sock.close()
except KeyboardInterrupt:
print("You pressed Ctrl+C")
exit(1)
except:
pass
Letâs understand the code:
- In the try block we are checking what is the scan_type whether it is âTâ or âUâ depending on the answer we enter in the specified block.
- If the scan_type is T, then we create a socket object of TCP (sock.AF_INET denotes the IP address is a IPV4 address and socket.SOCK_STREAM denotes the data packet follows the tcp protocol)
- We set a timeout of 1 seconds for the connection checking and then we try to connect to host at the designated port if the response comes we then print Port was open.
- If the scan_type is âUâ, then we create a socket object of UDP (sock.AF_INET denotes the IP address is a IPV4 address and socket.DGRAM denotes the data packet follows the UDP protocol.
- We set a timeout of 1 second for connection to build and we try connecting to the host at the port number , if the connection is made safely we print that the port was open.
- At last we are capturing keyboard interrupt to stop the program execution, if the user wants to stop program execution early.
- At the very last for every execution occurred (like connection was refused ) , we are doing nothing for it.
So this is the end of the code , if you want to reach out the whole code you can check it out at : đLink to code
đ· Conclusion
Finally, the above Network and port scanner script is just a mock-up script to show how widely used tools like nmap are created. There are numerous features that can be added to the script and optimized for better performance.
Furthermore, please only use scripts like these if you have network permissions.