在日常的工作中,python用到的概率還是比較的大的,那么,我們應該怎么去學習關于python的知識呢?平常用到的腳本都是有哪些呢?下面小編給大家詳細介紹下。
前言
日常生活中常會遇到一些小任務,如果人工處理會很麻煩。
用python做些小腳本處理,能夠提高不少效率?;蛘呖梢园裵ython當工具使用,輔助提高一下辦公效率。(比如我常拿python當計算器,計算和字符轉換用)
以下總結下個人用到的一些python小腳本留作備忘。
打印16進制字符串
用途:通信報文中的hex數(shù)據(jù)不好看,可以打印為16進制的字符串顯示出來。
#coding=utf-8 #name:myutil.py def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print print'myutil' def print_hex3(s,prev='0x'): i=0 for c in s: print'%s%s,'%(prev,s[i:i+2]), i+=2 print
文件合并
之前搞單片機時生成的hex應用程序文件不能直接刷到單片機里,還需要把iap程序合并成一個文件才能燒寫到單片機。每次打包麻煩,做個腳本處理:
#path='C:\\Users\\test\\IAP_CZ_v204w.hex' #file=open(path,'r') #for ll in file.readlines() #print ll #coding=gb18030 import time import os def prr(): print'file combination begin..' path0=os.getcwd() print path0 path=path0 #path1=path0 path2=path0 path+='\\IAP_CZ_v204w.hex' #path1+='\\NC_armStaSystem.hex' path2+='\\' print path s=raw_input('enter file path:') path1=s #path1+='\\NC_armStaSystem.hex' print path1 s=raw_input('enter file name:') path2+=s path2+=time.strftime('_%y%m%d%H%M%S') path2+='.hex' print path2 prr() try: f1=open(path,'r') count=0 for l in f1.readlines(): #print l count+=1 #print count f1.close() f1=open(path,'r') f2=open(path1,'r') f3=open(path2,'w') while(count>1): l=f1.readline() #print l f3.write(l) count-=1 #print count f3.flush() for l in f2.readlines(): f3.write(l) f3.flush() f3.close() print'combination success!' except Exception,ex: print'excettion occured!' print ex s=raw_input('press any key to continue...') finally: f1.close() f2.close() s=raw_input('press any key to continue...')
多線程下載圖集
網(wǎng)上好看的動漫圖集,如果手工下載太費時了。簡單分析下網(wǎng)頁地址規(guī)律,寫個多線程腳本搞定。
#!/usr/bin/python #-*-coding:utf-8-*- #filename:paxel.py '''It is a multi-thread downloading tool It was developed follow axel. Author:volans E-mail:volansw[at]gmail.com ''' import sys import os import time import urllib from threading import Thread local_proxies={'http':'http://131.139.58.200:8080'} class AxelPython(Thread,urllib.FancyURLopener): '''Multi-thread downloading class. run()is a vitural method of Thread. ''' def __init__(self,threadname,url,filename,ranges=0,proxies={}): Thread.__init__(self,name=threadname) urllib.FancyURLopener.__init__(self,proxies) self.name=threadname self.url=url self.filename=filename self.ranges=ranges self.downloaded=0 def run(self): '''vertual function in Thread''' try: self.downloaded=os.path.getsize(self.filename) except OSError: #print'never downloaded' self.downloaded=0 #rebuild start poind self.startpoint=self.ranges[0]+self.downloaded #This part is completed if self.startpoint>=self.ranges[1]: print'Part%s has been downloaded over.'%self.filename return self.oneTimeSize=16384#16kByte/time print'task%s will download from%d to%d'%(self.name,self.startpoint,self.ranges[1]) self.addheader("Range","bytes=%d-%d"%(self.startpoint,self.ranges[1])) self.urlhandle=self.open(self.url) data=self.urlhandle.read(self.oneTimeSize) while data: filehandle=open(self.filename,'ab+') filehandle.write(data) filehandle.close() self.downloaded+=len(data) #print"%s"%(self.name) #progress=u'\r...' data=self.urlhandle.read(self.oneTimeSize) def GetUrlFileSize(url,proxies={}): urlHandler=urllib.urlopen(url,proxies=proxies) headers=urlHandler.info().headers length=0 for header in headers: if header.find('Length')!=-1: length=header.split(':')[-1].strip() length=int(length) return length def SpliteBlocks(totalsize,blocknumber): blocksize=totalsize/blocknumber ranges=[] for i in range(0,blocknumber-1): ranges.append((i*blocksize,i*blocksize+blocksize-1)) ranges.append((blocksize*(blocknumber-1),totalsize-1)) return ranges def islive(tasks): for task in tasks: if task.isAlive(): return True return False def paxel(url,output,blocks=6,proxies=local_proxies): '''paxel ''' size=GetUrlFileSize(url,proxies) ranges=SpliteBlocks(size,blocks) threadname=["thread_%d"%i for i in range(0,blocks)] filename=["tmpfile_%d"%i for i in range(0,blocks)] tasks=[] for i in range(0,blocks): task=AxelPython(threadname<i>,url,filename<i>,ranges<i>) task.setDaemon(True) task.start() tasks.append(task) time.sleep(2) while islive(tasks): downloaded=sum([task.downloaded for task in tasks]) process=downloaded/float(size)*100 show=u'\rFilesize:%d Downloaded:%d Completed:%.2f%%'%(size,downloaded,process) sys.stdout.write(show) sys.stdout.flush() time.sleep(0.5) filehandle=open(output,'wb+') for i in filename: f=open(i,'rb') filehandle.write(f.read()) f.close() try: os.remove(i) pass except: pass filehandle.close() if __name__=='__main__': url="http://xz1.mm667.com/xz84/images/001.jpg" output='001.jpg' paxel(url,output,blocks=4,proxies={})
多線程下載圖片
多線程下載圖片并存儲到指定目錄中,若目錄不存在則自動創(chuàng)建。
#-*-coding:UTF-8-*- ''' import re import urllib urls='http://xz5.mm667.com/xz82/images/01.jpg' def getHtml(url): page=urllib.urlopen(url) html=page.read() return html def getImg(html): reg=r'src="(.+?\.jpg)"pic_ext' imgre=re.compile(reg) imglist=imgre.findall(html) x=0 for imgurl in imglist: urllib.urlretrieve(imgurl,'%s.jpg'%x) x=x+1 html=getHtml("http://tieba.baidu.com/p/2460150866") getImg(html) ''' import re import urllib import threading import time import socket socket.setdefaulttimeout(30) urls=[] j=0 for i in xrange(1,81): if(i-1)%4==0: j+=1 if((j-1)%5)==0: j=1 site='http://xz%d.mm667.com/xz%02d/images/'%(j,i) urls.append(site) print urls[i-1] #print urls ''' urls.append('http://xz1.mm667.com/xz01/images/') urls.append('http://xz1.mm667.com/xz02/images/') urls.append('http://xz1.mm667.com/xz03/images/') urls.append('http://xz1.mm667.com/xz04/images/') urls.append('http://xz1.mm667.com/xz84/images/') urls.append('http://xz2.mm667.com/xz85/images/') urls.append('http://xz3.mm667.com/xz86/images/') urls.append('http://xz1.mm667.com/s/') urls.append('http://xz1.mm667.com/p/') ''' def mkdir(path): #引入模塊 import os #去除首位空格 path=path.strip() #去除尾部\符號 path=path.rstrip("\\") #判斷路徑是否存在 #存在True #不存在False isExists=os.path.exists(path) #判斷結果 if not isExists: #如果不存在則創(chuàng)建目錄 print path+u'創(chuàng)建成功' #創(chuàng)建目錄操作函數(shù) os.makedirs(path) return True else: #如果目錄存在則不創(chuàng)建,并提示目錄已存在 print path+u'目錄已存在' return False def cbk(a,b,c): '''''回調函數(shù) a:已經(jīng)下載的數(shù)據(jù)塊 b:數(shù)據(jù)塊的大小 c:遠程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per #url='http://www.sina.com.cn' local='d:\\mysite\\pic1\\' d=0 mutex=threading.Lock() #mutex1=threading.Lock() class MyThread(threading.Thread): def __init__(self,url,name): threading.Thread.__init__(self) self.url=url self.name=name def run(self): mutex.acquire() print print'down from%s'%self.url time.sleep(1) mutex.release() try: urllib.urlretrieve(self.url,self.name) except Exception,e: print e time.sleep(1) urllib.urlretrieve(self.url,self.name) threads=[] for u in urls[84:]: d+=1 local='d:\\mysite\\pic1\\%d\\'%d mkdir(local) print'download begin...' for i in xrange(40): lcal=local url=u url+='%03d.jpg'%i lcal+='%03d.jpg'%i th=MyThread(url,lcal) threads.append(th) th.start() #for t in threads: #t.join() print'over!download finished' 爬蟲抓取信息 #!/usr/bin/env python #-*-coding:utf-8-*- """ Python爬蟲,抓取一卡通相關企業(yè)信息 Anthor:yangyongzhen Version:0.0.2 Date:2014-12-14 Language:Python2.7.5 Editor:Sublime Text2 """ import urllib2,re,string import threading,Queue,time import sys import os from bs4 import BeautifulSoup #from pprint import pprint reload(sys) sys.setdefaultencoding('utf8') _DATA=[] FILE_LOCK=threading.Lock() SHARE_Q=Queue.Queue()#構造一個不限制大小的的隊列 _WORKER_THREAD_NUM=3#設置線程的個數(shù) _Num=0#總條數(shù) class MyThread(threading.Thread): def __init__(self,func,num): super(MyThread,self).__init__()#調用父類的構造函數(shù) self.func=func#傳入線程函數(shù)邏輯 self.thread_num=num def run(self): self.func() #print u'線程ID:',self.thread_num def worker(): global SHARE_Q while not SHARE_Q.empty(): url=SHARE_Q.get()#獲得任務 my_page=get_page(url) find_data(my_page)#獲得當前頁面的數(shù)據(jù) #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url): """ 根據(jù)所給的url爬取網(wǎng)頁HTML Args: url:表示當前要爬取頁面的url Returns: 返回抓取到整個頁面的HTML(unicode編碼) Raises: URLError:url引發(fā)的異常 """ try: html=urllib2.urlopen(url).read() my_page=html.decode("gbk",'ignore') #my_page=unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page=urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError,e: if hasattr(e,"code"): print"The server couldn't fulfill the request." print"Error code:%s"%e.code elif hasattr(e,"reason"): print"We failed to reach a server.Please check your url and read the Reason" print"Reason:%s"%e.reason return my_page def find_data(my_page): """ 通過返回的整個網(wǎng)頁HTML,正則匹配名稱 Args: my_page:傳入頁面的HTML文本用于正則匹配 """ global _Num temp_data=[] items=BeautifulSoup(my_page).find_all("div",style="width:96%;margin:10px;border-bottom:1px#CCC dashed;padding-bottom:10px;") for index,item in enumerate(items): #print item #print item.h1 #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href=item.find(re.compile("^a")) #soup=BeautifulSoup(item) #公司名稱 if item.a: data=item.a.string.encode("gbk","ignore") print data temp_data.append(data) goods=item.find_all("div",style="font-size:12px;") #經(jīng)營產品與聯(lián)系方式 for i in goods: data=i.get_text().encode("gbk","ignore") temp_data.append(data) print data #b=item.find_all("b") #print b #鏈接地址 pat=re.compile(r'href="([^"]*)"') h=pat.search(str(item)) if h: #print h.group(0) href=h.group(1) print href temp_data.append(h.group(1)) _Num+=1 #b=item.find_all(text=re.compile("Dormouse")) #pprint(goods) #print href #pat=re.compile(r'title="([^"]*)"') #h=pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) #headers={'User-Agent':"Mozilla/5.0(Windows NT 6.1;WOW64)AppleWebKit/537.1(KHTML,like Gecko)Chrome/22.0.1207.1 Safari/537.1"}##瀏覽器請求頭(大部分網(wǎng)站沒有這個請求頭會報錯、請務必加上哦) #all_url='http://www.mzitu.com/all'##開始的URL地址 #start_html=requests.get(all_url,headers=headers)##使用requests中的get方法來獲取all_url(就是:http://www.mzitu.com/all這個地址)的內容headers為上面設置的請求頭、請務必參考requests官方文檔解釋 #print(start_html.text)##打印出start_html(請注意,concent是二進制的數(shù)據(jù),一般用于下載圖片、視頻、音頻、等多媒體內容是才使用concent,對于打印網(wǎng)頁內容請使用text) def main(): global SHARE_Q threads=[] start=time.clock() douban_url="http://company.yktworld.com/comapny_search.asp?page={page}" #向隊列中放入任務,真正使用時,應該設置為可持續(xù)的放入任務 for index in xrange(20): SHARE_Q.put(douban_url.format(page=index*1)) for i in xrange(_WORKER_THREAD_NUM): thread=MyThread(worker,i) thread.start()#線程開始處理任務 threads.append(thread) for thread in threads: thread.join() SHARE_Q.join() i=0 with open("down.txt","w+")as my_file: for page in _DATA: i+=1 for name in page: my_file.write(name+"\n") print"Spider Successful!!!" end=time.clock() print u'抓取完成!' print u'總頁數(shù):',i print u'總條數(shù):',_Num print u'一共用時:',end-start,u'秒' if __name__=='__main__': main() 爬蟲多線程下載電影名稱 #!/usr/bin/env python #-*-coding:utf-8-*- """ Python爬蟲 Anthor:yangyongzhen Version:0.0.2 Date:2014-12-14 Language:Python2.7.8 Editor:Sublime Text2 """ import urllib2,re,string import threading,Queue,time import sys import os from bs4 import BeautifulSoup reload(sys) sys.setdefaultencoding('utf8') _DATA=[] FILE_LOCK=threading.Lock() SHARE_Q=Queue.Queue()#構造一個不限制大小的的隊列 _WORKER_THREAD_NUM=3#設置線程的個數(shù) rootpath=os.getcwd()+u'/抓取的內容/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #創(chuàng)建抓取的根目錄 #makedir(rootpath) #顯示下載進度 def Schedule(a,b,c): ''''' a:已經(jīng)下載的數(shù)據(jù)塊 b:數(shù)據(jù)塊的大小 c:遠程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per class MyThread(threading.Thread): def __init__(self,func): super(MyThread,self).__init__()#調用父類的構造函數(shù) self.func=func#傳入線程函數(shù)邏輯 def run(self): self.func() def worker(): print'work thread start...\n' global SHARE_Q while not SHARE_Q.empty(): url=SHARE_Q.get()#獲得任務 my_page=get_page(url) find_title(my_page)#獲得當前頁面的電影名 #write_into_file(temp_data) time.sleep(1) SHARE_Q.task_done() def get_page(url): """ 根據(jù)所給的url爬取網(wǎng)頁HTML Args: url:表示當前要爬取頁面的url Returns: 返回抓取到整個頁面的HTML(unicode編碼) Raises: URLError:url引發(fā)的異常 """ try: html=urllib2.urlopen(url).read() my_page=html.decode("utf8") #my_page=unicode(html,'utf-8','ignore').encode('utf-8','ignore') #my_page=urllib2.urlopen(url).read().decode("utf8") except urllib2.URLError,e: if hasattr(e,"code"): print"The server couldn't fulfill the request." print"Error code:%s"%e.code elif hasattr(e,"reason"): print"We failed to reach a server.Please check your url and read the Reason" print"Reason:%s"%e.reason return my_page def find_title(my_page): """ 通過返回的整個網(wǎng)頁HTML,正則匹配前100的電影名稱 Args: my_page:傳入頁面的HTML文本用于正則匹配 """ temp_data=[] movie_items=BeautifulSoup(my_page).findAll('h1') for index,item in enumerate(movie_items): #print item #print item.h1 pat=re.compile(r'href="([^"]*)"') h=pat.search(str(item)) if h: #print h.group(0) href=h.group(1) print href temp_data.append(h.group(1)) #print h.group() #temp_data.append(item) #print item.find(re.compile("^a")) href=item.find(re.compile("^a")) #soup=BeautifulSoup(item) if item.a: #print item.a.string temp_data.append(item.a.string) #print href #pat=re.compile(r'title="([^"]*)"') #h=pat.search(str(href)) #if h: #print h.group(1) #temp_data.append(h.group(1)) _DATA.append(temp_data) def main(): global SHARE_Q threads=[] start=time.clock() douban_url="http://movie.misszm.com/page/{page}" #向隊列中放入任務,真正使用時,應該設置為可持續(xù)的放入任務 for index in xrange(5): SHARE_Q.put(douban_url.format(page=index*1)) for i in xrange(_WORKER_THREAD_NUM): thread=MyThread(worker) thread.start()#線程開始處理任務 threads.append(thread) for thread in threads: thread.join() SHARE_Q.join() with open("movie.txt","w+")as my_file: for page in _DATA: for movie_name in page: my_file.write(movie_name+"\n") print"Spider Successful!!!" end=time.clock() print u'抓取完成!' print u'一共用時:',end-start,u'秒' if __name__=='__main__': main() 串口轉tcp工具 #coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer-Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import* from struct import*; #from myutil import*; #name:myutil.py mylock=threading.RLock() Server_IP='' Srever_Port='' def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print def hexto_str(s): r='' for c in s: r+='%02x'%(ord(c)) return r def strto_hex(s): r=s.decode('hex') return r #''代表服務器為localhost #在一個非保留端口號上進行監(jiān)聽 class ComThread: def __init__(self,Port=0): self.l_serial=None; self.alive=False; self.waitEnd=None; self.port=Port; #TCP部分 #self.sockobj=socket.socket(socket.AF_INET,socket.SOCK_STREAM) self.connection=None #數(shù)據(jù) self.snddata='' self.rcvdata='' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive=False; self.stop(); def start(self): self.l_serial=serial.Serial(); self.l_serial.port=self.port; self.l_serial.baudrate=115200; self.l_serial.timeout=2;#秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd=threading.Event(); self.alive=True; print'open serial port%d ok!\n'%(self.port+1) print'baudrate:115200\n' self.thread_read=None; self.thread_read=threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write=None; self.thread_write=threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpClient=None; self.thread_TcpClient=threading.Thread(target=self.TcpClient); self.thread_TcpClient.setDaemon(1); self.thread_TcpClient.start(); self.thread_TcpSend=None; self.thread_TcpSend=threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: #接收間隔 time.sleep(0.1); try: data=''; n=self.l_serial.inWaiting(); if n: data=data+self.l_serial.read(n); #for l in xrange(len(data)): #print'%02X'%ord(data[l]), #發(fā)送數(shù)據(jù) print u'->請求:' print data; mylock.acquire() self.snddata=data mylock.release() #print_hex(data); #判斷結束 except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def FirstWriter(self): while self.alive: #接收間隔 time.sleep(0.1); try: #snddata=raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print u'-<應答:' print self.rcvdata; mylock.acquire() self.rcvdata=''; mylock.release() #print_hex(snddata); except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def TcpClient(self): while True: #接收間隔 time.sleep(0.1); self.connection=socket(AF_INET,SOCK_STREAM); self.connection.connect((Server_IP,int(Server_Port))); print'Connect to Server OK!'; self.snddata='' self.rcvdata='' while True: #讀取客戶端套接字的下一行 data=self.connection.recv(1024) #如果沒有數(shù)量的話,那么跳出循環(huán) if not data:break #發(fā)送一個回復至客戶端 mylock.acquire() self.snddata='' self.rcvdata=data mylock.release() #connection.send('Echo=>'+data) self.connection.close() self.waitEnd.set(); self.alive=False; def TcpSend(self): while True: #接收間隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata!='': self.connection.send(self.snddata) mylock.acquire() self.rcvdata='' self.snddata='' mylock.release() except Exception,ex: pass def stop(self): self.alive=False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #測試用部分 if __name__=='__main__': print'Serial to Tcp Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)**cap 2015-2016.\n' Server_IP=raw_input('please enter ServerIP:') print'Server_IP:%s'%(Server_IP) Server_Port=raw_input('please enter ServerPort:') print'Server_Port:%s'%(Server_Port) com=raw_input('please enter com port(1-9):') rt=ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print''; print'End OK.'; del rt; 遠程讀卡器server端 很早之前做過一個遠程讀卡器工具,原理就是在現(xiàn)場客服電腦上裝個python做的tcpserver服務端,操控現(xiàn)場的讀卡器。在公司內部做個客戶端連接過去,這樣實現(xiàn)在公司調試現(xiàn)場的卡片業(yè)務。 這個就是服務端工具的實現(xiàn): #coding=utf-8 #author:yangyongzhen #QQ:534117529 #'CardTest TcpServer-Simple Test Card Tool 1.00' import sys,threading,time; import serial; import binascii,encodings; import re; import os; from socket import* from struct import*; #from myutil import*; #name:myutil.py mylock=threading.RLock() def print_hex1(s,prev='0x'): for c in s: print'%s%02x'%(prev,ord(c)), print def print_hex(s): for c in s: print'%02x'%(ord(c)), print def hexto_str(s): r='' for c in s: r+='%02x'%(ord(c)) return r def strto_hex(s): r=s.decode('hex') return r #''代表服務器為localhost #在一個非保留端口號上進行監(jiān)聽 class ComThread: def __init__(self,Port=0): self.l_serial=None; self.alive=False; self.waitEnd=None; self.port=Port; #TCP部分 self.myHost='' self.myPort=5050 self.sockobj=socket(AF_INET,SOCK_STREAM) self.connection=None #數(shù)據(jù) self.snddata='' self.rcvdata='' def waiting(self): if not self.waitEnd is None: self.waitEnd.wait(); def SetStopEvent(self): if not self.waitEnd is None: self.waitEnd.set(); self.alive=False; self.stop(); def start(self): self.l_serial=serial.Serial(); self.l_serial.port=self.port; self.l_serial.baudrate=115200; self.l_serial.timeout=2;#秒 self.l_serial.open(); if self.l_serial.isOpen(): self.waitEnd=threading.Event(); self.alive=True; print'open serial port%d ok!\n'%(self.port+1) print'baudrate:115200\n' self.thread_read=None; self.thread_read=threading.Thread(target=self.FirstReader); self.thread_read.setDaemon(1); self.thread_read.start(); self.thread_write=None; self.thread_write=threading.Thread(target=self.FirstWriter); self.thread_write.setDaemon(1); self.thread_write.start(); #TCP部分 self.thread_TcpServer=None; self.thread_TcpServer=threading.Thread(target=self.TcpServer); self.thread_TcpServer.setDaemon(1); self.thread_TcpServer.start(); self.thread_TcpSend=None; self.thread_TcpSend=threading.Thread(target=self.TcpSend); self.thread_TcpSend.setDaemon(1); self.thread_TcpSend.start(); return True; else: return False; def FirstReader(self): while self.alive: #接收間隔 time.sleep(0.1); try: data=''; n=self.l_serial.inWaiting(); if n: data=data+self.l_serial.read(n); #for l in xrange(len(data)): #print'%02X'%ord(data[l]), #發(fā)送數(shù)據(jù) print'serial recv:' print data; mylock.acquire() self.snddata=data mylock.release() #print_hex(data); #判斷結束 except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def FirstWriter(self): while self.alive: #接收間隔 time.sleep(0.1); try: #snddata=raw_input('\nenter data send:\n') if self.rcvdata!='': self.l_serial.write(self.rcvdata); print'serial send:' print self.rcvdata; mylock.acquire() self.rcvdata=''; mylock.release() #print_hex(snddata); except Exception,ex: print str(ex); self.waitEnd.set(); self.alive=False; def TcpServer(self): self.sockobj.bind((self.myHost,self.myPort)) self.sockobj.listen(10) print'TcpServer listen at 5050 oK!\n' print'Waiting for connect...\n' while True: #接收間隔 time.sleep(0.1); self.connection,address=self.sockobj.accept() print'Server connected by',address self.snddata='' self.rcvdata='' try: while True: #讀取客戶端套接字的下一行 data=self.connection.recv(1024) #如果沒有數(shù)量的話,那么跳出循環(huán) if not data:break #發(fā)送一個回復至客戶端 mylock.acquire() self.snddata='' self.rcvdata=data mylock.release() #connection.send('Echo=>'+data) self.connection.close() except Exception,ex: self.connection.close() self.waitEnd.set(); self.alive=False; def TcpSend(self): while True: #接收間隔 time.sleep(0.1); while True: time.sleep(0.1); try: if not self.connection is None: if self.snddata!='': self.connection.send(self.snddata) mylock.acquire() self.rcvdata='' self.snddata='' mylock.release() except Exception,ex: pass def stop(self): self.alive=False; self.thread_read.join(); if self.l_serial.isOpen(): self.l_serial.close(); #測試用部分 if __name__=='__main__': print'CardTest TcpServer-Simple Test Card Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)****2015-2016.\n' com=raw_input('please enter com port(1-9):') rt=ComThread(int(com)-1); try: if rt.start(): rt.waiting(); rt.stop(); else: pass; except Exception,se: print str(se); if rt.alive: rt.stop(); os.system("pause") print''; print'End OK.'; del rt; 黑客rtcp反向鏈接 #-*-coding:utf-8-*- ''' filename:rtcp.py desc: 利用python的socket端口轉發(fā),用于遠程維護 如果連接不到遠程,會sleep 36s,最多嘗試200(即兩小時) usage: ./rtcp.py stream1 stream2 stream為:l:port或c:host:port l:port表示監(jiān)聽指定的本地端口 c:host:port表示監(jiān)聽遠程指定的端口 author:watercloud,zd,knownsec team web:www.knownsec.com,blog.knownsec.com date:2009-7 ''' import socket import sys import threading import time streams=[None,None]#存放需要進行數(shù)據(jù)轉發(fā)的兩個數(shù)據(jù)流(都是SocketObj對象) debug=1#調試狀態(tài)0 or 1 def print_hex(s): for c in s: print'%02x'%(ord(c)), print def _usage(): print'Usage:./rtcp.py stream1 stream2\nstream:L:port or C:host:port' def _get_another_stream(num): ''' 從streams獲取另外一個流對象,如果當前為空,則等待 ''' if num==0: num=1 elif num==1: num=0 else: raise"ERROR" while True: if streams[num]=='quit': print("can't connect to the target,quit now!") sys.exit(1) if streams[num]!=None: return streams[num] else: time.sleep(1) def _xstream(num,s1,s2): ''' 交換兩個流的數(shù)據(jù) num為當前流編號,主要用于調試目的,區(qū)分兩個回路狀態(tài)用。 ''' try: while True: #注意,recv函數(shù)會阻塞,直到對端完全關閉(close后還需要一定時間才能關閉,最快關閉方法是shutdow) buff=s1.recv(1024) if debug>0: print num,"recv" if len(buff)==0:#對端關閉連接,讀不到數(shù)據(jù) print num,"one closed" break s2.sendall(buff) if debug>0: print num,"sendall" print_hex(buff) except: print num,"one connect closed." try: s1.shutdown(socket.SHUT_RDWR) s1.close() except: pass try: s2.shutdown(socket.SHUT_RDWR) s2.close() except: pass streams[0]=None streams[1]=None print num,"CLOSED" def _server(port,num): ''' 處理服務情況,num為流編號(第0號還是第1號) ''' srv=socket.socket(socket.AF_INET,socket.SOCK_STREAM) srv.bind(('0.0.0.0',port)) srv.listen(1) #print'local listening at port%d'(%(port)) while True: conn,addr=srv.accept() print"connected from:",addr streams[num]=conn#放入本端流對象 s2=_get_another_stream(num)#獲取另一端流對象 _xstream(num,conn,s2) def _connect(host,port,num): '''處理連接,num為流編號(第0號還是第1號) note:如果連接不到遠程,會sleep 36s,最多嘗試200(即兩小時) ''' not_connet_time=0 wait_time=36 try_cnt=199 while True: if not_connet_time>try_cnt: streams[num]='quit' print('not connected') return None conn=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: conn.connect((host,port)) except Exception,e: print('can not connect%s:%s!'%(host,port)) not_connet_time+=1 time.sleep(wait_time) continue print"connected to%s:%i"%(host,port) streams[num]=conn#放入本端流對象 s2=_get_another_stream(num)#獲取另一端流對象 _xstream(num,conn,s2) if __name__=='__main__': print'Tcp to Tcp Tool 1.00\n' print'Author:yangyongzhen\n' print'QQ:534117529\n' print'Copyright(c)Newcapec 2015-2016.\n' Server_IP=raw_input('please enter Server IP:') print'Server_IP:%s'%(Server_IP) Server_Port=raw_input('please enter Server Port:') print'Server_Port:%s'%(Server_Port) com=raw_input('please enter Local Port:') tlist=[]#線程列表,最終存放兩個線程對象 #targv=[sys.argv[1],sys.argv[2]] t=threading.Thread(target=_server,args=(int(com),0)) tlist.append(t) t=threading.Thread(target=_connect,args=(Server_IP,int(Server_Port),1)) tlist.append(t) for t in tlist: t.start() for t in tlist: t.join() sys.exit(0) 調用c的動態(tài)庫示例 #-*-coding:utf8-*- from ctypes import* from binascii import unhexlify as unhex import os dll=cdll.LoadLibrary('mydll.dll'); print'begin load mydll..' #key #str1='\x9B\xED\x98\x89\x15\x80\xC3\xB2' str1=unhex('0000556677222238') #data str2=unhex('002d2000000100015566772222383CD881604D0D286A556677222238000020141214181427') #output str3='\x12\x34\x56\x78\x12\x34\x56\x78' pstr1=c_char_p() pstr2=c_char_p() pstr3=c_char_p() pstr1.value=str1 pstr2.value=str2 pstr3.value=str3 dll.CurCalc_DES_MAC64(805306481,pstr1,0,pstr2,13,pstr3) print pstr1 print pstr2 print pstr3 stro=pstr3.value print stro strtemp='' for c in stro: print"%02x"%(ord(c)) strtemp+="{0:02x}".format(ord(c)) print strtemp os.execlp("E:\\RSA.exe",'') s=raw_input('press any key to continue...') tcp的socket連接報文測試工具 #-*-coding:utf-8-*- import socket from myutil import* from binascii import unhexlify as unhex from ctypes import* dll=cdll.LoadLibrary('mydll.dll') print'begin load mydll..' HOST,PORT="192.168.51.28",5800 sd="1234567812345678" #Create a socket(SOCK_STREAM means a TCP socket) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: #Connect to server and send data sock.connect((HOST,int(PORT)) print"Sent1 OK:" print sd #Receive data from the server and shut down received=sock.recv(1024) print"Received:" print_hex(received) print'received len is 0x%02x'%(len(received)) print'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print"Sent2 OK:" print sd2 #Receive data from the server and shut down received1=sock.recv(1024) print"Received1:" print_hex(received1) print'received1 len is 0x%02x'%(len(received1)) finally: sock.close() s=raw_input('press any key to continue...') 報文拼接與加解密測試 #-*-coding:gb2312-*- import socket from myutil import* from binascii import unhexlify as unhex from ctypes import* dll=cdll.LoadLibrary('mydll.dll') print'begin load mydll..' #key key='\xF1\xE2\xD3\xC4\xF1\xE2\xD3\xC4' #output MAC mac='\x00'*8 data='\x00'*8 pkey=c_char_p() pdata=c_char_p() pmac=c_char_p() pkey.value=key pdata.value=data pmac.value=mac #pack1 class pack: pass pk=pack() pk.len='00000032' pk.ID='0001' pk.slnum='00000004' pk.poscode='123456781234' pk.rand='1122334455667788' pk.psam='313233343536' pk.kind='0000' pk.ver='000001' pk.time='20140805135601' pk.mac='06cc571e6d96e12d' data=unhex(pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time) #print_hex(data) pdata.value=data #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,42,pmac) stro=pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk.mac=strtemp #data to send sd=pk.len+pk.ID+pk.slnum+pk.poscode+pk.rand+pk.psam+pk.kind+pk.ver+pk.time+pk.mac print'send1 len is 0x%02x'%(len(sd)/2) print sd #pack2 class pack2: pass pk2=pack2() pk2.len='0000006E' pk2.ID='0012' pk2.slnum='00000005' pk2.fatCode='00' pk2.cardASN='0000000000000000' pk2.cardType='00' pk2.userNO='0000000000000000' pk2.fileName1='00000000000000000000000000000015' pk2.dataLen1='00' pk2.dataArea1='00000000000000319999990800FB2014080620240806FFFFFFFFFFFFFFFFFFFF' pk2.fileName2='00000000000000000000000000000016' pk2.dataLen2='00' pk2.dataArea2='000003E800FFFF16' pk2.mac='06cc571e6d96e12d' data2=unhex(pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2) pdata.value=data2 #cacl MAC dll.CurCalc_DES_MAC64(805306481,pkey,0,pdata,102,pmac) stro=pmac.value strtemp='' for c in stro: strtemp+="{0:02x}".format(ord(c)) #print strtemp pk2.mac=strtemp #data to send sd2=pk2.len+pk2.ID+pk2.slnum+pk2.fatCode+pk2.cardASN+pk2.cardType+pk2.userNO+pk2.fileName1+pk2.dataLen1+pk2.dataArea1+pk2.fileName2+pk2.dataLen2+pk2.dataArea2+pk2.mac print'send2 len is 0x%02x'%(len(sd2)/2) print sd2 #PORT="192.168.60.37" #PORT="localhost" HOST,PORT="192.168.51.28",5800 #Create a socket(SOCK_STREAM means a TCP socket) sock=socket.socket(socket.AF_INET,socket.SOCK_STREAM) try: #Connect to server and send data sock.connect((HOST,int(PORT)) #data="123456789" #s=struct.pack('bbb',1,2,3) sock.send(sd.decode('hex')) print"Sent1 OK:" print sd #Receive data from the server and shut down received=sock.recv(1024) print"Received:" print_hex(received) print'received len is 0x%02x'%(len(received)) print'received data analysis...' re1=received[0:4] print_hex(re1) re1=received[4:6] print_hex(re1) re1=received[6:10] print_hex(re1) re1=received[10:16] print_hex(re1) #pack2 send sock.send(sd2.decode('hex')) print"Sent2 OK:" print sd2 #Receive data from the server and shut down received1=sock.recv(1024) print"Received1:" print_hex(received1) print'received1 len is 0x%02x'%(len(received1)) finally: sock.close() s=raw_input('press any key to continue...') 二進制文件解析工具 #-*-coding:utf-8-*- from myutil import* from binascii import unhexlify as unhex import os path=os.getcwd() path+='\\rec04.bin' #print path print"begin ans......" f1=open(path,'rb') for i in range(1,35): s=f1.read(280) print"data:",i print_hex(s) print'read data is:' print_hex(s) recstatadd=187 print"終端編號:" print_hex(s[recstatadd:recstatadd+10]) print"卡號長度:" print_hex(s[10]) print"卡號:" print_hex(s[11:11+10]) print"持卡序號1+所屬地城市代碼2+交易地城市代碼2" print_hex(s[recstatadd+22:recstatadd+22+5]) print"應用交易計數(shù)器" print_hex(s[92:92+2]) print"交易前余額4,交易金額3" print_hex(s[recstatadd+29:recstatadd+29+7]) print"交易日期:" print_hex(s[99:99+3]) print"交易時間:" print_hex(s[44:44+3]) print"終端編號" print_hex(s[21:21+8]) print"商戶編號" print_hex(s[21+8:21+8+15]) print"批次號" print_hex(s[5:5+3]) print"應用密文" print_hex(s[47:47+8]) print"授權金額" print_hex(s[103:103+6]) print"其他金額" print_hex(s[115:115+6]) print"終端驗證結果" print_hex(s[94:5+94]) print"應用交易計數(shù)器" print_hex(s[92:92+4]) print"卡片驗證結果" print_hex(s[56:56+32]) print"卡片序列號:" print_hex(s[131]) f1.close() 抓取動漫圖片 #-*-coding:utf8-*- #2013.12.36 19:41 #抓取dbmei.com的圖片。 from bs4 import BeautifulSoup import os,sys,urllib2,time,random #創(chuàng)建文件夾 path=os.getcwd()#獲取此腳本所在目錄 new_path=os.path.join(path,u'暴走漫畫') if not os.path.isdir(new_path): os.mkdir(new_path) def page_loop(page=1): url='http://baozoumanhua.com/all/hot/page/%s?sv=1389537379'%page content=urllib2.urlopen(url) soup=BeautifulSoup(content) my_girl=soup.find_all('div',class_='img-wrap') for girl in my_girl: jokes=girl.find('img') link=jokes.get('src') flink=link print flink content2=urllib2.urlopen(flink).read() #with open(u'暴走漫畫'+'/'+time.strftime('%H-%M-%S')+random.choice('qwertyuiopasdfghjklzxcvbnm')+flink[-5:],'wb')as code:#在OSC上現(xiàn)學的 with open(u'暴走漫畫'+'/'+flink[-11:],'wb')as code: code.write(content2) page=int(page)+1 print u'開始抓取下一頁' print'the%s page'%page page_loop(page) page_loop() 抓取網(wǎng)站模板 #!/usr/bin/env python #-*-coding:utf-8-*- #by yangyongzhen #2016-12-06 from bs4 import BeautifulSoup import urllib,urllib2,os,time import re rootpath=os.getcwd()+u'/抓取的模板/' def makedir(path): if not os.path.isdir(path): os.makedirs(path) #創(chuàng)建抓取的根目錄 makedir(rootpath) #顯示下載進度 def Schedule(a,b,c): ''''' a:已經(jīng)下載的數(shù)據(jù)塊 b:數(shù)據(jù)塊的大小 c:遠程文件的大小 ''' per=100.0*a*b/c if per>100: per=100 print'%.2f%%'%per def grabHref(url,listhref,localfile): html=urllib2.urlopen(url).read() html=unicode(html,'gb2312','ignore').encode('utf-8','ignore') content=BeautifulSoup(html).findAll('link') myfile=open(localfile,'w') pat=re.compile(r'href="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content=BeautifulSoup(html).findAll('script') pat=re.compile(r'src="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) if h: href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans content=BeautifulSoup(html).findAll('a') pat=re.compile(r'href="([^"]*)"') pat2=re.compile(r'http') for item in content: h=pat.search(str(item)) if h: href=h.group(1) if pat2.search(href): ans=href else: ans=url+href listhref.append(ans) myfile.write(ans) myfile.write('\r\n') print ans myfile.close() def main(): url="http://192.168.72.140/qdkj/"#采集網(wǎng)頁的地址 listhref=[]#鏈接地址 localfile='ahref.txt'#保存鏈接地址為本地文件,文件名 grabHref(url,listhref,localfile) listhref=list(set(listhref))#去除鏈接中的重復地址 curpath=rootpath start=time.clock() for item in listhref: curpath=rootpath name=item.split('/')[-1] fdir=item.split('/')[3:-1] for i in fdir: curpath+=i curpath+='/' print curpath makedir(curpath) local=curpath+name urllib.urlretrieve(item,local,Schedule)#遠程保存函數(shù) end=time.clock() print u'模板抓取完成!' print u'一共用時:',end-start,u'秒' if __name__=="__main__": main()
到此為止,關于這篇文章的內容,小編就給大家介紹到這里了,希望可以給大家?guī)砀鄮椭?/p>
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://systransis.cn/yun/128240.html
摘要:做這一領域的工作,有很多網(wǎng)站能夠起到輔助性的作用。再加上爬蟲相對于其他熱門方向來說,更容易學。也促使更多人會優(yōu)先選擇學習爬蟲。能夠代替手工完成手工無法完成的測試任務,并且可以記錄相關數(shù)據(jù)及報告。 ...
摘要:簡單介紹自帶庫,使用調試程序還是很方便的。比如下圖就是展示斷點進入到內部之后,打印的參數(shù),打印某個變量退出調試,直接退出調試或者使用的方式退出最后說一句上面展示的使用調試的過程其實是很簡單的,文章中主要通過截圖展示運行的效果。 簡單介紹 Python自帶 Pdb庫,使用 Pdb調試 Python程序還是很方便的。但是遠程調試、多線程,Pdb是搞不定的 本文參考的相關文章如下: 《指針...
摘要:內存池機制提供了對內存的垃圾收集機制,但是它將不用的內存放到內存池而不是返回給操作系統(tǒng)。為了加速的執(zhí)行效率,引入了一個內存池機制,用于管理對小塊內存的申請和釋放。 注:答案一般在網(wǎng)上都能夠找到。1.對if __name__ == main的理解陳述2.python是如何進行內存管理的?3.請寫出一段Python代碼實現(xiàn)刪除一個list里面的重復元素4.Python里面如何拷貝一個對象?...
摘要:入門,第一個這是一門很新的語言,年前后正式公布,算起來是比較年輕的編程語言了,更重要的是它是面向程序員的函數(shù)式編程語言,它的代碼運行在之上。它通過編輯類工具,帶來了先進的編輯體驗,增強了語言服務。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不覺已經(jīng)到來了,總結過去的 2017,相信小伙們一定有很多收獲...
摘要:入門,第一個這是一門很新的語言,年前后正式公布,算起來是比較年輕的編程語言了,更重要的是它是面向程序員的函數(shù)式編程語言,它的代碼運行在之上。它通過編輯類工具,帶來了先進的編輯體驗,增強了語言服務。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不覺已經(jīng)到來了,總結過去的 2017,相信小伙們一定有很多收獲...
摘要:入門,第一個這是一門很新的語言,年前后正式公布,算起來是比較年輕的編程語言了,更重要的是它是面向程序員的函數(shù)式編程語言,它的代碼運行在之上。它通過編輯類工具,帶來了先進的編輯體驗,增強了語言服務。 showImg(https://segmentfault.com/img/bV1xdq?w=900&h=385); 新的一年不知不覺已經(jīng)到來了,總結過去的 2017,相信小伙們一定有很多收獲...
閱讀 923·2023-01-14 11:38
閱讀 896·2023-01-14 11:04
閱讀 756·2023-01-14 10:48
閱讀 2056·2023-01-14 10:34
閱讀 961·2023-01-14 10:24
閱讀 840·2023-01-14 10:18
閱讀 510·2023-01-14 10:09
閱讀 588·2023-01-14 10:02