# # # This is an example policy which denies ssh password scans: # if a IP has opened 3 sessions which is shorter than 1 minute # three times in the last 10 minute, the connection will be denied # # import time short_sessions = {} class MySSHPlugProxy(PlugProxy): def config(self): global short_sessions PlugProxy.config(self) ip = self.session.client_address.ip_s now = time.time() try: (last_attempt, count) = short_sessions[ip] except KeyError: last_attempt = now count = 0 if now > last_attempt + 600: # last attempt more than 10 minutes ago, it is allowed again last_attempt = now count = 0 count = count + 1 if count > 3 and now < last_attempt + 180: # more than 3 attempts in the last 3 minutes raise DACException, "Connections over limit" short_sessions[ip] = (now, count) self.started_time = now def shutDown(self): global short_sessions PlugProxy.shutDown(self) now = time.time() if now - self.started_time > 60: # this was a session longer than 60 seconds, it was not a # real short session ip = self.session.client_address.ip_s (last_attempt, count) = short_sessions[ip] short_sessions[ip] = (last_attempt, count - 1)