13.网络编程

予早 2024-10-05 11:38:27
Categories: Tags:

socket,套接字

BSD Socket

# BSD(Berkeley Software Distribution) Socket是C语言对socket的具体实现,BSD Socket的API抽象为事实上的socket标准,C语言之外大多语言会实现与之相似的BSD Socket API

# class socket.socket(family=AF_INET, type=SOCK_STREAM, proto=0, fileno=None)
# address_family,地址族,默认取AF_INET
# AF_INET,IPV4,用于网络通信
# AF_INET6,IPV6,用于网络通信
# AF_UNIX,UNIX域套接字,用于本地通信
# AF_CAN,用于嵌入式系统网络通信
# AF_PACKET,用于底层网络通信,直接在数据链路层(以太网)对原始数据操进行操作。网络监控和分析工具中有所体现
# AF_RDS,用于高性能网络通信

# 套接字类型
# SOCK_STREAM 默认值
# SOCK_STREAM,流式套接字,基于TCP,实现可靠的、有序的、有连接的字节流通信
# SOCK_DGRAM, 数据报套接字,基于UDP,实现不可靠的、无序的、无连接的字节流通信
# SOCK_RAW, 原始套接字。基于自定义协议


# 协议号
# 协议号通常为零并且可以省略,使用默认协议
# 地址族为AF_CAN时可选:
# CAN_RAW,原始CAN协议
# CAN_BCM,基于基于广播管理器(Broadcast Manager)的CAN
# CAN_ISOTP,基于ISO-TP(ISO Transport Protocol)的CAN
# CAN_J1939,基于SAE J1939的CAN,用于车辆系统

# 粘包问题
#
# 要发送的数据切分后成为多个数据包,经由发送缓冲区和接收缓冲区存储后不同包之间可能会粘连从而模糊包之间的数据边界,应用层使用时又需要知道数据边界从而正确展示信息,此时就导致粘包问题
#
# 1.粘包问题由传输层发送和接收时模糊数据边界与应用层需要区分数据边界的矛盾产生
# 2.问题只会出在传输层和应用层之间,网络层和传输层也分片,但此时无需关心数据边界
# 3.TCP基于数据流,会出现粘包问题,UDP基于数据报,不会出现粘包问题
# 4.由于TCP不区分数据边界,通常会由发送端和接收端约定在处理的内容中写明数据长度以明确数据边界

socket 库

import socket

sk = socket.socket()
sk.bind(('127.0.0.1', 8898))  # 把地址绑定到套接字
sk.listen()  # 监听链接
conn, addr = sk.accept()  # 接受客户端链接
ret = conn.recv(1024)  # 接收客户端信息
print(ret)  # 打印客户端信息
conn.send(b'hi')  # 向客户端发送信息
conn.close()  # 关闭客户端套接字
sk.close()  # 关闭服务器套接字(可选)
import socket

udp_sk = socket.socket(type=socket.SOCK_DGRAM)  # 创建一个服务器的套接字
udp_sk.bind(('127.0.0.1', 9000))  # 绑定服务器套接字
msg, addr = udp_sk.recvfrom(1024)
print(msg)
udp_sk.sendto(b'hi', addr)  # 对话(接收与发送)
udp_sk.close()  # 关闭服务器套接字
import socket

ip_port = ('127.0.0.1', 9000)
udp_sk = socket.socket(type=socket.SOCK_DGRAM)
udp_sk.sendto(b'hello', ip_port)
back_msg, addr = udp_sk.recvfrom(1024)
print(back_msg.decode('utf-8'), addr)
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

sock.bind(("127.0.0.1", 8023))
sock.listen()

print(sock.getsockname())
# print(sock.getpeername())

print(sock.fileno())
sock.makefile()

sock.setblocking(True)

# sock.send() 发送 TCP 数据
# sock.sendall() 完整发送 TCP 数据
# sock.recv() 接收 TCP 数据

# sock.recvfrom() 接收 UDP 数据
# sock.sendto() 发送 UDP 数据


sock.close()

缓冲区分析

# 接收信息
import socket

tcp_socket_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_socket_server.bind(('127.0.0.1', 8080))
tcp_socket_server.listen()

conn, addr = tcp_socket_server.accept()

data1 = conn.recv(10)
data2 = conn.recv(10)

print('----->', data1.decode('utf-8'))
print('----->', data2.decode('utf-8'))

conn.close()
# 发送信息
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 3)
buffer_size = s.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
print(f"发送缓冲区大小为: {buffer_size} 字节")
res = s.connect_ex(('127.0.0.1', 8080))

print(res)

s.send('hello'.encode('utf-8'))
s.send('world'.encode('utf-8'))
# 发送信息
import socket

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1024)
buffer_size = s.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
print(f"发送缓冲区大小为: {buffer_size} 字节")
res = s.connect_ex(('127.0.0.1', 8080))

print(res)

s.send(('hello' * 100).encode('UTF-8'))
s.send(('world' * 100).encode('UTF-8'))

socket 多路复用

select、poll、epoll均用于监控多个描述符

select

import socket
import select
import queue

# 创建服务端套接字,该套接字基于TCP协议
ser_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 设置socket为非阻塞io,即不会在第一阶段阻塞
ser_socket.setblocking(False)

# socket绑定到本地地址,此时端口被占用
server_address = ('localhost', 8023)
ser_socket.bind(server_address)

# 开始监听连接,最大等待连接数为5,其余连接直接被拒绝
ser_socket.listen(5)
print("log: server is listening on %s:%s." % ser_socket.getsockname())
# print(f"log: {ser_socket.getsockname()} to {ser_socket.getpeername()}.")  # 服务端套接字并未建立连接,调用getpeername会报错

rlist = [ser_socket]  # wait until ready for reading
wlist = []  # wait until ready for writing
xlist = []  # wait for an "exceptional condition"
msg_queues = {}  # 每个连接socket有一个发送消息的队列

while rlist:
    # io多路复用
    # r_ready_list, w_ready_list, x_ready_list分别为rlist、wlist、xlist的子集
    r_ready_list, w_ready_list, x_ready_list = select.select(rlist, wlist, xlist)
    print(len(rlist), len(wlist), len(xlist), len(rlist) + len(wlist) + len(xlist))

    # 处理可以读数据的socket(即要读的数据已经由内核线程准备好)
    for sock in r_ready_list:
        # 若sock为server socket,则说明有客户端连接到服务端
        if sock is ser_socket:
            # client连接server即是向server发送数据,server socket会读取数据
            # 接收客户端连接,并在客户端创建连接套接字,此后client socket会与这个连接套接字通信而不是服务端套接字
            con_socket, addr = sock.accept()
            # 连接套接字设置为非阻塞IO
            con_socket.setblocking(False)
            # 将连接套接字加入rlist
            rlist.append(con_socket)
            # 为socket分配消息队列
            msg_queues[con_socket] = queue.Queue()
            print(f"log: server accepts a conn {con_socket.getsockname()} to {con_socket.getpeername()}.")

        # 若sock为connection socket,则说明客户端发送数据过来了
        else:
            # 读取客户端数据,最多读取1024byte
            data = sock.recv(1024)

            # 若数据不为空,假定回复收到,需将该连接socket添加至wlist交由select多路复用监控
            if data:
                print(f"client{sock.getpeername()}: {data.decode('utf-8')}.")
                print(f"server{ser_socket.getsockname()}: Gotten.")
                msg_queues[sock].put("Gotten.".encode("UTF-8"))
                if sock not in wlist:
                    wlist.append(sock)
            # 若数据为空,说明已断开连接,释放资源
            else:
                print(f"log: server closes a conn {sock.getsockname()} to {sock.getpeername()}.")
                if sock in wlist:
                    wlist.remove(sock)
                rlist.remove(sock)
                sock.close()
                del msg_queues[sock]

    # 处理可以写数据的socket(即要写的数据已经由应用线程准备好)
    for sock in w_ready_list:
        # 有数据就发送数据
        if not msg_queues[sock].empty():
            sock.send(msg_queues[sock].get_nowait())
        # 没数据就退出wlist,不再由select监控写数据
        if msg_queues[sock].empty():
            wlist.remove(sock)

    # 处理出现异常的socket,退出select多路复用,释放资源
    for sock in x_ready_list:
        rlist.remove(sock)
        if sock in wlist:
            wlist.remove(sock)
        sock.close()
        del msg_queues[sock]

epoll

import socket
import select
import queue


ser_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

ser_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

server_address = ("127.0.0.1", 8023)
ser_socket.bind(server_address)
ser_socket.listen(10)
print("log: server is listening on %s:%s." % ser_socket.getsockname())
ser_socket.setblocking(False)
timeout = 10
# 创建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
# 注册服务器监听fd到等待读事件集合
epoll.register(ser_socket.fileno(), select.EPOLLIN)
# 保存连接客户端消息的字典,格式为{}
message_queues = {}
# 文件句柄到所对应对象的字典,格式为{句柄:对象}
fd_to_socket = {ser_socket.fileno(): ser_socket, }

while True:
    # 轮询注册的事件集合,返回值为[(文件句柄,对应的事件),(...),....]
    events = epoll.poll(timeout)
    for fd, event in events:
        socket = fd_to_socket[fd]
        # 如果活动socket为当前服务器socket,表示有新连接
        if socket == ser_socket:
            con_socket, address = ser_socket.accept()
            # 新连接socket设置为非阻塞
            con_socket.setblocking(False)
            # 注册新连接fd到待读事件集合
            epoll.register(con_socket.fileno(), select.EPOLLIN)
            # 把新连接的文件句柄以及对象保存到字典
            fd_to_socket[con_socket.fileno()] = con_socket
            # 以新连接的对象为键值,值存储在队列中,保存每个连接的信息
            message_queues[con_socket] = queue.Queue()
            print(f"log: server accepts a conn {con_socket.getsockname()} to {con_socket.getpeername()}.")
        # 关闭事件
        elif event & select.EPOLLHUP:
            print(f"log: server closes a conn {socket.getsockname()} to {socket.getpeername()}.")
            # 在epoll中注销客户端的文件句柄
            epoll.unregister(fd)
            # 关闭客户端的文件句柄
            socket.close()

            # 在字典中删除与已关闭客户端相关的信息
            del fd_to_socket[fd]
        # 可读事件
        elif event & select.EPOLLIN:
            # 接收数据
            data = socket.recv(1024)
            if data:
                print(f"client{socket.getpeername()}: {data.decode('utf-8')}.")
                # 将数据放入对应客户端的字典
                message_queues[socket].put(data)
                # 修改读取到消息的连接到等待写事件集合(即对应客户端收到消息后,再将其fd修改并加入写事件集合)
                epoll.modify(fd, select.EPOLLOUT)
        # 可写事件
        elif event & select.EPOLLOUT:
            try:
                # 从字典中获取对应客户端的信息
                msg = message_queues[socket].get_nowait()
            except queue.Empty:
                print(socket.getpeername(), " queue empty")
                # 修改文件句柄为读事件
                epoll.modify(fd, select.EPOLLIN)
            else:
                print(f"server{ser_socket.getsockname()}: Gotten.")
                socket.send("Gotten.".encode("UTF-8"))

# 在epoll中注销服务端文件句柄
epoll.unregister(ser_socket.fileno())
# 关闭epoll
epoll.close()
# 关闭服务器socket
ser_socket.close()

单客户端

import socket

ip_port = ('localhost', 8023)

conn_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
conn_socket.connect(ip_port)
while True:
    inp = input(f"client{conn_socket.getsockname()}: ").strip()
    if not inp:
        continue
    conn_socket.send(inp.encode('utf-8'))  # 数据发给服务端,先进行编码
    data = conn_socket.recv(1024)
    print(f"server{conn_socket.getpeername()}: {data.decode('utf-8')}")  # 接收到服务端返回的数据,进行解码

conn_socket.close()

多客户端测试连接数

import socket

ip_port = ('localhost', 8023)

for i in range(510):
    globals()[f'socket{i}'] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    globals()[f'socket{i}'].connect(ip_port)
    print(i)

socketserver

https://www.cnblogs.com/vipchenwei/p/7413523.html

MySockServer

import socketserver


class MySockServer(socketserver.BaseRequestHandler):

    def handle(self):  # handle(self)方法是必须要定义的,可以看上面的说明
        print('Got a new connection from', self.client_address)
        while True:
            data = self.request.recv(1024)  # 需要通过self的方法调用数据接收函数
            if not data: break
            print('recv:', data)

            self.request.send(data.upper())  # 需要通过self的方法调用数据接收函数


def main():
    HOST = '127.0.0.1'  # 定义侦听本地地址口(多个IP地址情况下),这里表示侦听所有
    PORT = 50007  # Server端开放的服务端口
    s = socketserver.ThreadingTCPServer((HOST, PORT), MySockServer)
    # 调用SocketServer模块的多线程并发函数
    s.serve_forever()  # 持续接受客户端的连接


if __name__ == '__main__':
    main()
import socket

HOST = '127.0.0.1'
PORT = 50007
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))

try:
    while True:
        user_input = input('msg to send:').strip()
        s.sendall(user_input.encode("UTF-8"))
        data = s.recv(1024)
        print('Received', repr(data.decode("UTF-8")))

finally:
    s.close()

FtpServer

import json
import os
import socketserver
import struct


class FtpServer(socketserver.BaseRequestHandler):
    coding = 'utf-8'
    server_dir = 'file_upload'
    max_packet_size = 1024
    BASE_DIR = os.path.dirname(os.path.abspath(__file__))

    def handle(self):
        print(self.request)
        while True:
            data = self.request.recv(4)
            data_len = struct.unpack('i', data)[0]
            head_json = self.request.recv(data_len).decode(self.coding)
            head_dic = json.loads(head_json)
            # print(head_dic)
            cmd = head_dic['cmd']
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(head_dic)

    def put(self, args):
        file_path = os.path.normpath(os.path.join(
            self.BASE_DIR,
            self.server_dir,
            args['filename']
        ))

        filesize = args['filesize']
        recv_size = 0
        print('----->', file_path)
        with open(file_path, 'wb') as f:
            while recv_size < filesize:
                recv_data = self.request.recv(self.max_packet_size)
                f.write(recv_data)
                recv_size += len(recv_data)
                print('recvsize:%s filesize:%s' % (recv_size, filesize))


ftpserver = socketserver.ThreadingTCPServer(('127.0.0.1', 8080), FtpServer)
ftpserver.serve_forever()
import json
import os
import socket
import struct


class MYTCPClient:
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding = 'utf-8'

    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        self.server_address = server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    def client_connect(self):
        self.socket.connect(self.server_address)

    def client_close(self):
        self.socket.close()

    def run(self):
        while True:
            inp = input(">>: ").strip()
            if not inp: continue
            l = inp.split()
            cmd = l[0]
            if hasattr(self, cmd):
                func = getattr(self, cmd)
                func(l)

    def put(self, args):
        cmd = args[0]
        filename = args[1]
        if not os.path.isfile(filename):
            print('file:%s is not exists' % filename)
            return
        else:
            filesize = os.path.getsize(filename)

        head_dic = {'cmd': cmd, 'filename': os.path.basename(filename), 'filesize': filesize}
        print(head_dic)
        head_json = json.dumps(head_dic)
        head_json_bytes = bytes(head_json, encoding=self.coding)

        head_struct = struct.pack('i', len(head_json_bytes))
        self.socket.send(head_struct)
        self.socket.send(head_json_bytes)
        send_size = 0
        with open(filename, 'rb') as f:
            for line in f:
                self.socket.send(line)
                send_size += len(line)
                print(send_size)
            else:
                print('upload successful')


client = MYTCPClient(('127.0.0.1', 8080))

client.run()

TCP粘包

TCP粘包指,发送方发出若干包数据,到达接收方时这些原本单独的数据包粘成了一个数据包。

由于TCP粘包现象的存在,不能直接以数据包本身作为信息边界,而是需要额外先判断信息边界,然后再处理信息。

粘包情况有两种,一种是粘在一起的包都是完整的数据包,另一种情况是粘在一起的包有不完整的包。

TCP粘包现象的原因可能来自发送方,也可能来自接收方。

发送方

发送方使用 Nagle 算法,Nagle 算法用于减少网络中报文的数量,会将多个报文打包发送给接收端。

接收端

接收端会先将数据包保存在缓冲区,然后应用程序从缓冲区读取数据,若接收数据包到缓存的速度大于应用程序从缓存中读取数据包的速度,多个包就会被缓存,应用程序就有可能读取到多个首尾相接粘到一起的包。

粘包现象何时需要处理

当需要区分消息边界然后再处理消息时就需要处理粘包现象

短链接:一次报文发收,无需考虑粘包现象

长连接:

  1. 若发送数据无结构(例如文件传输),发送方只需发送,接收方只需接收处理,无需考虑数据边界,自然不用处理粘包现象
  2. 若发送数据有结构,且发送不同结构的数据,此时需要考虑数据边界以区分不同的数据结构,需要考虑粘包现象

粘包现象处理方法

发送方传输层

对于发送方传输层造成的粘包问题,可以通过关闭Nagle算法来解决,使用TCP_NODELAY选项来关闭算法。

接收方传输层

接收方传输层无法处理粘包现象,粘包现象TCP本身的特点。

接收方应用层

由于消息边界取决于应用层应用程序发送的消息,故接收方一定是应用层处理该问题,而且在应用层处理,发送方传输层可以开启Nagle算法而提升效率。

应用层解决粘包现象导致的信息边界模糊问题

解决办法:循环处理,应用程序从接收缓存中读取分组时,读完一条数据,就应该循环读取下一条数据,直到所有数据都被处理完成,但是如何判断每条数据的长度呢?
格式化数据:每条数据有固定的格式(开始符,结束符),这种方法简单易行,但是选择开始符和结束符时一定要确保每条数据的内部不包含开始符和结束符。
发送长度:发送每条数据时,将数据的长度一并发送,例如规定数据的前4位是数据的长度,应用层在处理时可以根据长度来判断每个分组的开始和结束位置。

对比一下TCP和UDP

TCP为了保证可靠传输并减少额外的开销(每次发包都要验证),采用了基于流的传输,基于流的传输不认为消息是一条一条的,是无保护消息边界的(保护消息边界:指传输协议把数据当做一条独立的消息在网上传输,接收端一次只能接受一条独立的消息)。

UDP则是面向消息传输的,是有保护消息边界的,接收方一次只接受一条独立的信息,所以不存在粘包问题。

举个例子:有三个数据包,大小分别为2k、4k、6k,如果采用UDP发送的话,不管接受方的接收缓存有多大,我们必须要进行至少三次以上的发送才能把数据包发送完,但是使用TCP协议发送的话,我们只需要接受方的接收缓存有12k的大小,就可以一次把这3个数据包全部发送完毕。

可以使用struck模块来复现粘包现象

requests

import requests

response = requests.request(method='GET', url="https://www.baidu.com", params=None, data=None, headers=None, cookies=None)

print(response.status_code)
print(response.text)
print(response.url)
print(response.raw)
print(response.history)
print(response.ok)
print(response.reason)
print(response.request.body)
print(response.request.hooks)