本文共 2823 字,大约阅读时间需要 9 分钟。
运行环境python2.7
需要安装dpkt-1.8.6.2和win_inet_pton-1.0.1两个库
#!/usr/bin/env python
import dpkt
import datetime import socket import binascii import win_inet_pton from ctypes import *def inet_to_str(inet):
try: return socket.inet_ntop(socket.AF_INET, inet) except ValueError: return socket.inet_ntop(socket.AF_INET6, inet)def process_modbus_ctrl(data):
byte = '' byte_list = [] for i in range(0,len(data)): if i % 2 == 0 and i != len(data) -1: byte = byte + '0x' byte = byte +data[i] if i % 2 == 1: byte_list.append(byte) byte = '' if byte_list[7] == '0x06': addr = byte_list[8] + ' ' + byte_list[9] value = int(byte_list[10],16) * 256 + int(byte_list[11],16) print('Addr:%s Set value:%d' % (addr,value))def process_short_value(byte_list):
index_list = [4,14,16,18,20,22,24,26] print_buf = '' for index in index_list: real_index = index + 9 value = int(byte_list[real_index],16) * 256 + int(byte_list[real_index + 1],16) if value > 32767: value = value - 65536 print_buf = print_buf + 'Addr Index:' + str(index/2) + ' Value:' + str(value) + ',' print(print_buf[:-1])def convert(s):
i = int(s, 16) cp = pointer(c_int(i)) fp = cast(cp, POINTER(c_float)) return fp.contents.valuedef process_float_value(byte_list):
index_list = [0,76,108,220,224,228] print_buf = '' value_buf = '' for index in index_list: real_index = index + 9 value_buf = byte_list[real_index][2:]+byte_list[real_index+1][2:]+byte_list[real_index+2][2:]+byte_list[real_index+3][2:] value = convert(value_buf) print_buf = print_buf + 'Addr Index:' + str(index/2) + ' Value:' + str(value) + ',' print(print_buf[:-1])def process_modbus_data(data,data_type):
byte = '' byte_list = [] for i in range(0,len(data)): if i % 2 == 0 and i != len(data) -1: byte = byte + '0x' byte = byte +data[i] if i % 2 == 1: byte_list.append(byte) byte = '' if byte_list[7] == '0x06': pass else: if data_type == 0: process_short_value(byte_list) else: process_float_value(byte_list) def analyze_packet(pcap,hostIp,port,flag,data_type): for timestamp, buf in pcap: eth = dpkt.ethernet.Ethernet(buf) if not isinstance(eth.data, dpkt.ip.IP): print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__) continue ip = eth.data tcp = ip.datasrcIp = inet_to_str(ip.src)
dstIp = inet_to_str(ip.dst) if (tcp.dport != port and tcp.sport != port) or len(tcp.data) <= 0: continue if flag == 0: if srcIp == hostIp: hex_data = binascii.hexlify(tcp.data) process_modbus_ctrl(hex_data) elif flag == 1: if dstIp == hostIp: hex_data = binascii.hexlify(tcp.data) process_modbus_data(hex_data,data_type) else: passif __name__ == '__main__':
with open('device_tcpdump.cap', 'rb') as f: hostIp = '192.168.201.23' port = 502 flag = 1 #flag=0:ctrl,flag=1:data data_type = 1 #data_type=0:short,data_type=1:sw_float pcap = dpkt.pcap.Reader(f) analyze_packet(pcap,hostIp,port,flag,data_type)转载地址:http://dpqvi.baihongyu.com/