dpkt处理pcap包的TCP/UDP流

网络行为学导论做的一个实践作业,主要是利用commview抓包和dpkt分析,与一般抓包不同的点是利用透明代理抓境外流量

第一部分:抓包

利用可翻墙的虚拟机作为透明代理,首先将虚拟机设置为桥接模式让其在局域网可见,再手动设置安卓机的网关为虚拟机的IP,在安卓机上采用了黑域以获取更纯净的流量

在宿主机利用commview进行抓包,采用了高级过滤和IP过滤两种方式

1 2

第二部分:分析

采用[dpkt][https://dpkt.readthedocs.io/en/latest/]对pcap进行读取

1
pip install dpkt

选取的特征如下:

流开始时间(整型),持续时间(整型),客户端IP(字符串),客户端端口(主机序),服务器IP(字符串),服务器端口(主机序),应用名称,应用行为,流的总包数,流的总字节数,平均每包字节数

1
["StartTime", "DurationTime", "ClientIP", "ClientPort", "ServerIP", "ServerPort", "AppName", "AppAction", "FlowPackets", "FlowBytes", "PacketsBytesAvg"]

其中,AppName和AppAction从文件名获取,其余部分通过解析包获得,具体代码如下:

读取数据pcap包

1
2
3
4
f = open(pcap_path, 'rb')
pcap = dpkt.pcap.Reader(f)
for timestamp, buf in pcap:
pass

解以太网帧,对没有IP段的包过滤

1
2
3
4
5
for timestamp, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf) # 解以太网桢
if eth.type != dpkt.ethernet.ETH_TYPE_IP: # 对没有IP段的包过滤掉
continue
ip = eth.data

对传输层协议非TCP的包进行过滤

1
2
if isinstance(ip.data, dpkt.tcp.TCP):                   				# 解包,判断传输层协议是否是TCP
this_tcp = ip.data

对流的处理

采用启发式的方法,即对于一对IP而言,根据flags判断是否处于握手阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def JudgeTcpState(flags):
aim = 1
# 从右往左第几位,从1开始
fin = flags & (1 << (aim - 1)) > 0

aim = 2
syn = flags & (1 << (aim - 1)) > 0

aim = 5
ack = flags & (1 << (aim - 1)) > 0

if syn is True:
# 第一次握手,只有syn为True
return 'fshake'
if fin is True:
# 如果真正想确认挥手,需要判断不止一条数据的flag
return 'wave'
return 'Transdata'

以下是dpkt的TCP包源码:

截屏2020-03-27下午5.08.10

对于TCP流的结束,设置一个dict保存挥手状态,当第一次挥手FIN=1之后四次相同IP对的包出现,且其中还有一次FIN=1时,认为挥手成功(这样其实有很大缺陷,比如有的时候挥手异常不止四次)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if ip_set in TCP_wave_dic:                          
if JudgeTcpState(this_flags) == 'wave' and TCP_wave_dic[ip_set][0] == 0:
TCP_wave_dic[ip_set][0] = 1
# print('first wave')
elif TCP_wave_dic[ip_set][1] != 1 and TCP_wave_dic[ip_set][0] == 1:
# 第二次挥手
TCP_wave_dic[ip_set][1] = 1
# print('second wave')
elif JudgeTcpState(this_flags) == 'wave' and TCP_wave_dic[ip_set][2] != 1 and\
TCP_wave_dic[ip_set][0] == 1 and TCP_wave_dic[ip_set][1] == 1:
# 第三次挥手
TCP_wave_dic[ip_set][2] = 1
elif TCP_wave_dic[ip_set][0] == 1 and TCP_wave_dic[ip_set][1] == 1 and \
TCP_wave_dic[ip_set][2] == 1 and TCP_wave_dic[ip_set][3] != 1:
TCP_wave_dic[ip_set][3] = 1
# 第四次挥手,此时开始更新dic的时间戳和平均
TCP_dic[ip_set][1] = int(timestamp)
TCP_dic[ip_set][10] = TCP_dic[ip_set][9] / TCP_dic[ip_set][8]

对于没有挥手成功,但已经握手的IP对,则在pcap读取完毕后,以最后一次相同IP对的时间戳进行保存

对于UDP

采用第一次出现IP对和最后一次出现IP对作为“流”进行统计

完整代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
# -*- coding: UTF-8 -*-
import csv
import dpkt
import socket
from os import listdir


def inet_to_str(inet):
try:
return socket.inet_ntop(socket.AF_INET, inet)
except:
return False


# 这里有个缺陷是,用set保存,则不可以判断方向,挥手可能会有错误
# 并且我默认四次挥手后不会再次收到连接,其实有可能再次握手进行传输
# 缺陷三是只会统计被捕捉到第一次挥手的包,其他tcp流不会被捕捉
def JudgeTcpState(flags):
aim = 1 # 从右往左第几位,从1开始
fin = flags & (1 << (aim - 1)) > 0

aim = 2
syn = flags & (1 << (aim - 1)) > 0

aim = 5
ack = flags & (1 << (aim - 1)) > 0

if syn is True: # 第一次握手,只有syn为True
return 'fshake'
if fin is True: # 如果真正想确认挥手,需要判断不止一条数据的flag
return 'wave'
return 'Transdata'


'''
TCP流:从第一次握手开始统计,只要sip和dip相同,则叠加
'''
def tcp_udp_process(PcapPath):
name_list = PcapPath.split('_')
AppName, AppAction = name_list[3], name_list[5]
print(AppName, AppAction)

TCP_dic = {}
TCP_wave_dic = {} # TCP_wave_dic用于判断挥手状态
UDP_dic = {}

f = open(PcapPath, 'rb')
pcap = dpkt.pcap.Reader(f)
for timestamp, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf) # 解以太网桢
if eth.type != dpkt.ethernet.ETH_TYPE_IP: # 对没有IP段的包过滤掉
continue
ip = eth.data

ip_src = inet_to_str(ip.src) # 原ip和目的ip
ip_dst = inet_to_str(ip.dst)
ip_set = (ip_dst, ip_src) # 用tuple保存ip

if isinstance(ip.data, dpkt.tcp.TCP): # 解包,判断传输层协议是否是TCP
this_tcp = ip.data

this_flags = this_tcp.flags # 若这对ip不在TCP_dict里,且目前处于第一次握手,则放进TCP_dict
if ip_set not in TCP_dic and JudgeTcpState(this_flags) == 'fshake':
fpackets = 1
fbytes = len(eth)

flow_list = []
flow_list.append(int(timestamp)) # 流开始时间
flow_list.append(0) # 结束时间暂时为0,位置为1,最后若未挥手,则根据0判断
flow_list.append(ip_src) # 客户端IP
flow_list.append(this_tcp.sport) # 客户端Port
flow_list.append(ip_dst)
flow_list.append(this_tcp.dport)
flow_list.append(AppName)
flow_list.append(AppAction)
flow_list.append(fpackets) # 持续更新 8
flow_list.append(fbytes) # 持续更新 9
flow_list.append(0) # PacketsBytesAvg四次挥手时再更新
flow_list.append('TCP')

TCP_dic[ip_set] = flow_list
TCP_wave_dic[ip_set] = [0, 0, 0, 0] # 保存在wave_dic中
elif ip_set in TCP_dic: # 传输数据状态,更新fpackets和fbyte
TCP_dic[ip_set][8] += 1
TCP_dic[ip_set][9] += len(eth)
TCP_dic[ip_set][1] = int(timestamp) # 这里更新时间戳是防止没有挥手时,难以找到最后一个tcp的时间戳

if ip_set in TCP_wave_dic: # 这里不能和上分if合并,因为挥手也要统计
if JudgeTcpState(this_flags) == 'wave' and TCP_wave_dic[ip_set][0] == 0:
TCP_wave_dic[ip_set][0] = 1
# print('first wave')
elif TCP_wave_dic[ip_set][1] != 1 and TCP_wave_dic[ip_set][0] == 1: # 第二次挥手
TCP_wave_dic[ip_set][1] = 1
# print('second wave')
elif JudgeTcpState(this_flags) == 'wave' and TCP_wave_dic[ip_set][2] != 1 and\
TCP_wave_dic[ip_set][0] == 1 and TCP_wave_dic[ip_set][1] == 1: # 第三次挥手
TCP_wave_dic[ip_set][2] = 1
elif TCP_wave_dic[ip_set][0] == 1 and TCP_wave_dic[ip_set][1] == 1 and \
TCP_wave_dic[ip_set][2] == 1 and TCP_wave_dic[ip_set][3] != 1:
TCP_wave_dic[ip_set][3] = 1 # 第四次挥手,此时开始更新dic的时间戳和平均
TCP_dic[ip_set][1] = int(timestamp)
TCP_dic[ip_set][10] = TCP_dic[ip_set][9] / TCP_dic[ip_set][8]

if isinstance(ip.data, dpkt.udp.UDP): # 解包,判断传输层协议是否是UDP
this_udp = ip.data

if ip_set not in UDP_dic:
fpackets = 1
fbytes = len(eth)
flow_list = [
int(timestamp),
0,
ip_src,
this_udp.sport,
ip_dst,
this_udp.dport,
AppName,
AppAction,
fpackets,
fbytes,
0,
'UDP'
]
UDP_dic[ip_set] = flow_list
elif ip_set in UDP_dic:
UDP_dic[ip_set][8] += 1
UDP_dic[ip_set][9] += len(eth)
UDP_dic[ip_set][1] = int(timestamp)
# 读取完毕开始更新未四次挥手的tcp包和所有udp包的平均包数,第10喔
for tcp_key, tcp_value in TCP_dic.items():
if tcp_value[10] == 0:
TCP_dic[tcp_key][10] = tcp_value[9] / tcp_value[8]
for udp_key, udp_value in UDP_dic.items():
if udp_value[10] == 0:
UDP_dic[udp_key][10] = udp_value[9] / udp_value[8]
return TCP_dic, UDP_dic


def create_csv(path='flow_output.csv'):
with open(path, 'w') as f:
csv_write = csv.writer(f)
csv_head = (["StartTime", "DurationTime", "ClientIP", "ClientPort", "ServerIP", "ServerPort", "AppName",
"AppAction", "FlowPackets", "FlowBytes", "PacketsBytesAvg"])
csv_write.writerow(csv_head)


# 将tcp和udp的数据写入csv
def write_csv(WritePath, tcp_dic, udp_dic):
with open(WritePath, 'a') as f:
csv_write = csv.writer(f)
for tcp_value in tcp_dic.values():
csv_write.writerow(tcp_value)
for udp_value in udp_dic.values():
csv_write.writerow(udp_value)
print('Write successful!')


if __name__ == '__main__':
# pcap_path = '29404_feishu/mac_feishu_V3.17.3_NA_1.pcap'.replace('/', '/').replace('\\', '\')
# tcp_dic, udp_dic = tcp_udp_process(pcap_path)
# exit()

path = 'flow_output.csv'
create_csv(path)

# write_csv(path, tcp_dic, udp_dic)

cam_path = 'pcap_data/camsurf'
link_path = 'pcap_data/linkedin'
camsurf_files = listdir(cam_path)
linkedin_files = listdir(link_path)
print(camsurf_files)
print(linkedin_files)

for each_file in camsurf_files:
if each_file == '.DS_Store':
continue
thispath = cam_path+'/'+each_file
tcp_dic, udp_dic = tcp_udp_process(thispath)
write_csv(path, tcp_dic, udp_dic)

for each_file in linkedin_files:
if each_file == '.DS_Store':
continue
thispath = link_path+'/'+each_file
tcp_dic, udp_dic = tcp_udp_process(thispath)
write_csv(path, tcp_dic, udp_dic)

有待进一步改善的地方
  • IP对用set保存,则不可以判断上下行方向

  • 默认四次挥手后不会再次收到连接,其实有可能再次握手进行传输,有的时候挥手出现错误不止四次

  • 只会统计被捕捉到第一次挥手的包,其他tcp流不会被捕捉

[python+pcap+dpkt抓包小实例]