-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFile
More file actions
111 lines (86 loc) · 3.48 KB
/
File
File metadata and controls
111 lines (86 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
# Copyright (c) Twisted Matrix Laboratories.
# See LICENSE for details.
from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
from twisted.internet import defer, protocol, reactor
from twisted.python import log
import struct, sys, getpass, os
USER = 'z3p' # replace this with a valid username
HOST = 'localhost' # and a valid host
class SimpleTransport(transport.SSHClientTransport):
def verifyHostKey(self, hostKey, fingerprint):
print 'host key fingerprint: %s' % fingerprint
return defer.succeed(1)
def connectionSecure(self):
self.requestService(
SimpleUserAuth(USER,
SimpleConnection()))
class SimpleUserAuth(userauth.SSHUserAuthClient):
def getPassword(self):
return defer.succeed(getpass.getpass("%s@%s's password: " % (USER, HOST)))
def getGenericAnswers(self, name, instruction, questions):
print name
print instruction
answers = []
for prompt, echo in questions:
if echo:
answer = raw_input(prompt)
else:
answer = getpass.getpass(prompt)
answers.append(answer)
return defer.succeed(answers)
def getPublicKey(self):
path = os.path.expanduser('~/.ssh/id_dsa')
# this works with rsa too
# just change the name here and in getPrivateKey
if not os.path.exists(path) or self.lastPublicKey:
# the file doesn't exist, or we've tried a public key
return
return keys.Key.fromFile(filename=path+'.pub').blob()
def getPrivateKey(self):
path = os.path.expanduser('~/.ssh/id_dsa')
return defer.succeed(keys.Key.fromFile(path).keyObject)
class SimpleConnection(connection.SSHConnection):
def serviceStarted(self):
self.openChannel(TrueChannel(2**16, 2**15, self))
self.openChannel(FalseChannel(2**16, 2**15, self))
self.openChannel(CatChannel(2**16, 2**15, self))
class TrueChannel(channel.SSHChannel):
name = 'session' # needed for commands
def openFailed(self, reason):
print 'true failed', reason
def channelOpen(self, ignoredData):
self.conn.sendRequest(self, 'exec', common.NS('true'))
def request_exit_status(self, data):
status = struct.unpack('>L', data)[0]
print 'true status was: %s' % status
self.loseConnection()
class FalseChannel(channel.SSHChannel):
name = 'session'
def openFailed(self, reason):
print 'false failed', reason
def channelOpen(self, ignoredData):
self.conn.sendRequest(self, 'exec', common.NS('false'))
def request_exit_status(self, data):
status = struct.unpack('>L', data)[0]
print 'false status was: %s' % status
self.loseConnection()
class CatChannel(channel.SSHChannel):
name = 'session'
def openFailed(self, reason):
print 'echo failed', reason
def channelOpen(self, ignoredData):
self.data = ''
d = self.conn.sendRequest(self, 'exec', common.NS('cat'), wantReply = 1)
d.addCallback(self._cbRequest)
def _cbRequest(self, ignored):
self.write('hello conch\n')
self.conn.sendEOF(self)
def dataReceived(self, data):
self.data += data
def closed(self):
print 'got data from cat: %s' % repr(self.data)
self.loseConnection()
reactor.stop()
protocol.ClientCreator(reactor, SimpleTransport).connectTCP(HOST, 22)
reactor.run()