Оглавление
Описание
Я привожу пример клиента и сервера, написанных без учёта раздела DDOS. Реализована модель, когда сервер использует несколько клиентов для распределённых вычислений. Сервер отправляет клиентам запросы на вычисление, а клиенты результаты. Этот код использует протокол UDP, параллельные вычисления и может быть использован для создания простых распределённых систем. Для защиты от ошибок введена процедура регистрации. Клиент должен отправить серверу запрос, содержащий:
- Код запроса (1 байт)
- Фиксированную строку (8 байт) - пароль
- Порт, на который он будет принимать сообщения (2 байта)
- Нефиксированную строку с указанием размера - имя пользователя, для отладки, и возможно полноценной регистрации в ваших реализациях (2 и более байта)
Любой запрос имеет следующую структуру:
- Код запроса (1 байт)
- Не более 254 байт - дополнительная информация
Код запроса это целое число определяющее действие, выполняемое при получении запроса.
Этот пример вычисляет простоту чисел из диапазона, распределяя вычисления между клиентами.
Процедура взаимодействия следующая: клиент пытается зарегистрироваться на сервере. Сервер подтверждает регистрацию, затем отправляет клиенту запрос на вычисление. Клиент вычисляет и отвечает.
Запрос на вычисление выглядит следующим образом:
- Код запроса (1 байт)
- Число для проверки (4 байта)
Ответ на запрос:
- Код запроса (1 байт)
- Результат (0 или 1 равносильны False или True) (1 байт)
- Исходное число (4 байта)
В случае своей остановки, клиент отправляет запрос на удаление содержащий 1 байт - код запроса.
Общий код
Все модули используют следующий код:
- import socket
- import multiprocessing
- import sys
- import ctypes
- import threading
- import time
- class Number:
- '''Хранит число и сколько байт оно занимает.'''
- def __init__(self, data, size):
- self.data = data
- self.size = size;
- def append_number(data: bytearray, number: int, size: int):
- for i in range(size):
- data.append(number & 255)
- number = number >> 8
- def append_string(data: bytearray, string: str):
- data.extend(string.encode('utf8'))
- def append_data(data: bytearray, *args):
- for arg in args: #Перебираем аргументы
- if type(arg) == str: #Если встретилась строка, добавляем соотвествующей процедурой
- append_string(data, arg)
- elif type(arg) == Number: #Специальный объект для чисел произвольного размера
- append_number(data, arg.data, arg.size)
- elif type(arg) == list: #Если встречается список, используем оператор распаковки, т.е. append_data(data, [100, 20]) тоже самое, что append_data(data, 100, 20)
- append_data(data, *arg)
- elif type(arg) == tuple: #Если встречается кортеж, используем оператор распаковки, т.е. append_data(data, [100, 20]) тоже самое, что append_data(data, 10
- append_data(data, *arg)
- else: #Если не встретился указанный тип, считаем, что arg - число от 0 до 255 и записываем
- data.append(arg)
- def make_data(*args):
- '''Сформировать bytearray из данных.
- Преобразует любую последовательность аргументов в bytearray'''
- data = bytearray()
- append_data(data, args)
- return data
- def read_number(data, size, offset = 0):
- '''Прочитать число из двоичных данных. Операция обратная append_number'''
- number = 0;
- for i in range(offset, offset + size):
- number += data[i] << (8*(i - offset))
- return number;
- def check(input, check_data):
- '''Проверить что check_data содержится в начале input, т.е. check(make_data(10, 20), make_data(10)) = True'''
- if len(input) < len(check_data):
- return False
- for i in range(len(check_data)):
- if input[i] != check_data[i]:
- return False
- return True
- class ServerRequests:
- '''Описывает все запросы от клиента к серверу.'''
- REGISTER = 0 #Регистрация
- REMOVE = 1 #Полное удаление из списка
- CALC = 2 #Результат вычисления
- class ServerResponse:
- '''Описывает все запросы от сервера к клиенту.'''
- REGISTER = 0 #Успешная регистрация
- RESPONCE = 1 #Выдача задачи
Сервер
- import queue as queue_module
- def process_function(ps, queue: multiprocessing.Queue):
- INPUT_PORT = 3333 #Порт для приёма сообщений
- INPUT_BUFFER = 255 #Максимальный размер пакета, который может прилететь
- INPUT_TIMEOUT = 0.25 #Таймаут для сообщений
- udp_socket_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для приёма сообщений от сервера
- udp_socket_receive.bind(('', INPUT_PORT)) #Указываем, что слушаем порт INPUT_PORT
- udp_socket_receive.settimeout(INPUT_TIMEOUT)
- reg = set()
- while True:
- try:
- server_input, sender = udp_socket_receive.recvfrom(INPUT_BUFFER)
- sender = sender[0]
- if len(server_input) < 1:
- continue
- if server_input[0] == ServerRequests.REGISTER:
- print('Регистрация: ', server_input)
- try:
- if len(server_input) < (1 + 8 + 2 + 1 + 1):
- print('Ошибка регистрации')
- continue
- password = server_input[1:1 + 8].decode('utf8')
- if password != ps:
- print('Неверный пароль:', password)
- port = read_number(server_input, 2, 1 + 8)
- name_size = read_number(server_input, 1, 1+8 + 2)
- name = server_input[1+8 + 2 + 1:1+8 + 2 + 1 + name_size].decode('utf8')
- if not sender in reg:
- reg.add(sender)
- queue.put(['client', Client(sender, name, port)])
- except queue_module.Full:
- pass
- elif sender in reg:
- if (server_input[0] == ServerRequests.REMOVE):
- reg.remove(sender)
- queue.put(['remove', sender])
- elif (server_input[0] == ServerRequests.CALC):
- print('Расчитано число: ', read_number(server_input, 4, 2))
- queue.put(['result', server_input[1], read_number(server_input, 4, 2)])
- else:
- print('Неизвестный отправитель: ', sender)
- except socket.timeout as tm:
- pass
- except socket.error as ex:
- pass
- except BaseException as ex:
- pass
- class Client:
- def __init__(self, ip, name, port):
- self.ip = ip
- self.name = name
- self.port = port
- self.working = 0
- self.target = -1
- def reset(self):
- global result
- self.working = 0
- if self.target != -1:
- result[self.target] = None
- self.target = -1
- def stop_channel():
- global main_loop
- value = input('Для остановки нажите enter\n')
- main_loop = False
- if __name__ == '__main__':
- SERVER_PASSWORD = 'valid cd'
- WORKING_TIME = 0.5
- client_table = dict()
- udp_socket_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Для отправки сообщений клиентам
- result = []
- for i in range(5000):
- result.append([i, None])
- print('Запуск сервера')
- main_loop = True;
- queue = multiprocessing.Queue(1000)
- thread = threading.Thread(target = stop_channel);
- thread.start();
- for i in range(10):
- print('# ', end = '')
- time.sleep(0.5)
- print()
- print('Запуск потока чтения')
- process = multiprocessing.Process(target = process_function, args=(SERVER_PASSWORD, queue))
- process.start()
- while main_loop: #Бесконечный цикл
- try:
- qln = queue.qsize()
- for i in range(qln):
- try:
- req = queue.get(False)
- if req[0] == 'remove':
- client = client_table[client]
- client.reset()
- client_table.remove(req[1][0])
- elif req[0] == 'result':
- result[req[2]][1] = req[1]
- elif req[0] == 'client':
- client = req[1]
- if not client.ip in client_table:
- client_table[client.ip] = client
- udp_socket_send.sendto(make_data(ServerResponse.REGISTER), (client.ip, client.port))
- except queue_module.Empty as em:
- pass
- if len(client_table) > 0:
- current_time = time.time()
- for client_id in client_table:
- client = client_table[client_id]
- if (current_time - client.working) > WORKING_TIME:
- number = 0
- if client.target != -1:
- number = client.target
- else:
- all = True
- for i in range(len(result)):
- if result[i][1] == None:
- result[i][1] = -1
- number = i
- all = False
- break
- if all:
- main_loop = False
- break
- udp_socket_send.sendto(make_data(ServerResponse.RESPONCE, Number(number, 4)), (client.ip, client.port))
- except socket.timeout as tm:
- pass
- except socket.error as ex:
- pass
- except BaseException as ex:
- pass
- udp_socket_send.close()
- process.terminate()
- print('Обработка завершена, вывод результата')
- print(result)
- queue.close()
- input()
Клиент
- #Объявление констант
- SERVER_IP = '127.0.0.1' #IP сервера
- SERVER_PORT = 3333 #Порт сервера
- SERVER_ADDRESS = (SERVER_IP, SERVER_PORT)
- INPUT_PORT = 4444 #Порт для приёма сообщений от сервера
- INPUT_BUFFER = 255 #Максимальный размер пакета, который может быть получен от сервера
- INPUT_TIMEOUT = 0.25 #Таймаут для сообщений от сервера
- SERVER_PASSWORD = 'valid cd'
- USERNAME = 'Demonorium@yandex.ru'
- udp_socket_send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для отправки серверу
- udp_socket_receive = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для приёма сообщений от сервера
- udp_socket_receive.bind(('', INPUT_PORT)) #Указываем, что слушаем порт INPUT_PORT
- udp_socket_receive.settimeout(INPUT_TIMEOUT)
- REGISTER_REQUEST = make_data(
- ServerRequests.REGISTER, #Код запроса
- SERVER_PASSWORD, #Пароль
- Number(INPUT_PORT, 2), #Порт для приёма сообщений
- len(USERNAME), #Длина имени
- USERNAME #Имя
- )
- #Объявление глобальных переменных
- data_received = False #Если был получен хоть один ответ от сервера - true
- def isPrime(n): #Проверка числа на простоту
- if n % 2 == 0:
- return n == 2
- d = 3
- while d * d <= n and n % d != 0:
- d += 2
- return d * d > n
- def send_to_server_raw(bin_data: bytearray):
- '''Отправляет указанные данные серверу'''
- global udp_socket_send, SERVER_ADDRESS
- print('Отправка запроса к главному серверу:', bin_data)
- udp_socket_send.sendto(bin_data, SERVER_ADDRESS)
- def send_to_server(*data):
- '''Переводит данные в двоичный вид и отправляет главному серверу'''
- send_to_server_raw(make_data(*data))
- print('Запуск клиента')
- main_loop = True;
- def stop_channel():
- global main_loop
- value = input('Для остановки нажите enter\n')
- main_loop = False;
- thread = threading.Thread(target = stop_channel);
- thread.start();
- for i in range(10):
- print('# ', end = '')
- time.sleep(0.5)
- print()
- send_to_server_raw(REGISTER_REQUEST)
- table = dict()
- while main_loop: #Бесконечный цикл
- try:
- server_input, sender = udp_socket_receive.recvfrom(INPUT_BUFFER)
- print('Получена датаграмма:', server_input)
- if not main_loop:
- send_to_server(ServerRequests.REMOVE)
- break
- print('Запрос от ', sender, ' = ', server_input)
- if len(server_input) < 1: #Проверка на корректность
- continue
- #Помечаем, что мы получили какие-то данные от сервера
- if not data_received:
- data_received = True
- if server_input[0] == ServerResponse.RESPONCE:
- number = read_number(server_input, 4, 1)
- res = False
- if number in table:
- res = table[number]
- else:
- res = isPrime(number)
- table[number] = res
- if res:
- send_to_server(ServerRequests.CALC, 1, Number(number, 4))
- else:
- send_to_server(ServerRequests.CALC, 0, Number(number, 4))
- else:
- print('Ошибка: запрос не распознан, неизвестный код')
- except socket.timeout as tm: #Игоририруем ошибки сокета
- pass
- except socket.error as ex:
- pass
- except BaseException as ex: #Выводим в лог сообщения об остальном
- print(ex)
- if not data_received: #Если ни разу не прилетел ответ от сервера
- print('Запрос регистрации')
- send_to_server_raw(REGISTER_REQUEST)
- send_to_server(ServerRequests.REMOVE)
- udp_socket_receive.close()
- udp_socket_send.close()