Linux   发布时间:2022-03-31  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了liunx pyinotify的安装和使用大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

概述

介绍此功能是检测目录的操作的事件 1.安装  在百度云盘下载或者在gits上下载安装包   链接:https://pan.baidu.com/s/1Lqt872YEgEo_bNPEnEJMaw   提取码:bjl2 # git clone https://github.com/seb-m/pyinotify.git # cd pyinotify/ # ls # python set

介绍此功能是检测目录的操作的事件

1.安装 

百度云盘下载或者在gits上下载安装包

  链接https://pan.baidu.com/s/1Lqt872YEgEo_bNPEnEJMaw
  提取码:bjl2

# git clone https://github.com/seb-m/pyinotify.git     
# cd pyinotify/
# ls
# python setup.py install

2.使用在代码中直接导入就可以

# -*- coding: utf-8 -*-
# !/usr/bin/env python
import os
import Queue
import datetiR_850_11845@e
import pyinotify
import logging
#debug 
import threading,time

pos = 0
save_pos = 0
filename =""   #
filename1 =""
pathname =""




class Singleton(typE):
    def __call__(cls,*args,**kwargs):
        if not hasattr(cls,instance):
            cls.instance = super(Singleton,cls).__call__(*args,**kwargs)
        return cls.instance
    def __new__(cls,name,bases,dct):
        return type.__new__(cls,dct)
 
    def __init__(cls,dct):
        super(Singleton,cls).__init__(name,dct)


class AuditLog(object):
    __Metaclass__ = Singleton
    def __init__(self):

        FORMAT = (%(asctimE)s.%(msecs)d-%(levelName)s
                  [%(fileName)s:%(lineno)d:%(funcName)s]: %(messagE)s)
        DATEFMT = %Y-%m-%d %H:%M:%s
        logging.basicConfig(level=logging.DEBUG,format=FORMAT,datefmt=DATEFMT)

        logging.debug("__init__")
        self.log_queue = Queue.Queue(maxsize=1000)
        self.str = "AuditLog"


    def start(self,path):  #  path   as:  /var/log/auth.log
        try:
            print "start:",threading.currentThread()
            if( not os.path.exists(path)):
                logging.debug("文件路径不存在")
                return
            logging.debug("文件路径:{}".format(path))
            global pathname
            pathname = path

            # # 输出前面的log
            # printlog()
            file =  path[:path.rfind(/)]
            
            global filename
            global filename1
            filename = path[path.rfind(/)+1:]
            filename1 = filename + ".1"
            print "filename:",filename,"    filename1 ",filename1
            # watch manager
            wm = pyinotify.WatchManager()
            wm.add_watch(file,pyinotify.all_EVENTS,rec=TruE)
            eh = MyEventHandler()
            # notifier
            notifier = pyinotify.Notifier(wm,eh)
            notifier.loop()

        except Exception as e:
            logging.error([AuditLog]:send error: %s,str(E))
            return

    def read_log_file(self):
        global pos
        global save_pos
        try:
            # if( not os.pathname.exists(pathName)):
            #     logging.debug("读取的文件不存在")
            #     return
            fd = open(pathName)
            if pos != 0:
                fd.seek(pos,0)
            while True:
                line = fd.readline()
                if line.Strip():
                    logging.debug("put queue:{}".format(line.Strip()))
                    try:
                        self.log_queue.put_Nowait(line.Strip())
                    except Queue.Full:
                        logging.warn(send_log_queue is full Now)
                    
                    pos = fd.tell()
                    save_pos = pos
                else:
                    break
            fd.close()
        except Exception,e:
            logging.error([AuditLog]:send error: %s,str(E))

    def read_log1_file(self):

        
        try:
            global save_pos
            global pos 
            pos = 0
            pathname1 = pathname + ".1"
            # if( not os.pathname1.exists(pathname1)):
            #     logging.debug("读取的文件不存在")
            #     return
            fd = open(pathname1)
            if save_pos != 0:
                fd.seek(save_pos,0)
            while True:
                line = fd.readline()
                if line.Strip():
                    if save_pos > fd.tell() :
                        print save_pos,"   ",fd.tell()
                        conTinue 
                    try:
                        self.log_queue.put_Nowait(line.Strip())
                        logging.debug("put queue:{}".format(line.Strip()))
                    except Queue.Full:
                        logging.warn(send_log_queue is full Now)

                    
                else:
                    save_pos = 0
                    break
            fd.close()
        except Exception,str(E))



    def __del__(self):
        print "del"






class MyEventHandler(pyinotify.ProcessEvent):

    print "@H_160_81@myEventHandler :",threading.currentThread()
    def __init__(self):
        print "@H_160_81@myEventHandler  __init__"
        self.auditlogobject = AuditLog()
        print self.auditlogobject.str
    #文件修改调用函数
    def process_IN_MODIFY(self,event):  #文件修改
        print "process_IN_MODIFY",event.name

    def process_IN_CREATE(self,event): #文件创建
        # logging.debug("process_IN_CREATE")
        print "create event:",event.name
                #  log.log.1
        global save_pos
        try:

            if(event.name == fileName): 
                print "=="
                self.auditlogobject.read_log1_file()
            else:
                logging.debug("审计的文件不相同{}  {}".format(filename1).format(event.Name))
        except Exception as e:
            logging.error([AuditLog]:send error: %s,str(E))

 
    def process_IN_deletE(self,event):   #文件删除
        # logging.debug("process_IN_deletE")
        print "delete event:",event.name

    def process_IN_ACCESS(self,event):  #访问
        # logging.debug("process_IN_ACCESS")
        print "ACCESS event:",event.name



     
    def process_IN_ATTRIB(self,event):  #属性
        # logging.debug("process_IN_ATTRIB")
        print "ATTRIB event:  文件属性",event.name

     
    def process_IN_CLOSE_NowRITE(self,event):
        # logging.debug("process_IN_CLOSE_NowRITE")
        print "CLOSE_NowRITE event:",event.name

     
    def process_IN_CLOSE_WRITE(self,event):  # 关闭写入
        # logging.debug("process_IN_CLOSE_WRITE")
        print "CLOSE_WRITE event:",event.name
        try:
            if(event.name == fileName):
                self.auditlogobject.read_log_file()

            else:
                logging.debug("文件名不对 不是审计文件")
                return


        except Exception as e:
            logging.error([AuditLog]:send error: %s,str(E))
     
     
    def process_IN_OPEN(self,event):  # 打开
        # logging.debug("process_IN_OPEN")
        print "OPEN event:",event.name



def Producer():

    try:
        auditlogobj = AuditLog()
        auditlogobj.start("./log/log.log")

    except Exception as e:
        logging.error([auditLog]:send error: %s,str(E))
 
 
 
def  Consumer():

    try:
        print "Consumer"
        auditlogobject = AuditLog()
        print auditlogobject.str
        while True:
            print "ddd"
            time.sleep(1)
            
            while auditlogobject.log_queue.qsize() != 0 :
                print "queue size",auditlogobject.log_queue.qsize()
                print auditlogobject.log_queue.get()
            # time.sleep(5)

    except Exception as e:
        logging.error([AuditLog]:send error: %s,str(E))





if __name__ == __main__:


 
 
    # a = threading.Thread(target=Producer,)
    # a.start()
    b = threading.Thread(target=Consumer,)
    b.start()

大佬总结

以上是大佬教程为你收集整理的liunx pyinotify的安装和使用全部内容,希望文章能够帮你解决liunx pyinotify的安装和使用所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。