-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy patharin-ip
More file actions
executable file
·136 lines (119 loc) · 4.99 KB
/
arin-ip
File metadata and controls
executable file
·136 lines (119 loc) · 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python2.7
###########
# IMPORTS #
###########
import sys
import re
import argparse
from multiprocessing import Pool
import signal
import socket
import requests
#############
# FUNCTIONS #
#############
def resolve(address):
max_attempts = 3
attempt = 0
while attempt < max_attempts:
try:
url = "http://whois.arin.net/ui/query.do"
headers = {'Accept': 'application/json'}
payload = {'flushCache': 'false', 'q': address}
response = requests.post(url, headers=headers, data=payload)
except:
#import traceback
#traceback.print_exc()
attempt += 1
if attempt == max_attempts:
sys.stderr.write('%s => Request timed out.\n' % (address))
continue
else:
try:
if get_orgRef(response) == 'APNIC' or get_orgRef(response) == 'RIPE' or get_orgRef(response) == 'LACNIC' or get_orgRef(response) == 'AFRINIC':
sys.stderr.write('%s => Registered with another regional internet registry.\n' % (address))
break
try: # Reallocated IP space
organisation = response.json()['ns4:pft']['customer']['name']['$']
description = response.json()['ns4:pft']['customer']['handle']['$']
except KeyError:
try: # Direct allocation
organisation = response.json()['ns4:pft']['net']['orgRef']['@name']
description = response.json()['ns4:pft']['net']['orgRef']['@handle']
except KeyError:
sys.stderr.write("%s => Error querying response" % address)
break
netblocks = response.json()['ns4:pft']['net']['netBlocks']['netBlock']
if type(netblocks) == dict: # single net block
netblock = response.json()['ns4:pft']['net']['netBlocks']['netBlock']
cidr = netblock['cidrLength']['$']
description = netblock['description']['$']
endAddress = netblock['endAddress']['$']
startAddress = netblock['startAddress']['$']
host_netblock = "%s/%s" % (startAddress, cidr)
elif type(netblocks) == list: # multiple netblocks
host_netblock = ""
for netblock in netblocks:
cidr = netblock['cidrLength']['$']
description = netblock['description']['$']
endAddress = netblock['endAddress']['$']
startAddress = netblock['startAddress']['$']
nb = "%s/%s" % (startAddress, cidr)
host_netblock += "%s|" % nb
host_netblock = re.sub(r'\|*$', '', host_netblock)
sys.stdout.write('%s,%s,%s,%s\n' % (address, host_netblock, organisation, description))
sys.stdout.flush()
except TypeError as te:
sys.stderr.write('%s => Error consuming data, %s\n' % (address, te))
import traceback
traceback.print_exc()
break
except ValueError as ve:
sys.stderr.write('%s => Error consuming data, %s\n' % (address, ve))
import traceback
traceback.print_exc()
break
except Exception as e:
import traceback
traceback.print_exc()
break
# break out of the loop
attempt = max_attempts
def get_orgRef(response):
try:
handle = response.json()['ns4:pft']['net']['orgRef']['@handle']
except KeyError:
return None
return handle
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
########
# MAIN #
########
if __name__ == '__main__':
desc = 'Query the ARIN Whois RWS service to retrive netblock and organisation information about an IP address.'
parser = argparse.ArgumentParser(description=desc)
parser.add_argument('file',
nargs='?',
type=argparse.FileType('r'),
action='store',
help='file containing a list of IP addresses split by a newline, otherwise read from STDIN',
metavar='FILE',
default=sys.stdin)
args = parser.parse_args()
try:
addresses = [line.strip() for line in args.file if len(line.strip())>0 and line[0] is not '#']
except KeyboardInterrupt:
exit()
# remove duplicates and sort
addresses = list(set(addresses))
addresses = sorted(addresses, key=lambda item: socket.inet_aton(item))
pool = Pool(processes=10, initializer=initializer)
try:
pool.map(resolve, addresses)
pool.close()
pool.join()
except KeyboardInterrupt:
pool.terminate()
pool.join()