-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathjudge.py
More file actions
142 lines (126 loc) · 4.36 KB
/
judge.py
File metadata and controls
142 lines (126 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import asyncio
import random
from urllib.parse import urlparse
import aiohttp
from .errors import ResolveError
from .resolver import Resolver
from .utils import get_headers, log
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
class Judge:
"""Proxy Judge."""
available = {'HTTP': [], 'HTTPS': [], 'SMTP': []}
ev = {
'HTTP': asyncio.Event(),
'HTTPS': asyncio.Event(),
'SMTP': asyncio.Event(),
}
def __init__(self, url, timeout=8, verify_ssl=False):
self.url = url
self.scheme = urlparse(url).scheme.upper()
self.host = urlparse(url).netloc
self.path = url.split(self.host)[-1]
self.ip = None
self.is_working = False
self.marks = {'via': 0, 'proxy': 0}
self.timeout = timeout
self.verify_ssl = verify_ssl
self._loop = asyncio.get_running_loop()
#self._resolver = Resolver(loop=self._loop)
self._resolver = Resolver()
def __repr__(self):
return '<Judge [%s] %s>' % (self.scheme, self.host)
@classmethod
def get_random(cls, proto):
if proto == 'HTTPS':
scheme = 'HTTPS'
elif proto == 'CONNECT:25':
scheme = 'SMTP'
else:
scheme = 'HTTP'
return random.choice(cls.available[scheme])
@classmethod
def clear(cls):
cls.available['HTTP'].clear()
cls.available['HTTPS'].clear()
cls.available['SMTP'].clear()
cls.ev['HTTP'].clear()
cls.ev['HTTPS'].clear()
cls.ev['SMTP'].clear()
async def check(self, real_ext_ip):
# TODO: need refactoring
try:
self.ip = await self._resolver.resolve(self.host)
except ResolveError:
return
if self.scheme == 'SMTP':
self.is_working = True
self.available[self.scheme].append(self)
self.ev[self.scheme].set()
return
page = False
headers, rv = get_headers(rv=True)
#connector = aiohttp.TCPConnector(
# loop=self._loop, ssl=self.verify_ssl, force_close=True
#)
connector = aiohttp.TCPConnector(
ssl=self.verify_ssl, force_close=True
)
try:
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(
#connector=connector, timeout=timeout, loop=self._loop
connector=connector, timeout=timeout
) as session, session.get(
url=self.url, headers=headers, allow_redirects=False
) as resp:
page = await resp.text()
except (
asyncio.TimeoutError,
aiohttp.ClientOSError,
aiohttp.ClientResponseError,
aiohttp.ServerDisconnectedError,
) as e:
log.debug('%s is failed. Error: %r;' % (self, e))
return
page = page.lower()
if resp.status == 200 and real_ext_ip in page and rv in page:
self.marks['via'] = page.count('via')
self.marks['proxy'] = page.count('proxy')
self.is_working = True
self.available[self.scheme].append(self)
self.ev[self.scheme].set()
log.debug('%s is verified' % self)
else:
log.debug(
(
'{j} is failed. HTTP status code: {code}; '
'Real IP on page: {ip}; Version: {word}; '
'Response: {page}'
).format(
j=self,
code=resp.status,
page=page,
ip=(real_ext_ip in page),
word=(rv in page),
)
)
def get_judges(judges=None, timeout=8, verify_ssl=False):
judges = judges or [
'http://httpbin.org/get?show_env',
'https://httpbin.org/get?show_env',
'smtp://smtp.gmail.com',
'smtp://aspmx.l.google.com',
'http://azenv.net/',
'https://www.proxy-listen.de/azenv.php',
'http://www.proxyfire.net/fastenv',
'http://proxyjudge.us/azenv.php',
'http://ip.spys.ru/',
'http://www.proxy-listen.de/azenv.php',
]
_judges = []
for j in judges:
j = j if isinstance(j, Judge) else Judge(j)
j.timeout = timeout
j.verify_ssl = verify_ssl
_judges.append(j)
return _judges