Дмитрий
Дмитрий
Developer Python, PHP, Javascript
14.06.2019

Пишем сигнальный сервер Tornado

Дмитрий
Дмитрий
Developer Python, PHP, Javascript
14.06.2019
14.06.2019
4.5
9439
0

Сервер будем писать на python, используя фреймворк Tornado. В качестве транспорта для сообщений будет использован сервер Redis.

Установка сервера Redis:

sudo apt-get install redis-server

Установка Tornado:

pip install tornado==4.5.3 redis tornado-redis

Заготовка для сервера, файл server.py.

 #!/usr/bin/env python
   import tornado.ioloop
   import tornado.web
   # шаблонизатор
   loader = tornado.template.Loader(".")
   # утилита перегрузки сервера при изменениях кода
   from tornado import autoreload
   # обработчик вебсокетов
   from tornado.websocket import WebSocketHandler
   import json
   import datetime
   import time
   import hashlib
   import tornadoredis

   # Создаем клиента Redis в глобальной переменной
   # для посылки сообщений напрямую в канал из всех мест

   import redis
   redis_client = redis.Redis(host='localhost', port=6379, db=0)

   # отдает главную страницу

   class MainHandler(tornado.web.RequestHandler):
       def get(self):
           self.render("index.html")

   # глобальный словарь для хранения текущих соединений

   ws_clients = {}
   class WebsocketHandler(tornado.websocket.WebSocketHandler):

        # подписка на канал (асинхронно)
       @tornado.gen.coroutine
       def listen_redis(self, channel_id):
           self.client = tornadoredis.Client()
           self.client.connect()
           yield tornado.gen.Task(self.client.subscribe, channel_id)
           self.client.listen(self.redis_message)

       # обработчик поступления сообщений из редиса
       def redis_message(self,message):
           if(message.kind != 'subscribe'):
               data = message.body
               print(data)

       # обработчик открытия соединения
       def open(self):
           print('Open connection')
           # генерируем уникальный идентификатор клиента из таймстампа
           sign = hashlib.md5(str(datetime.datetime.now()).encode('utf-8')).hexdigest()
           self.client_id = sign
           # добавление нового соединения в глобальный словарь
        ws_clients[sign] = {}
ws_clients[sign]['connection'] = self
          ws_clients[sign]['username'] = 'undefined'
          # подписка на канал Redis
          self.listen_redis(self.client_id)

       # обработчик поступления сообщения из клиента по вебсокету
       def on_message(self, message):
           print('got message %s' % message)

       # обработчик закрытия соединения клиентом
       def on_close(self):
           print('close connection')
           # удаление соединения из глобального словаря   
           del ws_clients[self.client_id]

   # конфигурируем приложение роутингом
   def make_app():
       return tornado.web.Application([
           # главная страница
           (r"/", MainHandler),
           # отдача статики
           (r'/static/(.*)', tornado.web.StaticFileHandler, {'path': 'static'}),
           # запросы по веб-сокету
           (r"/websocket", WebsocketHandler),
       ])

   if __name__ == "__main__":
       print('Starting server on 8888 port')
       autoreload.start()
       autoreload.watch('.')
       autoreload.watch('index.html')
       app = make_app()
       app.listen(8888)
       tornado.ioloop.IOLoop.current().start()

HTML форма логина.

В элементе socketId будем хранить идентификатор соединения на сервере.

Авторизационный запрос

В нем мы заберем имя пользователя и передадим его на сервер по веб-сокету.

 // ws
   var ws = new WebSocket("ws://localhost:8888/websocket");

   // Login
   const loginButton = document.getElementById('loginButton');
   const username = document.getElementById('username');
   loginButton.addEventListener('click', login);

   function login(){
     console.log('Logining');
     let message = {'action': 'login', 'message': username.value}
     ws.send(JSON.stringify(message));      
   }

Отработаем на сервере процесс авторизации пользователя в функции on_message.

     def on_message(self, message):
       print('got message %s' % message)
       message = json.loads(message)
       if message['action'] == 'login':
           print('Login with name %s' % message['message'])
           ws_clients[self.client_id]['username'] = message['message']
           # отправляем клиенту его идентификатор соединения на сервере
           self.write_message(\
               {\
                   'action': 'set_connection_id',\
                   'message': self.client_id\
               }\
           )
           print(ws_clients[self.client_id])

Мы добавили логин пользователя в словарь, в котором находятся все «живые» соединения и отдали клиенту его внутренний идентификатор сервера.

Добавим этот идентификатор на форму страницы, определив обработчик onmessage объекта ws.

  ws.onmessage = function (evt) {
     let jdata = JSON.parse(evt.data);    
     if( jdata['action'] == 'set_connection_id'){
         console.log(`Set sign $`);
         socketId.value = jdata['message'];
     }
   }

Результат работы:

Опишем 3 основные сигнальные функции серверной части:

  • offer – запрос на подключение
  • answer – ответ на запрос удаленной точкой
  • candidate – обмен ICE кандидатами
        if message['action'] == 'offer':
           print("Sending offer to: %s" % message['destination'])
           message['initiator_id'] = self.client_id
           redis_client.publish(\
               message['destination'],\
               json.dumps(message)\
               )
           
       if message['action'] == 'answer':
           print("Sending answer to: %s" % message['destination'])
           redis_client.publish(\
               message['destination'],\
               json.dumps(message)\
               )

       if message['action'] == 'candidate':
           print("Sending ICE candidate to: %s" % message['destination'])
           redis_client.publish(\
               message['destination'],\
               json.dumps(message)\
               )

Все функции однотипны и просто посылают сообщение в канал Redis с идентификатором, указанном в параметре destination сообщения.

Осталось отработать эти функции на стороне клиента. Но прежде, необходимо получить на клиенте список онлайн пользователей чтобы иметь возможность выбрать одного для соединения.

Добавим следующую функцию на сервере, которая формирует этот список и отдает его всем онлайн-соединениям:

  def send_cliens_to_all():
       clnts = []
       # сформируем список соединений
       for key, value in ws_clients.items():
           clnts.append({
               'id': key,
               'username': value['username']
           })

       # отправим список в каждое соединение
       for key, value in ws_clients.items():
           ws_clients[key]['connection'].write_message(\
               {\
                   'action': 'update_clients',\
                   'message': json.dumps(clnts)\
               }\
           )

Теперь вызовем эту функцию при логине и завершении соединения.

    def on_message(self, message):
       ...
           send_cliens_to_all()
   ...
   # обработчик закрытия соединения клиентом
   def on_close(self):
       print('close connection')
       # удаление соединения из глобального словаря   
       del ws_clients[self.client_id]
       send_cliens_to_all()

Добавим элемент ul в код шаблона где будет наш список пользователей онлайн.

Будем его обновлять при поступлении сообщения с параметром action равным ’update_clients’:

     ws.onmessage = function (evt) {
     let jdata = JSON.parse(evt.data);    
    ...
            if( jdata['action'] == 'update_clients'){
       console.log(`Updating clients $`);
       clientList.empty();
       let data = JSON.parse(jdata['message']);
       data.forEach(function (item, index) {

clientList.find(`#$`).on('click',call);
});

Мы проходим циклом по списку активных пользователей и для каждого элемента добавляем ссылку для вызова, на которую вешаем функцию-обработчик call.

Разберем процесс взаимодействия между пользователями для установления передачи одного видео-аудио потока на примере следующей UML диаграммы последовательностей.

В результате такого взаимодействия, мы получаем два объекта RTCPeerConnection, по одному на каждую точку peer. В каждом соединении установлено два SDP объекта в Local и Remote параметрах, каждое их которых содержит список ICE кандидатов от удаленного и локального хоста. Данные SDP и ICE передаются между клиентами при помощи сигнального сервера в указанной на диаграмме последовательности.

Наш код будет разделен на 4 части:

  • Инициализация.
  • Описание логики обработчиков вебсокет-соединения.
  • Описание логики клиентского приложения.
  • Сервисная часть.

Инициализация

В ходе этого процесса, мы находим все элементы на странице и определяем их в коде как переменные. Затем «подвешиваем» на нужные элементы обработчики. Создаем RTCPeerConnection объекты ps1 и ps2 в каждом из приложений и тоже «цепляем» на них обработчики. Однако, в нашем примере мы будем использовать по одному RTCPeerConnection объекту из каждого приложения. Пока мы хотим передать только один поток и в одном направлении.

В дополнение инициализации, создается ряд конфигурационных переменных и объект веб-сокет соединения.

 //// Инициализация /////
   const startButton = $('#startButton');
   const callButton = $('#callButton');
   const hangupButton = $('#stopButton');
   startButton.on('click', start);
   callButton.on('click', call);
   hangupButton.on('click', stop);
   const localVideo = document.getElementById('localVideo');
   const remoteVideo = document.getElementById('remoteVideo');
   
   var localStream;
   let pc1;
   let pc2;
   const offerOptions = {
     offerToReceiveAudio: 1,
     offerToReceiveVideo: 1
   };

   // ws
   var ws = new WebSocket("ws://localhost:8888/websocket");
   const configuration = {};

   // Создаем объекты RTCPeerConnection c пустой конфигурацией
   console.log('RTCPeerConnection configuration:', configuration);
   pc1 = new RTCPeerConnection(configuration);
   console.log('Created local peer connection object pc1');
   pc2 = new RTCPeerConnection(configuration);
   console.log('Created remote peer connection object pc2');

   // Добавляем обработчики на событие добавления ICE кандидата
   pc1.addEventListener('icecandidate', e => onIceCandidate(pc1, e, 'pc1'));
   pc2.addEventListener('icecandidate', e => onIceCandidate(pc2, e, 'pc2'));

   // Login
   const loginButton = $('#loginButton');
   const username = $('#username');
   const socketId = $('#socketId');
   const abonentId = $('#abonentId');
   const clientList = $('#clientList');
   loginButton.on('click', login);

Описание логики обработчиков веб-сокет соединения

Приведенный далее код снабжен подробными комментариями и не нуждается в пояснении.

 /// Обработчик onmessage socket соединения ////
   ws.onmessage = function (evt) {
     let jdata = JSON.parse(evt.data);  
     // Установка текущего идентификатора сокет-соединения,
     // сгенерированного сервером Tornado
     if( jdata['action'] == 'set_connection_id'){
         console.log(`Set sign $`);
         socketId.val(jdata['message']);
     }

     // обновление списка пользователей онлайн
     if( jdata['action'] == 'update_clients'){
       console.log(`Updating clients $`);
       clientList.empty();
       let data = JSON.parse(jdata['message']);
       data.forEach(function (item, index) {
         console.log(item);

clientList.find(`#$`).on('click',call);
       });
     }

     // Начало соенинение с удаленным хостом в ответ на его запрос offer
     if( jdata['action'] == 'offer'){
       console.log('Geting offer');
       // Обработчик добавления потока на второе соединение
       pc2.addEventListener('track', gotRemoteStream);

       // устанавливаем initiator_id в input для того чтоб знать
       // его при передаче ICE кандидата
       abonentId.val(jdata['initiator_id']);

       // установка RemoteDescription
       pc2.setRemoteDescription(jdata['offer']).then(function(){
         console.log('pc2.setRemoteDescription');
         // генерация ответного SDP offer
         pc2.createAnswer().then(function(answer){
           // в onCreateAnswerSuccess мы отправим answer в сигнальный сервер
           // и установим pc2.setLocalDescription
           onCreateAnswerSuccess(answer,jdata['initiator_id']);
         },onCreateSessionDescriptionError);
       },onSetSessionDescriptionError);
     }

     // реакция на запрос answer
     if( jdata['action'] == 'answer'){
       console.log('Geting answer');
       pc1.setRemoteDescription(jdata['offer']).then(onSetRemoteSuccess,onSetSessionDescriptionError)
     }

     // обмен ICE кандидатами
     if( jdata['action'] == 'candidate'){
       console.log('ICE candidate');
       if (jdata['candidate'] != null){
         let candidate = new RTCIceCandidate(jdata['candidate']);
         // мы используем параметр pc для того, чтобы определить для какого соединения
         // приходящий кандидат предназначается
         if(jdata['pc'] == 'pc1'){
           console.log(`Addinng ICE to pc1`);
           pc1.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
         } else {
           console.log('Addinng ICE to pc2');
           pc2.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
         }
       }
     }
   }

Описание логики клиентского приложения

     function login(){
     console.log('Login button clecked');
     let message = {'action': 'login', 'message': username.val()}
     ws.send(JSON.stringify(message));      
   }

   // Включение камеры и получение медиа-потоков.
   async function start() {
       console.log('Requesting local stream');
       startButton.hide();
       hangupButton.show();
     try {
         const stream = await navigator.mediaDevices.getUserMedia();
         console.log('Received local stream');
         localVideo.srcObject = stream;
         localStream = stream;
         callButton.show();
     } catch (e) {
         alert(`getUserMedia() error: $`);
     }
   };

   // Точка входа в инициировании соединения (клик на ссылке пользователя онлайн).
   async function call(evt) {
     // заберем идентификатор абонента
     let con_id = $(evt.target).attr('id');
     // устанавливаем его в input для того чтоб знать
     // destination при передаче ICE кандидата
     abonentId.val(con_id);
     // Установим состояние кнопок
     callButton.disabled = true;
     hangupButton.disabled = false;

     // Получаем и выводим информацию о медиа-потоках
     const videoTracks = localStream.getVideoTracks();
     const audioTracks = localStream.getAudioTracks();
     if (videoTracks.length > 0) {
       console.log(`Using video device: $`);
     }
     if (audioTracks.length > 0) {
       console.log(`Using audio device: $`);
     }
     
     // Достаем медиа-дорожки из текущего stream объекта и передаем их в объект RTCPeerConnection
     localStream.getTracks().forEach(track => pc1.addTrack(track, localStream));

     // Формируем offer из pc1
     try {
       console.log('pc1 createOffer start');
       const offer = await pc1.createOffer(offerOptions);
       // в onCreateOfferSuccess мы отправим offer сигнальному серверу
       // и установим pc1.setLocalDescription
       await onCreateOfferSuccess(offer,con_id);
     } catch (e) {
        console.log(`$`);
     }
   }

   function stop() {
       console.log('Stop')
       // тут будем все обнулять
       console.log('Ending call');
       localStream.getTracks().forEach(function(track) {
         track.stop();
       });
       pc1.close();
       pc2.close();
       pc1 = null;
       pc2 = null;
       hangupButton.hide();
       startButton.show();
   };
   
   // Функция формирования offer
   async function onCreateOfferSuccess(desc, conn_id) {
     console.log('pc1 setLocalDescription start');
     // отправляем запрос offer на сервер
     let message = {'action': 'offer', 'offer': desc, 'destination': conn_id}
     ws.send(JSON.stringify(message));
     try {
       await pc1.setLocalDescription(desc);
       onSetLocalSuccess(pc1);
     } catch (e) {
       console.log(`error setting description to pc1 $`);
     }
   }   

   // Добавление ICE кандидатов.
   function onIceCandidate(pc, event, dest) {
       // отправляем запрос candidate на сервер
       if(dest=='pc1') {
           var message = {
                        'action': 'candidate',
                        'pc': 'pc2',
                        'candidate': event.candidate,
                        'destination': abonentId.val()
                        }
       } else {
           var message = {
                       'action': 'candidate',
                       'pc': 'pc1',
                       'candidate': event.candidate,
                       'destination': abonentId.val()
                       }
       }
       ws.send(JSON.stringify(message));
   }

   // Функция формирования ответного SDP offer
   async function onCreateAnswerSuccess(desc,initiator_id) {
     console.log(`Answer from pc2`);

     // посылаем ответ answer на сервер
     let message = {'action': 'answer', 'offer': desc, 'destination': initiator_id}
     ws.send(JSON.stringify(message));  
  
     try {
       await pc2.setLocalDescription(desc);
       onSetLocalSuccess(pc2);
     } catch (e) {
       onSetSessionDescriptionError(e);
     }
   }

   // Функция добавление потока к элементу remoteVideo    
   function gotRemoteStream(e) {
     if (remoteVideo.srcObject !== e.streams[0]) {
       remoteVideo.srcObject = e.streams[0];
       console.log('pc2 received remote stream');
     }
   }

Сервисные функции для отладки

   function getName(pc) {
     return (pc === pc1) ? 'pc1' : 'pc2';
   }

   function getOtherPc(pc) {
     return (pc === pc1) ? pc2 : pc1;
   }

   function onSetLocalSuccess(pc) {
     console.log(`$ setLocalDescription complete`);
   }

   function onSetRemoteSuccess(pc) {
     console.log(`$ setRemoteDescription complete`);
   }

   function onCreateSessionDescriptionError(error) {
     console.log(`Failed to create session description: $`);
   }

   function onAddIceCandidateSuccess(pc) {
     console.log(`$ addIceCandidate success`);
   }

   function onAddIceCandidateError(error) {
     console.log(` failed to add ICE Candidate: $`);
   }

   function onSetSessionDescriptionError(error) {
     console.log(`Error setting SESSION description: $`);
   }

   localVideo.addEventListener('loadedmetadata', function() {
     console.log(`Local video videoWidth: $px,  videoHeight: $px`);
   });

   remoteVideo.addEventListener('loadedmetadata', function() {
     console.log(`Remote video videoWidth: $px,  videoHeight: $px`);
   });

Код шаблона HTML

Результат работы нашего приложения изображен на рисунке ниже.

В заключение

Это была последняя статья из цикла, где мы рассмотрели основные стадии установки соединения и передачи информации между его участниками, а также алгоритмы взаимодействия между программными компонентами приложения. Подписывайтесь на наш блог, чтобы получать больше полезных материалов.

Если вас заинтересовали возможности, которые предоставляет WebRTC-приложение – оставьте заявку на сайте. Специалисты Wezom создадут действительно эффективное решение для вашего бизнеса.

Как вам статья?
4.5
Проголосовало: 8
Давайте обсудим Ваш проект
Нажимая на кнопку “Отправить”, вы даете согласие на обработку личных данных. Подробнее
Комментарии
(0)
Будьте первыми, кто оставит комментарий
wezom logo
Остались вопросы?
Оставьте ваши контактные данные. Наш менеджер свяжется и проконсультирует вас.
Подписывайтесь на рассылку Айтыжблог
blog subscriber decor image
Хотите получать интересные статьи?
Нажимая на кнопку “Отправить”, вы даете согласие на обработку личных данных. Подробнее
Следите за нами в социальных сетях
Этот сайт использует cookie-файлы для более комфортной работы пользователя. Продолжая просматривать сайт, Вы соглашаетесь на использование cookie.