Простой пример сервера и клиента

Оглавление

Описание

Я привожу пример клиента и сервера, написанных без учёта раздела DDOS. Реализована модель, когда сервер использует несколько клиентов для распределённых вычислений. Сервер отправляет клиентам запросы на вычисление, а клиенты результаты. Этот код использует протокол UDP, параллельные вычисления и может быть использован для создания простых распределённых систем. Для защиты от ошибок введена процедура регистрации. Клиент должен отправить серверу запрос, содержащий:

Любой запрос имеет следующую структуру:

Код запроса это целое число определяющее действие, выполняемое при получении запроса.

Этот пример вычисляет простоту чисел из диапазона, распределяя вычисления между клиентами.

Процедура взаимодействия следующая: клиент пытается зарегистрироваться на сервере. Сервер подтверждает регистрацию, затем отправляет клиенту запрос на вычисление. Клиент вычисляет и отвечает.

Запрос на вычисление выглядит следующим образом:

Ответ на запрос:

В случае своей остановки, клиент отправляет запрос на удаление содержащий 1 байт - код запроса.

Общий код

Все модули используют следующий код:

  1. import socket
  2. import multiprocessing
  3. import sys
  4. import ctypes
  5. import threading
  6. import time
  7.  
  8. class Number:
  9.     '''Хранит число и сколько байт оно занимает.'''
  10.     def __init__(self, data, size):
  11.         self.data = data
  12.         self.size = size;
  13.  
  14. def append_number(data: bytearray, number: int, size: int):
  15.     for i in range(size):
  16.         data.append(number & 255)
  17.         number = number >> 8
  18.  
  19. def append_string(data: bytearray, stringstr):
  20.     data.extend(string.encode('utf8'))
  21.  
  22. def append_data(data: bytearray, *args):
  23.     for arg in args: #Перебираем аргументы
  24.         if type(arg) == str#Если встретилась строка, добавляем соотвествующей процедурой
  25.            append_string(data, arg)
  26.         elif type(arg) == Number: #Специальный объект для чисел произвольного размера
  27.             append_number(data, arg.data, arg.size)
  28.         elif type(arg) == list#Если встречается список, используем оператор распаковки, т.е. append_data(data, [100, 20]) тоже самое, что append_data(data, 100, 20)
  29.             append_data(data, *arg)
  30.         elif type(arg) == tuple#Если встречается кортеж, используем оператор распаковки, т.е. append_data(data, [100, 20]) тоже самое, что append_data(data, 10
  31.             append_data(data, *arg)
  32.         else#Если не встретился указанный тип, считаем, что arg - число от 0 до 255 и записываем
  33.             data.append(arg)
  34.  
  35. def make_data(*args):
  36.     '''Сформировать bytearray из данных.
  37.     Преобразует любую последовательность аргументов в bytearray'''
  38.     data = bytearray()
  39.     append_data(data, args)
  40.     return data
  41.  
  42. def read_number(data, size, offset = 0):
  43.     '''Прочитать число из двоичных данных. Операция обратная append_number'''
  44.     number = 0;
  45.     for i in range(offset, offset + size):
  46.         number += data[i] << (8*(i - offset))
  47.     return number;
  48.  
  49. def check(input, check_data):
  50.     '''Проверить что check_data содержится в начале input, т.е. check(make_data(10, 20), make_data(10)) = True'''
  51.     if len(input) < len(check_data):
  52.         return False
  53.     for i in range(len(check_data)):
  54.         if input[i] != check_data[i]:
  55.             return False
  56.     return True
  57.  
  58.  
  59. class ServerRequests:
  60.     '''Описывает все запросы от клиента к серверу.'''
  61.     REGISTER     = 0 #Регистрация 
  62.     REMOVE       = 1 #Полное удаление из списка 
  63.     CALC         = 2 #Результат вычисления
  64.  
  65. class ServerResponse:
  66.     '''Описывает все запросы от сервера к клиенту.'''
  67.     REGISTER   = 0 #Успешная регистрация
  68.     RESPONCE   = 1 #Выдача задачи

Сервер

  1. import queue as queue_module
  2.  
  3. def process_function(ps, queue: multiprocessing.Queue):
  4.     INPUT_PORT = 3333       #Порт для приёма сообщений
  5.     INPUT_BUFFER = 255      #Максимальный размер пакета, который может прилететь 
  6.     INPUT_TIMEOUT = 0.25    #Таймаут для сообщений
  7.     udp_socket_receive  = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для приёма сообщений от сервера
  8.     udp_socket_receive.bind(('', INPUT_PORT))   #Указываем, что слушаем порт INPUT_PORT
  9.     udp_socket_receive.settimeout(INPUT_TIMEOUT)
  10.     reg = set()
  11.     while True:
  12.         try:
  13.             server_input, sender = udp_socket_receive.recvfrom(INPUT_BUFFER)
  14.             sender = sender[0]
  15.             if len(server_input) < 1:
  16.                 continue
  17.  
  18.  
  19.             if server_input[0] == ServerRequests.REGISTER:
  20.                 print('Регистрация: ', server_input)
  21.                 try:
  22.                     if len(server_input) < (1 + 8 + 2 + 1 + 1):
  23.                         print('Ошибка регистрации')
  24.                         continue
  25.                     password = server_input[1:1 + 8].decode('utf8')
  26.                     if password != ps:
  27.                         print('Неверный пароль:', password)
  28.                     port = read_number(server_input, 2, 1 + 8)
  29.                     name_size = read_number(server_input, 1, 1+8 + 2)
  30.                     name = server_input[1+8 + 2 + 1:1+8 + 2 + 1 + name_size].decode('utf8')
  31.                     if not sender in reg:
  32.                         reg.add(sender)
  33.                     queue.put(['client', Client(sender, name, port)])
  34.                 except queue_module.Full:
  35.                     pass
  36.  
  37.             elif sender in reg:
  38.                 if (server_input[0] == ServerRequests.REMOVE):
  39.                     reg.remove(sender)
  40.                     queue.put(['remove', sender])
  41.                 elif (server_input[0] == ServerRequests.CALC):
  42.                     print('Расчитано число: ', read_number(server_input, 4, 2))
  43.                     queue.put(['result', server_input[1], read_number(server_input, 4, 2)])
  44.             else:
  45.                 print('Неизвестный отправитель: ', sender)
  46.  
  47.         except socket.timeout as tm:
  48.             pass
  49.         except socket.error as ex:
  50.             pass
  51.         except BaseException as ex: 
  52.             pass
  53. class Client:
  54.     def __init__(self, ip, name, port):
  55.         self.ip = ip
  56.         self.name = name
  57.         self.port = port
  58.         self.working = 0
  59.         self.target = -1
  60.     def reset(self):
  61.         global result
  62.         self.working = 0
  63.         if self.target != -1:
  64.             result[self.target] = None
  65.         self.target = -1
  66.  
  67. def stop_channel():
  68.     global main_loop
  69.     value = input('Для остановки нажите enter\n')
  70.     main_loop = False
  71.  
  72. if __name__ == '__main__':
  73.     SERVER_PASSWORD = 'valid cd'
  74.     WORKING_TIME = 0.5
  75.  
  76.     client_table = dict()
  77.     udp_socket_send     = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Для отправки сообщений клиентам
  78.  
  79.     result = []
  80.     for i in range(5000):
  81.         result.append([i, None])
  82.  
  83.     print('Запуск сервера')
  84.     main_loop = True;
  85.  
  86.     queue = multiprocessing.Queue(1000)
  87.  
  88.     thread = threading.Thread(target = stop_channel);
  89.     thread.start();
  90.  
  91.     for i in range(10):
  92.         print('# ', end = '')
  93.         time.sleep(0.5)
  94.     print()
  95.  
  96.     print('Запуск потока чтения')
  97.     process = multiprocessing.Process(target = process_function, args=(SERVER_PASSWORD, queue))
  98.     process.start()
  99.  
  100.     while main_loop: #Бесконечный цикл
  101.         try:
  102.             qln = queue.qsize()
  103.             for i in range(qln):
  104.                 try:
  105.                     req = queue.get(False)
  106.                     if req[0] == 'remove':
  107.                         client = client_table[client]
  108.                         client.reset()
  109.                         client_table.remove(req[1][0])
  110.                     elif req[0] == 'result':
  111.                         result[req[2]][1] = req[1]
  112.                     elif req[0] == 'client':
  113.                         client = req[1]
  114.                         if not client.ip in client_table:
  115.                             client_table[client.ip] = client
  116.                             udp_socket_send.sendto(make_data(ServerResponse.REGISTER), (client.ip, client.port))
  117.                 except queue_module.Empty as em:
  118.                     pass
  119.  
  120.             if len(client_table) > 0:
  121.                 current_time = time.time()
  122.                 for client_id in client_table:
  123.                     client = client_table[client_id]
  124.                     if (current_time - client.working) > WORKING_TIME:
  125.                         number = 0
  126.                         if client.target != -1:
  127.                             number = client.target
  128.                         else:
  129.                             all = True
  130.                             for i in range(len(result)):
  131.                                 if result[i][1] == None:
  132.                                     result[i][1] = -1
  133.                                     number = i
  134.                                     all = False
  135.                                     break
  136.                             if all
  137.                                 main_loop = False
  138.                                 break
  139.                         udp_socket_send.sendto(make_data(ServerResponse.RESPONCE, Number(number, 4)), (client.ip, client.port))
  140.         except socket.timeout as tm:
  141.             pass
  142.         except socket.error as ex:
  143.             pass
  144.         except BaseException as ex:
  145.             pass
  146.  
  147.     udp_socket_send.close()
  148.     process.terminate()
  149.  
  150.     print('Обработка завершена, вывод результата')
  151.     print(result)
  152.     queue.close()
  153.     input()

Клиент

  1. #Объявление констант
  2. SERVER_IP = '127.0.0.1'  #IP сервера
  3. SERVER_PORT = 3333       #Порт сервера
  4. SERVER_ADDRESS = (SERVER_IP, SERVER_PORT)
  5.  
  6. INPUT_PORT = 4444       #Порт для приёма сообщений от сервера
  7. INPUT_BUFFER = 255      #Максимальный размер пакета, который может быть получен от сервера
  8. INPUT_TIMEOUT = 0.25    #Таймаут для сообщений от сервера
  9.  
  10. SERVER_PASSWORD = 'valid cd'
  11. USERNAME = 'Demonorium@yandex.ru'
  12.  
  13. udp_socket_send     = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для отправки серверу
  14. udp_socket_receive  = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) #Сокет для приёма сообщений от сервера
  15. udp_socket_receive.bind(('', INPUT_PORT))   #Указываем, что слушаем порт INPUT_PORT
  16. udp_socket_receive.settimeout(INPUT_TIMEOUT)
  17.  
  18. REGISTER_REQUEST = make_data(
  19.     ServerRequests.REGISTER,    #Код запроса
  20.     SERVER_PASSWORD,            #Пароль
  21.     Number(INPUT_PORT, 2),      #Порт для приёма сообщений
  22.     len(USERNAME),              #Длина имени
  23.     USERNAME                    #Имя
  24.     )
  25.  
  26. #Объявление глобальных переменных
  27. data_received = False       #Если был получен хоть один ответ от сервера - true
  28.  
  29. def isPrime(n)#Проверка числа на простоту
  30.     if n % 2 == 0:
  31.         return n == 2
  32.     d = 3
  33.     while d * d <= n and n % d != 0:
  34.         d += 2
  35.     return d * d > n
  36.  
  37. def send_to_server_raw(bin_data: bytearray):
  38.     '''Отправляет указанные данные серверу'''
  39.     global udp_socket_send, SERVER_ADDRESS
  40.     print('Отправка запроса к главному серверу:', bin_data)
  41.     udp_socket_send.sendto(bin_data, SERVER_ADDRESS)
  42.  
  43. def send_to_server(*data):
  44.     '''Переводит данные в двоичный вид и отправляет главному серверу'''
  45.     send_to_server_raw(make_data(*data))
  46.  
  47. print('Запуск клиента')
  48. main_loop = True;
  49.  
  50. def stop_channel():
  51.     global main_loop
  52.     value = input('Для остановки нажите enter\n')
  53.     main_loop = False;
  54.  
  55.  
  56. thread = threading.Thread(target = stop_channel);
  57. thread.start();
  58.  
  59. for i in range(10):
  60.     print('# ', end = '')
  61.     time.sleep(0.5)
  62. print()
  63.  
  64. send_to_server_raw(REGISTER_REQUEST)
  65. table = dict()
  66. while main_loop: #Бесконечный цикл
  67.     try:
  68.         server_input, sender = udp_socket_receive.recvfrom(INPUT_BUFFER)
  69.         print('Получена датаграмма:', server_input)
  70.  
  71.         if not main_loop: 
  72.             send_to_server(ServerRequests.REMOVE)
  73.             break
  74.  
  75.         print('Запрос от ', sender, ' = ', server_input)
  76.  
  77.         if len(server_input) < 1#Проверка на корректность
  78.             continue
  79.  
  80.         #Помечаем, что мы получили какие-то данные от сервера
  81.         if not data_received:
  82.             data_received = True
  83.  
  84.         if server_input[0] == ServerResponse.RESPONCE:
  85.             number = read_number(server_input, 4, 1)
  86.             res = False
  87.             if number in table:
  88.                res = table[number]
  89.             else:
  90.                 res = isPrime(number)
  91.                 table[number] = res
  92.  
  93.             if res:
  94.                 send_to_server(ServerRequests.CALC, 1, Number(number, 4))
  95.             else:
  96.                 send_to_server(ServerRequests.CALC, 0, Number(number, 4))
  97.         else:
  98.             print('Ошибка: запрос не распознан, неизвестный код')
  99.     except socket.timeout as tm: #Игоририруем ошибки сокета
  100.         pass
  101.     except socket.error as ex:
  102.         pass
  103.     except BaseException as ex: #Выводим в лог сообщения об остальном
  104.         print(ex)
  105.  
  106.     if not data_received: #Если ни разу не прилетел ответ от сервера
  107.         print('Запрос регистрации')
  108.         send_to_server_raw(REGISTER_REQUEST)
  109.  
  110. send_to_server(ServerRequests.REMOVE)
  111. udp_socket_receive.close()
  112. udp_socket_send.close()