class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html")
# глобальный словарь для хранения текущих соединений
ws_clients = {} class WebsocketHandler(tornado.websocket.WebSocketHandler):
# подписка на канал (асинхронно) @tornado.gen.coroutine def listen_redis(self, channel_id): self.client = tornadoredis.Client() self.client.connect() yield tornado.gen.Task(self.client.subscribe, channel_id) self.client.listen(self.redis_message)
# обработчик поступления сообщений из редиса def redis_message(self,message): if(message.kind != 'subscribe'): data = message.body print(data)
# обработчик открытия соединения def open(self): print('Open connection') # генерируем уникальный идентификатор клиента из таймстампа sign = hashlib.md5(str(datetime.datetime.now()).encode('utf-8')).hexdigest() self.client_id = sign # добавление нового соединения в глобальный словарь ws_clients[sign] = {} ws_clients[sign]['connection'] = self ws_clients[sign]['username'] = 'undefined' # подписка на канал Redis self.listen_redis(self.client_id)
# обработчик поступления сообщения из клиента по вебсокету def on_message(self, message): print('got message %s' % message)
# обработчик закрытия соединения клиентом def on_close(self): print('close connection') # удаление соединения из глобального словаря del ws_clients[self.client_id]
Опишем 3 основные сигнальные функции серверной части:
offer – запрос на подключение
answer – ответ на запрос удаленной точкой
candidate – обмен ICE кандидатами
if message['action'] == 'offer': print("Sending offer to: %s" % message['destination']) message['initiator_id'] = self.client_id redis_client.publish(\ message['destination'],\ json.dumps(message)\ )
if message['action'] == 'answer': print("Sending answer to: %s" % message['destination']) redis_client.publish(\ message['destination'],\ json.dumps(message)\ )
if message['action'] == 'candidate': print("Sending ICE candidate to: %s" % message['destination']) redis_client.publish(\ message['destination'],\ json.dumps(message)\ )
Все функции однотипны и просто посылают сообщение в канал Redis с идентификатором, указанном в параметре destination сообщения.
Осталось отработать эти функции на стороне клиента. Но прежде, необходимо получить на клиенте список онлайн пользователей чтобы иметь возможность выбрать одного для соединения.
Добавим следующую функцию на сервере, которая формирует этот список и отдает его всем онлайн-соединениям:
def send_cliens_to_all(): clnts = [] # сформируем список соединений for key, value in ws_clients.items(): clnts.append({ 'id': key, 'username': value['username'] })
# отправим список в каждое соединение for key, value in ws_clients.items(): ws_clients[key]['connection'].write_message(\ {\ 'action': 'update_clients',\ 'message': json.dumps(clnts)\ }\ )
Теперь вызовем эту функцию при логине и завершении соединения.
def on_message(self, message): ... send_cliens_to_all() ... # обработчик закрытия соединения клиентом def on_close(self): print('close connection') # удаление соединения из глобального словаря del ws_clients[self.client_id] send_cliens_to_all()
Добавим элемент ul в код шаблона где будет наш список пользователей онлайн.
Будем его обновлять при поступлении сообщения с параметром action равным ’update_clients’:
ws.onmessage = function (evt) { let jdata = JSON.parse(evt.data); ... if( jdata['action'] == 'update_clients'){ console.log(`Updating clients $`); clientList.empty(); let data = JSON.parse(jdata['message']); data.forEach(function (item, index) { clientList.find(`#$`).on('click',call); });
Мы проходим циклом по списку активных пользователей и для каждого элемента добавляем ссылку для вызова, на которую вешаем функцию-обработчик call.
Разберем процесс взаимодействия между пользователями для установления передачи одного видео-аудио потока на примере следующей UML диаграммы последовательностей.
В результате такого взаимодействия, мы получаем два объекта RTCPeerConnection, по одному на каждую точку peer. В каждом соединении установлено два SDP объекта в Local и Remote параметрах, каждое их которых содержит список ICE кандидатов от удаленного и локального хоста. Данные SDP и ICE передаются между клиентами при помощи сигнального сервера в указанной на диаграмме последовательности.
Наш код будет разделен на 4 части:
Инициализация.
Описание логики обработчиков вебсокет-соединения.
Описание логики клиентского приложения.
Сервисная часть.
Инициализация
В ходе этого процесса, мы находим все элементы на странице и определяем их в коде как переменные. Затем «подвешиваем» на нужные элементы обработчики. Создаем RTCPeerConnection объекты ps1 и ps2 в каждом из приложений и тоже «цепляем» на них обработчики. Однако, в нашем примере мы будем использовать по одному RTCPeerConnection объекту из каждого приложения. Пока мы хотим передать только один поток и в одном направлении.
В дополнение инициализации, создается ряд конфигурационных переменных и объект веб-сокет соединения.
var localStream; let pc1; let pc2; const offerOptions = { offerToReceiveAudio: 1, offerToReceiveVideo: 1 };
// ws var ws = new WebSocket("ws://localhost:8888/websocket"); const configuration = {};
// Создаем объекты RTCPeerConnection c пустой конфигурацией console.log('RTCPeerConnection configuration:', configuration); pc1 = new RTCPeerConnection(configuration); console.log('Created local peer connection object pc1'); pc2 = new RTCPeerConnection(configuration); console.log('Created remote peer connection object pc2');
// Добавляем обработчики на событие добавления ICE кандидата pc1.addEventListener('icecandidate', e => onIceCandidate(pc1, e, 'pc1')); pc2.addEventListener('icecandidate', e => onIceCandidate(pc2, e, 'pc2'));
// Начало соенинение с удаленным хостом в ответ на его запрос offer if( jdata['action'] == 'offer'){ console.log('Geting offer'); // Обработчик добавления потока на второе соединение pc2.addEventListener('track', gotRemoteStream);
// устанавливаем initiator_id в input для того чтоб знать // его при передаче ICE кандидата abonentId.val(jdata['initiator_id']);
// установка RemoteDescription pc2.setRemoteDescription(jdata['offer']).then(function(){ console.log('pc2.setRemoteDescription'); // генерация ответного SDP offer pc2.createAnswer().then(function(answer){ // в onCreateAnswerSuccess мы отправим answer в сигнальный сервер // и установим pc2.setLocalDescription onCreateAnswerSuccess(answer,jdata['initiator_id']); },onCreateSessionDescriptionError); },onSetSessionDescriptionError); }
// обмен ICE кандидатами if( jdata['action'] == 'candidate'){ console.log('ICE candidate'); if (jdata['candidate'] != null){ let candidate = new RTCIceCandidate(jdata['candidate']); // мы используем параметр pc для того, чтобы определить для какого соединения // приходящий кандидат предназначается if(jdata['pc'] == 'pc1'){ console.log(`Addinng ICE to pc1`); pc1.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError); } else { console.log('Addinng ICE to pc2'); pc2.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError); } } } }
Описание логики клиентского приложения
function login(){ console.log('Login button clecked'); let message = {'action': 'login', 'message': username.val()} ws.send(JSON.stringify(message)); }
// Точка входа в инициировании соединения (клик на ссылке пользователя онлайн). async function call(evt) { // заберем идентификатор абонента let con_id = $(evt.target).attr('id'); // устанавливаем его в input для того чтоб знать // destination при передаче ICE кандидата abonentId.val(con_id); // Установим состояние кнопок callButton.disabled = true; hangupButton.disabled = false;
// Получаем и выводим информацию о медиа-потоках const videoTracks = localStream.getVideoTracks(); const audioTracks = localStream.getAudioTracks(); if (videoTracks.length > 0) { console.log(`Using video device: $`); } if (audioTracks.length > 0) { console.log(`Using audio device: $`); }
// Достаем медиа-дорожки из текущего stream объекта и передаем их в объект RTCPeerConnection localStream.getTracks().forEach(track => pc1.addTrack(track, localStream));
// Формируем offer из pc1 try { console.log('pc1 createOffer start'); const offer = await pc1.createOffer(offerOptions); // в onCreateOfferSuccess мы отправим offer сигнальному серверу // и установим pc1.setLocalDescription await onCreateOfferSuccess(offer,con_id); } catch (e) { console.log(`$`); } }
function stop() { console.log('Stop') // тут будем все обнулять console.log('Ending call'); localStream.getTracks().forEach(function(track) { track.stop(); }); pc1.close(); pc2.close(); pc1 = null; pc2 = null; hangupButton.hide(); startButton.show(); };
// Функция формирования offer async function onCreateOfferSuccess(desc, conn_id) { console.log('pc1 setLocalDescription start'); // отправляем запрос offer на сервер let message = {'action': 'offer', 'offer': desc, 'destination': conn_id} ws.send(JSON.stringify(message)); try { await pc1.setLocalDescription(desc); onSetLocalSuccess(pc1); } catch (e) { console.log(`error setting description to pc1 $`); } }
// Функция добавление потока к элементу remoteVideo function gotRemoteStream(e) { if (remoteVideo.srcObject !== e.streams[0]) { remoteVideo.srcObject = e.streams[0]; console.log('pc2 received remote stream'); } }
function onSetLocalSuccess(pc) { console.log(`$ setLocalDescription complete`); }
function onSetRemoteSuccess(pc) { console.log(`$ setRemoteDescription complete`); }
function onCreateSessionDescriptionError(error) { console.log(`Failed to create session description: $`); }
function onAddIceCandidateSuccess(pc) { console.log(`$ addIceCandidate success`); }
function onAddIceCandidateError(error) { console.log(` failed to add ICE Candidate: $`); }
function onSetSessionDescriptionError(error) { console.log(`Error setting SESSION description: $`); }
localVideo.addEventListener('loadedmetadata', function() { console.log(`Local video videoWidth: $px, videoHeight: $px`); });
remoteVideo.addEventListener('loadedmetadata', function() { console.log(`Remote video videoWidth: $px, videoHeight: $px`); });
Код шаблона HTML
Результат работы нашего приложения изображен на рисунке ниже.
В заключение
Это была последняя статья из цикла, где мы рассмотрели основные стадии установки соединения и передачи информации между его участниками, а также алгоритмы взаимодействия между программными компонентами приложения. Подписывайтесь на наш блог, чтобы получать больше полезных материалов.
Если вас заинтересовали возможности, которые предоставляет WebRTC-приложение – оставьте заявку на сайте. Специалисты Wezom создадут действительно эффективное решение для вашего бизнеса.