Browse Source

first commit

zhzhenqin 3 years ago
commit
a95e7e9ab1

+ 25 - 0
.gitignore

@@ -0,0 +1,25 @@
+*.log
+*.pyc
+*.svn
+*.iml
+*.day
+*.tnf
+*.tfz
+*.tdx
+*.egg
+.classpath
+.project
+*.bak
+*.egg-info
+
+# Package Files #
+.svn
+.idea
+.settings
+.ipynb_checkpoints
+.vscode
+target
+db
+build
+dist
+logs

+ 23 - 0
amqp/py-amqp-reciver.py

@@ -0,0 +1,23 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+import amqp
+from amqp.basic_message import Message
+
+def recv_callback(msg):
+    headers = msg.headers
+    print(headers.get("loggerName") + "    " + msg.body.decode('utf-8'))
+
+
+if __name__ == "__main__":
+    with amqp.Connection(
+        host="localhost:5672",
+        userid="admin",
+        password="123456",
+        virtual_host="/",
+        insist=False) as c:
+        channel = c.channel()
+        channel.basic_consume(queue='proxylogtopic',
+                              callback=recv_callback, no_ack=True)
+        while True:
+            c.drain_events()

+ 182 - 0
ftp/ftp.py

@@ -0,0 +1,182 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+import ftplib
+import os
+import sys
+
+class FTPSync(object):
+
+  conn = ftplib.FTP()
+
+  def __init__(self,host,port=21):    
+    self.conn.connect(host,port)    
+
+  def login(self,username,password):
+    self.conn.login(username,password)
+    self.conn.set_pasv(False)
+    print(self.conn.welcome)
+
+  def test(self,ftp_path):
+    print(ftp_path)
+    print(self._is_ftp_dir(ftp_path))
+    #print self.conn.nlst(ftp_path)
+    #self.conn.retrlines( 'LIST ./a/b')
+    #ftp_parent_path = os.path.dirname(ftp_path)
+    #ftp_dir_name = os.path.basename(ftp_path)
+    #print ftp_parent_path
+    #print ftp_dir_name
+
+
+  def _is_ftp_file(self,ftp_path):
+    try:
+      if ftp_path in self.conn.nlst(os.path.dirname(ftp_path)):
+        return True
+      else:
+        return False
+    except ftplib.error_perm as e:
+      return False
+
+  def _ftp_list(self, line):
+    list = line.split(' ')
+    if self.ftp_dir_name==list[-1] and list[0].startswith('d'):
+      self._is_dir = True
+
+  def _is_ftp_dir(self,ftp_path):
+    ftp_path = ftp_path.rstrip('/')
+    ftp_parent_path = os.path.dirname(ftp_path)
+    self.ftp_dir_name = os.path.basename(ftp_path)
+    self._is_dir = False
+    if ftp_path == '.' or ftp_path== './' or ftp_path=='':
+      self._is_dir = True
+    else:
+      #this ues callback function ,that will change _is_dir value
+      try:
+        self.conn.retrlines('LIST %s' %ftp_parent_path,self._ftp_list)
+      except ftplib.error_perm as e:
+        return self._is_dir    
+    return self._is_dir
+
+  def get_file(self,ftp_path,local_path='.'):
+    ftp_path = ftp_path.rstrip('/')
+    if self._is_ftp_file(ftp_path):    
+      file_name = os.path.basename(ftp_path)
+      #如果本地路径是目录,下载文件到该目录
+      if os.path.isdir(local_path):
+        file_handler = open(os.path.join(local_path,file_name), 'wb' )
+        self.conn.retrbinary("RETR %s" %(ftp_path), file_handler.write) 
+        file_handler.close()
+      #如果本地路径不是目录,但上层目录存在,则按照本地路径的文件名作为下载的文件名称
+      elif os.path.isdir(os.path.dirname(local_path)):
+        file_handler = open(local_path, 'wb' )
+        self.conn.retrbinary("RETR %s" %(ftp_path), file_handler.write) 
+        file_handler.close()
+      #如果本地路径不是目录,且上层目录不存在,则退出
+      else:
+        print('EROOR:The dir:%s is not exist' %os.path.dirname(local_path))
+    else:
+      print('EROOR:The ftp file:%s is not exist' %ftp_path)
+
+  def put_file(self,local_path,ftp_path='.'):
+    ftp_path = ftp_path.rstrip('/')
+    if os.path.isfile( local_path ):           
+      file_handler = open(local_path, "r")
+      local_file_name = os.path.basename(local_path)
+      #如果远程路径是个目录,则上传文件到这个目录,文件名不变
+      if self._is_ftp_dir(ftp_path):
+        self.conn.storbinary('STOR %s'%os.path.join(ftp_path,local_file_name), file_handler)
+      #如果远程路径的上层是个目录,则上传文件,文件名按照给定命名
+      elif self._is_ftp_dir(os.path.dirname(ftp_path)): 
+        print('STOR %s'%ftp_path)      
+        self.conn.storbinary('STOR %s'%ftp_path, file_handler)
+      #如果远程路径不是目录,且上一层的目录也不存在,则提示给定远程路径错误
+      else:        
+        print('EROOR:The ftp path:%s is error' %ftp_path)
+      file_handler.close()
+    else:
+      print('ERROR:The file:%s is not exist' %local_path)
+
+  def get_dir(self,ftp_path,local_path='.',begin=True): 
+    ftp_path = ftp_path.rstrip('/')
+    #当ftp目录存在时下载    
+    if self._is_ftp_dir(ftp_path):
+      #如果下载到本地当前目录下,并创建目录
+      #下载初始化:如果给定的本地路径不存在需要创建,同时将ftp的目录存放在给定的本地目录下。
+      #ftp目录下文件存放的路径为local_path=local_path+os.path.basename(ftp_path)
+      #例如:将ftp文件夹a下载到本地的a/b目录下,则ftp的a目录下的文件将下载到本地的a/b/a目录下
+      if begin:
+        if not os.path.isdir(local_path):
+          os.makedirs(local_path)
+        local_path=os.path.join(local_path,os.path.basename(ftp_path))
+      #如果本地目录不存在,则创建目录
+      if not os.path.isdir(local_path):
+        os.makedirs(local_path)
+      #进入ftp目录,开始递归查询
+      self.conn.cwd(ftp_path)
+      ftp_files = self.conn.nlst()
+      for file in ftp_files:
+        local_file = os.path.join(local_path, file)
+        #如果file ftp路径是目录则递归上传目录(不需要再进行初始化begin的标志修改为False)
+        #如果file ftp路径是文件则直接上传文件
+        if self._is_ftp_dir(file):
+          self.get_dir(file,local_file,False)
+        else:
+          self.get_file(file,local_file)
+      #如果当前ftp目录文件已经遍历完毕返回上一层目录
+      self.conn.cwd( ".." )
+      return
+    else:
+      print('ERROR:The dir:%s is not exist' %ftp_path)
+      return
+ 
+  def put_dir(self,local_path,ftp_path='.',begin=True):
+    ftp_path = ftp_path.rstrip('/')
+    #当本地目录存在时上传
+    if os.path.isdir(local_path):      
+      #上传初始化:如果给定的ftp路径不存在需要创建,同时将本地的目录存放在给定的ftp目录下。
+      #本地目录下文件存放的路径为ftp_path=ftp_path+os.path.basename(local_path)
+      #例如:将本地文件夹a上传到ftp的a/b目录下,则本地a目录下的文件将上传的ftp的a/b/a目录下
+      if begin:        
+        if not self._is_ftp_dir(ftp_path):
+          self.conn.mkd(ftp_path)
+        ftp_path=os.path.join(ftp_path,os.path.basename(local_path))          
+      #如果ftp路径不是目录,则创建目录
+      if not self._is_ftp_dir(ftp_path):
+        self.conn.mkd(ftp_path)
+ 
+      #进入本地目录,开始递归查询
+      os.chdir(local_path)
+      local_files = os.listdir('.')
+      for file in local_files:
+        #如果file本地路径是目录则递归上传目录(不需要再进行初始化begin的标志修改为False)
+        #如果file本地路径是文件则直接上传文件
+        if os.path.isdir(file):          
+          ftp_path=os.path.join(ftp_path,file)
+          self.put_dir(file,ftp_path,False)
+        else:
+          self.put_file(file,ftp_path)
+      #如果当前本地目录文件已经遍历完毕返回上一层目录
+      os.chdir( ".." )
+    else:
+      print('ERROR:The dir:%s is not exist' %local_path)
+      return
+
+
+if __name__ == '__main__':
+  ftp = FTPSync('192.168.30.13', 2121)
+  ftp.login('admin','admin')
+  #上传文件,不重命名
+  #ftp.put_file('111.txt','a/b')
+  #上传文件,重命名
+  #ftp.put_file('111.txt','a/112.txt')
+  #下载文件,不重命名
+  #ftp.get_file('/a/111.txt',r'D:\\')
+  #下载文件,重命名
+  #ftp.get_file('/a/111.txt',r'D:\112.txt')
+  #下载到已经存在的文件夹
+  #ftp.get_dir('a/b/c',r'D:\\a')
+  #下载到不存在的文件夹
+  #ftp.get_dir('a/b/c',r'D:\\aa')
+  #上传到已经存在的文件夹
+  ftp.put_dir('hello','process_engine/manual')
+  #上传到不存在的文件夹
+  #ftp.put_dir('b','aa/B/')

+ 52 - 0
ftp/pyftp.py

@@ -0,0 +1,52 @@
+#!/usr/bin/env python3
+
+"""
+pip install pyftpdlib
+"""
+
+import argparse
+import os
+from os import path
+from pyftpdlib.handlers import FTPHandler
+from pyftpdlib.servers import FTPServer
+from pyftpdlib.authorizers import DummyAuthorizer
+
+
+handler = FTPHandler
+
+
+def init_auth(dir):
+    authorizer = DummyAuthorizer()
+    authorizer.add_user('admin', '123456', dir, perm='elradfmwM')
+    handler.authorizer = authorizer
+
+
+if __name__ == "__main__":
+    parser = argparse.ArgumentParser(description='ftp module of argparse')
+
+    # 指定-p/--port的参数
+    # 类型为int
+    # help为简短地说明
+    parser.add_argument(
+        '-p', '--port', default=2121, type=int,
+        help='ftp server port'
+    )
+
+    # 指定-H/--host
+    parser.add_argument(
+        '-H', '--host', default="localhost", type=str,
+        help='ftp server bind host'
+    )
+
+    # 指定-d/--dir参数
+    parser.add_argument(
+        '-d', '--dir', default=".", type=str,
+        help='ftp server user home'
+    )
+    args = parser.parse_args()
+    dir = path.abspath(args.dir)
+    print("current dir: {}", dir)
+    init_auth(dir)
+    server = FTPServer((args.host, args.port), handler)
+    print("ftp server start at: {}:{}".format(args.host, args.port))
+    server.serve_forever()

+ 39 - 0
hadoop/hdfs_test.py

@@ -0,0 +1,39 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+"""
+关于python操作hdfs
+"""
+
+import sys
+from datetime import datetime
+from datetime import date
+from datetime import timedelta
+from hdfs import Config
+from json import dump, load
+from hdfs.client import Client
+
+
+#返回目录下的文件
+def list(client, hdfs_path):
+    return client.list(hdfs_path, status=False)
+
+if __name__ == "__main__":
+    #client = Client(url, root=None, proxy=None, timeout=None, session=None)
+    base_path = "/user/hadoop"
+    now = datetime.now()
+    yestoday = now - timedelta(days=1)  # 一天以前
+    start = datetime(yestoday.year, yestoday.month, yestoday.day)
+    end = datetime(now.year, now.month, now.day)
+    print(start)
+    print(end)
+    client = Client("http://zhenqin-mbp:50070")
+    files = list(client, base_path)
+    for f in files:
+        file_path = base_path + "/" + f
+        status = client.status(file_path)
+        lastmodify = datetime.fromtimestamp(status["modificationTime"]/1000)
+        if lastmodify < end and status["type"] == 'FILE':
+            # delete file
+            print("delete file: " + file_path)
+            client.delete(file_path)

+ 4 - 0
hadoop/requirment.txt

@@ -0,0 +1,4 @@
+pyhive
+sasl
+thrift_sasl
+hdfs

+ 10 - 0
hive/hive_test.py

@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+from pyhive import hive
+from TCLIService.ttypes import TOperationState
+cursor = hive.connect('localhost', 10000).cursor()
+print("connect ok.")
+cursor.execute('select count(*) from userinfo', async = False) 
+print cursor.fetchall()
+

+ 17 - 0
hive/puhs2_test.py

@@ -0,0 +1,17 @@
+#!/usr/bin/env python
+# encoding: utf-8
+
+import pyhs2
+
+cursor = pyhs2.connect(host='localhost', port=10000).cursor()
+print cursor.getDatabases();
+
+#Execute query
+cur.execute("select count(*) from userinfo")
+ 
+#Return column info from query
+print cur.getSchema()
+ 
+#Fetch table results
+for i in cur.fetch():
+    print i

+ 33 - 0
leveldb/leveldb_test.py

@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+
+__author__ = "ZhenQin"
+
+import os, stat
+import os.path as path
+from datetime import datetime
+import shutil
+import logging
+import sys
+import leveldb
+
+db = leveldb.LevelDB('./db')
+
+db.Put('hello'.encode("UTF-8"), 'world'.encode("UTF-8")) 
+print(db.Get('hello'.encode("UTF-8")).decode("UTF-8"))
+
+batch = leveldb.WriteBatch()
+batch.Put('hello'.encode("UTF-8"), 'world'.encode("UTF-8"))
+batch.Put('hello again'.encode("UTF-8"), 'world'.encode("UTF-8")) 
+# batch.Delete('hello'.encode("UTF-8"))
+
+for i in range(1, 100000):
+    key = "hello_" + str(i)
+    val = "world_" + str(i)
+    batch.Put(key.encode("UTF-8"), val.encode("UTF-8")) 
+
+db.Write(batch, sync = True)
+# db.close()
+
+if __name__ == "__main__":
+    pass

+ 79 - 0
local_fs/fg_remove_file.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+
+#
+# Vendor: 9sdata.cn
+# Create At: 2019/1/3 0003
+#
+# 本程序为分光数据接入使用。用于把十所推送的数据(zip)分别 move 到当前整点小时的指定目录下,便于接入程序scan。
+# 因十所 ftp 推送只支持推送到 ftp 根目录下,因此可能会导致根目录下文件数量过多。导致程序在 listDir 时时间过长。
+# 因此开发本程序,分解压力。
+# 该程序可设置为 Linux 定时执行,每 5 分钟执行一次。该程序可能会导致需要升级Python
+#
+__author__ = "ZhenQin"
+
+import os, stat
+import os.path as path
+from datetime import datetime
+import shutil
+import logging
+
+
+BASE_DIR = path.abspath(__file__ + "/../")  # 当前脚本文件目录,无需更改
+DATA_ROOT_PATH = "/Users/zhenqin/temp/ftp"  # 十所推送数据目录,在FTP的根目录
+DATE_TARGET_PATH = "/Users/zhenqin/temp/tar_gz"  # 需要 move 到的目录,在FTP根目录下 /fenguang/,会自动按照小时创建目录
+FILTER_FILE_SUBFIX = (".sql", ".txt")  # 扫描的文件后缀,无需更改
+
+log = logging.getLogger(__name__)
+log.setLevel(logging.INFO)
+
+# 再创建一个handler,用于输出到控制台
+ch = logging.StreamHandler()
+# 定义handler的输出格式formatter
+formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+ch.setFormatter(formatter)
+# 给logger添加handler
+#logger.addFilter(filter)
+log.addHandler(ch)
+
+if not path.exists(path.abspath(DATE_TARGET_PATH)):
+    # 如果目标目录不存在,则创建
+    os.mkdir(path.abspath(DATE_TARGET_PATH))  # 创建目录
+    os.chmod(path.abspath(DATE_TARGET_PATH), stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO) # 授权目录权限为777
+    log.info("mkdir: %s" % path.abspath(DATE_TARGET_PATH))
+
+def moveFile(file):
+    currHour = datetime.now().strftime("%Y%m%d%H")
+    target_dir = path.abspath(path.join(DATE_TARGET_PATH, currHour))
+
+    if not path.exists(target_dir):
+        # 如果目标目录不存在,则创建当前整点数据
+        os.mkdir(target_dir)  # 创建目录
+        os.chmod(target_dir, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO) # 授权目录权限为777
+        log.info("mkdir: %s" % target_dir)
+
+    src_file = file
+    log.info("move file %s to %s" % (src_file, target_dir))
+    shutil.move(src_file, target_dir + "/" + path.basename(file))
+    os.chmod(path.join(target_dir, path.basename(file)), stat.S_IRWXU+stat.S_IRWXG+stat.S_IRWXO)  # move文件,把文件权限改为777
+
+
+def scanFile():
+    """
+    扫描文件,把需要处理的后缀文件move到另一个目录下
+    :return:
+    """
+    files = os.listdir(DATA_ROOT_PATH)
+    for file in files:
+        point_index = file.find(".")
+        subfix = file[point_index:] if point_index > 0 else ""
+        curr_file = path.abspath(path.join(DATA_ROOT_PATH, file))
+        if path.isfile(curr_file) and FILTER_FILE_SUBFIX.count(subfix) > 0:
+            # 处理的文件后缀名包含待处理的文件
+            moveFile(curr_file)
+        else:
+            log.info("skip file: %s" % curr_file)
+
+if __name__ == "__main__":
+    log.info("current dir: %s" % BASE_DIR)
+    scanFile()

+ 85 - 0
protobuf_file/IntHeaderProtobufFile.py

@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+#encoding:utf-8
+
+import string
+import struct
+from google.protobuf import message as _message
+
+class Reader(object):
+    
+    def __init__(self, protobufClass, file):
+        self.protobufClass = protobufClass
+        self.file = file
+        self.f = open(self.file, "rb")
+    
+
+    closed = property(lambda self: self.f.closed)
+
+    def __enter__(self):
+        pass
+
+    def __iter__(self):
+        return self
+    
+    def next(self):
+        header = self.f.read(4)
+        if len(header) < 4:
+            raise StopIteration()
+        length = struct.unpack('>i', header)
+        bytes = self.f.read(length[0])
+        protoObj = self.protobufClass()
+        protoObj.ParseFromString(bytes)
+        return protoObj
+    
+    def __next__(self):
+        self.next()
+    
+    def isclosed(self):
+        return self.f.closed
+    
+    def close(self):
+        self.f.close()
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if not self.closed:
+            self.close()
+
+class Writer(object):
+    
+    def __init__(self, file):
+        self.file = file
+        self.f = open(self.file, "wb")
+    
+    closed = property(lambda self: self.f.closed)
+
+    def __enter__(self):
+        pass
+
+    def write(self, message):
+        if isinstance(message, str):
+            length = len(message)
+            self.f.write(struct.pack(">i", length))
+            self.f.write(message)
+            return
+        if isinstance(message, _message.Message):
+            bytes = message.SerializeToString()
+            length = len(bytes)
+            self.f.write(struct.pack(">i", length))
+            self.f.write(bytes)
+            return
+        
+    def flush(self):
+        self.f.flush()
+
+    def close(self):
+        self.f.close()
+    
+    def isclosed(self):
+        return self.f.closed
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if not self.closed:
+            self.close()
+
+if __name__ == "__main__":
+    pass