import time import datetime import re import mysql.connector import getpass import sys MonthCodes=dict(); MonthCodes["Jan"]=1; MonthCodes["Feb"]=2; MonthCodes["Mar"]=3; MonthCodes["Apr"]=4; MonthCodes["May"]=5; MonthCodes["Jun"]=6; MonthCodes["Jul"]=7; MonthCodes["Aug"]=8; MonthCodes["Sep"]=9; MonthCodes["Oct"]=10; MonthCodes["Nov"]=11; MonthCodes["Dec"]=12; expressions=dict(); expressions["Connection closed by invalid user "]="password"; expressions["Disconnected from invalid user "]="password"; expressions["Connection closed by authenticating user "]="password"; expressions["Disconnected from authenticating user "]="password"; expressions["Disconnecting authenticating user "]="password"; expressions["Connection closed by "]="kex"; expressions["Disconnecting invalid user "]="password"; OrderedDataKeys=["PID","timestamp","duration","IP","port","user","attempts","type"]; noise=["/n", ": Too many authentication failures", " [preauth]", "Bye Bye" ]; def RemoveNoise(textline): updatedtext=""+textline; for n in noise: updatedtext=updatedtext.replace(n,""); replaceind=updatedtext.find(": Change of username or service not allowed"); if replaceind!=-1: updatedtext=updatedtext[:replaceind]; return updatedtext; class sshlogentry: def __init__(self,ProcID): self.timestampsList=[]; self.data=dict(); self.PID=ProcID; self.data["timestamp"]=None; self.data["IP"]=None; self.data["port"]=None; self.data["attempts"]=0; self.data["user"]=None; self.data["duration"]=None; self.data["type"]=None; self.isValid=True; def VerifyData(self): if len(self.timestampsList)==0: self.isValid=False; return False; self.data["timestamp"]=min(self.timestampsList); self.data["duration"]=max(self.timestampsList)-self.data["timestamp"]; self.data["PID"]=self.PID; if self.data["user"]==None: self.data["user"]="n.a"; for d in self.data: self.isValid *= self.data[d]!=None; return True; #SQLstatement="INSERT INTO entries (PID, timestamp, duration, IPaddress, port, user, attempts) VALUES (%s, %s, %s, %s, %s, %s, %s)"; def OutputData(self,keyorder): return tuple([self.data.get(datakey,"") for datakey in keyorder]); #year is not explicitly written in logs #might need to check the code behind auth.log def FormatLogDate(DateString,ld_year): ld_month=MonthCodes[DateString[:3]]; ld_day=int(DateString[4:6]); ld_hour,ld_min,ld_sec=[int(ld_temp) for ld_temp in DateString[7:15].split(":")]; return datetime.datetime(ld_year,ld_month,ld_day,ld_hour,ld_min,ld_sec); def ParseFile(filename,LowerTS,UpperTS,VerboseFeedback=True): Entries=[]; Errors=[]; CurrentEntry=sshlogentry(None); # DataSource=open(filename,"r"); for ds_i,dsLine in enumerate(DataSource): dsLine=dsLine.replace("\n",""); OpeningBracket=dsLine.index("["); ClosingBracket=dsLine.index("]"); entryID=dsLine[OpeningBracket+1:ClosingBracket]; if entryID != CurrentEntry.PID: CurrentEntry.VerifyData(); if CurrentEntry.isValid: if LowerTS <= CurrentEntry.data["timestamp"] <=UpperTS: Entries.append(CurrentEntry); else: Errors.append(CurrentEntry); CurrentEntry=sshlogentry(entryID); entryDate=dsLine[0:15]; entryDateFormatted=FormatLogDate(entryDate,2025); entryTimestamp=int(entryDateFormatted.timestamp()); if VerboseFeedback: print(f'{entryDateFormatted.year}-{entryDateFormatted.month}-{entryDateFormatted.day}: line #{ds_i}'); CurrentEntry.timestampsList.append(entryTimestamp); # dsLineParsable=RemoveNoise(dsLine[ClosingBracket+3:]); CurrentEntry.data["attempts"]+=dsLineParsable.count("Failed password for"); for expression in expressions: if expression in dsLineParsable and " port " in dsLineParsable: entrySublist,entryPort=dsLineParsable.replace(expression,"").split(" port "); CurrentEntry.data["port"]=int(entryPort); IPmatch=re.search(r'(\d+)\.(\d+)\.(\d+)\.(\d+)',entrySublist); if not IPmatch: IPmatch=re.search(r'([a-f0-9:]+:+)+[a-f0-9]*(%\w+)*',entrySublist); CurrentEntry.data["IP"]=IPmatch.group(); CurrentEntry.data["user"]=entrySublist.replace(" "+CurrentEntry.data["IP"],"").replace(" from",""); CurrentEntry.data["type"]=""+expressions[expression]; break; DataSource.close(); return Entries,Errors; def CreateFeedFile(EntryList,feedorder,FeedFilename="ssh-logins-feedfile",delim=";"): feedfile=open(FeedFilename,"w"); for entryItem in EntryList: feedline=delim.join([f'{ei}'for ei in entryItem.OutputData(feedorder)]); feedfile.write(feedline+"\n"); feedfile.close(); def UpdateDatabase(ParsedEntriesList,keyorder, pword=None): ParsedValuesList=[]; for pel in ParsedEntriesList: ParsedValuesList.append(pel.OutputData(keyorder)); if pword==None: SQLpassword=getpass.getpass("SQL user password: "); else: SQLpassword=pwd; SQLDatabase=mysql.connector.connect( host="localhost", user="sshlogins", password=SQLpassword, database="sshlogs"); SQLcursor=SQLDatabase.cursor(); OrderedData=", ".join(keyorder); SQLstatement="INSERT INTO entries (PID, timestamp, duration, IPaddress, port, user, attempts, type) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"; batchsize=100; for batch in range(1+len(ParsedValuesList)//batchsize): SQLcursor.executemany(SQLstatement,ParsedValuesList[batch*batchsize:(batch+1)*batchsize]); SQLDatabase.commit(); print("batch ",batch," out of ",1+len(ParsedValuesList)//batchsize," inserted"); def CreateUncategorizedFile(UncategorizedList,fname="uncategorized-sshlogs.err",delim=";"): uncat=open(fname,"a"); for err in UncategorizedList: uncat.write(err+"\n"); uncat.close(); def ConvertBoundaryDates(LB,UB): boundaryTimestamps=list(); #second date must be reduced by 1 #so it's a previous month for bi,bdate in enumerate([LB,UB]): yy,mm,dd=[int(v) for v in bdate.split("-")] ts=datetime.datetime(yy,mm,dd).timestamp(); boundaryTimestamps.append(int(ts)-bi); return boundaryTimestamps; def PerformUpdate(LowerBoundsDate,UpperBoundsDate,RawData,dbpassword=None): print("Performing full database update:"); print("Start..."); LowerBounds,UpperBounds=ConvertBoundaryDates(LowerBoundsDate,UpperBoundsDate); updateEntries,updateErrors=ParseFile(RawData,LowerBounds,UpperBounds,True); print("File parsed"); #CreateFeedFile(EntryList=updateEntries,feedorder=OrderedDataKeys,FeedFilename="ssh-logins-feedfile"); print("backup feedfile created"); UpdateDatabase(updateEntries,OrderedDataKeys,dbpassword); print("Instances retrieved:",len(updateEntries)); print("Instances discarded:",len(updateErrors)); updateEntries,updateErrors=[],[]; #i think this will clear memory ''' +-------------+-----------+ | COLUMN_NAME | DATA_TYPE | +-------------+-----------+ | lp | int | | PID | int | | timestamp | int | | duration | mediumint | | IPaddress | varchar | | port | varchar | | user | varchar | | attempts | smallint | | type | varchar | +-------------+-----------+ 9 rows in set (0,01 sec) ''' if __name__=="__main__": PerformUpdate(sys.argv[1],sys.argv[2],sys.argv[3],sys.argv[4]);