Сервер будем писать на python, используя фреймворк Tornado. В качестве транспорта для сообщений будет использован сервер Redis.
Установка сервера Redis:
sudo apt-get install redis-server
Установка Tornado:
pip install tornado==4.5.3 redis tornado-redis
Заготовка для сервера, файл server.py.
#!/usr/bin/env python
import tornado.ioloop
import tornado.web
# шаблонизатор
loader = tornado.template.Loader(".")
# утилита перегрузки сервера при изменениях кода
from tornado import autoreload
# обработчик вебсокетов
from tornado.websocket import WebSocketHandler
import json
import datetime
import time
import hashlib
import tornadoredis
# Создаем клиента Redis в глобальной переменной
# для посылки сообщений напрямую в канал из всех мест
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# отдает главную страницу
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html")
# глобальный словарь для хранения текущих соединений
ws_clients = {}
class WebsocketHandler(tornado.websocket.WebSocketHandler):
# подписка на канал (асинхронно)
@tornado.gen.coroutine
def listen_redis(self, channel_id):
self.client = tornadoredis.Client()
self.client.connect()
yield tornado.gen.Task(self.client.subscribe, channel_id)
self.client.listen(self.redis_message)
# обработчик поступления сообщений из редиса
def redis_message(self,message):
if(message.kind != 'subscribe'):
data = message.body
print(data)
# обработчик открытия соединения
def open(self):
print('Open connection')
# генерируем уникальный идентификатор клиента из таймстампа
sign = hashlib.md5(str(datetime.datetime.now()).encode('utf-8')).hexdigest()
self.client_id = sign
# добавление нового соединения в глобальный словарь
ws_clients[sign] = {}
ws_clients[sign]['connection'] = self
ws_clients[sign]['username'] = 'undefined'
# подписка на канал Redis
self.listen_redis(self.client_id)
# обработчик поступления сообщения из клиента по вебсокету
def on_message(self, message):
print('got message %s' % message)
# обработчик закрытия соединения клиентом
def on_close(self):
print('close connection')
# удаление соединения из глобального словаря
del ws_clients[self.client_id]
# конфигурируем приложение роутингом
def make_app():
return tornado.web.Application([
# главная страница
(r"/", MainHandler),
# отдача статики
(r'/static/(.*)', tornado.web.StaticFileHandler, {'path': 'static'}),
# запросы по веб-сокету
(r"/websocket", WebsocketHandler),
])
if __name__ == "__main__":
print('Starting server on 8888 port')
autoreload.start()
autoreload.watch('.')
autoreload.watch('index.html')
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
HTML форма логина.
В элементе socketId будем хранить идентификатор соединения на сервере.
Авторизационный запрос
В нем мы заберем имя пользователя и передадим его на сервер по веб-сокету.
// ws
var ws = new WebSocket("ws://localhost:8888/websocket");
// Login
const loginButton = document.getElementById('loginButton');
const username = document.getElementById('username');
loginButton.addEventListener('click', login);
function login(){
console.log('Logining');
let message = {'action': 'login', 'message': username.value}
ws.send(JSON.stringify(message));
}
Отработаем на сервере процесс авторизации пользователя в функции on_message.
def on_message(self, message):
print('got message %s' % message)
message = json.loads(message)
if message['action'] == 'login':
print('Login with name %s' % message['message'])
ws_clients[self.client_id]['username'] = message['message']
# отправляем клиенту его идентификатор соединения на сервере
self.write_message(\
{\
'action': 'set_connection_id',\
'message': self.client_id\
}\
)
print(ws_clients[self.client_id])
Мы добавили логин пользователя в словарь, в котором находятся все «живые» соединения и отдали клиенту его внутренний идентификатор сервера.
Добавим этот идентификатор на форму страницы, определив обработчик onmessage объекта ws.
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
if( jdata['action'] == 'set_connection_id'){
console.log(`Set sign $`);
socketId.value = jdata['message'];
}
}
Результат работы:
Опишем 3 основные сигнальные функции серверной части:
- offer – запрос на подключение
- answer – ответ на запрос удаленной точкой
- candidate – обмен ICE кандидатами
if message['action'] == 'offer':
print("Sending offer to: %s" % message['destination'])
message['initiator_id'] = self.client_id
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
if message['action'] == 'answer':
print("Sending answer to: %s" % message['destination'])
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
if message['action'] == 'candidate':
print("Sending ICE candidate to: %s" % message['destination'])
redis_client.publish(\
message['destination'],\
json.dumps(message)\
)
Все функции однотипны и просто посылают сообщение в канал Redis с идентификатором, указанном в параметре destination сообщения.
Осталось отработать эти функции на стороне клиента. Но прежде, необходимо получить на клиенте список онлайн пользователей чтобы иметь возможность выбрать одного для соединения.
Добавим следующую функцию на сервере, которая формирует этот список и отдает его всем онлайн-соединениям:
def send_cliens_to_all():
clnts = []
# сформируем список соединений
for key, value in ws_clients.items():
clnts.append({
'id': key,
'username': value['username']
})
# отправим список в каждое соединение
for key, value in ws_clients.items():
ws_clients[key]['connection'].write_message(\
{\
'action': 'update_clients',\
'message': json.dumps(clnts)\
}\
)
Теперь вызовем эту функцию при логине и завершении соединения.
def on_message(self, message):
...
send_cliens_to_all()
...
# обработчик закрытия соединения клиентом
def on_close(self):
print('close connection')
# удаление соединения из глобального словаря
del ws_clients[self.client_id]
send_cliens_to_all()
Добавим элемент ul в код шаблона где будет наш список пользователей онлайн.
Будем его обновлять при поступлении сообщения с параметром action равным ’update_clients’:
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
...
if( jdata['action'] == 'update_clients'){
console.log(`Updating clients $`);
clientList.empty();
let data = JSON.parse(jdata['message']);
data.forEach(function (item, index) {
clientList.find(`#$`).on('click',call);
});
Мы проходим циклом по списку активных пользователей и для каждого элемента добавляем ссылку для вызова, на которую вешаем функцию-обработчик call.
Разберем процесс взаимодействия между пользователями для установления передачи одного видео-аудио потока на примере следующей UML диаграммы последовательностей.
В результате такого взаимодействия, мы получаем два объекта RTCPeerConnection, по одному на каждую точку peer. В каждом соединении установлено два SDP объекта в Local и Remote параметрах, каждое их которых содержит список ICE кандидатов от удаленного и локального хоста. Данные SDP и ICE передаются между клиентами при помощи сигнального сервера в указанной на диаграмме последовательности.
Наш код будет разделен на 4 части:
- Инициализация.
- Описание логики обработчиков вебсокет-соединения.
- Описание логики клиентского приложения.
- Сервисная часть.
Инициализация
В ходе этого процесса, мы находим все элементы на странице и определяем их в коде как переменные. Затем «подвешиваем» на нужные элементы обработчики. Создаем RTCPeerConnection объекты ps1 и ps2 в каждом из приложений и тоже «цепляем» на них обработчики. Однако, в нашем примере мы будем использовать по одному RTCPeerConnection объекту из каждого приложения. Пока мы хотим передать только один поток и в одном направлении.
В дополнение инициализации, создается ряд конфигурационных переменных и объект веб-сокет соединения.
//// Инициализация /////
const startButton = $('#startButton');
const callButton = $('#callButton');
const hangupButton = $('#stopButton');
startButton.on('click', start);
callButton.on('click', call);
hangupButton.on('click', stop);
const localVideo = document.getElementById('localVideo');
const remoteVideo = document.getElementById('remoteVideo');
var localStream;
let pc1;
let pc2;
const offerOptions = {
offerToReceiveAudio: 1,
offerToReceiveVideo: 1
};
// ws
var ws = new WebSocket("ws://localhost:8888/websocket");
const configuration = {};
// Создаем объекты RTCPeerConnection c пустой конфигурацией
console.log('RTCPeerConnection configuration:', configuration);
pc1 = new RTCPeerConnection(configuration);
console.log('Created local peer connection object pc1');
pc2 = new RTCPeerConnection(configuration);
console.log('Created remote peer connection object pc2');
// Добавляем обработчики на событие добавления ICE кандидата
pc1.addEventListener('icecandidate', e => onIceCandidate(pc1, e, 'pc1'));
pc2.addEventListener('icecandidate', e => onIceCandidate(pc2, e, 'pc2'));
// Login
const loginButton = $('#loginButton');
const username = $('#username');
const socketId = $('#socketId');
const abonentId = $('#abonentId');
const clientList = $('#clientList');
loginButton.on('click', login);
Описание логики обработчиков веб-сокет соединения
Приведенный далее код снабжен подробными комментариями и не нуждается в пояснении.
/// Обработчик onmessage socket соединения ////
ws.onmessage = function (evt) {
let jdata = JSON.parse(evt.data);
// Установка текущего идентификатора сокет-соединения,
// сгенерированного сервером Tornado
if( jdata['action'] == 'set_connection_id'){
console.log(`Set sign $`);
socketId.val(jdata['message']);
}
// обновление списка пользователей онлайн
if( jdata['action'] == 'update_clients'){
console.log(`Updating clients $`);
clientList.empty();
let data = JSON.parse(jdata['message']);
data.forEach(function (item, index) {
console.log(item);
clientList.find(`#$`).on('click',call);
});
}
// Начало соенинение с удаленным хостом в ответ на его запрос offer
if( jdata['action'] == 'offer'){
console.log('Geting offer');
// Обработчик добавления потока на второе соединение
pc2.addEventListener('track', gotRemoteStream);
// устанавливаем initiator_id в input для того чтоб знать
// его при передаче ICE кандидата
abonentId.val(jdata['initiator_id']);
// установка RemoteDescription
pc2.setRemoteDescription(jdata['offer']).then(function(){
console.log('pc2.setRemoteDescription');
// генерация ответного SDP offer
pc2.createAnswer().then(function(answer){
// в onCreateAnswerSuccess мы отправим answer в сигнальный сервер
// и установим pc2.setLocalDescription
onCreateAnswerSuccess(answer,jdata['initiator_id']);
},onCreateSessionDescriptionError);
},onSetSessionDescriptionError);
}
// реакция на запрос answer
if( jdata['action'] == 'answer'){
console.log('Geting answer');
pc1.setRemoteDescription(jdata['offer']).then(onSetRemoteSuccess,onSetSessionDescriptionError)
}
// обмен ICE кандидатами
if( jdata['action'] == 'candidate'){
console.log('ICE candidate');
if (jdata['candidate'] != null){
let candidate = new RTCIceCandidate(jdata['candidate']);
// мы используем параметр pc для того, чтобы определить для какого соединения
// приходящий кандидат предназначается
if(jdata['pc'] == 'pc1'){
console.log(`Addinng ICE to pc1`);
pc1.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
} else {
console.log('Addinng ICE to pc2');
pc2.addIceCandidate(candidate).then(onAddIceCandidateSuccess,onAddIceCandidateError);
}
}
}
}
Описание логики клиентского приложения
function login(){
console.log('Login button clecked');
let message = {'action': 'login', 'message': username.val()}
ws.send(JSON.stringify(message));
}
// Включение камеры и получение медиа-потоков.
async function start() {
console.log('Requesting local stream');
startButton.hide();
hangupButton.show();
try {
const stream = await navigator.mediaDevices.getUserMedia();
console.log('Received local stream');
localVideo.srcObject = stream;
localStream = stream;
callButton.show();
} catch (e) {
alert(`getUserMedia() error: $`);
}
};
// Точка входа в инициировании соединения (клик на ссылке пользователя онлайн).
async function call(evt) {
// заберем идентификатор абонента
let con_id = $(evt.target).attr('id');
// устанавливаем его в input для того чтоб знать
// destination при передаче ICE кандидата
abonentId.val(con_id);
// Установим состояние кнопок
callButton.disabled = true;
hangupButton.disabled = false;
// Получаем и выводим информацию о медиа-потоках
const videoTracks = localStream.getVideoTracks();
const audioTracks = localStream.getAudioTracks();
if (videoTracks.length > 0) {
console.log(`Using video device: $`);
}
if (audioTracks.length > 0) {
console.log(`Using audio device: $`);
}
// Достаем медиа-дорожки из текущего stream объекта и передаем их в объект RTCPeerConnection
localStream.getTracks().forEach(track => pc1.addTrack(track, localStream));
// Формируем offer из pc1
try {
console.log('pc1 createOffer start');
const offer = await pc1.createOffer(offerOptions);
// в onCreateOfferSuccess мы отправим offer сигнальному серверу
// и установим pc1.setLocalDescription
await onCreateOfferSuccess(offer,con_id);
} catch (e) {
console.log(`$`);
}
}
function stop() {
console.log('Stop')
// тут будем все обнулять
console.log('Ending call');
localStream.getTracks().forEach(function(track) {
track.stop();
});
pc1.close();
pc2.close();
pc1 = null;
pc2 = null;
hangupButton.hide();
startButton.show();
};
// Функция формирования offer
async function onCreateOfferSuccess(desc, conn_id) {
console.log('pc1 setLocalDescription start');
// отправляем запрос offer на сервер
let message = {'action': 'offer', 'offer': desc, 'destination': conn_id}
ws.send(JSON.stringify(message));
try {
await pc1.setLocalDescription(desc);
onSetLocalSuccess(pc1);
} catch (e) {
console.log(`error setting description to pc1 $`);
}
}
// Добавление ICE кандидатов.
function onIceCandidate(pc, event, dest) {
// отправляем запрос candidate на сервер
if(dest=='pc1') {
var message = {
'action': 'candidate',
'pc': 'pc2',
'candidate': event.candidate,
'destination': abonentId.val()
}
} else {
var message = {
'action': 'candidate',
'pc': 'pc1',
'candidate': event.candidate,
'destination': abonentId.val()
}
}
ws.send(JSON.stringify(message));
}
// Функция формирования ответного SDP offer
async function onCreateAnswerSuccess(desc,initiator_id) {
console.log(`Answer from pc2`);
// посылаем ответ answer на сервер
let message = {'action': 'answer', 'offer': desc, 'destination': initiator_id}
ws.send(JSON.stringify(message));
try {
await pc2.setLocalDescription(desc);
onSetLocalSuccess(pc2);
} catch (e) {
onSetSessionDescriptionError(e);
}
}
// Функция добавление потока к элементу remoteVideo
function gotRemoteStream(e) {
if (remoteVideo.srcObject !== e.streams[0]) {
remoteVideo.srcObject = e.streams[0];
console.log('pc2 received remote stream');
}
}
Сервисные функции для отладки
function getName(pc) {
return (pc === pc1) ? 'pc1' : 'pc2';
}
function getOtherPc(pc) {
return (pc === pc1) ? pc2 : pc1;
}
function onSetLocalSuccess(pc) {
console.log(`$ setLocalDescription complete`);
}
function onSetRemoteSuccess(pc) {
console.log(`$ setRemoteDescription complete`);
}
function onCreateSessionDescriptionError(error) {
console.log(`Failed to create session description: $`);
}
function onAddIceCandidateSuccess(pc) {
console.log(`$ addIceCandidate success`);
}
function onAddIceCandidateError(error) {
console.log(` failed to add ICE Candidate: $`);
}
function onSetSessionDescriptionError(error) {
console.log(`Error setting SESSION description: $`);
}
localVideo.addEventListener('loadedmetadata', function() {
console.log(`Local video videoWidth: $px, videoHeight: $px`);
});
remoteVideo.addEventListener('loadedmetadata', function() {
console.log(`Remote video videoWidth: $px, videoHeight: $px`);
});
Код шаблона HTML
Результат работы нашего приложения изображен на рисунке ниже.
В заключение
Это была последняя статья из цикла, где мы рассмотрели основные стадии установки соединения и передачи информации между его участниками, а также алгоритмы взаимодействия между программными компонентами приложения. Подписывайтесь на наш блог, чтобы получать больше полезных материалов.
Если вас заинтересовали возможности, которые предоставляет WebRTC-приложение – оставьте заявку на сайте. Специалисты Wezom создадут действительно эффективное решение для вашего бизнеса.