download
#
#
# This is an example policy which denies ssh password scans:
# if a IP has opened 3 sessions which is shorter than 1 minute
# three times in the last 10 minute, the connection will be denied
#
#
import time
short_sessions = {}
class MySSHPlugProxy(PlugProxy):
def config(self):
global short_sessions
PlugProxy.config(self)
ip = self.session.client_address.ip_s
now = time.time()
try:
(last_attempt, count) = short_sessions[ip]
except KeyError:
last_attempt = now
count = 0
if now > last_attempt + 600:
# last attempt more than 10 minutes ago, it is allowed again
last_attempt = now
count = 0
count = count + 1
if count > 3 and now < last_attempt + 180:
# more than 3 attempts in the last 3 minutes
raise DACException, "Connections over limit"
short_sessions[ip] = (now, count)
self.started_time = now
def shutDown(self):
global short_sessions
PlugProxy.shutDown(self)
now = time.time()
if now - self.started_time > 60:
# this was a session longer than 60 seconds, it was not a
# real short session
ip = self.session.client_address.ip_s
(last_attempt, count) = short_sessions[ip]
short_sessions[ip] = (last_attempt, count - 1)
|