大佬教程收集整理的这篇文章主要介绍了liunx pyinotify的安装和使用,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
介绍此功能是检测目录的操作的事件
1.安装
在百度云盘下载或者在gits上下载安装包
链接:https://pan.baidu.com/s/1Lqt872YEgEo_bNPEnEJMaw
提取码:bjl2
# git clone https://github.com/seb-m/pyinotify.git # cd pyinotify/ # ls # python setup.py install
2.使用在代码中直接导入就可以
# -*- coding: utf-8 -*- # !/usr/bin/env python import os import Queue import datetiR_850_11845@e import pyinotify import logging #debug import threading,time pos = 0 save_pos = 0 filename ="" # filename1 ="" pathname ="" class Singleton(typE): def __call__(cls,*args,**kwargs): if not hasattr(cls,‘instance‘): cls.instance = super(Singleton,cls).__call__(*args,**kwargs) return cls.instance def __new__(cls,name,bases,dct): return type.__new__(cls,dct) def __init__(cls,dct): super(Singleton,cls).__init__(name,dct) class AuditLog(object): __Metaclass__ = Singleton def __init__(self): FORMAT = (‘%(asctimE)s.%(msecs)d-%(levelName)s‘ ‘[%(fileName)s:%(lineno)d:%(funcName)s]: %(messagE)s‘) DATEFMT = ‘%Y-%m-%d %H:%M:%s‘ logging.basicConfig(level=logging.DEBUG,format=FORMAT,datefmt=DATEFMT) logging.debug("__init__") self.log_queue = Queue.Queue(maxsize=1000) self.str = "AuditLog" def start(self,path): # path as: /var/log/auth.log try: print "start:",threading.currentThread() if( not os.path.exists(path)): logging.debug("文件路径不存在") return logging.debug("文件路径:{}".format(path)) global pathname pathname = path # # 输出前面的log # printlog() file = path[:path.rfind(‘/‘)] global filename global filename1 filename = path[path.rfind(‘/‘)+1:] filename1 = filename + ".1" print "filename:",filename," filename1 ",filename1 # watch manager wm = pyinotify.WatchManager() wm.add_watch(file,pyinotify.all_EVENTS,rec=TruE) eh = MyEventHandler() # notifier notifier = pyinotify.Notifier(wm,eh) notifier.loop() except Exception as e: logging.error(‘[AuditLog]:send error: %s‘,str(E)) return def read_log_file(self): global pos global save_pos try: # if( not os.pathname.exists(pathName)): # logging.debug("读取的文件不存在") # return fd = open(pathName) if pos != 0: fd.seek(pos,0) while True: line = fd.readline() if line.Strip(): logging.debug("put queue:{}".format(line.Strip())) try: self.log_queue.put_Nowait(line.Strip()) except Queue.Full: logging.warn(‘send_log_queue is full Now‘) pos = fd.tell() save_pos = pos else: break fd.close() except Exception,e: logging.error(‘[AuditLog]:send error: %s‘,str(E)) def read_log1_file(self): try: global save_pos global pos pos = 0 pathname1 = pathname + ".1" # if( not os.pathname1.exists(pathname1)): # logging.debug("读取的文件不存在") # return fd = open(pathname1) if save_pos != 0: fd.seek(save_pos,0) while True: line = fd.readline() if line.Strip(): if save_pos > fd.tell() : print save_pos," ",fd.tell() conTinue try: self.log_queue.put_Nowait(line.Strip()) logging.debug("put queue:{}".format(line.Strip())) except Queue.Full: logging.warn(‘send_log_queue is full Now‘) else: save_pos = 0 break fd.close() except Exception,str(E)) def __del__(self): print "del" class MyEventHandler(pyinotify.ProcessEvent): print "@H_160_81@myEventHandler :",threading.currentThread() def __init__(self): print "@H_160_81@myEventHandler __init__" self.auditlogobject = AuditLog() print self.auditlogobject.str # 当文件被修改时调用函数 def process_IN_MODIFY(self,event): #文件修改 print "process_IN_MODIFY",event.name def process_IN_CREATE(self,event): #文件创建 # logging.debug("process_IN_CREATE") print "create event:",event.name # log.log.1 global save_pos try: if(event.name == fileName): print "==" self.auditlogobject.read_log1_file() else: logging.debug("审计的文件不相同{} {}".format(filename1).format(event.Name)) except Exception as e: logging.error(‘[AuditLog]:send error: %s‘,str(E)) def process_IN_deletE(self,event): #文件删除 # logging.debug("process_IN_deletE") print "delete event:",event.name def process_IN_ACCESS(self,event): #访问 # logging.debug("process_IN_ACCESS") print "ACCESS event:",event.name def process_IN_ATTRIB(self,event): #属性 # logging.debug("process_IN_ATTRIB") print "ATTRIB event: 文件属性",event.name def process_IN_CLOSE_NowRITE(self,event): # logging.debug("process_IN_CLOSE_NowRITE") print "CLOSE_NowRITE event:",event.name def process_IN_CLOSE_WRITE(self,event): # 关闭写入 # logging.debug("process_IN_CLOSE_WRITE") print "CLOSE_WRITE event:",event.name try: if(event.name == fileName): self.auditlogobject.read_log_file() else: logging.debug("文件名不对 不是审计文件") return except Exception as e: logging.error(‘[AuditLog]:send error: %s‘,str(E)) def process_IN_OPEN(self,event): # 打开 # logging.debug("process_IN_OPEN") print "OPEN event:",event.name def Producer(): try: auditlogobj = AuditLog() auditlogobj.start("./log/log.log") except Exception as e: logging.error(‘[auditLog]:send error: %s‘,str(E)) def Consumer(): try: print "Consumer" auditlogobject = AuditLog() print auditlogobject.str while True: print "ddd" time.sleep(1) while auditlogobject.log_queue.qsize() != 0 : print "queue size",auditlogobject.log_queue.qsize() print auditlogobject.log_queue.get() # time.sleep(5) except Exception as e: logging.error(‘[AuditLog]:send error: %s‘,str(E)) if __name__ == ‘__main__‘: # a = threading.Thread(target=Producer,) # a.start() b = threading.Thread(target=Consumer,) b.start()
以上是大佬教程为你收集整理的liunx pyinotify的安装和使用全部内容,希望文章能够帮你解决liunx pyinotify的安装和使用所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。