Python实现的Google IP 可用性检测脚本
需要Python3.4+,一个参数用来选择测试搜索服务还是GAE服务。测试GAE服务的话需要先修改开头的两个变量。从标准输入读取IP地址或者IP段(形如192.168.0.0/16)列表,每行一个。可用IP输出到标准输出。实时测试结果输出到标准错误。50线程并发。
checkgoogleip
#!/usr/bin/envpython3 importsys fromipaddressimportIPv4Network importhttp.clientasclient fromconcurrent.futuresimportThreadPoolExecutor importargparse importssl importsocket #先按自己的情况修改以下几行 APP_ID='your_id_here' APP_PATH='/fetch.py' context=ssl.SSLContext(ssl.PROTOCOL_TLSv1) context.verify_mode=ssl.CERT_REQUIRED context.load_verify_locations('/etc/ssl/certs/ca-certificates.crt') classHTTPSConnection(client.HTTPSConnection): def__init__(self,*args,hostname=None,**kwargs): self._hostname=hostname super().__init__(*args,**kwargs) defconnect(self): super(client.HTTPSConnection,self).connect() ifself._tunnel_host: server_hostname=self._tunnel_host else: server_hostname=self._hostnameorself.host sni_hostname=server_hostnameifssl.HAS_SNIelseNone self.sock=self._context.wrap_socket(self.sock, server_hostname=sni_hostname) ifnotself._context.check_hostnameandself._check_hostname: try: ssl.match_hostname(self.sock.getpeercert(),server_hostname) exceptException: self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() raise defcheck_ip_p(ip,func): iffunc(ip): print(ip,flush=True) defcheck_for_gae(ip): return_check(APP_ID+'.appspot.com',APP_PATH,ip) defcheck_for_search(ip): return_check('www.google.com','/',ip) def_check(host,path,ip): forchanceinrange(1,-1,-1): try: conn=HTTPSConnection( ip,timeout=5, context=context, hostname=host, ) conn.request('GET',path,headers={ 'Host':host, }) response=conn.getresponse() ifresponse.status<400: print('GOOD:',ip,file=sys.stderr) else: raiseException('HTTPError%s%s'%( response.status,response.reason)) returnTrue exceptKeyboardInterrupt: raise exceptExceptionase: ifisinstance(e,ssl.CertificateError): print('WARN:%sisnotGoogle\'s!'%ip,file=sys.stderr) chance=0 ifchance==0: print('BAD:',ip,e,file=sys.stderr) returnFalse else: print('RE:',ip,e,file=sys.stderr) defmain(): parser=argparse.ArgumentParser(description='CheckGoogleIPs') parser.add_argument('service',choices=['search','gae'], help='servicetocheck') args=parser.parse_args() func=globals()['check_for_'+args.service] count=0 withThreadPoolExecutor(max_workers=50)asexecutor: forlinsys.stdin: l=l.strip() if'/'inl: foripinIPv4Network(l).hosts(): executor.submit(check_ip_p,str(ip),func) count+=1 else: executor.submit(check_ip_p,l,func) count+=1 print('%dIPchecked.'%count) if__name__=='__main__': main()