-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRPC_Server.py
More file actions
71 lines (61 loc) · 2.4 KB
/
Copy pathRPC_Server.py
File metadata and controls
71 lines (61 loc) · 2.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
from OpenSSL import SSL
from twisted.web import server
from twisted.internet import reactor
from twisted.internet import ssl
from RPC_functions import JSONRPCServer
# Show dedailed information for invalid certificates
def verifyCallback(connection, x509, errnum, errdepth, ok):
try:
if not ok:
print('\033[91mInvalid client certificate\033[0m')
if(x509.has_expired()):
print("\033[91mCertificate has expired\033[0m")
return False
print(str(x509.get_subject()))
print("Issuer: " + str(x509.get_issuer()))
print("Version: " + str(x509.get_version()))
return False
else:
return True
except Exception as e:
print("error in verifyCallback" + str(e))
return False
def main():
port = 12345
# Servers private Key
privKey = "<PATH TO YOUR PRIVATE KEY>"
# Server's certificate
certificate = "<PATH TO YOUR CERTIFICATE>"
# Accepted client Certificate Authority, accepts only
# clients with signed certificates as authentication method
accepted_ca = "<PATH TO YOUR ACCEPTED CERTIFICATE AUTHORITY>"
# Used SSL/TLS method using OpenSSL
sslMethod = SSL.TLSv1_2_METHOD
# Our RPC Server which is imported from RPC_functions.py
r = JSONRPCServer()
# SSL context used for encryption
sslContext = ssl.DefaultOpenSSLContextFactory(privKey, certificate,
sslmethod=sslMethod)
ctx = sslContext.getContext()
# Ensure, that we verify clients's certificate, use our verifyCallback
# for detailed information if the offered certificate is invalid
ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
verifyCallback)
# Only accept our trusted CA/Certificate
ctx.load_verify_locations(accepted_ca)
# Use our RPC server with encryption as well as client certificate
# authentication
reactor.listenSSL(port,
server.Site(r),
contextFactory=sslContext)
print("\033[94mJSON-RPC server started at port: \033[0m" + str(port))
# start our server
reactor.run()
if __name__ == '__main__':
# Disallow root to execute our client
if os.geteuid() == 0:
exit('\033[91mSorry, server cannot run as root user!\n\033[0m')
main()